使用 Model Context Protocol (MCP) 的 HTTPS 流式传输 本章全面介绍了如何使用 HTTPS 结合 Model Context Protocol (MCP) 实现安全、可扩展且实时的流式传输。内容涵盖了流式传输的动机、可用的传输机制、如何在 MCP 中实现可流式的 HTTP、安全最佳实践、从 SSE 的迁移以及构建流式 MCP 应用的实用指导。 MCP 中的传输机制与流式传输 本节探讨 MCP 中可用的不同传输机制及其在实现客户端与服务器实时通信流式能力中的作用。 什么是传输机制? 传输机制定义了客户端和服务器之间数据交换的方式。MCP 支持多种传输类型,以适应不同环境和需求: stdio:标准输入/输出,适合本地和命令行工具。简单但不适合 Web 或云环境。
本章全面介绍了如何使用 HTTPS 结合 Model Context Protocol (MCP) 实现安全、可扩展且实时的流式传输。内容涵盖了流式传输的动机、可用的传输机制、如何在 MCP 中实现可流式的 HTTP、安全最佳实践、从 SSE 的迁移以及构建流式 MCP 应用的实用指导。
本节探讨 MCP 中可用的不同传输机制及其在实现客户端与服务器实时通信流式能力中的作用。
传输机制定义了客户端和服务器之间数据交换的方式。MCP 支持多种传输类型,以适应不同环境和需求:
请查看下表,了解这些传输机制的区别:
| 传输方式 | 实时更新 | 流式传输 | 可扩展性 | 使用场景 |
|---|---|---|---|---|
| stdio | 否 | 否 | 低 | 本地命令行工具 |
| SSE | 是 | 是 | 中 | Web,实时更新 |
| 可流式 HTTP | 是 | 是 | 高 | 云环境,多客户端 |
提示: 选择合适的传输方式会影响性能、可扩展性和用户体验。可流式 HTTP 推荐用于现代、可扩展且云就绪的应用。
请注意前几章中介绍的 stdio 和 SSE 传输方式,本章重点介绍的是可流式 HTTP 传输。
理解流式传输的基本概念和动机,对于实现高效的实时通信系统至关重要。
流式传输 是网络编程中的一种技术,它允许数据分批或作为事件序列发送和接收,而不必等待整个响应准备完毕。该技术特别适用于:
以下是流式传输的核心要点:
使用流式传输的原因包括:
下面是一个简单的流式传输实现示例:
服务器(Python,使用 FastAPI 和 StreamingResponse):
from fastapi import FastAPI from fastapi.responses import StreamingResponse import time app = FastAPI() async def event_stream(): for i in range(1, 6): yield f"data: Message {i}\n\n" time.sleep(1) @app.get("/stream") def stream(): return StreamingResponse(event_stream(), media_type="text/event-stream")
客户端(Python,使用 requests):
import requests with requests.get("http://localhost:8000/stream", stream=True) as r: for line in r.iter_lines(): if line: print(line.decode())
该示例展示了服务器如何在消息准备好时逐条发送给客户端,而不是等待所有消息准备完毕。
工作原理:
要求:
StreamingResponse in FastAPI).stream=True in requests).text/event-stream or application/octet-stream)服务器(Java,使用 Spring Boot 和服务器发送事件 SSE):
@RestController public class CalculatorController { @GetMapping(value = "/calculate", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<String>> calculate(@RequestParam double a, @RequestParam double b, @RequestParam String op) { double result; switch (op) { case "add": result = a + b; break; case "sub": result = a - b; break; case "mul": result = a * b; break; case "div": result = b != 0 ? a / b : Double.NaN; break; default: result = Double.NaN; } return Flux.<ServerSentEvent<String>>just( ServerSentEvent.<String>builder() .event("info") .data("Calculating: " + a + " " + op + " " + b) .build(), ServerSentEvent.<String>builder() .event("result") .data(String.valueOf(result)) .build() ) .delayElements(Duration.ofSeconds(1)); } }
客户端(Java,使用 Spring WebFlux WebClient):
@SpringBootApplication public class CalculatorClientApplication implements CommandLineRunner { private final WebClient client = WebClient.builder() .baseUrl("http://localhost:8080") .build(); @Override public void run(String... args) { client.get() .uri(uriBuilder -> uriBuilder .path("/calculate") .queryParam("a", 7) .queryParam("b", 5) .queryParam("op", "mul") .build()) .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() .bodyToFlux(String.class) .doOnNext(System.out::println) .blockLast(); } }
Java 实现说明:
Flux for streamingServerSentEvent provides structured event streaming with event typesWebClient with bodyToFlux() enables reactive streaming consumptiondelayElements() simulates processing time between eventsinfo, result) for better client handlingThe differences between how streaming works in a "classical" manner versus how it works in MCP can be depicted like so:
| Feature | Classic HTTP Streaming | MCP Streaming (Notifications) |
|---|---|---|
| Main response | Chunked | Single, at end |
| Progress updates | Sent as data chunks | Sent as notifications |
| Client requirements | Must process stream | Must implement message handler |
| Use case | Large files, AI token streams | Progress, logs, real-time feedback |
Additionally, here are some key differences:
Communication Pattern:
Message Format:
Client Implementation:
Progress Updates:
There are some things we recommend when it comes to choosing between implementing classical streaming (as an endpoint we showed you above using /stream,与选择 MCP 流式传输进行对比。
简单流式需求: 传统 HTTP 流式传输更易实现,适合基本流式需求。
复杂交互应用: MCP 流式传输提供更结构化的方式,拥有丰富的元数据,区分通知和最终结果。
AI 应用: MCP 的通知系统特别适合长时间运行的 AI 任务,方便持续向用户反馈进度。
到目前为止,你已经了解了经典流式传输和 MCP 流式传输的推荐与比较。接下来详细介绍如何在 MCP 中利用流式传输。
理解 MCP 框架中的流式传输机制,对于构建在长时间运行操作中向用户实时反馈的响应式应用至关重要。
在 MCP 中,流式传输并非将主响应分块发送,而是在工具处理请求时向客户端发送通知。这些通知可以包括进度更新、日志或其他事件。
主结果仍作为单个响应发送。但在处理过程中,可以发送单独的通知消息,从而实时更新客户端。客户端必须能处理并显示这些通知。
我们提到了“通知”,在 MCP 中这指的是什么?
通知是服务器向客户端发送的消息,用于告知长时间运行操作中的进度、状态或其他事件。通知提升了透明度和用户体验。
例如,客户端应在与服务器完成初始握手后发送一条通知。
通知的 JSON 消息示例如下:
{ jsonrpc: "2.0"; method: string; params?: { [key: string]: unknown; }; }
通知属于 MCP 中称为"Logging" 的主题。
要启用日志功能,服务器需将其作为特性/能力开启,示例如下:
{ "capabilities": { "logging": {} } }
[!NOTE]
根据所用 SDK,日志功能可能默认开启,或需在服务器配置中显式启用。
通知类型如下:
| 级别 | 描述 | 示例用例 |
|---|---|---|
| debug | 详细调试信息 | 函数入口/出口点 |
| info | 一般信息消息 | 操作进度更新 |
| notice | 普通但重要事件 | 配置变更 |
| warning | 警告条件 | 过时功能使用 |
| error | 错误条件 | 操作失败 |
| critical | 严重条件 | 系统组件故障 |
| alert | 必须立即采取行动 | 发现数据损坏 |
| emergency | 系统不可用 | 完全系统故障 |
要在 MCP 中实现通知,需要同时设置服务器端和客户端以处理实时更新,从而让应用在长时间运行操作中即时反馈给用户。
先从服务器端开始。在 MCP 中,你定义的工具可以在处理请求时发送通知。服务器使用上下文对象(通常是 ctx)向客户端发送消息。
@mcp.tool(description="A tool that sends progress notifications") async def process_files(message: str, ctx: Context) -> TextContent: await ctx.info("Processing file 1/3...") await ctx.info("Processing file 2/3...") await ctx.info("Processing file 3/3...") return TextContent(type="text", text=f"Done: {message}")
在上例中,process_files tool sends three notifications to the client as it processes each file. The ctx.info() method is used to send informational messages.
Additionally, to enable notifications, ensure your server uses a streaming transport (like streamable-http) and your client implements a message handler to process notifications. Here's how you can set up the server to use the streamable-http 传输:
mcp.run(transport="streamable-http")
[Tool("A tool that sends progress notifications")] public async Task<TextContent> ProcessFiles(string message, ToolContext ctx) { await ctx.Info("Processing file 1/3..."); await ctx.Info("Processing file 2/3..."); await ctx.Info("Processing file 3/3..."); return new TextContent { Type = "text", Text = $"Done: {message}" }; }
在该 .NET 示例中,使用了 ProcessFiles tool is decorated with the Tool attribute and sends three notifications to the client as it processes each file. The ctx.Info() 方法发送信息消息。
确保你的 .NET MCP 服务器使用流式传输以启用通知:
var builder = McpBuilder.Create(); await builder .UseStreamableHttp() // Enable streamable HTTP transport .Build() .RunAsync();
客户端必须实现消息处理器,用于处理并显示接收到的通知。
async def message_handler(message): if isinstance(message, types.ServerNotification): print("NOTIFICATION:", message) else: print("SERVER MESSAGE:", message) async with ClientSession( read_stream, write_stream, logging_callback=logging_collector, message_handler=message_handler, ) as session:
上述代码中,message_handler function checks if the incoming message is a notification. If it is, it prints the notification; otherwise, it processes it as a regular server message. Also note how the ClientSession is initialized with the message_handler 用于处理接收的通知。
// Define a message handler void MessageHandler(IJsonRpcMessage message) { if (message is ServerNotification notification) { Console.WriteLine($"NOTIFICATION: {notification}"); } else { Console.WriteLine($"SERVER MESSAGE: {message}"); } } // Create and use a client session with the message handler var clientOptions = new ClientSessionOptions { MessageHandler = MessageHandler, LoggingCallback = (level, message) => Console.WriteLine($"[{level}] {message}") }; using var client = new ClientSession(readStream, writeStream, clientOptions); await client.InitializeAsync(); // Now the client will process notifications through the MessageHandler
该 .NET 示例中,MessageHandler function checks if the incoming message is a notification. If it is, it prints the notification; otherwise, it processes it as a regular server message. The ClientSession is initialized with the message handler via the ClientSessionOptions.
To enable notifications, ensure your server uses a streaming transport (like streamable-http,客户端实现了消息处理器来处理通知。
本节介绍 MCP 中进度通知的概念、重要性及如何使用可流式 HTTP 实现。还包含一个实操练习,帮助加深理解。
进度通知是在长时间运行操作中,服务器向客户端实时发送的消息。服务器无需等待整个过程完成,即可持续向客户端更新当前状态。这提升了透明度、用户体验,也方便调试。
示例:
"Processing document 1/10" "Processing document 2/10" ... "Processing complete!"
进度通知的重要性体现在:
实现进度通知的方法:
ctx.info() or ctx.log() 在处理每个项目时发送通知。这些消息在主结果准备好之前发送给客户端。服务器示例:
@mcp.tool(description="A tool that sends progress notifications") async def process_files(message: str, ctx: Context) -> TextContent: for i in range(1, 11): await ctx.info(f"Processing document {i}/10") await ctx.info("Processing complete!") return TextContent(type="text", text=f"Done: {message}")
客户端示例:
async def message_handler(message): if isinstance(message, types.ServerNotification): print("NOTIFICATION:", message) else: print("SERVER MESSAGE:", message)
在使用基于 HTTP 的传输实现 MCP 服务器时,安全问题至关重要,需要关注多种攻击向量和防护机制。
公开 MCP 服务器时,安全性尤为关键。可流式 HTTP 引入了新的攻击面,需谨慎配置。
Origin header to prevent DNS rebinding attacks.localhost to avoid exposing them to the public internet.For applications currently using Server-Sent Events (SSE), migrating to Streamable HTTP provides enhanced capabilities and better long-term sustainability for your MCP implementations.
transport="streamable-http" in mcp.run().streamablehttp_client instead of SSE client.Security should be a top priority when implementing any server, especially when using HTTP-based transports like Streamable HTTP in MCP.
When implementing MCP servers with HTTP-based transports, security becomes a paramount concern that requires careful attention to multiple attack vectors and protection mechanisms.
Security is critical when exposing MCP servers over HTTP. Streamable HTTP introduces new attack surfaces and requires careful configuration.
Here are some key security considerations:
Origin header to prevent DNS rebinding attacks.localhost to avoid exposing them to the public internet.Additionally, here are some best practices to follow when implementing security in your MCP streaming server:
You will face some challenges when implementing security in MCP streaming servers:
For applications currently using Server-Sent Events (SSE), migrating to Streamable HTTP provides enhanced capabilities and better long-term sustainability for your MCP implementations.
There are two compelling reasons to upgrade from SSE to Streamable HTTP:
Here's how you can migrate from SSE to Streamable HTTP in your MCP applications:
transport="streamable-http" in mcp.run().streamablehttp_client 而非 SSE 客户端。迁移过程中建议保持与现有 SSE 客户端的兼容性。策略包括:
迁移时需解决以下问题:
场景:
构建一个 MCP 服务器和客户端,服务器处理一组项目(如文件或文档),并在处理每个项目时发送通知。客户端应实时显示每条通知。
步骤:
继续深入 MCP 流式传输的学习,本节提供额外资源和建议的下一步行动,助力构建更高级的应用。
免责声明:
本文件使用AI翻译服务Co-op Translator进行翻译。虽然我们力求准确,但请注意自动翻译可能包含错误或不准确之处。原始语言的文件应被视为权威来源。对于重要信息,建议采用专业人工翻译。我们不对因使用本翻译而产生的任何误解或误释承担责任。