Skip to content

Commit 9ebcf33

Browse files
authored
Implement MCP on the server side (#6513)
Motivation: Instead of designing all MCP API and implmenting it, using [the official Java SDK](https://github.com/modelcontextprotocol/java-sdk?rgh-link-date=2025-11-11T16%3A09%3A53Z) for MCP support will be a more reasonable choice. Related: #6179 Modifications: - Forked the two upstream spring-webflux implementations and adjusted to work with Armeria - `ArmeriaStatelessServerTransport` supports simple stateless MCP. - `ArmeriaStreamableServerTransportProvider` fully support HTTP streamable protocol through server-sent events. - Modified the upstream `testCreateElicitationSuccess ` test since it internally blocks Armeria event loops when asserting. Result: - You can now serve MCP using the MCP Java SDK on an Armeria server. Future work: - Client integration - Add examples - Support custom annotations such as `@Tool`, `@Resource` to easily integrate POJO with MCP.
1 parent 2bcdb85 commit 9ebcf33

12 files changed

+1277
-0
lines changed

ai/mcp/build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
dependencies {
2+
implementation project(":jsonrpc")
3+
api libs.mcp.core
4+
testImplementation libs.mcp.test
5+
testImplementation libs.mcp.spring.webflux
6+
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Copyright 2025 LY Corporation
3+
*
4+
* LY Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
/*
17+
* Copyright 2025-2025 the original author or authors.
18+
*/
19+
20+
package com.linecorp.armeria.server.ai.mcp;
21+
22+
import static com.linecorp.armeria.server.ai.mcp.ArmeriaStreamableServerTransportProvider.canAcceptSse;
23+
import static com.linecorp.armeria.server.ai.mcp.ArmeriaStreamableServerTransportProvider.writeContext;
24+
25+
import java.io.IOException;
26+
import java.util.concurrent.CompletableFuture;
27+
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import com.linecorp.armeria.common.HttpRequest;
32+
import com.linecorp.armeria.common.HttpResponse;
33+
import com.linecorp.armeria.common.HttpStatus;
34+
import com.linecorp.armeria.common.MediaType;
35+
import com.linecorp.armeria.common.ResponseHeaders;
36+
import com.linecorp.armeria.common.annotation.Nullable;
37+
import com.linecorp.armeria.common.annotation.UnstableApi;
38+
import com.linecorp.armeria.common.jsonrpc.JsonRpcError;
39+
import com.linecorp.armeria.common.jsonrpc.JsonRpcResponse;
40+
import com.linecorp.armeria.common.util.UnmodifiableFuture;
41+
import com.linecorp.armeria.server.AbstractHttpService;
42+
import com.linecorp.armeria.server.HttpService;
43+
import com.linecorp.armeria.server.Server;
44+
import com.linecorp.armeria.server.ServiceRequestContext;
45+
46+
import io.modelcontextprotocol.common.McpTransportContext;
47+
import io.modelcontextprotocol.json.McpJsonMapper;
48+
import io.modelcontextprotocol.server.McpStatelessServerHandler;
49+
import io.modelcontextprotocol.server.McpTransportContextExtractor;
50+
import io.modelcontextprotocol.spec.McpSchema;
51+
import io.modelcontextprotocol.spec.McpStatelessServerTransport;
52+
import reactor.core.publisher.Mono;
53+
54+
/**
55+
* Implementation of {@link McpStatelessServerTransport} for Armeria server.
56+
*/
57+
@UnstableApi
58+
public final class ArmeriaStatelessServerTransport implements McpStatelessServerTransport {
59+
60+
// Forked from https://github.com/modelcontextprotocol/java-sdk/blob/80d0ad82a6b88a8ce8756dad3d4c90c4ae62ca69/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStatelessServerTransport.java
61+
// with modifications to work with Armeria
62+
63+
private static final Logger logger = LoggerFactory.getLogger(ArmeriaStatelessServerTransport.class);
64+
65+
/**
66+
* Returns a new {@link ArmeriaStatelessServerTransport} with default settings.
67+
*/
68+
public static ArmeriaStatelessServerTransport of() {
69+
return builder().build();
70+
}
71+
72+
/**
73+
* Returns a new builder for {@link ArmeriaStatelessServerTransport}.
74+
*/
75+
public static ArmeriaStatelessServerTransportBuilder builder() {
76+
return new ArmeriaStatelessServerTransportBuilder();
77+
}
78+
79+
private final HttpService httpService = new McpStatelessService();
80+
private final McpJsonMapper jsonMapper;
81+
private final McpTransportContextExtractor<ServiceRequestContext> contextExtractor;
82+
@Nullable
83+
private McpStatelessServerHandler mcpHandler;
84+
85+
private volatile boolean isClosing;
86+
87+
ArmeriaStatelessServerTransport(McpJsonMapper jsonMapper,
88+
McpTransportContextExtractor<ServiceRequestContext> contextExtractor) {
89+
90+
this.jsonMapper = jsonMapper;
91+
this.contextExtractor = contextExtractor;
92+
}
93+
94+
@Override
95+
public void setMcpHandler(McpStatelessServerHandler mcpHandler) {
96+
this.mcpHandler = mcpHandler;
97+
}
98+
99+
@Override
100+
public Mono<Void> closeGracefully() {
101+
return Mono.fromRunnable(() -> isClosing = true);
102+
}
103+
104+
/**
105+
* Returns the {@link HttpService} that defines the transport's HTTP endpoints.
106+
* This {@link HttpService} should be registered to Armeria {@link Server}.
107+
*
108+
* <p>The {@link HttpService} defines one endpoint with two methods:
109+
* <ul>
110+
* <li>GET - Unsupported, returns 405 METHOD NOT ALLOWED</li>
111+
* <li>POST - For handling client requests and notifications</li>
112+
* </ul>
113+
*/
114+
public HttpService httpService() {
115+
return httpService;
116+
}
117+
118+
private class McpStatelessService extends AbstractHttpService {
119+
120+
@Override
121+
protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) throws Exception {
122+
if (isClosing) {
123+
return HttpResponse.of(HttpStatus.SERVICE_UNAVAILABLE, MediaType.PLAIN_TEXT,
124+
"Server is shutting down");
125+
}
126+
127+
if (!canAcceptSse(req.headers())) {
128+
return HttpResponse.of(HttpStatus.BAD_REQUEST, MediaType.PLAIN_TEXT,
129+
"Accept header must include both application/json " +
130+
"and text/event-stream");
131+
}
132+
133+
return HttpResponse.of(req.aggregate().thenComposeAsync(agg -> {
134+
try {
135+
return handlePost(ctx, agg.contentUtf8());
136+
} catch (IllegalArgumentException | IOException e) {
137+
logger.debug("Failed to deserialize message: {}", e.getMessage(), e);
138+
final HttpResponse response =
139+
HttpResponse.ofJson(HttpStatus.BAD_REQUEST, JsonRpcResponse.ofFailure(
140+
JsonRpcError.PARSE_ERROR.withData("Invalid message format")));
141+
return UnmodifiableFuture.completedFuture(response);
142+
}
143+
}, ctx.blockingTaskExecutor()));
144+
}
145+
146+
private CompletableFuture<HttpResponse> handlePost(ServiceRequestContext ctx, String jsonText)
147+
throws IOException {
148+
final McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(jsonMapper, jsonText);
149+
final McpTransportContext mcpContext = contextExtractor.extract(ctx);
150+
151+
assert mcpHandler != null;
152+
if (message instanceof McpSchema.JSONRPCRequest rpcRequest) {
153+
return mcpHandler.handleRequest(mcpContext, rpcRequest)
154+
.contextWrite(writeContext(mcpContext))
155+
.toFuture().thenApply(rpcResponse -> {
156+
try {
157+
final String json = jsonMapper.writeValueAsString(rpcResponse);
158+
return HttpResponse.of(HttpStatus.OK, MediaType.JSON, json);
159+
} catch (IOException e) {
160+
logger.warn("Failed to serialize response: {}", e.getMessage(), e);
161+
return HttpResponse.ofJson(HttpStatus.INTERNAL_SERVER_ERROR,
162+
JsonRpcResponse.ofFailure(
163+
JsonRpcError.INTERNAL_ERROR.withData(
164+
"Failed to serialize response")));
165+
}
166+
});
167+
} else if (message instanceof McpSchema.JSONRPCNotification notification) {
168+
return mcpHandler.handleNotification(mcpContext, notification)
169+
.contextWrite(writeContext(mcpContext))
170+
.toFuture()
171+
.thenApply(unused -> HttpResponse.of(ResponseHeaders.of(HttpStatus.ACCEPTED)));
172+
} else {
173+
final HttpResponse response = HttpResponse.ofJson(
174+
HttpStatus.BAD_REQUEST,
175+
JsonRpcResponse.ofFailure(JsonRpcError.INVALID_REQUEST.withData(
176+
"The server accepts either requests or notifications")));
177+
return UnmodifiableFuture.completedFuture(response);
178+
}
179+
}
180+
}
181+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2025 LY Corporation
3+
*
4+
* LY Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package com.linecorp.armeria.server.ai.mcp;
18+
19+
import static java.util.Objects.requireNonNull;
20+
21+
import com.fasterxml.jackson.databind.json.JsonMapper;
22+
23+
import com.linecorp.armeria.common.annotation.UnstableApi;
24+
import com.linecorp.armeria.server.ServiceRequestContext;
25+
26+
import io.modelcontextprotocol.common.McpTransportContext;
27+
import io.modelcontextprotocol.json.McpJsonMapper;
28+
import io.modelcontextprotocol.server.McpTransportContextExtractor;
29+
30+
/**
31+
* A builder for creating an instance of {@link ArmeriaStatelessServerTransport}.
32+
*/
33+
@UnstableApi
34+
public final class ArmeriaStatelessServerTransportBuilder {
35+
36+
private McpJsonMapper jsonMapper = McpJsonMapper.getDefault();
37+
private McpTransportContextExtractor<ServiceRequestContext> contextExtractor =
38+
serverRequest -> McpTransportContext.EMPTY;
39+
40+
ArmeriaStatelessServerTransportBuilder() {}
41+
42+
/**
43+
* Sets the {@link JsonMapper} to use for JSON serialization/deserialization of MCP
44+
* messages.
45+
*/
46+
public ArmeriaStatelessServerTransportBuilder jsonMapper(McpJsonMapper jsonMapper) {
47+
this.jsonMapper = requireNonNull(jsonMapper, "jsonMapper");
48+
return this;
49+
}
50+
51+
/**
52+
* Sets the {@link McpTransportContextExtractor} that allows providing the MCP feature
53+
* implementations to inspect HTTP transport level metadata that was present at
54+
* HTTP request processing time. This allows to extract custom headers and other
55+
* useful data for use during execution later on in the process.
56+
*/
57+
public ArmeriaStatelessServerTransportBuilder contextExtractor(
58+
McpTransportContextExtractor<ServiceRequestContext> contextExtractor) {
59+
this.contextExtractor = requireNonNull(contextExtractor, "contextExtractor");
60+
return this;
61+
}
62+
63+
/**
64+
* Builds a new instance of {@link ArmeriaStatelessServerTransport} with the configured settings.
65+
*/
66+
public ArmeriaStatelessServerTransport build() {
67+
return new ArmeriaStatelessServerTransport(jsonMapper, contextExtractor);
68+
}
69+
}

0 commit comments

Comments
 (0)