08-MCP开发最佳实践


文档摘要

MCP 开发最佳实践 概述 本课重点介绍在生产环境中开发、测试和部署 MCP 服务器及功能的高级最佳实践。随着 MCP 生态系统的复杂性和重要性不断提升,遵循既定模式能够确保系统的可靠性、可维护性和互操作性。本课汇集了来自实际 MCP 实现的宝贵经验,指导你打造健壮、高效的服务器及其资源、提示和工具。 学习目标 完成本课后,你将能够: 在 MCP 服务器和功能设计中应用行业最佳实践 制定全面的 MCP 服务器测试策略 设计高效且可复用的复杂 MCP 应用工作流模式 在 MCP 服务器中实现正确的错误处理、日志记录和可观测性 优化 MCP 实现的性能、安全性和可维护性 额外参考资料 获取最新 MCP 最佳实践信息,请参考: MCP 文档 MCP 规范 GitHub 仓库 MCP

MCP 开发最佳实践

概述

本课重点介绍在生产环境中开发、测试和部署 MCP 服务器及功能的高级最佳实践。随着 MCP 生态系统的复杂性和重要性不断提升,遵循既定模式能够确保系统的可靠性、可维护性和互操作性。本课汇集了来自实际 MCP 实现的宝贵经验,指导你打造健壮、高效的服务器及其资源、提示和工具。

学习目标

完成本课后,你将能够:

  • 在 MCP 服务器和功能设计中应用行业最佳实践
  • 制定全面的 MCP 服务器测试策略
  • 设计高效且可复用的复杂 MCP 应用工作流模式
  • 在 MCP 服务器中实现正确的错误处理、日志记录和可观测性
  • 优化 MCP 实现的性能、安全性和可维护性

额外参考资料

获取最新 MCP 最佳实践信息,请参考:

MCP 工具开发最佳实践

架构原则

1. 单一职责原则

每个 MCP 功能应有明确且专注的目标。避免创建试图处理多种职责的单一庞大工具,而应开发专注于特定任务的专业工具。

良好示例:

// A focused tool that does one thing well public class WeatherForecastTool : ITool { private readonly IWeatherService _weatherService; public WeatherForecastTool(IWeatherService weatherService) { _weatherService = weatherService; } public string Name => "weatherForecast"; public string Description => "Gets weather forecast for a specific location"; public ToolDefinition GetDefinition() { return new ToolDefinition { Name = Name, Description = Description, Parameters = new Dictionary<string, ParameterDefinition> { ["location"] = new ParameterDefinition { Type = ParameterType.String, Description = "City or location name" }, ["days"] = new ParameterDefinition { Type = ParameterType.Integer, Description = "Number of forecast days", Default = 3 } }, Required = new[] { "location" } }; } public async Task<ToolResponse> ExecuteAsync(IDictionary<string, object> parameters) { var location = parameters["location"].ToString(); var days = parameters.ContainsKey("days") ? Convert.ToInt32(parameters["days"]) : 3; var forecast = await _weatherService.GetForecastAsync(location, days); return new ToolResponse { Content = new List<ContentItem> { new TextContent(JsonSerializer.Serialize(forecast)) } }; } }

不良示例:

// A tool trying to do too many things public class WeatherToolSuite : ITool { public string Name => "weather"; public string Description => "Weather-related functionality"; public ToolDefinition GetDefinition() { return new ToolDefinition { Name = Name, Description = Description, Parameters = new Dictionary<string, ParameterDefinition> { ["action"] = new ParameterDefinition { Type = ParameterType.String, Description = "Weather action to perform", Enum = new[] { "forecast", "history", "alerts", "radar" } }, ["location"] = new ParameterDefinition { Type = ParameterType.String, Description = "City or location name" }, // Many more properties for different actions... }, required = new[] { "action", "location" } }; } public async Task<ToolResponse> ExecuteAsync(ToolRequest request) { // Complex conditional logic to handle different actions var action = request.Parameters.GetProperty("action").GetString(); var location = request.Parameters.GetProperty("location").GetString(); switch (action) { case "forecast": // Forecast logic break; case "history": // Historical data logic break; // More cases... default: throw new ToolExecutionException($"Unknown action: {action}"); } // Result processing // ... } }

2. 依赖注入与可测试性

设计工具时通过构造函数注入依赖,使其更易测试和配置:

// Java example with dependency injection public class CurrencyConversionTool implements Tool { private final ExchangeRateService exchangeService; private final CacheService cacheService; private final Logger logger; // Dependencies injected through constructor public CurrencyConversionTool( ExchangeRateService exchangeService, CacheService cacheService, Logger logger) { this.exchangeService = exchangeService; this.cacheService = cacheService; this.logger = logger; } // Tool implementation // ... }

3. 可组合工具

设计可以组合在一起以构建更复杂工作流的工具:

# Python example showing composable tools class DataFetchTool(Tool): def get_name(self): return "dataFetch" # Implementation... class DataAnalysisTool(Tool): def get_name(self): return "dataAnalysis" # This tool can use results from the dataFetch tool async def execute_async(self, request): # Implementation... pass class DataVisualizationTool(Tool): def get_name(self): return "dataVisualize" # This tool can use results from the dataAnalysis tool async def execute_async(self, request): # Implementation... pass # These tools can be used independently or as part of a workflow

Schema 设计最佳实践

Schema 是模型与工具之间的契约。设计良好的 schema 能提升工具的易用性。

1. 清晰的参数描述

始终为每个参数提供详细描述:

public object GetSchema() { return new { type = "object", properties = new { query = new { type = "string", description = "Search query text. Use precise keywords for better results." }, filters = new { type = "object", description = "Optional filters to narrow down search results", properties = new { dateRange = new { type = "string", description = "Date range in format YYYY-MM-DD:YYYY-MM-DD" }, category = new { type = "string", description = "Category name to filter by" } } }, limit = new { type = "integer", description = "Maximum number of results to return (1-50)", default = 10 } }, required = new[] { "query" } }; }

2. 验证约束

