Spring AI 源码解析:MCP链路调用流程及示例
MCP官方文档:https://modelcontextprotocol.io/introduction
java版的MCP源码:https://github.com/modelcontextprotocol/java-sdk
- 本版源码解析,取自mcp/java-sdk(20250322),等正式发版后会再度更新
理论部分
MCP调用链路(核心)
以client-webflux、server-webflux为例
初始化连接链路...

客户端咨询问题,调用服务端工具链路

McpClient(客户端)
用于创建MCP客户端的工厂类,提供了同步、异步客户端的方法,并支持多种配置,提供如下核心功能
- 配置选项:运行设置请求超时、客户端能力、客户端信息、根URI等
- 工具和资源管理:支持工具发现、资源访问、提示模版处理等
- 实时更新:通过变更消费者接收工具、资源和提示的实时更新
- 日志记录:支持结构化日志记录,提供多种日志级别和日志消费者配置
内部类
- AsyncSpec:配置异步 MCP Server 的构建器类
- SyncSpec:配置同步 MCP Server 的构建器类
public interface McpClient {
static SyncSpec sync(McpClientTransport transport) {
return new SyncSpec(transport);
}
static AsyncSpec async(McpClientTransport transport) {
return new AsyncSpec(transport);
}
class SyncSpec {
private final McpClientTransport transport;
private Duration requestTimeout = Duration.ofSeconds(20); // Default timeout
private Duration initializationTimeout = Duration.ofSeconds(20);
private ClientCapabilities capabilities;
private Implementation clientInfo = new Implementation("Java SDK MCP Client", "1.0.0");
private final Map<String, Root> roots = new HashMap<>();
private final List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers = new ArrayList<>();
private final List<Consumer<List<McpSchema.Resource>>> resourcesChangeConsumers = new ArrayList<>();
private final List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers = new ArrayList<>();
private final List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers = new ArrayList<>();
private Function<CreateMessageRequest, CreateMessageResult> samplingHandler;
private SyncSpec(McpClientTransport transport) {
Assert.notNull(transport, "Transport must not be null");
this.transport = transport;
}
public SyncSpec requestTimeout(Duration requestTimeout) {
Assert.notNull(requestTimeout, "Request timeout must not be null");
this.requestTimeout = requestTimeout;
return this;
}
public SyncSpec initializationTimeout(Duration initializationTimeout) {
Assert.notNull(initializationTimeout, "Initialization timeout must not be null");
this.initializationTimeout = initializationTimeout;
return this;
}
public SyncSpec capabilities(ClientCapabilities capabilities) {
Assert.notNull(capabilities, "Capabilities must not be null");
this.capabilities = capabilities;
return this;
}
public SyncSpec clientInfo(Implementation clientInfo) {
Assert.notNull(clientInfo, "Client info must not be null");
this.clientInfo = clientInfo;
return this;
}
public SyncSpec roots(List<Root> roots) {
Assert.notNull(roots, "Roots must not be null");
for (Root root : roots) {
this.roots.put(root.uri(), root);
}
return this;
}
public SyncSpec roots(Root... roots) {
Assert.notNull(roots, "Roots must not be null");
for (Root root : roots) {
this.roots.put(root.uri(), root);
}
return this;
}
public SyncSpec sampling(Function<CreateMessageRequest, CreateMessageResult> samplingHandler) {
Assert.notNull(samplingHandler, "Sampling handler must not be null");
this.samplingHandler = samplingHandler;
return this;
}
public SyncSpec toolsChangeConsumer(Consumer<List<McpSchema.Tool>> toolsChangeConsumer) {
Assert.notNull(toolsChangeConsumer, "Tools change consumer must not be null");
this.toolsChangeConsumers.add(toolsChangeConsumer);
return this;
}
public SyncSpec resourcesChangeConsumer(Consumer<List<McpSchema.Resource>> resourcesChangeConsumer) {
Assert.notNull(resourcesChangeConsumer, "Resources change consumer must not be null");
this.resourcesChangeConsumers.add(resourcesChangeConsumer);
return this;
}
public SyncSpec promptsChangeConsumer(Consumer<List<McpSchema.Prompt>> promptsChangeConsumer) {
Assert.notNull(promptsChangeConsumer, "Prompts change consumer must not be null");
this.promptsChangeConsumers.add(promptsChangeConsumer);
return this;
}
public SyncSpec loggingConsumer(Consumer<McpSchema.LoggingMessageNotification> loggingConsumer) {
Assert.notNull(loggingConsumer, "Logging consumer must not be null");
this.loggingConsumers.add(loggingConsumer);
return this;
}
public SyncSpec loggingConsumers(List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers) {
Assert.notNull(loggingConsumers, "Logging consumers must not be null");
this.loggingConsumers.addAll(loggingConsumers);
return this;
}
public McpSyncClient build() {
McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(this.clientInfo, this.capabilities,
this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers,
this.loggingConsumers, this.samplingHandler);
McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);
return new McpSyncClient(
new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout, asyncFeatures));
}
}
class AsyncSpec {
private final McpClientTransport transport;
private Duration requestTimeout = Duration.ofSeconds(20); // Default timeout
private Duration initializationTimeout = Duration.ofSeconds(20);
private ClientCapabilities capabilities;
private Implementation clientInfo = new Implementation("Spring AI MCP Client", "0.3.1");
private final Map<String, Root> roots = new HashMap<>();
private final List<Function<List<McpSchema.Tool>, Mono<Void>>> toolsChangeConsumers = new ArrayList<>();
private final List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumers = new ArrayList<>();
private final List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers = new ArrayList<>();
private final List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers = new ArrayList<>();
private Function<CreateMessageRequest, Mono<CreateMessageResult>> samplingHandler;
private AsyncSpec(McpClientTransport transport) {
Assert.notNull(transport, "Transport must not be null");
this.transport = transport;
}
public AsyncSpec requestTimeout(Duration requestTimeout) {
Assert.notNull(requestTimeout, "Request timeout must not be null");
this.requestTimeout = requestTimeout;
return this;
}
public AsyncSpec initializationTimeout(Duration initializationTimeout) {
Assert.notNull(initializationTimeout, "Initialization timeout must not be null");
this.initializationTimeout = initializationTimeout;
return this;
}
public AsyncSpec capabilities(ClientCapabilities capabilities) {
Assert.notNull(capabilities, "Capabilities must not be null");
this.capabilities = capabilities;
return this;
}
public AsyncSpec clientInfo(Implementation clientInfo) {
Assert.notNull(clientInfo, "Client info must not be null");
this.clientInfo = clientInfo;
return this;
}
public AsyncSpec roots(List<Root> roots) {
Assert.notNull(roots, "Roots must not be null");
for (Root root : roots) {
this.roots.put(root.uri(), root);
}
return this;
}
public AsyncSpec roots(Root... roots) {
Assert.notNull(roots, "Roots must not be null");
for (Root root : roots) {
this.roots.put(root.uri(), root);
}
return this;
}
public AsyncSpec sampling(Function<CreateMessageRequest, Mono<CreateMessageResult>> samplingHandler) {
Assert.notNull(samplingHandler, "Sampling handler must not be null");
this.samplingHandler = samplingHandler;
return this;
}
public AsyncSpec toolsChangeConsumer(Function<List<McpSchema.Tool>, Mono<Void>> toolsChangeConsumer) {
Assert.notNull(toolsChangeConsumer, "Tools change consumer must not be null");
this.toolsChangeConsumers.add(toolsChangeConsumer);
return this;
}
public AsyncSpec resourcesChangeConsumer(
Function<List<McpSchema.Resource>, Mono<Void>> resourcesChangeConsumer) {
Assert.notNull(resourcesChangeConsumer, "Resources change consumer must not be null");
this.resourcesChangeConsumers.add(resourcesChangeConsumer);
return this;
}
public AsyncSpec promptsChangeConsumer(Function<List<McpSchema.Prompt>, Mono<Void>> promptsChangeConsumer) {
Assert.notNull(promptsChangeConsumer, "Prompts change consumer must not be null");
this.promptsChangeConsumers.add(promptsChangeConsumer);
return this;
}
public AsyncSpec loggingConsumer(Function<McpSchema.LoggingMessageNotification, Mono<Void>> loggingConsumer) {
Assert.notNull(loggingConsumer, "Logging consumer must not be null");
this.loggingConsumers.add(loggingConsumer);
return this;
}
public AsyncSpec loggingConsumers(
List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers) {
Assert.notNull(loggingConsumers, "Logging consumers must not be null");
this.loggingConsumers.addAll(loggingConsumers);
return this;
}
public McpAsyncClient build() {
return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout,
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers,
this.loggingConsumers, this.samplingHandler));
}
}
}
McpAsyncClient
异步客户端实现,基于Project Reactor的Mono与Flux类型,支持非阻塞操作
初始化与关闭
- initialize:初始化客户端与服务端的连接,协商协议版本、交换能力并共享实现信息
- close:立即关闭客户端连接
- closeGracefully:优雅关闭客户端连接
工具操作
- callTool:调用服务器提供的工具
- listTools:获取服务器提供的工具列表
服务器与客户端信息
- getServerInfo:获取服务器的实现信息
- getServerCapabilities:获取服务器支持的功能和能力
- getClientInfo:获取客户端的实现信息
- getClientCapabilities:获取客户端支持的功能和能力
资源操作
- listResources:获取服务器提供的资源列表
- readResource:获取特定资源的内容
- listResourceTemplates:获取服务器提供的资源模版列表
- subscribeResource:资源订阅变更通知
- unsubscribeResource:取消资源变更订阅
提示操作
- listPrompts:获取服务器提供的提示列表
- getPrompt:获取特定提示的详细信息
日志操作
- setLoggingLevel:设置日志级别
其他工具方法:
- ping:向服务器发送ping请求
- addRoot:添加根路径
- removeRoot:移除根路径
- rootsListChangedNotification:手动发送根路径变更通知
public class McpAsyncClient {
private static final Logger logger = LoggerFactory.getLogger(McpAsyncClient.class);
private static TypeReference<Void> VOID_TYPE_REFERENCE = new TypeReference<>() {
};
protected final Sinks.One<McpSchema.InitializeResult> initializedSink = Sinks.one();
private AtomicBoolean initialized = new AtomicBoolean(false);
private final Duration initializationTimeout;
private final McpClientSession mcpSession;
private final McpSchema.ClientCapabilities clientCapabilities;
private final McpSchema.Implementation clientInfo;
private McpSchema.ServerCapabilities serverCapabilities;
private McpSchema.Implementation serverInfo;
private final ConcurrentHashMap<String, Root> roots;
private Function<CreateMessageRequest, Mono<CreateMessageResult>> samplingHandler;
private final McpTransport transport;
private List<String> protocolVersions = List.of(McpSchema.LATEST_PROTOCOL_VERSION);
McpAsyncClient(McpClientTransport transport, Duration requestTimeout, Duration initializationTimeout,
McpClientFeatures.Async features) {
Assert.notNull(transport, "Transport must not be null");
Assert.notNull(requestTimeout, "Request timeout must not be null");
Assert.notNull(initializationTimeout, "Initialization timeout must not be null");
this.clientInfo = features.clientInfo();
this.clientCapabilities = features.clientCapabilities();
this.transport = transport;
this.roots = new ConcurrentHashMap<>(features.roots());
this.initializationTimeout = initializationTimeout;
// Request Handlers
Map<String, RequestHandler<?>> requestHandlers = new HashMap<>();
// Roots List Request Handler
if (this.clientCapabilities.roots() != null) {
requestHandlers.put(McpSchema.METHOD_ROOTS_LIST, rootsListRequestHandler());
}
// Sampling Handler
if (this.clientCapabilities.sampling() != null) {
if (features.samplingHandler() == null) {
throw new McpError("Sampling handler must not be null when client capabilities include sampling");
}
this.samplingHandler = features.samplingHandler();
requestHandlers.put(McpSchema.METHOD_SAMPLING_CREATE_MESSAGE, samplingCreateMessageHandler());
}
// Notification Handlers
Map<String, NotificationHandler> notificationHandlers = new HashMap<>();
// Tools Change Notification
List<Function<List<McpSchema.Tool>, Mono<Void>>> toolsChangeConsumersFinal = new ArrayList<>();
toolsChangeConsumersFinal
.add((notification) -> Mono.fromRunnable(() -> logger.debug("Tools changed: {}", notification)));
if (!Utils.isEmpty(features.toolsChangeConsumers())) {
toolsChangeConsumersFinal.addAll(features.toolsChangeConsumers());
}
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_TOOLS_LIST_CHANGED,
asyncToolsChangeNotificationHandler(toolsChangeConsumersFinal));
// Resources Change Notification
List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumersFinal = new ArrayList<>();
resourcesChangeConsumersFinal
.add((notification) -> Mono.fromRunnable(() -> logger.debug("Resources changed: {}", notification)));
if (!Utils.isEmpty(features.resourcesChangeConsumers())) {
resourcesChangeConsumersFinal.addAll(features.resourcesChangeConsumers());
}
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED,
asyncResourcesChangeNotificationHandler(resourcesChangeConsumersFinal));
// Prompts Change Notification
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumersFinal = new ArrayList<>();
promptsChangeConsumersFinal
.add((notification) -> Mono.fromRunnable(() -> logger.debug("Prompts changed: {}", notification)));
if (!Utils.isEmpty(features.promptsChangeConsumers())) {
promptsChangeConsumersFinal.addAll(features.promptsChangeConsumers());
}
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_PROMPTS_LIST_CHANGED,
asyncPromptsChangeNotificationHandler(promptsChangeConsumersFinal));
// Utility Logging Notification
List<Function<LoggingMessageNotification, Mono<Void>>> loggingConsumersFinal = new ArrayList<>();
loggingConsumersFinal.add((notification) -> Mono.fromRunnable(() -> logger.debug("Logging: {}", notification)));
if (!Utils.isEmpty(features.loggingConsumers())) {
loggingConsumersFinal.addAll(features.loggingConsumers());
}
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_MESSAGE,
asyncLoggingNotificationHandler(loggingConsumersFinal));
this.mcpSession = new McpClientSession(requestTimeout, transport, requestHandlers, notificationHandlers);
}
public McpSchema.ServerCapabilities getServerCapabilities() {
return this.serverCapabilities;
}
public McpSchema.Implementation getServerInfo() {
return this.serverInfo;
}
public boolean isInitialized() {
return this.initialized.get();
}
public ClientCapabilities getClientCapabilities() {
return this.clientCapabilities;
}
public McpSchema.Implementation getClientInfo() {
return this.clientInfo;
}
public void close() {
this.mcpSession.close();
}
public Mono<Void> closeGracefully() {
return this.mcpSession.closeGracefully();
}
// --------------------------
// Initialization
// --------------------------
public Mono<McpSchema.InitializeResult> initialize() {
String latestVersion = this.protocolVersions.get(this.protocolVersions.size() - 1);
McpSchema.InitializeRequest initializeRequest = new McpSchema.InitializeRequest(// @formatter:off
latestVersion,
this.clientCapabilities,
this.clientInfo); // @formatter:on
Mono<McpSchema.InitializeResult> result = this.mcpSession.sendRequest(McpSchema.METHOD_INITIALIZE,
initializeRequest, new TypeReference<McpSchema.InitializeResult>() {
});
return result.flatMap(initializeResult -> {
this.serverCapabilities = initializeResult.capabilities();
this.serverInfo = initializeResult.serverInfo();
logger.info("Server response with Protocol: {}, Capabilities: {}, Info: {} and Instructions {}",
initializeResult.protocolVersion(), initializeResult.capabilities(), initializeResult.serverInfo(),
initializeResult.instructions());
if (!this.protocolVersions.contains(initializeResult.protocolVersion())) {
return Mono.error(new McpError(
"Unsupported protocol version from the server: " + initializeResult.protocolVersion()));
}
return this.mcpSession.sendNotification(McpSchema.METHOD_NOTIFICATION_INITIALIZED, null).doOnSuccess(v -> {
this.initialized.set(true);
this.initializedSink.tryEmitValue(initializeResult);
}).thenReturn(initializeResult);
});
}
private <T> Mono<T> withInitializationCheck(String actionName,
Function<McpSchema.InitializeResult, Mono<T>> operation) {
return this.initializedSink.asMono()
.timeout(this.initializationTimeout)
.onErrorResume(TimeoutException.class,
ex -> Mono.error(new McpError("Client must be initialized before " + actionName)))
.flatMap(operation);
}
// --------------------------
// Basic Utilities
// --------------------------
public Mono<Object> ping() {
return this.withInitializationCheck("pinging the server", initializedResult -> this.mcpSession
.sendRequest(McpSchema.METHOD_PING, null, new TypeReference<Object>() {
}));
}
// --------------------------
// Roots
// --------------------------
public Mono<Void> addRoot(Root root) {
if (root == null) {
return Mono.error(new McpError("Root must not be null"));
}
if (this.clientCapabilities.roots() == null) {
return Mono.error(new McpError("Client must be configured with roots capabilities"));
}
if (this.roots.containsKey(root.uri())) {
return Mono.error(new McpError("Root with uri '" + root.uri() + "' already exists"));
}
this.roots.put(root.uri(), root);
logger.debug("Added root: {}", root);
if (this.clientCapabilities.roots().listChanged()) {
if (this.isInitialized()) {
return this.rootsListChangedNotification();
}
else {
logger.warn("Client is not initialized, ignore sending a roots list changed notification");
}
}
return Mono.empty();
}
public Mono<Void> removeRoot(String rootUri) {
if (rootUri == null) {
return Mono.error(new McpError("Root uri must not be null"));
}
if (this.clientCapabilities.roots() == null) {
return Mono.error(new McpError("Client must be configured with roots capabilities"));
}
Root removed = this.roots.remove(rootUri);
if (removed != null) {
logger.debug("Removed Root: {}", rootUri);
if (this.clientCapabilities.roots().listChanged()) {
if (this.isInitialized()) {
return this.rootsListChangedNotification();
}
else {
logger.warn("Client is not initialized, ignore sending a roots list changed notification");
}
}
return Mono.empty();
}
return Mono.error(new McpError("Root with uri '" + rootUri + "' not found"));
}
public Mono<Void> rootsListChangedNotification() {
return this.withInitializationCheck("sending roots list changed notification",
initResult -> this.mcpSession.sendNotification(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED));
}
private RequestHandler<McpSchema.ListRootsResult> rootsListRequestHandler() {
return params -> {
@SuppressWarnings("unused")
McpSchema.PaginatedRequest request = transport.unmarshalFrom(params,
new TypeReference<McpSchema.PaginatedRequest>() {
});
List<Root> roots = this.roots.values().stream().toList();
return Mono.just(new McpSchema.ListRootsResult(roots));
};
}
// --------------------------
// Sampling
// --------------------------
private RequestHandler<CreateMessageResult> samplingCreateMessageHandler() {
return params -> {
McpSchema.CreateMessageRequest request = transport.unmarshalFrom(params,
new TypeReference<McpSchema.CreateMessageRequest>() {
});
return this.samplingHandler.apply(request);
};
}
// --------------------------
// Tools
// --------------------------
private static final TypeReference<McpSchema.CallToolResult> CALL_TOOL_RESULT_TYPE_REF = new TypeReference<>() {
};
private static final TypeReference<McpSchema.ListToolsResult> LIST_TOOLS_RESULT_TYPE_REF = new TypeReference<>() {
};
public Mono<McpSchema.CallToolResult> callTool(McpSchema.CallToolRequest callToolRequest) {
return this.withInitializationCheck("calling tools", initializedResult -> {
if (this.serverCapabilities.tools() == null) {
return Mono.error(new McpError("Server does not provide tools capability"));
}
return this.mcpSession.sendRequest(McpSchema.METHOD_TOOLS_CALL, callToolRequest, CALL_TOOL_RESULT_TYPE_REF);
});
}
public Mono<McpSchema.ListToolsResult> listTools() {
return this.listTools(null);
}
public Mono<McpSchema.ListToolsResult> listTools(String cursor) {
return this.withInitializationCheck("listing tools", initializedResult -> {
if (this.serverCapabilities.tools() == null) {
return Mono.error(new McpError("Server does not provide tools capability"));
}
return this.mcpSession.sendRequest(McpSchema.METHOD_TOOLS_LIST, new McpSchema.PaginatedRequest(cursor),
LIST_TOOLS_RESULT_TYPE_REF);
});
}
private NotificationHandler asyncToolsChangeNotificationHandler(
List<Function<List<McpSchema.Tool>, Mono<Void>>> toolsChangeConsumers) {
// TODO: params are not used yet
return params -> this.listTools()
.flatMap(listToolsResult -> Flux.fromIterable(toolsChangeConsumers)
.flatMap(consumer -> consumer.apply(listToolsResult.tools()))
.onErrorResume(error -> {
logger.error("Error handling tools list change notification", error);
return Mono.empty();
})
.then());
}
// --------------------------
// Resources
// --------------------------
private static final TypeReference<McpSchema.ListResourcesResult> LIST_RESOURCES_RESULT_TYPE_REF = new TypeReference<>() {
};
private static final TypeReference<McpSchema.ReadResourceResult> READ_RESOURCE_RESULT_TYPE_REF = new TypeReference<>() {
};
private static final TypeReference<McpSchema.ListResourceTemplatesResult> LIST_RESOURCE_TEMPLATES_RESULT_TYPE_REF = new TypeReference<>() {
};
public Mono<McpSchema.ListResourcesResult> listResources() {
return this.listResources(null);
}
public Mono<McpSchema.ListResourcesResult> listResources(String cursor) {
return this.withInitializationCheck("listing resources", initializedResult -> {
if (this.serverCapabilities.resources() == null) {
return Mono.error(new McpError("Server does not provide the resources capability"));
}
return this.mcpSession.sendRequest(McpSchema.METHOD_RESOURCES_LIST, new McpSchema.PaginatedRequest(cursor),
LIST_RESOURCES_RESULT_TYPE_REF);
});
}
public Mono<McpSchema.ReadResourceResult> readResource(McpSchema.Resource resource) {
return this.readResource(new McpSchema.ReadResourceRequest(resource.uri()));
}
public Mono<McpSchema.ReadResourceResult> readResource(McpSchema.ReadResourceRequest readResourceRequest) {
return this.withInitializationCheck("reading resources", initializedResult -> {
if (this.serverCapabilities.resources() == null) {
return Mono.error(new McpError("Server does not provide the resources capability"));
}
return this.mcpSession.sendRequest(McpSchema.METHOD_RESOURCES_READ, readResourceRequest,
READ_RESOURCE_RESULT_TYPE_REF);
});
}
public Mono<McpSchema.ListResourceTemplatesResult> listResourceTemplates() {
return this.listResourceTemplates(null);
}
public Mono<McpSchema.ListResourceTemplatesResult> listResourceTemplates(String cursor) {
return this.withInitializationCheck("listing resource templates", initializedResult -> {
if (this.serverCapabilities.resources() == null) {
return Mono.error(new McpError("Server does not provide the resources capability"));
}
return this.mcpSession.sendRequest(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST,
new McpSchema.PaginatedRequest(cursor), LIST_RESOURCE_TEMPLATES_RESULT_TYPE_REF);
});
}
public Mono<Void> subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
return this.withInitializationCheck("subscribing to resources", initializedResult -> this.mcpSession
.sendRequest(McpSchema.METHOD_RESOURCES_SUBSCRIBE, subscribeRequest, VOID_TYPE_REFERENCE));
}
public Mono<Void> unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {
return this.withInitializationCheck("unsubscribing from resources", initializedResult -> this.mcpSession
.sendRequest(McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, unsubscribeRequest, VOID_TYPE_REFERENCE));
}
private NotificationHandler asyncResourcesChangeNotificationHandler(
List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumers) {
return params -> listResources().flatMap(listResourcesResult -> Flux.fromIterable(resourcesChangeConsumers)
.flatMap(consumer -> consumer.apply(listResourcesResult.resources()))
.onErrorResume(error -> {
logger.error("Error handling resources list change notification", error);
return Mono.empty();
})
.then());
}
// --------------------------
// Prompts
// --------------------------
private static final TypeReference<McpSchema.ListPromptsResult> LIST_PROMPTS_RESULT_TYPE_REF = new TypeReference<>() {
};
private static final TypeReference<McpSchema.GetPromptResult> GET_PROMPT_RESULT_TYPE_REF = new TypeReference<>() {
};
public Mono<ListPromptsResult> listPrompts() {
return this.listPrompts(null);
}
public Mono<ListPromptsResult> listPrompts(String cursor) {
return this.withInitializationCheck("listing prompts", initializedResult -> this.mcpSession
.sendRequest(McpSchema.METHOD_PROMPT_LIST, new PaginatedRequest(cursor), LIST_PROMPTS_RESULT_TYPE_REF));
}
public Mono<GetPromptResult> getPrompt(GetPromptRequest getPromptRequest) {
return this.withInitializationCheck("getting prompts", initializedResult -> this.mcpSession
.sendRequest(McpSchema.METHOD_PROMPT_GET, getPromptRequest, GET_PROMPT_RESULT_TYPE_REF));
}
private NotificationHandler asyncPromptsChangeNotificationHandler(
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers) {
return params -> listPrompts().flatMap(listPromptsResult -> Flux.fromIterable(promptsChangeConsumers)
.flatMap(consumer -> consumer.apply(listPromptsResult.prompts()))
.onErrorResume(error -> {
logger.error("Error handling prompts list change notification", error);
return Mono.empty();
})
.then());
}
// --------------------------
// Logging
// --------------------------
private NotificationHandler asyncLoggingNotificationHandler(
List<Function<LoggingMessageNotification, Mono<Void>>> loggingConsumers) {
return params -> {
McpSchema.LoggingMessageNotification loggingMessageNotification = transport.unmarshalFrom(params,
new TypeReference<McpSchema.LoggingMessageNotification>() {
});
return Flux.fromIterable(loggingConsumers)
.flatMap(consumer -> consumer.apply(loggingMessageNotification))
.then();
};
}
public Mono<Void> setLoggingLevel(LoggingLevel loggingLevel) {
if (loggingLevel == null) {
return Mono.error(new McpError("Logging level must not be null"));
}
return this.withInitializationCheck("setting logging level", initializedResult -> {
String levelName = this.transport.unmarshalFrom(loggingLevel, new TypeReference<String>() {
});
Map<String, Object> params = Map.of("level", levelName);
return this.mcpSession.sendNotification(McpSchema.METHOD_LOGGING_SET_LEVEL, params);
});
}
void setProtocolVersions(List<String> protocolVersions) {
this.protocolVersions = protocolVersions;
}
}
McpSyncClient
同步客户端实现,封装了McpAsyncClient以提供阻塞操作,其余功能方法和McpAsyncClient保持一致
public class McpSyncClient implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(McpSyncClient.class);
// TODO: Consider providing a client config to set this properly
// this is currently a concern only because AutoCloseable is used - perhaps it
// is not a requirement?
private static final long DEFAULT_CLOSE_TIMEOUT_MS = 10_000L;
private final McpAsyncClient delegate;
McpSyncClient(McpAsyncClient delegate) {
Assert.notNull(delegate, "The delegate can not be null");
this.delegate = delegate;
}
public McpSchema.ServerCapabilities getServerCapabilities() {
return this.delegate.getServerCapabilities();
}
public McpSchema.Implementation getServerInfo() {
return this.delegate.getServerInfo();
}
public ClientCapabilities getClientCapabilities() {
return this.delegate.getClientCapabilities();
}
public McpSchema.Implementation getClientInfo() {
return this.delegate.getClientInfo();
}
@Override
public void close() {
this.delegate.close();
}
public boolean closeGracefully() {
try {
this.delegate.closeGracefully().block(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
}
catch (RuntimeException e) {
logger.warn("Client didn't close within timeout of {} ms.", DEFAULT_CLOSE_TIMEOUT_MS, e);
return false;
}
return true;
}
public McpSchema.InitializeResult initialize() {
// TODO: block takes no argument here as we assume the async client is
// configured with a requestTimeout at all times
return this.delegate.initialize().block();
}
public void rootsListChangedNotification() {
this.delegate.rootsListChangedNotification().block();
}
public void addRoot(McpSchema.Root root) {
this.delegate.addRoot(root).block();
}
public void removeRoot(String rootUri) {
this.delegate.removeRoot(rootUri).block();
}
public Object ping() {
return this.delegate.ping().block();
}
// --------------------------
// Tools
// --------------------------
public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) {
return this.delegate.callTool(callToolRequest).block();
}
public McpSchema.ListToolsResult listTools() {
return this.delegate.listTools().block();
}
public McpSchema.ListToolsResult listTools(String cursor) {
return this.delegate.listTools(cursor).block();
}
// --------------------------
// Resources
// --------------------------
public McpSchema.ListResourcesResult listResources(String cursor) {
return this.delegate.listResources(cursor).block();
}
public McpSchema.ListResourcesResult listResources() {
return this.delegate.listResources().block();
}
public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) {
return this.delegate.readResource(resource).block();
}
public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest readResourceRequest) {
return this.delegate.readResource(readResourceRequest).block();
}
public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor) {
return this.delegate.listResourceTemplates(cursor).block();
}
public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
return this.delegate.listResourceTemplates().block();
}
public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
this.delegate.subscribeResource(subscribeRequest).block();
}
public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {
this.delegate.unsubscribeResource(unsubscribeRequest).block();
}
// --------------------------
// Prompts
// --------------------------
public ListPromptsResult listPrompts(String cursor) {
return this.delegate.listPrompts(cursor).block();
}
public ListPromptsResult listPrompts() {
return this.delegate.listPrompts().block();
}
public GetPromptResult getPrompt(GetPromptRequest getPromptRequest) {
return this.delegate.getPrompt(getPromptRequest).block();
}
public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) {
this.delegate.setLoggingLevel(loggingLevel).block();
}
}
McpServer(Server)
用于创建MCP Server 的工厂类,提供了同步、异步 Server 的方法,提供如下核心功能
- 暴露工具:允许AI模型调用 Server 提供的工具来执行特定操作
- 提供资源访问:为AI模型提供上下文数据,如文件、数据库等
- 管理提示模版:提供结构化的提示模版,用于与AI模型的交互
- 处理 Client 连接和请求:管理 Client 的连接,并处理其请求
内部类
- AsyncSpecification:配置异步 MCP Server 的构建器类
- SyncSpecification:配置同步 MCP Server 的构建器类
public interface McpServer {
static SyncSpecification sync(McpServerTransportProvider transportProvider) {
return new SyncSpecification(transportProvider);
}
static AsyncSpecification async(McpServerTransportProvider transportProvider) {
return new AsyncSpecification(transportProvider);
}
class AsyncSpecification {
private static final McpSchema.Implementation DEFAULT_SERVER_INFO = new McpSchema.Implementation("mcp-server",
"1.0.0");
private final McpServerTransportProvider transportProvider;
private ObjectMapper objectMapper;
private McpSchema.Implementation serverInfo = DEFAULT_SERVER_INFO;
private McpSchema.ServerCapabilities serverCapabilities;
private final List<McpServerFeatures.AsyncToolSpecification> tools = new ArrayList<>();
private final Map<String, McpServerFeatures.AsyncResourceSpecification> resources = new HashMap<>();
private final List<ResourceTemplate> resourceTemplates = new ArrayList<>();
private final Map<String, McpServerFeatures.AsyncPromptSpecification> prompts = new HashMap<>();
private final List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeHandlers = new ArrayList<>();
private AsyncSpecification(McpServerTransportProvider transportProvider) {
Assert.notNull(transportProvider, "Transport provider must not be null");
this.transportProvider = transportProvider;
}
public AsyncSpecification serverInfo(McpSchema.Implementation serverInfo) {
Assert.notNull(serverInfo, "Server info must not be null");
this.serverInfo = serverInfo;
return this;
}
public AsyncSpecification serverInfo(String name, String version) {
Assert.hasText(name, "Name must not be null or empty");
Assert.hasText(version, "Version must not be null or empty");
this.serverInfo = new McpSchema.Implementation(name, version);
return this;
}
public AsyncSpecification capabilities(McpSchema.ServerCapabilities serverCapabilities) {
Assert.notNull(serverCapabilities, "Server capabilities must not be null");
this.serverCapabilities = serverCapabilities;
return this;
}
public AsyncSpecification tool(McpSchema.Tool tool,
BiFunction<McpAsyncServerExchange, Map<String, Object>, Mono<CallToolResult>> handler) {
Assert.notNull(tool, "Tool must not be null");
Assert.notNull(handler, "Handler must not be null");
this.tools.add(new McpServerFeatures.AsyncToolSpecification(tool, handler));
return this;
}
public AsyncSpecification tools(List<McpServerFeatures.AsyncToolSpecification> toolSpecifications) {
Assert.notNull(toolSpecifications, "Tool handlers list must not be null");
this.tools.addAll(toolSpecifications);
return this;
}
public AsyncSpecification tools(McpServerFeatures.AsyncToolSpecification... toolSpecifications) {
Assert.notNull(toolSpecifications, "Tool handlers list must not be null");
for (McpServerFeatures.AsyncToolSpecification tool : toolSpecifications) {
this.tools.add(tool);
}
return this;
}
public AsyncSpecification resources(
Map<String, McpServerFeatures.AsyncResourceSpecification> resourceSpecifications) {
Assert.notNull(resourceSpecifications, "Resource handlers map must not be null");
this.resources.putAll(resourceSpecifications);
return this;
}
public AsyncSpecification resources(List<McpServerFeatures.AsyncResourceSpecification> resourceSpecifications) {
Assert.notNull(resourceSpecifications, "Resource handlers list must not be null");
for (McpServerFeatures.AsyncResourceSpecification resource : resourceSpecifications) {
this.resources.put(resource.resource().uri(), resource);
}
return this;
}
public AsyncSpecification resources(McpServerFeatures.AsyncResourceSpecification... resourceSpecifications) {
Assert.notNull(resourceSpecifications, "Resource handlers list must not be null");
for (McpServerFeatures.AsyncResourceSpecification resource : resourceSpecifications) {
this.resources.put(resource.resource().uri(), resource);
}
return this;
}
public AsyncSpecification resourceTemplates(List<ResourceTemplate> resourceTemplates) {
Assert.notNull(resourceTemplates, "Resource templates must not be null");
this.resourceTemplates.addAll(resourceTemplates);
return this;
}
public AsyncSpecification resourceTemplates(ResourceTemplate... resourceTemplates) {
Assert.notNull(resourceTemplates, "Resource templates must not be null");
for (ResourceTemplate resourceTemplate : resourceTemplates) {
this.resourceTemplates.add(resourceTemplate);
}
return this;
}
public AsyncSpecification prompts(Map<String, McpServerFeatures.AsyncPromptSpecification> prompts) {
Assert.notNull(prompts, "Prompts map must not be null");
this.prompts.putAll(prompts);
return this;
}
public AsyncSpecification prompts(List<McpServerFeatures.AsyncPromptSpecification> prompts) {
Assert.notNull(prompts, "Prompts list must not be null");
for (McpServerFeatures.AsyncPromptSpecification prompt : prompts) {
this.prompts.put(prompt.prompt().name(), prompt);
}
return this;
}
public AsyncSpecification prompts(McpServerFeatures.AsyncPromptSpecification... prompts) {
Assert.notNull(prompts, "Prompts list must not be null");
for (McpServerFeatures.AsyncPromptSpecification prompt : prompts) {
this.prompts.put(prompt.prompt().name(), prompt);
}
return this;
}
public AsyncSpecification rootsChangeHandler(
BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>> handler) {
Assert.notNull(handler, "Consumer must not be null");
this.rootsChangeHandlers.add(handler);
return this;
}
public AsyncSpecification rootsChangeHandlers(
List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> handlers) {
Assert.notNull(handlers, "Handlers list must not be null");
this.rootsChangeHandlers.addAll(handlers);
return this;
}
public AsyncSpecification rootsChangeHandlers(
@SuppressWarnings("unchecked") BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>... handlers) {
Assert.notNull(handlers, "Handlers list must not be null");
return this.rootsChangeHandlers(Arrays.asList(handlers));
}
public AsyncSpecification objectMapper(ObjectMapper objectMapper) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
this.objectMapper = objectMapper;
return this;
}
public McpAsyncServer build() {
var features = new McpServerFeatures.Async(this.serverInfo, this.serverCapabilities, this.tools,
this.resources, this.resourceTemplates, this.prompts, this.rootsChangeHandlers);
var mapper = this.objectMapper != null ? this.objectMapper : new ObjectMapper();
return new McpAsyncServer(this.transportProvider, mapper, features);
}
}
class SyncSpecification {
private static final McpSchema.Implementation DEFAULT_SERVER_INFO = new McpSchema.Implementation("mcp-server",
"1.0.0");
private final McpServerTransportProvider transportProvider;
private ObjectMapper objectMapper;
private McpSchema.Implementation serverInfo = DEFAULT_SERVER_INFO;
private McpSchema.ServerCapabilities serverCapabilities;
private final List<McpServerFeatures.SyncToolSpecification> tools = new ArrayList<>();
private final Map<String, McpServerFeatures.SyncResourceSpecification> resources = new HashMap<>();
private final List<ResourceTemplate> resourceTemplates = new ArrayList<>();
private final Map<String, McpServerFeatures.SyncPromptSpecification> prompts = new HashMap<>();
private final List<BiConsumer<McpSyncServerExchange, List<McpSchema.Root>>> rootsChangeHandlers = new ArrayList<>();
private SyncSpecification(McpServerTransportProvider transportProvider) {
Assert.notNull(transportProvider, "Transport provider must not be null");
this.transportProvider = transportProvider;
}
public SyncSpecification serverInfo(McpSchema.Implementation serverInfo) {
Assert.notNull(serverInfo, "Server info must not be null");
this.serverInfo = serverInfo;
return this;
}
public SyncSpecification serverInfo(String name, String version) {
Assert.hasText(name, "Name must not be null or empty");
Assert.hasText(version, "Version must not be null or empty");
this.serverInfo = new McpSchema.Implementation(name, version);
return this;
}
public SyncSpecification capabilities(McpSchema.ServerCapabilities serverCapabilities) {
Assert.notNull(serverCapabilities, "Server capabilities must not be null");
this.serverCapabilities = serverCapabilities;
return this;
}
public SyncSpecification tool(McpSchema.Tool tool,
BiFunction<McpSyncServerExchange, Map<String, Object>, McpSchema.CallToolResult> handler) {
Assert.notNull(tool, "Tool must not be null");
Assert.notNull(handler, "Handler must not be null");
this.tools.add(new McpServerFeatures.SyncToolSpecification(tool, handler));
return this;
}
public SyncSpecification tools(List<McpServerFeatures.SyncToolSpecification> toolSpecifications) {
Assert.notNull(toolSpecifications, "Tool handlers list must not be null");
this.tools.addAll(toolSpecifications);
return this;
}
public SyncSpecification tools(McpServerFeatures.SyncToolSpecification... toolSpecifications) {
Assert.notNull(toolSpecifications, "Tool handlers list must not be null");
for (McpServerFeatures.SyncToolSpecification tool : toolSpecifications) {
this.tools.add(tool);
}
return this;
}
public SyncSpecification resources(
Map<String, McpServerFeatures.SyncResourceSpecification> resourceSpecifications) {
Assert.notNull(resourceSpecifications, "Resource handlers map must not be null");
this.resources.putAll(resourceSpecifications);
return this;
}
public SyncSpecification resources(List<McpServerFeatures.SyncResourceSpecification> resourceSpecifications) {
Assert.notNull(resourceSpecifications, "Resource handlers list must not be null");
for (McpServerFeatures.SyncResourceSpecification resource : resourceSpecifications) {
this.resources.put(resource.resource().uri(), resource);
}
return this;
}
public SyncSpecification resources(McpServerFeatures.SyncResourceSpecification... resourceSpecifications) {
Assert.notNull(resourceSpecifications, "Resource handlers list must not be null");
for (McpServerFeatures.SyncResourceSpecification resource : resourceSpecifications) {
this.resources.put(resource.resource().uri(), resource);
}
return this;
}
public SyncSpecification resourceTemplates(List<ResourceTemplate> resourceTemplates) {
Assert.notNull(resourceTemplates, "Resource templates must not be null");
this.resourceTemplates.addAll(resourceTemplates);
return this;
}
public SyncSpecification resourceTemplates(ResourceTemplate... resourceTemplates) {
Assert.notNull(resourceTemplates, "Resource templates must not be null");
for (ResourceTemplate resourceTemplate : resourceTemplates) {
this.resourceTemplates.add(resourceTemplate);
}
return this;
}
public SyncSpecification prompts(Map<String, McpServerFeatures.SyncPromptSpecification> prompts) {
Assert.notNull(prompts, "Prompts map must not be null");
this.prompts.putAll(prompts);
return this;
}
public SyncSpecification prompts(List<McpServerFeatures.SyncPromptSpecification> prompts) {
Assert.notNull(prompts, "Prompts list must not be null");
for (McpServerFeatures.SyncPromptSpecification prompt : prompts) {
this.prompts.put(prompt.prompt().name(), prompt);
}
return this;
}
public SyncSpecification prompts(McpServerFeatures.SyncPromptSpecification... prompts) {
Assert.notNull(prompts, "Prompts list must not be null");
for (McpServerFeatures.SyncPromptSpecification prompt : prompts) {
this.prompts.put(prompt.prompt().name(), prompt);
}
return this;
}
public SyncSpecification rootsChangeHandler(BiConsumer<McpSyncServerExchange, List<McpSchema.Root>> handler) {
Assert.notNull(handler, "Consumer must not be null");
this.rootsChangeHandlers.add(handler);
return this;
}
public SyncSpecification rootsChangeHandlers(
List<BiConsumer<McpSyncServerExchange, List<McpSchema.Root>>> handlers) {
Assert.notNull(handlers, "Handlers list must not be null");
this.rootsChangeHandlers.addAll(handlers);
return this;
}
public SyncSpecification rootsChangeHandlers(
BiConsumer<McpSyncServerExchange, List<McpSchema.Root>>... handlers) {
Assert.notNull(handlers, "Handlers list must not be null");
return this.rootsChangeHandlers(List.of(handlers));
}
public SyncSpecification objectMapper(ObjectMapper objectMapper) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
this.objectMapper = objectMapper;
return this;
}
public McpSyncServer build() {
McpServerFeatures.Sync syncFeatures = new McpServerFeatures.Sync(this.serverInfo, this.serverCapabilities,
this.tools, this.resources, this.resourceTemplates, this.prompts, this.rootsChangeHandlers);
McpServerFeatures.Async asyncFeatures = McpServerFeatures.Async.fromSync(syncFeatures);
var mapper = this.objectMapper != null ? this.objectMapper : new ObjectMapper();
var asyncServer = new McpAsyncServer(this.transportProvider, mapper, asyncFeatures);
return new McpSyncServer(asyncServer);
}
}
}
McpAsyncServer
异步服务端的实现,基于Project Reactor的Mono与Flux类型,支持非阻塞操作
关闭
- close:立即关闭服务端连接
- closeGracefully:优雅关闭服务端连接
工具操作
- addTool:动态添加工具
- removeTool:动态移除指定名称的工具
- notifyToolsListChanged:通知客户端工具列表已发生变化
服务器信息获取
- getServerCapabilities:获取服务器的能力配置
- getServerInfo:获取服务器的实现信息
资源管理
- addResource:动态添加资源
- removeResource:动态移除指定URL的资源
- notifyResourcesListChanged:通知客户端资源列表已发生变化
提 示管理
- addPrompt:动态添加提示
- removePrompt:动态移除指定名称的提示
- notifyPromptsListChanged:通知客户端提示列表已发生变化
日志管理
- loggingNotification:向所有连接的客户端发送日志通知
public class McpAsyncServer {
private static final Logger logger = LoggerFactory.getLogger(McpAsyncServer.class);
private final McpAsyncServer delegate;
McpAsyncServer() {
this.delegate = null;
}
McpAsyncServer(McpServerTransportProvider mcpTransportProvider, ObjectMapper objectMapper,
McpServerFeatures.Async features) {
this.delegate = new AsyncServerImpl(mcpTransportProvider, objectMapper, features);
}
public McpSchema.ServerCapabilities getServerCapabilities() {
return this.delegate.getServerCapabilities();
}
public McpSchema.Implementation getServerInfo() {
return this.delegate.getServerInfo();
}
public Mono<Void> closeGracefully() {
return this.delegate.closeGracefully();
}
public void close() {
this.delegate.close();
}
// ---------------------------------------
// Tool Management
// ---------------------------------------
public Mono<Void> addTool(McpServerFeatures.AsyncToolSpecification toolSpecification) {
return this.delegate.addTool(toolSpecification);
}
public Mono<Void> removeTool(String toolName) {
return this.delegate.removeTool(toolName);
}
public Mono<Void> notifyToolsListChanged() {
return this.delegate.notifyToolsListChanged();
}
// ---------------------------------------
// Resource Management
// ---------------------------------------
public Mono<Void> addResource(McpServerFeatures.AsyncResourceSpecification resourceHandler) {
return this.delegate.addResource(resourceHandler);
}
public Mono<Void> removeResource(String resourceUri) {
return this.delegate.removeResource(resourceUri);
}
public Mono<Void> notifyResourcesListChanged() {
return this.delegate.notifyResourcesListChanged();
}
// ---------------------------------------
// Prompt Management
// ---------------------------------------
public Mono<Void> addPrompt(McpServerFeatures.AsyncPromptSpecification promptSpecification) {
return this.delegate.addPrompt(promptSpecification);
}
public Mono<Void> removePrompt(String promptName) {
return this.delegate.removePrompt(promptName);
}
public Mono<Void> notifyPromptsListChanged() {
return this.delegate.notifyPromptsListChanged();
}
// ---------------------------------------
// Logging Management
// ---------------------------------------
public Mono<Void> loggingNotification(LoggingMessageNotification loggingMessageNotification) {
return this.delegate.loggingNotification(loggingMessageNotification);
}
// ---------------------------------------
// Sampling
// ---------------------------------------
void setProtocolVersions(List<String> protocolVersions) {
this.delegate.setProtocolVersions(protocolVersions);
}
private static class AsyncServerImpl extends McpAsyncServer {
private final McpServerTransportProvider mcpTransportProvider;
private final ObjectMapper objectMapper;
private final McpSchema.ServerCapabilities serverCapabilities;
private final McpSchema.Implementation serverInfo;
private final CopyOnWriteArrayList<McpServerFeatures.AsyncToolSpecification> tools = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<McpSchema.ResourceTemplate> resourceTemplates = new CopyOnWriteArrayList<>();
private final ConcurrentHashMap<String, McpServerFeatures.AsyncResourceSpecification> resources = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, McpServerFeatures.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>();
private LoggingLevel minLoggingLevel = LoggingLevel.DEBUG;
private List<String> protocolVersions = List.of(McpSchema.LATEST_PROTOCOL_VERSION);
AsyncServerImpl(McpServerTransportProvider mcpTransportProvider, ObjectMapper objectMapper,
McpServerFeatures.Async features) {
this.mcpTransportProvider = mcpTransportProvider;
this.objectMapper = objectMapper;
this.serverInfo = features.serverInfo();
this.serverCapabilities = features.serverCapabilities();
this.tools.addAll(features.tools());
this.resources.putAll(features.resources());
this.resourceTemplates.addAll(features.resourceTemplates());
this.prompts.putAll(features.prompts());
Map<String, McpServerSession.RequestHandler<?>> requestHandlers = new HashMap<>();
// Initialize request handlers for standard MCP methods
// Ping MUST respond with an empty data, but not NULL response.
requestHandlers.put(McpSchema.METHOD_PING, (exchange, params) -> Mono.just(Map.of()));
// Add tools API handlers if the tool capability is enabled
if (this.serverCapabilities.tools() != null) {
requestHandlers.put(McpSchema.METHOD_TOOLS_LIST, toolsListRequestHandler());
requestHandlers.put(McpSchema.METHOD_TOOLS_CALL, toolsCallRequestHandler());
}
// Add resources API handlers if provided
if (this.serverCapabilities.resources() != null) {
requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, resourcesListRequestHandler());
requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, resourcesReadRequestHandler());
requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, resourceTemplateListRequestHandler());
}
// Add prompts API handlers if provider exists
if (this.serverCapabilities.prompts() != null) {
requestHandlers.put(McpSchema.METHOD_PROMPT_LIST, promptsListRequestHandler());
requestHandlers.put(McpSchema.METHOD_PROMPT_GET, promptsGetRequestHandler());
}
// Add logging API handlers if the logging capability is enabled
if (this.serverCapabilities.logging() != null) {
requestHandlers.put(McpSchema.METHOD_LOGGING_SET_LEVEL, setLoggerRequestHandler());
}
Map<String, McpServerSession.NotificationHandler> notificationHandlers = new HashMap<>();
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_INITIALIZED, (exchange, params) -> Mono.empty());
List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers = features
.rootsChangeConsumers();
if (Utils.isEmpty(rootsChangeConsumers)) {
rootsChangeConsumers = List.of((exchange,
roots) -> Mono.fromRunnable(() -> logger.warn(
"Roots list changed notification, but no consumers provided. Roots list changed: {}",
roots)));
}
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED,
asyncRootsListChangedNotificationHandler(rootsChangeConsumers));
mcpTransportProvider
.setSessionFactory(transport -> new McpServerSession(UUID.randomUUID().toString(), transport,
this::asyncInitializeRequestHandler, Mono::empty, requestHandlers, notificationHandlers));
}
// ---------------------------------------
// Lifecycle Management
// ---------------------------------------
private Mono<McpSchema.InitializeResult> asyncInitializeRequestHandler(
McpSchema.InitializeRequest initializeRequest) {
return Mono.defer(() -> {
logger.info("Client initialize request - Protocol: {}, Capabilities: {}, Info: {}",
initializeRequest.protocolVersion(), initializeRequest.capabilities(),
initializeRequest.clientInfo());
// The server MUST respond with the highest protocol version it supports
// if
// it does not support the requested (e.g. Client) version.
String serverProtocolVersion = this.protocolVersions.get(this.protocolVersions.size() - 1);
if (this.protocolVersions.contains(initializeRequest.protocolVersion())) {
// If the server supports the requested protocol version, it MUST
// respond
// with the same version.
serverProtocolVersion = initializeRequest.protocolVersion();
}
else {
logger.warn(
"Client requested unsupported protocol version: {}, so the server will suggest the {} version instead",
initializeRequest.protocolVersion(), serverProtocolVersion);
}
return Mono.just(new McpSchema.InitializeResult(serverProtocolVersion, this.serverCapabilities,
this.serverInfo, null));
});
}
public McpSchema.ServerCapabilities getServerCapabilities() {
return this.serverCapabilities;
}
public McpSchema.Implementation getServerInfo() {
return this.serverInfo;
}
@Override
public Mono<Void> closeGracefully() {
return this.mcpTransportProvider.closeGracefully();
}
@Override
public void close() {
this.mcpTransportProvider.close();
}
private McpServerSession.NotificationHandler asyncRootsListChangedNotificationHandler(
List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers) {
return (exchange, params) -> exchange.listRoots()
.flatMap(listRootsResult -> Flux.fromIterable(rootsChangeConsumers)
.flatMap(consumer -> consumer.apply(exchange, listRootsResult.roots()))
.onErrorResume(error -> {
logger.error("Error handling roots list change notification", error);
return Mono.empty();
})
.then());
}
// ---------------------------------------
// Tool Management
// ---------------------------------------
@Override
public Mono<Void> addTool(McpServerFeatures.AsyncToolSpecification toolSpecification) {
if (toolSpecification == null) {
return Mono.error(new McpError("Tool specification must not be null"));
}
if (toolSpecification.tool() == null) {
return Mono.error(new McpError("Tool must not be null"));
}
if (toolSpecification.call() == null) {
return Mono.error(new McpError("Tool call handler must not be null"));
}
if (this.serverCapabilities.tools() == null) {
return Mono.error(new McpError("Server must be configured with tool capabilities"));
}
return Mono.defer(() -> {
// Check for duplicate tool names
if (this.tools.stream().anyMatch(th -> th.tool().name().equals(toolSpecification.tool().name()))) {
return Mono
.error(new McpError("Tool with name '" + toolSpecification.tool().name() + "' already exists"));
}
this.tools.add(toolSpecification);
logger.debug("Added tool handler: {}", toolSpecification.tool().name());
if (this.serverCapabilities.tools().listChanged()) {
return notifyToolsListChanged();
}
return Mono.empty();
});
}
@Override
public Mono<Void> removeTool(String toolName) {
if (toolName == null) {
return Mono.error(new McpError("Tool name must not be null"));
}
if (this.serverCapabilities.tools() == null) {
return Mono.error(new McpError("Server must be configured with tool capabilities"));
}
return Mono.defer(() -> {
boolean removed = this.tools
.removeIf(toolSpecification -> toolSpecification.tool().name().equals(toolName));
if (removed) {
logger.debug("Removed tool handler: {}", toolName);
if (this.serverCapabilities.tools().listChanged()) {
return notifyToolsListChanged();
}
return Mono.empty();
}
return Mono.error(new McpError("Tool with name '" + toolName + "' not found"));
});
}
@Override
public Mono<Void> notifyToolsListChanged() {
return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_TOOLS_LIST_CHANGED, null);
}
private McpServerSession.RequestHandler<McpSchema.ListToolsResult> toolsListRequestHandler() {
return (exchange, params) -> {
List<Tool> tools = this.tools.stream().map(McpServerFeatures.AsyncToolSpecification::tool).toList();
return Mono.just(new McpSchema.ListToolsResult(tools, null));
};
}
private McpServerSession.RequestHandler<CallToolResult> toolsCallRequestHandler() {
return (exchange, params) -> {
McpSchema.CallToolRequest callToolRequest = objectMapper.convertValue(params,
new TypeReference<McpSchema.CallToolRequest>() {
});
Optional<McpServerFeatures.AsyncToolSpecification> toolSpecification = this.tools.stream()
.filter(tr -> callToolRequest.name().equals(tr.tool().name()))
.findAny();
if (toolSpecification.isEmpty()) {
return Mono.error(new McpError("Tool not found: " + callToolRequest.name()));
}
return toolSpecification.map(tool -> tool.call().apply(exchange, callToolRequest.arguments()))
.orElse(Mono.error(new McpError("Tool not found: " + callToolRequest.name())));
};
}
// ---------------------------------------
// Resource Management
// ---------------------------------------
@Override
public Mono<Void> addResource(McpServerFeatures.AsyncResourceSpecification resourceSpecification) {
if (resourceSpecification == null || resourceSpecification.resource() == null) {
return Mono.error(new McpError("Resource must not be null"));
}
if (this.serverCapabilities.resources() == null) {
return Mono.error(new McpError("Server must be configured with resource capabilities"));
}
return Mono.defer(() -> {
if (this.resources.putIfAbsent(resourceSpecification.resource().uri(), resourceSpecification) != null) {
return Mono.error(new McpError(
"Resource with URI '" + resourceSpecification.resource().uri() + "' already exists"));
}
logger.debug("Added resource handler: {}", resourceSpecification.resource().uri());
if (this.serverCapabilities.resources().listChanged()) {
return notifyResourcesListChanged();
}
return Mono.empty();
});
}
@Override
public Mono<Void> removeResource(String resourceUri) {
if (resourceUri == null) {
return Mono.error(new McpError("Resource URI must not be null"));
}
if (this.serverCapabilities.resources() == null) {
return Mono.error(new McpError("Server must be configured with resource capabilities"));
}
return Mono.defer(() -> {
McpServerFeatures.AsyncResourceSpecification removed = this.resources.remove(resourceUri);
if (removed != null) {
logger.debug("Removed resource handler: {}", resourceUri);
if (this.serverCapabilities.resources().listChanged()) {
return notifyResourcesListChanged();
}
return Mono.empty();
}
return Mono.error(new McpError("Resource with URI '" + resourceUri + "' not found"));
});
}
@Override
public Mono<Void> notifyResourcesListChanged() {
return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED, null);
}
private McpServerSession.RequestHandler<McpSchema.ListResourcesResult> resourcesListRequestHandler() {
return (exchange, params) -> {
var resourceList = this.resources.values()
.stream()
.map(McpServerFeatures.AsyncResourceSpecification::resource)
.toList();
return Mono.just(new McpSchema.ListResourcesResult(resourceList, null));
};
}
private McpServerSession.RequestHandler<McpSchema.ListResourceTemplatesResult> resourceTemplateListRequestHandler() {
return (exchange, params) -> Mono
.just(new McpSchema.ListResourceTemplatesResult(this.resourceTemplates, null));
}
private McpServerSession.RequestHandler<McpSchema.ReadResourceResult> resourcesReadRequestHandler() {
return (exchange, params) -> {
McpSchema.ReadResourceRequest resourceRequest = objectMapper.convertValue(params,
new TypeReference<McpSchema.ReadResourceRequest>() {
});
var resourceUri = resourceRequest.uri();
McpServerFeatures.AsyncResourceSpecification specification = this.resources.get(resourceUri);
if (specification != null) {
return specification.readHandler().apply(exchange, resourceRequest);
}
return Mono.error(new McpError("Resource not found: " + resourceUri));
};
}
// ---------------------------------------
// Prompt Management
// ---------------------------------------
@Override
public Mono<Void> addPrompt(McpServerFeatures.AsyncPromptSpecification promptSpecification) {
if (promptSpecification == null) {
return Mono.error(new McpError("Prompt specification must not be null"));
}
if (this.serverCapabilities.prompts() == null) {
return Mono.error(new McpError("Server must be configured with prompt capabilities"));
}
return Mono.defer(() -> {
McpServerFeatures.AsyncPromptSpecification specification = this.prompts
.putIfAbsent(promptSpecification.prompt().name(), promptSpecification);
if (specification != null) {
return Mono.error(new McpError(
"Prompt with name '" + promptSpecification.prompt().name() + "' already exists"));
}
logger.debug("Added prompt handler: {}", promptSpecification.prompt().name());
// Servers that declared the listChanged capability SHOULD send a
// notification,
// when the list of available prompts changes
if (this.serverCapabilities.prompts().listChanged()) {
return notifyPromptsListChanged();
}
return Mono.empty();
});
}
@Override
public Mono<Void> removePrompt(String promptName) {
if (promptName == null) {
return Mono.error(new McpError("Prompt name must not be null"));
}
if (this.serverCapabilities.prompts() == null) {
return Mono.error(new McpError("Server must be configured with prompt capabilities"));
}
return Mono.defer(() -> {
McpServerFeatures.AsyncPromptSpecification removed = this.prompts.remove(promptName);
if (removed != null) {
logger.debug("Removed prompt handler: {}", promptName);
// Servers that declared the listChanged capability SHOULD send a
// notification, when the list of available prompts changes
if (this.serverCapabilities.prompts().listChanged()) {
return this.notifyPromptsListChanged();
}
return Mono.empty();
}
return Mono.error(new McpError("Prompt with name '" + promptName + "' not found"));
});
}
@Override
public Mono<Void> notifyPromptsListChanged() {
return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_PROMPTS_LIST_CHANGED, null);
}
private McpServerSession.RequestHandler<McpSchema.ListPromptsResult> promptsListRequestHandler() {
return (exchange, params) -> {
// TODO: Implement pagination
// McpSchema.PaginatedRequest request = objectMapper.convertValue(params,
// new TypeReference<McpSchema.PaginatedRequest>() {
// });
var promptList = this.prompts.values()
.stream()
.map(McpServerFeatures.AsyncPromptSpecification::prompt)
.toList();
return Mono.just(new McpSchema.ListPromptsResult(promptList, null));
};
}
private McpServerSession.RequestHandler<McpSchema.GetPromptResult> promptsGetRequestHandler() {
return (exchange, params) -> {
McpSchema.GetPromptRequest promptRequest = objectMapper.convertValue(params,
new TypeReference<McpSchema.GetPromptRequest>() {
});
// Implement prompt retrieval logic here
McpServerFeatures.AsyncPromptSpecification specification = this.prompts.get(promptRequest.name());
if (specification == null) {
return Mono.error(new McpError("Prompt not found: " + promptRequest.name()));
}
return specification.promptHandler().apply(exchange, promptRequest);
};
}
// ---------------------------------------
// Logging Management
// ---------------------------------------
@Override
public Mono<Void> loggingNotification(LoggingMessageNotification loggingMessageNotification) {
if (loggingMessageNotification == null) {
return Mono.error(new McpError("Logging message must not be null"));
}
Map<String, Object> params = this.objectMapper.convertValue(loggingMessageNotification,
new TypeReference<Map<String, Object>>() {
});
if (loggingMessageNotification.level().level() < minLoggingLevel.level()) {
return Mono.empty();
}
return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_MESSAGE, params);
}
private McpServerSession.RequestHandler<Void> setLoggerRequestHandler() {
return (exchange, params) -> {
this.minLoggingLevel = objectMapper.convertValue(params, new TypeReference<LoggingLevel>() {
});
return Mono.empty();
};
}
// ---------------------------------------
// Sampling
// ---------------------------------------
@Override
void setProtocolVersions(List<String> protocolVersions) {
this.protocolVersions = protocolVersions;
}
}
}
McpSyncServer
同步服务端的实现,封装了McpSyncServer以提供阻塞操作,其余功能方法和McpAsyncServer保持一致
public class McpSyncServer {
private final McpAsyncServer asyncServer;
public McpSyncServer(McpAsyncServer asyncServer) {
Assert.notNull(asyncServer, "Async server must not be null");
this.asyncServer = asyncServer;
}
public void addTool(McpServerFeatures.SyncToolSpecification toolHandler) {
this.asyncServer.addTool(McpServerFeatures.AsyncToolSpecification.fromSync(toolHandler)).block();
}
public void removeTool(String toolName) {
this.asyncServer.removeTool(toolName).block();
}
public void addResource(McpServerFeatures.SyncResourceSpecification resourceHandler) {
this.asyncServer.addResource(McpServerFeatures.AsyncResourceSpecification.fromSync(resourceHandler)).block();
}
public void removeResource(String resourceUri) {
this.asyncServer.removeResource(resourceUri).block();
}
public void addPrompt(McpServerFeatures.SyncPromptSpecification promptSpecification) {
this.asyncServer.addPrompt(McpServerFeatures.AsyncPromptSpecification.fromSync(promptSpecification)).block();
}
public void removePrompt(String promptName) {
this.asyncServer.removePrompt(promptName).block();
}
public void notifyToolsListChanged() {
this.asyncServer.notifyToolsListChanged().block();
}
public McpSchema.ServerCapabilities getServerCapabilities() {
return this.asyncServer.getServerCapabilities();
}
public McpSchema.Implementation getServerInfo() {
return this.asyncServer.getServerInfo();
}
public void notifyResourcesListChanged() {
this.asyncServer.notifyResourcesListChanged().block();
}
public void notifyPromptsListChanged() {
this.asyncServer.notifyPromptsListChanged().block();
}
public void loggingNotification(LoggingMessageNotification loggingMessageNotification) {
this.asyncServer.loggingNotification(loggingMessageNotification).block();
}
public void closeGracefully() {
this.asyncServer.closeGracefully().block();
}
public void close() {
this.asyncServer.close();
}
public McpAsyncServer getAsyncServer() {
return this.asyncServer;
}
}
McpTransport(传输层接口)
MCP中定义异步传输层的核心接口,负责管理客户端和服务器端之间的双向通信,提供了自定义传输机制的基础,具体功能如下:
- 管理传输连接的生命周期:包括连接到建立、关闭和资源释放
- 处理来自服务器的消息和错误:
- 将客户端生成的消息发送到服务器
public interface McpTransport {
// 关闭传输连接并释放相关资源
default void close() {
this.closeGracefully().subscribe();
}
// 异步关闭传输连接并释放相关资源
Mono<Void> closeGracefully();
// 异步发送消息到服务器
Mono<Void> sendMessage(JSONRPCMessage message);
// 将给定数据反序列化为指定类型的对象
<T> T unmarshalFrom(Object data, TypeReference<T> typeRef);
}
McpClientTransport(客户端传输层接口)
通过connect方法,建立 Client 与 MCP Server 之间的连接
public interface McpClientTransport extends McpTransport {
Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler);
}
StdioClientTransport
通过标准输入输出流(stdin/stdout)与 Server 进程进行通信
connect:启动服务器进程并初始化消息处理流。设置进程的命令、参数和环境,然后启动输入、输出和错误处理线程sendMessage:发送JSON-RPC消息到 Server 进程closeGracefully:优雅地关闭传输、销毁进程并释放调度器资源,发送TERM信号给进程并等待其退出
public class StdioClientTransport implements McpClientTransport {
private static final Logger logger = LoggerFactory.getLogger(StdioClientTransport.class);
private final Sinks.Many<JSONRPCMessage> inboundSink;
private final Sinks.Many<JSONRPCMessage> outboundSink;
private Process process;
private ObjectMapper objectMapper;
private Scheduler inboundScheduler;
private Scheduler outboundScheduler;
private Scheduler errorScheduler;
private final ServerParameters params;
private final Sinks.Many<String> errorSink;
private volatile boolean isClosing = false;
// visible for tests
private Consumer<String> stdErrorHandler = error -> logger.info("STDERR Message received: {}", error);
public StdioClientTransport(ServerParameters params) {
this(params, new ObjectMapper());
}
public StdioClientTransport(ServerParameters params, ObjectMapper objectMapper) {
Assert.notNull(params, "The params can not be null");
Assert.notNull(objectMapper, "The ObjectMapper can not be null");
this.inboundSink = Sinks.many().unicast().onBackpressureBuffer();
this.outboundSink = Sinks.many().unicast().onBackpressureBuffer();
this.params = params;
this.objectMapper = objectMapper;
this.errorSink = Sinks.many().unicast().onBackpressureBuffer();
// Start threads
this.inboundScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), "inbound");
this.outboundScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), "outbound");
this.errorScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), "error");
}
@Override
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
return Mono.<Void>fromRunnable(() -> {
handleIncomingMessages(handler);
handleIncomingErrors();
// Prepare command and environment
List<String> fullCommand = new ArrayList<>();
fullCommand.add(params.getCommand());
fullCommand.addAll(params.getArgs());
ProcessBuilder processBuilder = this.getProcessBuilder();
processBuilder.command(fullCommand);
processBuilder.environment().putAll(params.getEnv());
// Start the process
try {
this.process = processBuilder.start();
}
catch (IOException e) {
throw new RuntimeException("Failed to start process with command: " + fullCommand, e);
}
// Validate process streams
if (this.process.getInputStream() == null || process.getOutputStream() == null) {
this.process.destroy();
throw new RuntimeException("Process input or output stream is null");
}
// Start threads
startInboundProcessing();
startOutboundProcessing();
startErrorProcessing();
}).subscribeOn(Schedulers.boundedElastic());
}
protected ProcessBuilder getProcessBuilder() {
return new ProcessBuilder();
}
public void setStdErrorHandler(Consumer<String> errorHandler) {
this.stdErrorHandler = errorHandler;
}
public void awaitForExit() {
try {
this.process.waitFor();
}
catch (InterruptedException e) {
throw new RuntimeException("Process interrupted", e);
}
}
private void startErrorProcessing() {
this.errorScheduler.schedule(() -> {
try (BufferedReader processErrorReader = new BufferedReader(
new InputStreamReader(process.getErrorStream()))) {
String line;
while (!isClosing && (line = processErrorReader.readLine()) != null) {
try {
if (!this.errorSink.tryEmitNext(line).isSuccess()) {
if (!isClosing) {
logger.error("Failed to emit error message");
}
break;
}
}
catch (Exception e) {
if (!isClosing) {
logger.error("Error processing error message", e);
}
break;
}
}
}
catch (IOException e) {
if (!isClosing) {
logger.error("Error reading from error stream", e);
}
}
finally {
isClosing = true;
errorSink.tryEmitComplete();
}
});
}
private void handleIncomingMessages(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> inboundMessageHandler) {
this.inboundSink.asFlux()
.flatMap(message -> Mono.just(message)
.transform(inboundMessageHandler)
.contextWrite(ctx -> ctx.put("observation", "myObservation")))
.subscribe();
}
private void handleIncomingErrors() {
this.errorSink.asFlux().subscribe(e -> {
this.stdErrorHandler.accept(e);
});
}
@Override
public Mono<Void> sendMessage(JSONRPCMessage message) {
if (this.outboundSink.tryEmitNext(message).isSuccess()) {
// TODO: essentially we could reschedule ourselves in some time and make
// another attempt with the already read data but pause reading until
// success
// In this approach we delegate the retry and the backpressure onto the
// caller. This might be enough for most cases.
return Mono.empty();
}
else {
return Mono.error(new RuntimeException("Failed to enqueue message"));
}
}
private void startInboundProcessing() {
this.inboundScheduler.schedule(() -> {
try (BufferedReader processReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
String line;
while (!isClosing && (line = processReader.readLine()) != null) {
try {
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, line);
if (!this.inboundSink.tryEmitNext(message).isSuccess()) {
if (!isClosing) {
logger.error("Failed to enqueue inbound message: {}", message);
}
break;
}
}
catch (Exception e) {
if (!isClosing) {
logger.error("Error processing inbound message for line: " + line, e);
}
break;
}
}
}
catch (IOException e) {
if (!isClosing) {
logger.error("Error reading from input stream", e);
}
}
finally {
isClosing = true;
inboundSink.tryEmitComplete();
}
});
}
private void startOutboundProcessing() {
this.handleOutbound(messages -> messages
// this bit is important since writes come from user threads and we
// want to ensure that the actual writing happens on a dedicated thread
.publishOn(outboundScheduler)
.handle((message, s) -> {
if (message != null && !isClosing) {
try {
String jsonMessage = objectMapper.writeValueAsString(message);
// Escape any embedded newlines in the JSON message as per spec:
// https://spec.modelcontextprotocol.io/specification/basic/transports/#stdio
// - Messages are delimited by newlines, and MUST NOT contain
// embedded newlines.
jsonMessage = jsonMessage.replace("\r\n", "\\n").replace("\n", "\\n").replace("\r", "\\n");
var os = this.process.getOutputStream();
synchronized (os) {
os.write(jsonMessage.getBytes(StandardCharsets.UTF_8));
os.write("\n".getBytes(StandardCharsets.UTF_8));
os.flush();
}
s.next(message);
}
catch (IOException e) {
s.error(new RuntimeException(e));
}
}
}));
}
protected void handleOutbound(Function<Flux<JSONRPCMessage>, Flux<JSONRPCMessage>> outboundConsumer) {
outboundConsumer.apply(outboundSink.asFlux()).doOnComplete(() -> {
isClosing = true;
outboundSink.tryEmitComplete();
}).doOnError(e -> {
if (!isClosing) {
logger.error("Error in outbound processing", e);
isClosing = true;
outboundSink.tryEmitComplete();
}
}).subscribe();
}
@Override
public Mono<Void> closeGracefully() {
return Mono.fromRunnable(() -> {
isClosing = true;
logger.debug("Initiating graceful shutdown");
}).then(Mono.defer(() -> {
// First complete all sinks to stop accepting new messages
inboundSink.tryEmitComplete();
outboundSink.tryEmitComplete();
errorSink.tryEmitComplete();
// Give a short time for any pending messages to be processed
return Mono.delay(Duration.ofMillis(100));
})).then(Mono.defer(() -> {
logger.debug("Sending TERM to process");
if (this.process != null) {
this.process.destroy();
return Mono.fromFuture(process.onExit());
}
else {
logger.warn("Process not started");
return Mono.empty();
}
})).doOnNext(process -> {
if (process.exitValue() != 0) {
logger.warn("Process terminated with code " + process.exitValue());
}
}).then(Mono.fromRunnable(() -> {
try {
// The Threads are blocked on readLine so disposeGracefully would not
// interrupt them, therefore we issue an async hard dispose.
inboundScheduler.dispose();
errorScheduler.dispose();
outboundScheduler.dispose();
logger.debug("Graceful shutdown completed");
}
catch (Exception e) {
logger.error("Error during graceful shutdown", e);
}
})).then().subscribeOn(Schedulers.boundedElastic());
}
public Sinks.Many<String> getErrorSink() {
return this.errorSink;
}
@Override
public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
return this.objectMapper.convertValue(data, typeRef);
}
}
WebFluxSseClientTransport
基于Server-Sent Events(SSE)的MCP客户端传输实现类,用于与 MCP Server 建立双向通信通道,基于Spring WebFlux的响应式编程
connect:与MCP Server 建立SSE连接,并设置消息处理管道- 建立SSE连接
- 等待 Server 发送包含端点的endpoint事件
- 设置处理传入JSON-RPC消息的处理器
sendMessage:将JSON-RPC消息发送到 ServereventStream:初始化并启动SSE事件处理流closeGracefully:优雅地关闭传输,清理所有资源(如订阅和调度器)
public class WebFluxSseClientTransport implements McpClientTransport {
private static final Logger logger = LoggerFactory.getLogger(WebFluxSseClientTransport.class);
private static final String MESSAGE_EVENT_TYPE = "message";
private static final String ENDPOINT_EVENT_TYPE = "endpoint";
private static final String SSE_ENDPOINT = "/sse";
private static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {
};
private final WebClient webClient;
protected ObjectMapper objectMapper;
private Disposable inboundSubscription;
private volatile boolean isClosing = false;
protected final Sinks.One<String> messageEndpointSink = Sinks.one();
public WebFluxSseClientTransport(WebClient.Builder webClientBuilder) {
this(webClientBuilder, new ObjectMapper());
}
public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
this.objectMapper = objectMapper;
this.webClient = webClientBuilder.build();
}
@Override
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
Flux<ServerSentEvent<String>> events = eventStream();
this.inboundSubscription = events.concatMap(event -> Mono.just(event).<JSONRPCMessage>handle((e, s) -> {
if (ENDPOINT_EVENT_TYPE.equals(event.event())) {
String messageEndpointUri = event.data();
if (messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) {
s.complete();
}
else {
// TODO: clarify with the spec if multiple events can be
// received
s.error(new McpError("Failed to handle SSE endpoint event"));
}
}
else if (MESSAGE_EVENT_TYPE.equals(event.event())) {
try {
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, event.data());
s.next(message);
}
catch (IOException ioException) {
s.error(ioException);
}
}
else {
s.error(new McpError("Received unrecognized SSE event type: " + event.event()));
}
}).transform(handler)).subscribe();
// The connection is established once the server sends the endpoint event
return messageEndpointSink.asMono().then();
}
@Override
public Mono<Void> sendMessage(JSONRPCMessage message) {
// The messageEndpoint is the endpoint URI to send the messages
// It is provided by the server as part of the endpoint event
return messageEndpointSink.asMono().flatMap(messageEndpointUri -> {
if (isClosing) {
return Mono.empty();
}
try {
String jsonText = this.objectMapper.writeValueAsString(message);
return webClient.post()
.uri(messageEndpointUri)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(jsonText)
.retrieve()
.toBodilessEntity()
.doOnSuccess(response -> {
logger.debug("Message sent successfully");
})
.doOnError(error -> {
if (!isClosing) {
logger.error("Error sending message: {}", error.getMessage());
}
});
}
catch (IOException e) {
if (!isClosing) {
return Mono.error(new RuntimeException("Failed to serialize message", e));
}
return Mono.empty();
}
}).then(); // TODO: Consider non-200-ok response
}
// visible for tests
protected Flux<ServerSentEvent<String>> eventStream() {// @formatter:off
return this.webClient
.get()
.uri(SSE_ENDPOINT)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(SSE_TYPE)
.retryWhen(Retry.from(retrySignal -> retrySignal.handle(inboundRetryHandler)));
} // @formatter:on
private BiConsumer<RetrySignal, SynchronousSink<Object>> inboundRetryHandler = (retrySpec, sink) -> {
if (isClosing) {
logger.debug("SSE connection closed during shutdown");
sink.error(retrySpec.failure());
return;
}
if (retrySpec.failure() instanceof IOException) {
logger.debug("Retrying SSE connection after IO error");
sink.next(retrySpec);
return;
}
logger.error("Fatal SSE error, not retrying: {}", retrySpec.failure().getMessage());
sink.error(retrySpec.failure());
};
@Override
public Mono<Void> closeGracefully() { // @formatter:off
return Mono.fromRunnable(() -> {
isClosing = true;
if (inboundSubscription != null) {
inboundSubscription.dispose();
}
})
.then()
.subscribeOn(Schedulers.boundedElastic());
} // @formatter:on
@Override
public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
return this.objectMapper.convertValue(data, typeRef);
}
}
HttpClientSseClientTransport
基于Server-Sent Events(SSE)的MCP客户端传输实现类,用于与 MCP Server 建立双向通信通道。使用Java的HttpClient进行通信
connect:与 MCP Server 建立SSE连接,并设置消息处理管道- 建立SSE连接(订阅SSE事件 + 处理SSE事件)
- 等待服务器发送包含端点的endpoint事件
- 设置处理传入JSON-RPC消息的处理器
sendMessage:将JSON-RPC消息发送到服务器closeGracefully:优雅地关闭传输,清理所有资源(如订阅和调度器)
public class HttpClientSseClientTransport implements McpClientTransport {
private static final Logger logger = LoggerFactory.getLogger(HttpClientSseClientTransport.class);
private static final String MESSAGE_EVENT_TYPE = "message";
private static final String ENDPOINT_EVENT_TYPE = "endpoint";
private static final String SSE_ENDPOINT = "/sse";
private final String baseUri;
private final FlowSseClient sseClient;
private final HttpClient httpClient;
protected ObjectMapper objectMapper;
private volatile boolean isClosing = false;
private final CountDownLatch closeLatch = new CountDownLatch(1);
private final AtomicReference<String> messageEndpoint = new AtomicReference<>();
private final AtomicReference<CompletableFuture<Void>> connectionFuture = new AtomicReference<>();
public HttpClientSseClientTransport(String baseUri) {
this(HttpClient.newBuilder(), baseUri, new ObjectMapper());
}
public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, String baseUri, ObjectMapper objectMapper) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
Assert.hasText(baseUri, "baseUri must not be empty");
Assert.notNull(clientBuilder, "clientBuilder must not be null");
this.baseUri = baseUri;
this.objectMapper = objectMapper;
this.httpClient = clientBuilder.connectTimeout(Duration.ofSeconds(10)).build();
this.sseClient = new FlowSseClient(this.httpClient);
}
@Override
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
CompletableFuture<Void> future = new CompletableFuture<>();
connectionFuture.set(future);
sseClient.subscribe(this.baseUri + SSE_ENDPOINT, new FlowSseClient.SseEventHandler() {
@Override
public void onEvent(SseEvent event) {
if (isClosing) {
return;
}
try {
if (ENDPOINT_EVENT_TYPE.equals(event.type())) {
String endpoint = event.data();
messageEndpoint.set(endpoint);
closeLatch.countDown();
future.complete(null);
}
else if (MESSAGE_EVENT_TYPE.equals(event.type())) {
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, event.data());
handler.apply(Mono.just(message)).subscribe();
}
else {
logger.error("Received unrecognized SSE event type: {}", event.type());
}
}
catch (IOException e) {
logger.error("Error processing SSE event", e);
future.completeExceptionally(e);
}
}
@Override
public void onError(Throwable error) {
if (!isClosing) {
logger.error("SSE connection error", error);
future.completeExceptionally(error);
}
}
});
return Mono.fromFuture(future);
}
@Override
public Mono<Void> sendMessage(JSONRPCMessage message) {
if (isClosing) {
return Mono.empty();
}
try {
if (!closeLatch.await(10, TimeUnit.SECONDS)) {
return Mono.error(new McpError("Failed to wait for the message endpoint"));
}
}
catch (InterruptedException e) {
return Mono.error(new McpError("Failed to wait for the message endpoint"));
}
String endpoint = messageEndpoint.get();
if (endpoint == null) {
return Mono.error(new McpError("No message endpoint available"));
}
try {
String jsonText = this.objectMapper.writeValueAsString(message);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(this.baseUri + endpoint))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(jsonText))
.build();
return Mono.fromFuture(
httpClient.sendAsync(request, HttpResponse.BodyHandlers.discarding()).thenAccept(response -> {
if (response.statusCode() != 200 && response.statusCode() != 201 && response.statusCode() != 202
&& response.statusCode() != 206) {
logger.error("Error sending message: {}", response.statusCode());
}
}));
}
catch (IOException e) {
if (!isClosing) {
return Mono.error(new RuntimeException("Failed to serialize message", e));
}
return Mono.empty();
}
}
@Override
public Mono<Void> closeGracefully() {
return Mono.fromRunnable(() -> {
isClosing = true;
CompletableFuture<Void> future = connectionFuture.get();
if (future != null && !future.isDone()) {
future.cancel(true);
}
});
}
@Override
public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
return this.objectMapper.convertValue(data, typeRef);
}
}
