Skip to content

feat(mcp): refactor logging to use exchange for targeted client notifications #132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

Expand Down Expand Up @@ -453,15 +454,10 @@ void testLoggingLevelsWithoutInitialization() {
@Test
void testLoggingLevels() {
withClient(createMcpTransport(), mcpAsyncClient -> {
Mono<Void> testAllLevels = mcpAsyncClient.initialize().then(Mono.defer(() -> {
Mono<Void> chain = Mono.empty();
for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) {
chain = chain.then(mcpAsyncClient.setLoggingLevel(level));
}
return chain;
}));

StepVerifier.create(testAllLevels).verifyComplete();
StepVerifier
.create(mcpAsyncClient.initialize()
.thenMany(Flux.fromArray(McpSchema.LoggingLevel.values()).flatMap(mcpAsyncClient::setLoggingLevel)))
.verifyComplete();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,53 +416,4 @@ void testRootsChangeHandlers() {
.doesNotThrowAnyException();
}

// ---------------------------------------
// Logging Tests
// ---------------------------------------

@Test
void testLoggingLevels() {
var mcpAsyncServer = McpServer.async(createMcpTransportProvider())
.serverInfo("test-server", "1.0.0")
.capabilities(ServerCapabilities.builder().logging().build())
.build();

// Test all logging levels
for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) {
var notification = McpSchema.LoggingMessageNotification.builder()
.level(level)
.logger("test-logger")
.data("Test message with level " + level)
.build();

StepVerifier.create(mcpAsyncServer.loggingNotification(notification)).verifyComplete();
}
}

@Test
void testLoggingWithoutCapability() {
var mcpAsyncServer = McpServer.async(createMcpTransportProvider())
.serverInfo("test-server", "1.0.0")
.capabilities(ServerCapabilities.builder().build()) // No logging capability
.build();

var notification = McpSchema.LoggingMessageNotification.builder()
.level(McpSchema.LoggingLevel.INFO)
.logger("test-logger")
.data("Test log message")
.build();

StepVerifier.create(mcpAsyncServer.loggingNotification(notification)).verifyComplete();
}

@Test
void testLoggingWithNullNotification() {
var mcpAsyncServer = McpServer.async(createMcpTransportProvider())
.serverInfo("test-server", "1.0.0")
.capabilities(ServerCapabilities.builder().logging().build())
.build();

StepVerifier.create(mcpAsyncServer.loggingNotification(null)).verifyError(McpError.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -388,53 +388,4 @@ void testRootsChangeHandlers() {
assertThatCode(() -> noConsumersServer.closeGracefully()).doesNotThrowAnyException();
}

// ---------------------------------------
// Logging Tests
// ---------------------------------------

@Test
void testLoggingLevels() {
var mcpSyncServer = McpServer.sync(createMcpTransportProvider())
.serverInfo("test-server", "1.0.0")
.capabilities(ServerCapabilities.builder().logging().build())
.build();

// Test all logging levels
for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) {
var notification = McpSchema.LoggingMessageNotification.builder()
.level(level)
.logger("test-logger")
.data("Test message with level " + level)
.build();

assertThatCode(() -> mcpSyncServer.loggingNotification(notification)).doesNotThrowAnyException();
}
}

@Test
void testLoggingWithoutCapability() {
var mcpSyncServer = McpServer.sync(createMcpTransportProvider())
.serverInfo("test-server", "1.0.0")
.capabilities(ServerCapabilities.builder().build()) // No logging capability
.build();

var notification = McpSchema.LoggingMessageNotification.builder()
.level(McpSchema.LoggingLevel.INFO)
.logger("test-logger")
.data("Test log message")
.build();

assertThatCode(() -> mcpSyncServer.loggingNotification(notification)).doesNotThrowAnyException();
}

@Test
void testLoggingWithNullNotification() {
var mcpSyncServer = McpServer.sync(createMcpTransportProvider())
.serverInfo("test-server", "1.0.0")
.capabilities(ServerCapabilities.builder().logging().build())
.build();

assertThatThrownBy(() -> mcpSyncServer.loggingNotification(null)).isInstanceOf(McpError.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -786,10 +786,9 @@ public Mono<Void> setLoggingLevel(LoggingLevel loggingLevel) {
}

return this.withInitializationCheck("setting logging level", initializedResult -> {
String levelName = this.transport.unmarshalFrom(loggingLevel, new TypeReference<String>() {
});
Map<String, Object> params = Map.of("level", levelName);
return this.mcpSession.sendNotification(McpSchema.METHOD_LOGGING_SET_LEVEL, params);
var params = new McpSchema.SetLevelRequest(loggingLevel);
return this.mcpSession.sendRequest(McpSchema.METHOD_LOGGING_SET_LEVEL, params, new TypeReference<Object>() {
}).then();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import java.time.Duration;

import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.modelcontextprotocol.spec.McpSchema.CallToolResult;
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
import io.modelcontextprotocol.spec.McpSchema.SetLevelRequest;
import io.modelcontextprotocol.spec.McpSchema.Tool;
import io.modelcontextprotocol.spec.McpServerSession;
import io.modelcontextprotocol.spec.McpServerTransportProvider;
Expand Down Expand Up @@ -216,11 +217,17 @@ public Mono<Void> notifyPromptsListChanged() {
// ---------------------------------------

/**
* Send a logging message notification to all connected clients. Messages below the
* current minimum logging level will be filtered out.
* This implementation would, incorrectly, broadcast the logging message to all
* connected clients, using a single minLoggingLevel for all of them. Similar to the
* sampling and roots, the logging level should be set per client session and use the
* ServerExchange to send the logging message to the right client.
* @param loggingMessageNotification The logging message to send
* @return A Mono that completes when the notification has been sent
* @deprecated Use
* {@link McpAsyncServerExchange#loggingNotification(LoggingMessageNotification)}
* instead.
*/
@Deprecated
public Mono<Void> loggingNotification(LoggingMessageNotification loggingMessageNotification) {
return this.delegate.loggingNotification(loggingMessageNotification);
}
Expand Down Expand Up @@ -257,6 +264,8 @@ private static class AsyncServerImpl extends McpAsyncServer {

private final ConcurrentHashMap<String, McpServerFeatures.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>();

// FIXME: this field is deprecated and should be remvoed together with the
// broadcasting loggingNotification.
private LoggingLevel minLoggingLevel = LoggingLevel.DEBUG;

private List<String> protocolVersions = List.of(McpSchema.LATEST_PROTOCOL_VERSION);
Expand Down Expand Up @@ -677,12 +686,22 @@ public Mono<Void> loggingNotification(LoggingMessageNotification loggingMessageN
loggingMessageNotification);
}

private McpServerSession.RequestHandler<Void> setLoggerRequestHandler() {
private McpServerSession.RequestHandler<Object> setLoggerRequestHandler() {
return (exchange, params) -> {
this.minLoggingLevel = objectMapper.convertValue(params, new TypeReference<LoggingLevel>() {
});
return Mono.defer(() -> {

return Mono.empty();
SetLevelRequest newMinLoggingLevel = objectMapper.convertValue(params,
new TypeReference<SetLevelRequest>() {
});

exchange.setMinLoggingLevel(newMinLoggingLevel.level());

// FIXME: this field is deprecated and should be removed together
// with the broadcasting loggingNotification.
this.minLoggingLevel = newMinLoggingLevel.level();

return Mono.just(Map.of());
});
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
/*
* Copyright 2024-2024 the original author or authors.
*/

package io.modelcontextprotocol.server;

import com.fasterxml.jackson.core.type.TypeReference;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
import io.modelcontextprotocol.spec.McpServerSession;
import io.modelcontextprotocol.util.Assert;
import reactor.core.publisher.Mono;

/**
* Represents an asynchronous exchange with a Model Context Protocol (MCP) client. The
* exchange provides methods to interact with the client and query its capabilities.
*
* @author Dariusz Jędrzejczyk
* @author Christian Tzolov
*/
public class McpAsyncServerExchange {

Expand All @@ -20,6 +28,8 @@ public class McpAsyncServerExchange {

private final McpSchema.Implementation clientInfo;

private volatile LoggingLevel minLoggingLevel = LoggingLevel.INFO;

private static final TypeReference<McpSchema.CreateMessageResult> CREATE_MESSAGE_RESULT_TYPE_REF = new TypeReference<>() {
};

Expand Down Expand Up @@ -101,4 +111,38 @@ public Mono<McpSchema.ListRootsResult> listRoots(String cursor) {
LIST_ROOTS_RESULT_TYPE_REF);
}

/**
* Send a logging message notification to all connected clients. Messages below the
* current minimum logging level will be filtered out.
* @param loggingMessageNotification The logging message to send
* @return A Mono that completes when the notification has been sent
*/
public Mono<Void> loggingNotification(LoggingMessageNotification loggingMessageNotification) {

if (loggingMessageNotification == null) {
return Mono.error(new McpError("Logging message must not be null"));
}

return Mono.defer(() -> {
if (this.isNotificationForLevelAllowed(loggingMessageNotification.level())) {
return this.session.sendNotification(McpSchema.METHOD_NOTIFICATION_MESSAGE, loggingMessageNotification);
}
return Mono.empty();
});
}

/**
* Set the minimum logging level for the client. Messages below this level will be
* filtered out.
* @param minLoggingLevel The minimum logging level
*/
void setMinLoggingLevel(LoggingLevel minLoggingLevel) {
Assert.notNull(minLoggingLevel, "minLoggingLevel must not be null");
this.minLoggingLevel = minLoggingLevel;
}

private boolean isNotificationForLevelAllowed(LoggingLevel loggingLevel) {
return loggingLevel.level() >= this.minLoggingLevel.level();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

package io.modelcontextprotocol.server;

import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
import io.modelcontextprotocol.util.Assert;

Expand Down Expand Up @@ -151,9 +149,16 @@ public void notifyPromptsListChanged() {
}

/**
* Send a logging message notification to all clients.
* @param loggingMessageNotification The logging message notification to send
* This implementation would, incorrectly, broadcast the logging message to all
* connected clients, using a single minLoggingLevel for all of them. Similar to the
* sampling and roots, the logging level should be set per client session and use the
* ServerExchange to send the logging message to the right client.
* @param loggingMessageNotification The logging message to send
* @deprecated Use
* {@link McpSyncServerExchange#loggingNotification(LoggingMessageNotification)}
* instead.
*/
@Deprecated
public void loggingNotification(LoggingMessageNotification loggingMessageNotification) {
this.asyncServer.loggingNotification(loggingMessageNotification).block();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
/*
* Copyright 2024-2024 the original author or authors.
*/

package io.modelcontextprotocol.server;

import com.fasterxml.jackson.core.type.TypeReference;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;

/**
* Represents a synchronous exchange with a Model Context Protocol (MCP) client. The
* exchange provides methods to interact with the client and query its capabilities.
*
* @author Dariusz Jędrzejczyk
* @author Christian Tzolov
*/
public class McpSyncServerExchange {

Expand Down Expand Up @@ -75,4 +81,13 @@ public McpSchema.ListRootsResult listRoots(String cursor) {
return this.exchange.listRoots(cursor).block();
}

/**
* Send a logging message notification to all connected clients. Messages below the
* current minimum logging level will be filtered out.
* @param loggingMessageNotification The logging message to send
*/
public void loggingNotification(LoggingMessageNotification loggingMessageNotification) {
this.exchange.loggingNotification(loggingMessageNotification).block();
}

}
5 changes: 5 additions & 0 deletions mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,11 @@ public int level() {

} // @formatter:on

@JsonInclude(JsonInclude.Include.NON_ABSENT)
@JsonIgnoreProperties(ignoreUnknown = true)
public record SetLevelRequest(@JsonProperty("level") LoggingLevel level) {
}

// ---------------------------
// Autocomplete
// ---------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

Expand Down Expand Up @@ -454,15 +455,10 @@ void testLoggingLevelsWithoutInitialization() {
@Test
void testLoggingLevels() {
withClient(createMcpTransport(), mcpAsyncClient -> {
Mono<Void> testAllLevels = mcpAsyncClient.initialize().then(Mono.defer(() -> {
Mono<Void> chain = Mono.empty();
for (McpSchema.LoggingLevel level : McpSchema.LoggingLevel.values()) {
chain = chain.then(mcpAsyncClient.setLoggingLevel(level));
}
return chain;
}));

StepVerifier.create(testAllLevels).verifyComplete();
StepVerifier
.create(mcpAsyncClient.initialize()
.thenMany(Flux.fromArray(McpSchema.LoggingLevel.values()).flatMap(mcpAsyncClient::setLoggingLevel)))
.verifyComplete();
});
}

Expand Down
Loading