添加验证约束以防止无效输入:

Map<String, Object> getSchema() { Map<String, Object> schema = new HashMap<>(); schema.put("type", "object"); Map<String, Object> properties = new HashMap<>(); // Email property with format validation Map<String, Object> email = new HashMap<>(); email.put("type", "string"); email.put("format", "email"); email.put("description", "User email address"); // Age property with numeric constraints Map<String, Object> age = new HashMap<>(); age.put("type", "integer"); age.put("minimum", 13); age.put("maximum", 120); age.put("description", "User age in years"); // Enumerated property Map<String, Object> subscription = new HashMap<>(); subscription.put("type", "string"); subscription.put("enum", Arrays.asList("free", "basic", "premium")); subscription.put("default", "free"); subscription.put("description", "Subscription tier"); properties.put("email", email); properties.put("age", age); properties.put("subscription", subscription); schema.put("properties", properties); schema.put("required", Arrays.asList("email")); return schema; }

3. 一致的返回结构

保持响应结构的一致性,便于模型解析结果:

async def execute_async(self, request): try: # Process request results = await self._search_database(request.parameters["query"]) # Always return a consistent structure return ToolResponse( result={ "matches": [self._format_item(item) for item in results], "totalCount": len(results), "queryTime": calculation_time_ms, "status": "success" } ) except Exception as e: return ToolResponse( result={ "matches": [], "totalCount": 0, "queryTime": 0, "status": "error", "error": str(e) } ) def _format_item(self, item): """Ensures each item has a consistent structure""" return { "id": item.id, "title": item.title, "summary": item.summary[:100] + "..." if len(item.summary) > 100 else item.summary, "url": item.url, "relevance": item.score }

错误处理

健壮的错误处理对于 MCP 工具的可靠性至关重要。

1. 优雅的错误处理

在适当层级处理错误,并提供有用的错误信息:

public async Task<ToolResponse> ExecuteAsync(ToolRequest request) { try { string fileId = request.Parameters.GetProperty("fileId").GetString(); try { var fileData = await _fileService.GetFileAsync(fileId); return new ToolResponse { Result = JsonSerializer.SerializeToElement(fileData) }; } catch (FileNotFoundException) { throw new ToolExecutionException($"File not found: {fileId}"); } catch (UnauthorizedAccessException) { throw new ToolExecutionException("You don't have permission to access this file"); } catch (Exception ex) when (ex is IOException || ex is TimeoutException) { _logger.LogError(ex, "Error accessing file {FileId}", fileId); throw new ToolExecutionException("Error accessing file: The service is temporarily unavailable"); } } catch (JsonException) { throw new ToolExecutionException("Invalid file ID format"); } catch (Exception ex) { _logger.LogError(ex, "Unexpected error in FileAccessTool"); throw new ToolExecutionException("An unexpected error occurred"); } }

2. 结构化错误响应

尽可能返回结构化的错误信息:

@Override public ToolResponse execute(ToolRequest request) { try { // Implementation } catch (Exception ex) { Map<String, Object> errorResult = new HashMap<>(); errorResult.put("success", false); if (ex instanceof ValidationException) { ValidationException validationEx = (ValidationException) ex; errorResult.put("errorType", "validation"); errorResult.put("errorMessage", validationEx.getMessage()); errorResult.put("validationErrors", validationEx.getErrors()); return new ToolResponse.Builder() .setResult(errorResult) .build(); } // Re-throw other exceptions as ToolExecutionException throw new ToolExecutionException("Tool execution failed: " + ex.getMessage(), ex); } }

3. 重试机制

为临时失败实现合适的重试逻辑:

async def execute_async(self, request): max_retries = 3 retry_count = 0 base_delay = 1 # seconds while retry_count < max_retries: try: # Call external API return await self._call_api(request.parameters) except TransientError as e: retry_count += 1 if retry_count >= max_retries: raise ToolExecutionException(f"Operation failed after {max_retries} attempts: {str(e)}") # Exponential backoff delay = base_delay * (2 ** (retry_count - 1)) logging.warning(f"Transient error, retrying in {delay}s: {str(e)}") await asyncio.sleep(delay) except Exception as e: # Non-transient error, don't retry raise ToolExecutionException(f"Operation failed: {str(e)}")

性能优化

1. 缓存

对耗时操作实现缓存:

public class CachedDataTool : IMcpTool { private readonly IDatabase _database; private readonly IMemoryCache _cache; public CachedDataTool(IDatabase database, IMemoryCache cache) { _database = database; _cache = cache; } public async Task<ToolResponse> ExecuteAsync(ToolRequest request) { var query = request.Parameters.GetProperty("query").GetString(); // Create cache key based on parameters var cacheKey = $"data_query_{ComputeHash(query)}"; // Try to get from cache first if (_cache.TryGetValue(cacheKey, out var cachedResult)) { return new ToolResponse { Result = cachedResult }; } // Cache miss - perform actual query var result = await _database.QueryAsync(query); // Store in cache with expiration var cacheOptions = new MemoryCacheEntryOptions() .SetAbsoluteExpiration(TimeSpan.FromMinutes(15)); _cache.Set(cacheKey, JsonSerializer.SerializeToElement(result), cacheOptions); return new ToolResponse { Result = JsonSerializer.SerializeToElement(result) }; } private string ComputeHash(string input) { // Implementation to generate stable hash for cache key } }

2. 异步处理

针对 I/O 密集型操作使用异步编程模式:

public class AsyncDocumentProcessingTool implements Tool { private final DocumentService documentService; private final ExecutorService executorService; @Override public ToolResponse execute(ToolRequest request) { String documentId = request.getParameters().get("documentId").asText(); // For long-running operations, return a processing ID immediately String processId = UUID.randomUUID().toString(); // Start async processing CompletableFuture.runAsync(() -> { try { // Perform long-running operation documentService.processDocument(documentId); // Update status (would typically be stored in a database) processStatusRepository.updateStatus(processId, "completed"); } catch (Exception ex) { processStatusRepository.updateStatus(processId, "failed", ex.getMessage()); } }, executorService); // Return immediate response with process ID Map<String, Object> result = new HashMap<>(); result.put("processId", processId); result.put("status", "processing"); result.put("estimatedCompletionTime", ZonedDateTime.now().plusMinutes(5)); return new ToolResponse.Builder().setResult(result).build(); } // Companion status check tool public class ProcessStatusTool implements Tool { @Override public ToolResponse execute(ToolRequest request) { String processId = request.getParameters().get("processId").asText(); ProcessStatus status = processStatusRepository.getStatus(processId); return new ToolResponse.Builder().setResult(status).build(); } } }

3. 资源限流

实现资源限流以防止系统过载:

class ThrottledApiTool(Tool): def __init__(self): self.rate_limiter = TokenBucketRateLimiter( tokens_per_second=5, # Allow 5 requests per second bucket_size=10 # Allow bursts up to 10 requests ) async def execute_async(self, request): # Check if we can proceed or need to wait delay = self.rate_limiter.get_delay_time() if delay > 0: if delay > 2.0: # If wait is too long raise ToolExecutionException( f"Rate limit exceeded. Please try again in {delay:.1f} seconds." ) else: # Wait for the appropriate delay time await asyncio.sleep(delay) # Consume a token and proceed with the request self.rate_limiter.consume() # Call API result = await self._call_api(request.parameters) return ToolResponse(result=result) class TokenBucketRateLimiter: def __init__(self, tokens_per_second, bucket_size): self.tokens_per_second = tokens_per_second self.bucket_size = bucket_size self.tokens = bucket_size self.last_refill = time.time() self.lock = asyncio.Lock() async def get_delay_time(self): async with self.lock: self._refill() if self.tokens >= 1: return 0 # Calculate time until next token available return (1 - self.tokens) / self.tokens_per_second async def consume(self): async with self.lock: self._refill() self.tokens -= 1 def _refill(self): now = time.time() elapsed = now - self.last_refill # Add new tokens based on elapsed time new_tokens = elapsed * self.tokens_per_second self.tokens = min(self.bucket_size, self.tokens + new_tokens) self.last_refill = now

安全最佳实践

1. 输入验证

始终彻底验证输入参数:

public async Task<ToolResponse> ExecuteAsync(ToolRequest request) { // Validate parameters exist if (!request.Parameters.TryGetProperty("query", out var queryProp)) { throw new ToolExecutionException("Missing required parameter: query"); } // Validate correct type if (queryProp.ValueKind != JsonValueKind.String) { throw new ToolExecutionException("Query parameter must be a string"); } var query = queryProp.GetString(); // Validate string content if (string.IsNullOrWhiteSpace(query)) { throw new ToolExecutionException("Query parameter cannot be empty"); } if (query.Length > 500) { throw new ToolExecutionException("Query parameter exceeds maximum length of 500 characters"); } // Check for SQL injection attacks if applicable if (ContainsSqlInjection(query)) { throw new ToolExecutionException("Invalid query: contains potentially unsafe SQL"); } // Proceed with execution // ... }

2. 授权检查

实施正确的授权检查:

@Override public ToolResponse execute(ToolRequest request) { // Get user context from request UserContext user = request.getContext().getUserContext(); // Check if user has required permissions if (!authorizationService.hasPermission(user, "documents:read")) { throw new ToolExecutionException("User does not have permission to access documents"); } // For specific resources, check access to that resource String documentId = request.getParameters().get("documentId").asText(); if (!documentService.canUserAccess(user.getId(), documentId)) { throw new ToolExecutionException("Access denied to the requested document"); } // Proceed with tool execution // ... }

3. 敏感数据处理

谨慎处理敏感数据:

class SecureDataTool(Tool): def get_schema(self): return { "type": "object", "properties": { "userId": {"type": "string"}, "includeSensitiveData": {"type": "boolean", "default": False} }, "required": ["userId"] } async def execute_async(self, request): user_id = request.parameters["userId"] include_sensitive = request.parameters.get("includeSensitiveData", False) # Get user data user_data = await self.user_service.get_user_data(user_id) # Filter sensitive fields unless explicitly requested AND authorized if not include_sensitive or not self._is_authorized_for_sensitive_data(request): user_data = self._redact_sensitive_fields(user_data) return ToolResponse(result=user_data) def _is_authorized_for_sensitive_data(self, request): # Check authorization level in request context auth_level = request.context.get("authorizationLevel") return auth_level == "admin" def _redact_sensitive_fields(self, user_data): # Create a copy to avoid modifying the original redacted = user_data.copy() # Redact specific sensitive fields sensitive_fields = ["ssn", "creditCardNumber", "password"] for field in sensitive_fields: if field in redacted: redacted[field] = "REDACTED" # Redact nested sensitive data if "financialInfo" in redacted: redacted["financialInfo"] = {"available": True, "accessRestricted": True} return redacted

MCP 工具测试最佳实践

全面测试确保 MCP 工具功能正确,能处理边界情况,并与系统其他部分良好集成。

单元测试

1. 单独测试每个工具

针对每个工具功能编写专注的测试:

[Fact] public async Task WeatherTool_ValidLocation_ReturnsCorrectForecast() { // Arrange var mockWeatherService = new Mock<IWeatherService>(); mockWeatherService .Setup(s => s.GetForecastAsync("Seattle", 3)) .ReturnsAsync(new WeatherForecast(/* test data */)); var tool = new WeatherForecastTool(mockWeatherService.Object); var request = new ToolRequest( toolName: "weatherForecast", parameters: JsonSerializer.SerializeToElement(new { location = "Seattle", days = 3 }) ); // Act var response = await tool.ExecuteAsync(request); // Assert Assert.NotNull(response); var result = JsonSerializer.Deserialize<WeatherForecast>(response.Result); Assert.Equal("Seattle", result.Location); Assert.Equal(3, result.DailyForecasts.Count); } [Fact] public async Task WeatherTool_InvalidLocation_ThrowsToolExecutionException() { // Arrange var mockWeatherService = new Mock<IWeatherService>(); mockWeatherService .Setup(s => s.GetForecastAsync("InvalidLocation", It.IsAny<int>())) .ThrowsAsync(new LocationNotFoundException("Location not found")); var tool = new WeatherForecastTool(mockWeatherService.Object); var request = new ToolRequest( toolName: "weatherForecast", parameters: JsonSerializer.SerializeToElement(new { location = "InvalidLocation", days = 3 }) ); // Act & Assert var exception = await Assert.ThrowsAsync<ToolExecutionException>( () => tool.ExecuteAsync(request) ); Assert.Contains("Location not found", exception.Message); }

2. Schema 验证测试

测试 schema 的有效性及约束的正确执行:

@Test public void testSchemaValidation() { // Create tool instance SearchTool searchTool = new SearchTool(); // Get schema Object schema = searchTool.getSchema(); // Convert schema to JSON for validation String schemaJson = objectMapper.writeValueAsString(schema); // Validate schema is valid JSONSchema JsonSchemaFactory factory = JsonSchemaFactory.byDefault(); JsonSchema jsonSchema = factory.getJsonSchema(schemaJson); // Test valid parameters JsonNode validParams = objectMapper.createObjectNode() .put("query", "test query") .put("limit", 5); ProcessingReport validReport = jsonSchema.validate(validParams); assertTrue(validReport.isSuccess()); // Test missing required parameter JsonNode missingRequired = objectMapper.createObjectNode() .put("limit", 5); ProcessingReport missingReport = jsonSchema.validate(missingRequired); assertFalse(missingReport.isSuccess()); // Test invalid parameter type JsonNode invalidType = objectMapper.createObjectNode() .put("query", "test") .put("limit", "not-a-number"); ProcessingReport invalidReport = jsonSchema.validate(invalidType); assertFalse(invalidReport.isSuccess()); }

3. 错误处理测试

针对错误情况编写专门测试:

@pytest.mark.asyncio async def test_api_tool_handles_timeout(): # Arrange tool = ApiTool(timeout=0.1) # Very short timeout # Mock a request that will time out with aioresponses() as mocked: mocked.get( "https://api.example.com/data", callback=lambda *args, **kwargs: asyncio.sleep(0.5) # Longer than timeout ) request = ToolRequest( tool_name="apiTool", parameters={"url": "https://api.example.com/data"} ) # Act & Assert with pytest.raises(ToolExecutionException) as exc_info: await tool.execute_async(request) # Verify exception message assert "timed out" in str(exc_info.value).lower() @pytest.mark.asyncio async def test_api_tool_handles_rate_limiting(): # Arrange tool = ApiTool() # Mock a rate-limited response with aioresponses() as mocked: mocked.get( "https://api.example.com/data", status=429, headers={"Retry-After": "2"}, body=json.dumps({"error": "Rate limit exceeded"}) ) request = ToolRequest( tool_name="apiTool", parameters={"url": "https://api.example.com/data"} ) # Act & Assert with pytest.raises(ToolExecutionException) as exc_info: await tool.execute_async(request) # Verify exception contains rate limit information error_msg = str(exc_info.value).lower() assert "rate limit" in error_msg assert "try again" in error_msg

集成测试

1. 工具链测试

测试工具间按预期组合工作:

[Fact] public async Task DataProcessingWorkflow_CompletesSuccessfully() { // Arrange var dataFetchTool = new DataFetchTool(mockDataService.Object); var analysisTools = new DataAnalysisTool(mockAnalysisService.Object); var visualizationTool = new DataVisualizationTool(mockVisualizationService.Object); var toolRegistry = new ToolRegistry(); toolRegistry.RegisterTool(dataFetchTool); toolRegistry.RegisterTool(analysisTools); toolRegistry.RegisterTool(visualizationTool); var workflowExecutor = new WorkflowExecutor(toolRegistry); // Act var result = await workflowExecutor.ExecuteWorkflowAsync(new[] { new ToolCall("dataFetch", new { source = "sales2023" }), new ToolCall("dataAnalysis", ctx => new { data = ctx.GetResult("dataFetch"), analysis = "trend" }), new ToolCall("dataVisualize", ctx => new { analysisResult = ctx.GetResult("dataAnalysis"), type = "line-chart" }) }); // Assert Assert.NotNull(result); Assert.True(result.Success); Assert.NotNull(result.GetResult("dataVisualize")); Assert.Contains("chartUrl", result.GetResult("dataVisualize").ToString()); }

2. MCP 服务器测试

测试 MCP 服务器的完整工具注册与执行:

@SpringBootTest @AutoConfigureMockMvc public class McpServerIntegrationTest { @Autowired private MockMvc mockMvc; @Autowired private ObjectMapper objectMapper; @Test public void testToolDiscovery() throws Exception { // Test the discovery endpoint mockMvc.perform(get("/mcp/tools")) .andExpect(status().isOk()) .andExpect(jsonPath("$.tools").isArray()) .andExpect(jsonPath("$.tools[*].name").value(hasItems( "weatherForecast", "calculator", "documentSearch" ))); } @Test public void testToolExecution() throws Exception { // Create tool request Map<String, Object> request = new HashMap<>(); request.put("toolName", "calculator"); Map<String, Object> parameters = new HashMap<>(); parameters.put("operation", "add"); parameters.put("a", 5); parameters.put("b", 7); request.put("parameters", parameters); // Send request and verify response mockMvc.perform(post("/mcp/execute") .contentType(MediaType.APPLICATION_JSON) .content(objectMapper.writeValueAsString(request))) .andExpect(status().isOk()) .andExpect(jsonPath("$.result.value").value(12)); } @Test public void testToolValidation() throws Exception { // Create invalid tool request Map<String, Object> request = new HashMap<>(); request.put("toolName", "calculator"); Map<String, Object> parameters = new HashMap<>(); parameters.put("operation", "divide"); parameters.put("a", 10); // Missing parameter "b" request.put("parameters", parameters); // Send request and verify error response mockMvc.perform(post("/mcp/execute") .contentType(MediaType.APPLICATION_JSON) .content(objectMapper.writeValueAsString(request))) .andExpect(status().isBadRequest()) .andExpect(jsonPath("$.error").exists()); } }

3. 端到端测试

测试从模型提示到工具执行的完整工作流:

@pytest.mark.asyncio async def test_model_interaction_with_tool(): # Arrange - Set up MCP client and mock model mcp_client = McpClient(server_url="http://localhost:5000") # Mock model responses mock_model = MockLanguageModel([ MockResponse( "What's the weather in Seattle?", tool_calls=[{ "tool_name": "weatherForecast", "parameters": {"location": "Seattle", "days": 3} }] ), MockResponse( "Here's the weather forecast for Seattle:\n- Today: 65°F, Partly Cloudy\n- Tomorrow: 68°F, Sunny\n- Day after: 62°F, Rain", tool_calls=[] ) ]) # Mock weather tool response with aioresponses() as mocked: mocked.post( "http://localhost:5000/mcp/execute", payload={ "result": { "location": "Seattle", "forecast": [ {"date": "2023-06-01", "temperature": 65, "conditions": "Partly Cloudy"}, {"date": "2023-06-02", "temperature": 68, "conditions": "Sunny"}, {"date": "2023-06-03", "temperature": 62, "conditions": "Rain"} ] } } ) # Act response = await mcp_client.send_prompt( "What's the weather in Seattle?", model=mock_model, allowed_tools=["weatherForecast"] ) # Assert assert "Seattle" in response.generated_text assert "65" in response.generated_text assert "Sunny" in response.generated_text assert "Rain" in response.generated_text assert len(response.tool_calls) == 1 assert response.tool_calls[0].tool_name == "weatherForecast"

性能测试

1. 负载测试

测试 MCP 服务器可处理的并发请求数量:

[Fact] public async Task McpServer_HandlesHighConcurrency() { // Arrange var server = new McpServer( name: "TestServer", version: "1.0", maxConcurrentRequests: 100 ); server.RegisterTool(new FastExecutingTool()); await server.StartAsync(); var client = new McpClient("http://localhost:5000"); // Act var tasks = new List<Task<McpResponse>>(); for (int i = 0; i < 1000; i++) { tasks.Add(client.ExecuteToolAsync("fastTool", new { iteration = i })); } var results = await Task.WhenAll(tasks); // Assert Assert.Equal(1000, results.Length); Assert.All(results, r => Assert.NotNull(r)); }

2. 压力测试

测试系统在极端负载下的表现:

@Test public void testServerUnderStress() { int maxUsers = 1000; int rampUpTimeSeconds = 60; int testDurationSeconds = 300; // Set up JMeter for stress testing StandardJMeterEngine jmeter = new StandardJMeterEngine(); // Configure JMeter test plan HashTree testPlanTree = new HashTree(); // Create test plan, thread group, samplers, etc. TestPlan testPlan = new TestPlan("MCP Server Stress Test"); testPlanTree.add(testPlan); ThreadGroup threadGroup = new ThreadGroup(); threadGroup.setNumThreads(maxUsers); threadGroup.setRampUp(rampUpTimeSeconds); threadGroup.setScheduler(true); threadGroup.setDuration(testDurationSeconds); testPlanTree.add(threadGroup); // Add HTTP sampler for tool execution HTTPSampler toolExecutionSampler = new HTTPSampler(); toolExecutionSampler.setDomain("localhost"); toolExecutionSampler.setPort(5000); toolExecutionSampler.setPath("/mcp/execute"); toolExecutionSampler.setMethod("POST"); toolExecutionSampler.addArgument("toolName", "calculator"); toolExecutionSampler.addArgument("parameters", "{\"operation\":\"add\",\"a\":5,\"b\":7}"); threadGroup.add(toolExecutionSampler); // Add listeners SummaryReport summaryReport = new SummaryReport(); threadGroup.add(summaryReport); // Run test jmeter.configure(testPlanTree); jmeter.run(); // Validate results assertEquals(0, summaryReport.getErrorCount()); assertTrue(summaryReport.getAverage() < 200); // Average response time < 200ms assertTrue(summaryReport.getPercentile(90.0) < 500); // 90th percentile < 500ms }

3. 监控与性能分析

设置监控以进行长期性能分析:

# Configure monitoring for an MCP server def configure_monitoring(server): # Set up Prometheus metrics prometheus_metrics = { "request_count": Counter("mcp_requests_total", "Total MCP requests"), "request_latency": Histogram( "mcp_request_duration_seconds", "Request duration in seconds", buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0] ), "tool_execution_count": Counter( "mcp_tool_executions_total", "Tool execution count", labelnames=["tool_name"] ), "tool_execution_latency": Histogram( "mcp_tool_duration_seconds", "Tool execution duration in seconds", labelnames=["tool_name"], buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0] ), "tool_errors": Counter( "mcp_tool_errors_total", "Tool execution errors", labelnames=["tool_name", "error_type"] ) } # Add middleware for timing and recording metrics server.add_middleware(PrometheusMiddleware(prometheus_metrics)) # Expose metrics endpoint @server.router.get("/metrics") async def metrics(): return generate_latest() return server

MCP 工作流设计模式

设计良好的 MCP 工作流可提升效率、可靠性和可维护性。以下是关键模式:

1. 工具链模式

将多个工具串联,每个工具的输出作为下一个工具的输入:

# Python Chain of Tools implementation class ChainWorkflow: def __init__(self, tools_chain): self.tools_chain = tools_chain # List of tool names to execute in sequence async def execute(self, mcp_client, initial_input): current_result = initial_input all_results = {"input": initial_input} for tool_name in self.tools_chain: # Execute each tool in the chain, passing previous result response = await mcp_client.execute_tool(tool_name, current_result) # Store result and use as input for next tool all_results[tool_name] = response.result current_result = response.result return { "final_result": current_result, "all_results": all_results } # Example usage data_processing_chain = ChainWorkflow([ "dataFetch", "dataCleaner", "dataAnalyzer", "dataVisualizer" ]) result = await data_processing_chain.execute( mcp_client, {"source": "sales_database", "table": "transactions"} )

2. 调度器模式

使用中央工具根据输入调度到不同的专业工具:

public class ContentDispatcherTool : IMcpTool { private readonly IMcpClient _mcpClient; public ContentDispatcherTool(IMcpClient mcpClient) { _mcpClient = mcpClient; } public string Name => "contentProcessor"; public string Description => "Processes content of various types"; public object GetSchema() { return new { type = "object", properties = new { content = new { type = "string" }, contentType = new { type = "string", enum = new[] { "text", "html", "markdown", "csv", "code" } }, operation = new { type = "string", enum = new[] { "summarize", "analyze", "extract", "convert" } } }, required = new[] { "content", "contentType", "operation" } }; } public async Task<ToolResponse> ExecuteAsync(ToolRequest request) { var content = request.Parameters.GetProperty("content").GetString(); var contentType = request.Parameters.GetProperty("contentType").GetString(); var operation = request.Parameters.GetProperty("operation").GetString(); // Determine which specialized tool to use string targetTool = DetermineTargetTool(contentType, operation); // Forward to the specialized tool var specializedResponse = await _mcpClient.ExecuteToolAsync( targetTool, new { content, options = GetOptionsForTool(targetTool, operation) } ); return new ToolResponse { Result = specializedResponse.Result }; } private string DetermineTargetTool(string contentType, string operation) { return (contentType, operation) switch { ("text", "summarize") => "textSummarizer", ("text", "analyze") => "textAnalyzer", ("html", _) => "htmlProcessor", ("markdown", _) => "markdownProcessor", ("csv", _) => "csvProcessor", ("code", _) => "codeAnalyzer", _ => throw new ToolExecutionException($"No tool available for {contentType}/{operation}") }; } private object GetOptionsForTool(string toolName, string operation) { // Return appropriate options for each specialized tool return toolName switch { "textSummarizer" => new { length = "medium" }, "htmlProcessor" => new { cleanUp = true, operation }, // Options for other tools... _ => new { } }; } }

3. 并行处理模式

同时执行多个工具以提高效率:

public class ParallelDataProcessingWorkflow { private final McpClient mcpClient; public ParallelDataProcessingWorkflow(McpClient mcpClient) { this.mcpClient = mcpClient; } public WorkflowResult execute(String datasetId) { // Step 1: Fetch dataset metadata (synchronous) ToolResponse metadataResponse = mcpClient.executeTool("datasetMetadata", Map.of("datasetId", datasetId)); // Step 2: Launch multiple analyses in parallel CompletableFuture<ToolResponse> statisticalAnalysis = CompletableFuture.supplyAsync(() -> mcpClient.executeTool("statisticalAnalysis", Map.of( "datasetId", datasetId, "type", "comprehensive" )) ); CompletableFuture<ToolResponse> correlationAnalysis = CompletableFuture.supplyAsync(() -> mcpClient.executeTool("correlationAnalysis", Map.of( "datasetId", datasetId, "method", "pearson" )) ); CompletableFuture<ToolResponse> outlierDetection = CompletableFuture.supplyAsync(() -> mcpClient.executeTool("outlierDetection", Map.of( "datasetId", datasetId, "sensitivity", "medium" )) ); // Wait for all parallel tasks to complete CompletableFuture<Void> allAnalyses = CompletableFuture.allOf( statisticalAnalysis, correlationAnalysis, outlierDetection ); allAnalyses.join(); // Wait for completion // Step 3: Combine results Map<String, Object> combinedResults = new HashMap<>(); combinedResults.put("metadata", metadataResponse.getResult()); combinedResults.put("statistics", statisticalAnalysis.join().getResult()); combinedResults.put("correlations", correlationAnalysis.join().getResult()); combinedResults.put("outliers", outlierDetection.join().getResult()); // Step 4: Generate summary report ToolResponse summaryResponse = mcpClient.executeTool("reportGenerator", Map.of("analysisResults", combinedResults)); // Return complete workflow result WorkflowResult result = new WorkflowResult(); result.setDatasetId(datasetId); result.setAnalysisResults(combinedResults); result.setSummaryReport(summaryResponse.getResult()); return result; } }

4. 错误恢复模式

为工具失败实现优雅的回退机制:

class ResilientWorkflow: def __init__(self, mcp_client): self.client = mcp_client async def execute_with_fallback(self, primary_tool, fallback_tool, parameters): try: # Try primary tool first response = await self.client.execute_tool(primary_tool, parameters) return { "result": response.result, "source": "primary", "tool": primary_tool } except ToolExecutionException as e: # Log the failure logging.warning(f"Primary tool '{primary_tool}' failed: {str(e)}") # Fall back to secondary tool try: # Might need to transform parameters for fallback tool fallback_params = self._adapt_parameters(parameters, primary_tool, fallback_tool) response = await self.client.execute_tool(fallback_tool, fallback_params) return { "result": response.result, "source": "fallback", "tool": fallback_tool, "primaryError": str(e) } except ToolExecutionException as fallback_error: # Both tools failed logging.error(f"Both primary and fallback tools failed. Fallback error: {str(fallback_error)}") raise WorkflowExecutionException( f"Workflow failed: primary error: {str(e)}; fallback error: {str(fallback_error)}" ) def _adapt_parameters(self, params, from_tool, to_tool): """Adapt parameters between different tools if needed""" # This implementation would depend on the specific tools # For this example, we'll just return the original parameters return params # Example usage async def get_weather(workflow, location): return await workflow.execute_with_fallback( "premiumWeatherService", # Primary (paid) weather API "basicWeatherService", # Fallback (free) weather API {"location": location} )

5. 工作流组合模式

通过组合简单工作流构建复杂工作流:

public class CompositeWorkflow : IWorkflow { private readonly List<IWorkflow> _workflows; public CompositeWorkflow(IEnumerable<IWorkflow> workflows) { _workflows = new List<IWorkflow>(workflows); } public async Task<WorkflowResult> ExecuteAsync(WorkflowContext context) { var results = new Dictionary<string, object>(); foreach (var workflow in _workflows) { var workflowResult = await workflow.ExecuteAsync(context); // Store each workflow's result results[workflow.Name] = workflowResult; // Update context with the result for the next workflow context = context.WithResult(workflow.Name, workflowResult); } return new WorkflowResult(results); } public string Name => "CompositeWorkflow"; public string Description => "Executes multiple workflows in sequence"; } // Example usage var documentWorkflow = new CompositeWorkflow(new IWorkflow[] { new DocumentFetchWorkflow(), new DocumentProcessingWorkflow(), new InsightGenerationWorkflow(), new ReportGenerationWorkflow() }); var result = await documentWorkflow.ExecuteAsync(new WorkflowContext { Parameters = new { documentId = "12345" } });

MCP 服务器测试:最佳实践与实用技巧

概述

测试是开发可靠、高质量 MCP 服务器的重要环节。本指南涵盖从单元测试、集成测试到端到端验证的全面最佳实践和技巧。

MCP 服务器测试的重要性

MCP 服务器作为 AI 模型与客户端应用之间的重要中间件,彻底测试能够确保:

  • 生产环境中的可靠性
  • 请求和响应的准确处理
  • MCP 规范的正确实现
  • 对故障和边界情况的韧性
  • 在不同负载下的稳定性能

MCP 服务器的单元测试

单元测试(基础层)

单元测试验证 MCP 服务器中各个组件的独立功能。

测试内容

  1. 资源处理器:独立测试每个资源处理器的逻辑
  2. 工具实现:验证工具在各种输入下的行为
  3. 提示模板:确保提示模板正确渲染
  4. Schema 验证:测试参数验证逻辑
  5. 错误处理:验证无效输入的错误响应

单元测试最佳实践

// Example unit test for a calculator tool in C# [Fact] public async Task CalculatorTool_Add_ReturnsCorrectSum() { // Arrange var calculator = new CalculatorTool(); var parameters = new Dictionary<string, object> { ["operation"] = "add", ["a"] = 5, ["b"] = 7 }; // Act var response = await calculator.ExecuteAsync(parameters); var result = JsonSerializer.Deserialize<CalculationResult>(response.Content[0].ToString()); // Assert Assert.Equal(12, result.Value); }
# Example unit test for a calculator tool in Python def test_calculator_tool_add(): # Arrange calculator = CalculatorTool() parameters = { "operation": "add", "a": 5, "b": 7 } # Act response = calculator.execute(parameters) result = json.loads(response.content[0].text) # Assert assert result["value"] == 12

集成测试(中间层)

集成测试验证 MCP 服务器组件间的交互。

测试内容

  1. 服务器初始化:测试服务器在不同配置下的启动
  2. 路由注册:验证所有端点正确注册
  3. 请求处理:测试完整的请求-响应流程
  4. 错误传播:确保错误在组件间正确传递和处理
  5. 认证与授权:测试安全机制

集成测试最佳实践

// Example integration test for MCP server in C# [Fact] public async Task Server_ProcessToolRequest_ReturnsValidResponse() { // Arrange var server = new McpServer(); server.RegisterTool(new CalculatorTool()); await server.StartAsync(); var request = new McpRequest { Tool = "calculator", Parameters = new Dictionary<string, object> { ["operation"] = "multiply", ["a"] = 6, ["b"] = 7 } }; // Act var response = await server.ProcessRequestAsync(request); // Assert Assert.NotNull(response); Assert.Equal(McpStatusCodes.Success, response.StatusCode); // Additional assertions for response content // Cleanup await server.StopAsync(); }

端到端测试(顶层)

端到端测试验证从客户端到服务器的完整系统行为。

测试内容

  1. 客户端-服务器通信:测试完整请求-响应周期
  2. 真实客户端 SDK:使用实际客户端实现进行测试
  3. 负载下性能:验证多并发请求下的表现
  4. 错误恢复:测试系统从故障中的恢复能力
  5. 长时操作:验证流式和长时间操作的处理

端到端测试最佳实践

// Example E2E test with a client in TypeScript describe('MCP Server E2E Tests', () => { let client: McpClient; beforeAll(async () => { // Start server in test environment await startTestServer(); client = new McpClient('http://localhost:5000'); }); afterAll(async () => { await stopTestServer(); }); test('Client can invoke calculator tool and get correct result', async () => { // Act const response = await client.invokeToolAsync('calculator', { operation: 'divide', a: 20, b: 4 }); // Assert expect(response.statusCode).toBe(200); expect(response.content[0].text).toContain('5'); }); });

MCP 测试中的 Mock 策略

Mock 对于隔离组件进行测试至关重要。

需要 Mock 的组件

  1. 外部 AI 模型:Mock 模型响应以实现可预测测试
  2. 外部服务:Mock API 依赖(数据库、第三方服务)
  3. 认证服务:Mock 身份提供者
  4. 资源提供者:Mock 高耗资源处理器

示例:Mock AI 模型响应

// C# example with Moq var mockModel = new Mock<ILanguageModel>(); mockModel .Setup(m => m.GenerateResponseAsync( It.IsAny<string>(), It.IsAny<McpRequestContext>())) .ReturnsAsync(new ModelResponse { Text = "Mocked model response", FinishReason = FinishReason.Completed }); var server = new McpServer(modelClient: mockModel.Object);
# Python example with unittest.mock @patch('mcp_server.models.OpenAIModel') def test_with_mock_model(mock_model): # Configure mock mock_model.return_value.generate_response.return_value = { "text": "Mocked model response", "finish_reason": "completed" } # Use mock in test server = McpServer(model_client=mock_model) # Continue with test

性能测试

性能测试对生产环境 MCP 服务器至关重要。

测量指标

  1. 延迟:请求响应时间
  2. 吞吐量:每秒处理请求数
  3. 资源利用率:CPU、内存、网络使用情况
  4. 并发处理能力:并行请求下的表现
  5. 扩展特性:负载增加时的性能变化

性能测试工具

  • k6:开源负载测试工具
  • JMeter:全面的性能测试工具
  • Locust:基于 Python 的负载测试
  • Azure Load Testing:云端性能测试服务

示例:使用 k6 进行基础负载测试

// k6 script for load testing MCP server import http from 'k6/http'; import { check, sleep } from 'k6'; export const options = { vus: 10, // 10 virtual users duration: '30s', }; export default function () { const payload = JSON.stringify({ tool: 'calculator', parameters: { operation: 'add', a: Math.floor(Math.random() * 100), b: Math.floor(Math.random() * 100) } }); const params = { headers: { 'Content-Type': 'application/json', 'Authorization': 'Bearer test-token' }, }; const res = http.post('http://localhost:5000/api/tools/invoke', payload, params); check(res, { 'status is 200': (r) => r.status === 200, 'response time < 500ms': (r) => r.timings.duration < 500, }); sleep(1); }

MCP 服务器的测试自动化

自动化测试确保质量一致,反馈更快。

CI/CD 集成

  1. 在 Pull Request 时运行单元测试:确保代码变更不破坏现有功能
  2. 在预发布环境运行集成测试:验证集成效果
  3. 性能基线维护:保持性能基准,防止回退
  4. 安全扫描:将安全测试纳入流水线

示例 CI 流水线(GitHub Actions)

name: MCP Server Tests on: push: branches: [ main ] pull_request: branches: [ main ] jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Set up Runtime uses: actions/setup-dotnet@v1 with: dotnet-version: '8.0.x' - name: Restore dependencies run: dotnet restore - name: Build run: dotnet build --no-restore - name: Unit Tests run: dotnet test --no-build --filter Category=Unit - name: Integration Tests run: dotnet test --no-build --filter Category=Integration - name: Performance Tests run: dotnet run --project tests/PerformanceTests/PerformanceTests.csproj

MCP 规范合规性测试

验证服务器正确实现 MCP 规范。

关键合规领域

  1. API 端点:测试必需端点(/resources, /tools 等)
  2. 请求/响应格式:验证 schema 合规性
  3. 错误码:确保各种场景下状态码正确
  4. 内容类型:测试不同内容类型的处理
  5. 认证流程:验证符合规范的认证机制

合规测试套件

[Fact] public async Task Server_ResourceEndpoint_ReturnsCorrectSchema() { // Arrange var client = new HttpClient(); client.DefaultRequestHeaders.Add("Authorization", "Bearer test-token"); // Act var response = await client.GetAsync("http://localhost:5000/api/resources"); var content = await response.Content.ReadAsStringAsync(); var resources = JsonSerializer.Deserialize<ResourceList>(content); // Assert Assert.Equal(HttpStatusCode.OK, response.StatusCode); Assert.NotNull(resources); Assert.All(resources.Resources, resource => { Assert.NotNull(resource.Id); Assert.NotNull(resource.Type); // Additional schema validation }); }

MCP 服务器测试十大实用技巧

  1. 单独测试工具定义:独立验证 schema 定义与工具逻辑
  2. 使用参数化测试:用多样输入(含边界情况)测试工具
  3. 检查错误响应:确保所有错误情况均正确处理
  4. 测试授权逻辑:验证不同用户角色的访问控制
  5. 监控测试覆盖率:确保关键代码路径高覆盖率
  6. 测试流式响应:验证流式内容处理
  7. 模拟网络问题:测试网络不佳情况下的表现
  8. 测试资源限制:验证达到配额或速率限制时的行为
  9. 自动化回归测试:建立每次代码变更都运行的测试套件
  10. 文档化测试用例:保持测试场景清晰文档

常见测试陷阱

  • 过度依赖正常路径测试:务必充分测试错误情况
  • 忽视性能测试:提前发现瓶颈,避免生产问题
  • 仅做孤立测试:结合单元、集成和端到端测试
  • API 覆盖不全:确保所有端点和功能均被测试
  • 测试环境不一致:使用容器确保环境一致性

结论

全面的测试策略是开发可靠、高质量 MCP 服务器的基石。通过实施本指南中的最佳实践和技巧,你可以确保 MCP 实现达到最高的质量、可靠性和性能标准。

关键要点

  1. 工具设计:遵循单一职责原则,使用依赖注入,设计可组合工具
  2. Schema 设计:创建清晰、文档完善且具备正确验证约束的 schema
  3. 错误处理:实现优雅的错误处理、结构化错误响应及重试机制
  4. 性能:利用缓存、异步处理和资源限流
  5. 安全:全面的输入验证、授权检查及敏感数据处理
  6. 测试:构建全面的单元、集成和端到端测试
  7. 工作流模式:应用链式、调度器和并行处理等成熟模式

练习

设计一个文档处理系统的 MCP 工具和工作流,要求:

  1. 支持多种格式的文档(PDF、DOCX、TXT)
  2. 从文档中提取文本和关键信息
  3. 根据类型和内容对文档进行分类
  4. 生成每个文档的摘要

实现工具的 schema、错误处理,并设计最适合该场景的工作流模式。考虑如何测试该实现。

资源

  1. 加入 MCP 社区,关注最新动态:Azure AI Foundry Discord Community
  2. 参与开源 MCP 项目
  3. 在自身组织的 AI 计划中应用 MCP 原则
  4. 探索针对行业的专业 MCP 实现
  5. 考虑参加高级 MCP 主题课程,如多模态集成或企业应用集成
  6. 通过 Hands on Lab 实践学习 MCP 工具和工作流构建

下一章:最佳实践案例研究

免责声明
本文件使用 AI 翻译服务 Co-op Translator 进行翻译。虽然我们力求准确,但请注意,自动翻译可能存在错误或不准确之处。原始语言的文档应被视为权威来源。对于重要信息,建议使用专业人工翻译。因使用本翻译而产生的任何误解或误释,我们概不负责。


发布者: 作者: 转发
评论区 (0)
U