测试与调试 本实验内容 本实验提供了关于在生产环境中测试和调试 MCP 服务器的全面指导。您将学习如何实施可靠的测试策略、调试复杂问题,并确保 MCP 服务器在各种条件下可靠运行。 概述 测试 MCP 服务器需要采用多层次的方法,包括单元测试、集成测试、性能验证以及真实场景测试。本实验涵盖了从开发到生产监控的完整测试生命周期。 我们的测试策略强调可靠性、安全性和性能,确保您的 MCP 服务器能够处理生产工作负载,同时保持数据完整性和用户体验质量。
本实验提供了关于在生产环境中测试和调试 MCP 服务器的全面指导。您将学习如何实施可靠的测试策略、调试复杂问题,并确保 MCP 服务器在各种条件下可靠运行。
测试 MCP 服务器需要采用多层次的方法,包括单元测试、集成测试、性能验证以及真实场景测试。本实验涵盖了从开发到生产监控的完整测试生命周期。
我们的测试策略强调可靠性、安全性和性能,确保您的 MCP 服务器能够处理生产工作负载,同时保持数据完整性和用户体验质量。
完成本实验后,您将能够:
┌─────────────────────────────────────────────────┐ │ Unit Tests │ │ • Tool execution logic │ │ • Database query validation │ │ • Authentication/authorization │ │ • Embedding generation │ └─────────────────┬───────────────────────────────┘ │ ┌─────────────────▼───────────────────────────────┐ │ Integration Tests │ │ • End-to-end MCP workflows │ │ • Database schema validation │ │ • API endpoint testing │ │ • Multi-tool interactions │ └─────────────────┬───────────────────────────────┘ │ ┌─────────────────▼───────────────────────────────┐ │ Performance Tests │ │ • Load testing under realistic conditions │ │ • Database performance validation │ │ • Memory and resource usage │ │ • Embedding generation performance │ └─────────────────┬───────────────────────────────┘ │ ┌─────────────────▼───────────────────────────────┐ │ E2E Tests │ │ • Complete user workflows │ │ • VS Code integration testing │ │ • Real-world scenario validation │ │ • Cross-browser compatibility │ └─────────────────────────────────────────────────┘
# tests/conftest.py """ Pytest configuration and shared fixtures for MCP server testing. """ import pytest import asyncio import asyncpg import os from typing import AsyncGenerator, Dict, Any from unittest.mock import AsyncMock, Mock import tempfile import shutil from datetime import datetime # Test configuration TEST_DATABASE_URL = "postgresql://test_user:test_pass@localhost:5432/test_retail_db" TEST_STORE_IDS = ['test_seattle', 'test_redmond', 'test_bellevue'] @pytest.fixture(scope="session") def event_loop(): """Create an instance of the default event loop for the test session.""" loop = asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close() @pytest.fixture(scope="session") async def test_database(): """Set up test database with schema and sample data.""" # Create test database connection sys_conn = await asyncpg.connect( "postgresql://postgres:password@localhost:5432/postgres" ) try: # Create test database await sys_conn.execute("DROP DATABASE IF EXISTS test_retail_db") await sys_conn.execute("CREATE DATABASE test_retail_db") finally: await sys_conn.close() # Connect to test database and set up schema test_conn = await asyncpg.connect(TEST_DATABASE_URL) try: # Load schema schema_sql = await load_sql_file("../scripts/create_schema.sql") await test_conn.execute(schema_sql) # Load sample data sample_data_sql = await load_sql_file("../scripts/sample_data.sql") await test_conn.execute(sample_data_sql) yield test_conn finally: await test_conn.close() # Cleanup test database sys_conn = await asyncpg.connect( "postgresql://postgres:password@localhost:5432/postgres" ) try: await sys_conn.execute("DROP DATABASE IF EXISTS test_retail_db") finally: await sys_conn.close() @pytest.fixture async def db_connection(test_database): """Provide a clean database connection for each test.""" conn = await asyncpg.connect(TEST_DATABASE_URL) # Start transaction for test isolation tx = conn.transaction() await tx.start() try: yield conn finally: # Rollback transaction to maintain test isolation await tx.rollback() await conn.close() @pytest.fixture async def mock_embedding_manager(): """Mock embedding manager for testing without Azure OpenAI calls.""" mock_manager = AsyncMock() # Mock embedding generation mock_manager.generate_embedding.return_value = [0.1] * 1536 # Mock embedding mock_manager.generate_embeddings_batch.return_value = [[0.1] * 1536] * 10 # Mock initialization mock_manager.initialize.return_value = None mock_manager.cleanup.return_value = None return mock_manager @pytest.fixture async def test_mcp_server(db_connection, mock_embedding_manager): """Set up test MCP server instance.""" from mcp_server.server import MCPServer from mcp_server.database import DatabaseProvider from mcp_server.config import Config # Create test configuration config = Config() config.database.connection_string = TEST_DATABASE_URL config.server.enable_debug = True # Create database provider db_provider = DatabaseProvider(config.database.connection_string) await db_provider.initialize() # Create MCP server server = MCPServer(config, db_provider) server.embedding_manager = mock_embedding_manager await server.initialize() yield server await server.cleanup() @pytest.fixture def sample_products(): """Sample product data for testing.""" return [ { 'product_id': 'test-product-1', 'product_name': 'Test Running Shoes', 'brand': 'TestBrand', 'price': 99.99, 'product_description': 'Comfortable running shoes for daily training', 'category_name': 'Electronics', 'current_stock': 50 }, { 'product_id': 'test-product-2', 'product_name': 'Test Laptop', 'brand': 'TestTech', 'price': 1299.99, 'product_description': 'High-performance laptop for professional use', 'category_name': 'Electronics', 'current_stock': 25 } ] async def load_sql_file(file_path: str) -> str: """Load SQL file content.""" with open(file_path, 'r') as file: return file.read() # Test data helpers class TestDataHelper: """Helper class for managing test data.""" @staticmethod async def create_test_store(conn: asyncpg.Connection, store_id: str) -> Dict[str, Any]: """Create a test store.""" store_data = { 'store_id': store_id, 'store_name': f'Test Store {store_id}', 'store_location': 'Test Location', 'store_type': 'test', 'region': 'test' } await conn.execute(""" INSERT INTO retail.stores (store_id, store_name, store_location, store_type, region) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (store_id) DO NOTHING """, *store_data.values()) return store_data @staticmethod async def create_test_customer(conn: asyncpg.Connection, store_id: str) -> str: """Create a test customer.""" customer_id = await conn.fetchval(""" INSERT INTO retail.customers ( store_id, first_name, last_name, email, loyalty_tier ) VALUES ($1, $2, $3, $4, $5) RETURNING customer_id """, store_id, 'Test', 'Customer', 'test@example.com', 'bronze') return customer_id @staticmethod async def create_test_product( conn: asyncpg.Connection, store_id: str, product_data: Dict[str, Any] ) -> str: """Create a test product.""" product_id = await conn.fetchval(""" INSERT INTO retail.products ( store_id, sku, product_name, brand, price, product_description, current_stock ) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING product_id """, store_id, f"TEST-{product_data['product_name'][:10]}", product_data['product_name'], product_data['brand'], product_data['price'], product_data['product_description'], product_data['current_stock'] ) return product_id
# tests/test_tools.py """ Comprehensive unit tests for MCP tools. """ import pytest import asyncio from unittest.mock import AsyncMock, patch from datetime import datetime, timedelta from mcp_server.tools.sales_analysis import SalesAnalysisTool from mcp_server.tools.semantic_search import SemanticProductSearchTool from mcp_server.tools.schema_introspection import SchemaIntrospectionTool from tests.conftest import TestDataHelper class TestSalesAnalysisTool: """Test sales analysis tool functionality.""" @pytest.fixture async def sales_tool(self, test_mcp_server): """Create sales analysis tool for testing.""" return SalesAnalysisTool(test_mcp_server.db_provider) async def test_daily_sales_query(self, sales_tool, db_connection): """Test daily sales analysis query.""" # Set up test data store_id = 'test_seattle' await TestDataHelper.create_test_store(db_connection, store_id) customer_id = await TestDataHelper.create_test_customer(db_connection, store_id) # Create test transaction await db_connection.execute(""" INSERT INTO retail.sales_transactions ( store_id, customer_id, transaction_date, total_amount, transaction_type ) VALUES ($1, $2, $3, $4, $5) """, store_id, customer_id, datetime.now(), 150.00, 'sale') # Execute tool result = await sales_tool.execute( query_type='daily_sales', store_id=store_id, start_date=(datetime.now() - timedelta(days=7)).date(), end_date=datetime.now().date() ) # Validate results assert result.success is True assert len(result.data) > 0 assert 'total_revenue' in result.data[0] assert result.metadata['query_type'] == 'daily_sales' async def test_custom_query_validation(self, sales_tool, db_connection): """Test custom query validation.""" store_id = 'test_seattle' await TestDataHelper.create_test_store(db_connection, store_id) # Test valid query valid_query = "SELECT COUNT(*) as customer_count FROM retail.customers" result = await sales_tool.execute( query_type='custom', store_id=store_id, query=valid_query ) assert result.success is True # Test invalid query (should be blocked) invalid_query = "DROP TABLE retail.customers" result = await sales_tool.execute( query_type='custom', store_id=store_id, query=invalid_query ) assert result.success is False assert 'validation failed' in result.error.lower() async def test_store_isolation(self, sales_tool, db_connection): """Test that store isolation works correctly.""" # Create two different stores store1 = 'test_store1' store2 = 'test_store2' await TestDataHelper.create_test_store(db_connection, store1) await TestDataHelper.create_test_store(db_connection, store2) # Create customers in different stores customer1 = await TestDataHelper.create_test_customer(db_connection, store1) customer2 = await TestDataHelper.create_test_customer(db_connection, store2) # Query from store1 should only see store1 data result1 = await sales_tool.execute( query_type='custom', store_id=store1, query="SELECT COUNT(*) as count FROM retail.customers" ) # Query from store2 should only see store2 data result2 = await sales_tool.execute( query_type='custom', store_id=store2, query="SELECT COUNT(*) as count FROM retail.customers" ) assert result1.success is True assert result2.success is True assert result1.data[0]['count'] == 1 assert result2.data[0]['count'] == 1 class TestSemanticSearchTool: """Test semantic search tool functionality.""" @pytest.fixture async def search_tool(self, test_mcp_server): """Create semantic search tool for testing.""" return SemanticProductSearchTool(test_mcp_server.db_provider) async def test_semantic_search_execution(self, search_tool, db_connection, sample_products): """Test semantic search with mock embeddings.""" store_id = 'test_seattle' await TestDataHelper.create_test_store(db_connection, store_id) # Create test products for product_data in sample_products: product_id = await TestDataHelper.create_test_product( db_connection, store_id, product_data ) # Create mock embedding await db_connection.execute(""" INSERT INTO retail.product_embeddings ( product_id, store_id, embedding_text, embedding ) VALUES ($1, $2, $3, $4) """, product_id, store_id, f"{product_data['product_name']} {product_data['brand']}", '[0.1,0.2,0.3]' # Mock embedding ) # Execute search result = await search_tool.execute( query='running shoes', store_id=store_id, limit=10, similarity_threshold=0.0 ) # Validate results assert result.success is True assert len(result.data) > 0 assert 'similarity_score' in result.data[0] assert result.metadata['search_type'] == 'semantic' async def test_search_parameter_validation(self, search_tool): """Test search parameter validation.""" # Test missing query result = await search_tool.execute(store_id='test_store') assert result.success is False assert 'query is required' in result.error.lower() # Test missing store_id result = await search_tool.execute(query='test query') assert result.success is False assert 'store_id is required' in result.error.lower() class TestSchemaIntrospectionTool: """Test schema introspection tool.""" @pytest.fixture async def schema_tool(self, test_mcp_server): """Create schema introspection tool for testing.""" return SchemaIntrospectionTool(test_mcp_server.db_provider) async def test_single_table_schema(self, schema_tool, db_connection): """Test getting schema for a single table.""" result = await schema_tool.execute( table_name='customers', include_constraints=True, include_indexes=True ) assert result.success is True assert result.data['table_name'] == 'customers' assert len(result.data['columns']) > 0 assert 'customer_id' in [col['column_name'] for col in result.data['columns']] async def test_all_tables_schema(self, schema_tool, db_connection): """Test getting schema for all tables.""" result = await schema_tool.execute() assert result.success is True assert result.data['schema_name'] == 'retail' assert len(result.data['tables']) > 0 table_names = [table['table_name'] for table in result.data['tables']] expected_tables = ['customers', 'products', 'sales_transactions'] for expected_table in expected_tables: assert expected_table in table_names
# tests/test_database.py """ Database layer testing including RLS and security. """ import pytest import asyncpg from datetime import datetime from mcp_server.database import DatabaseProvider from tests.conftest import TestDataHelper class TestRowLevelSecurity: """Test Row Level Security implementation.""" async def test_store_context_setting(self, db_connection): """Test that store context is set correctly.""" store_id = 'test_seattle' await TestDataHelper.create_test_store(db_connection, store_id) # Set store context await db_connection.execute("SELECT retail.set_store_context($1)", store_id) # Verify context is set current_store = await db_connection.fetchval( "SELECT current_setting('app.current_store_id', true)" ) assert current_store == store_id async def test_customer_isolation(self, db_connection): """Test that customers are properly isolated by store.""" # Create two stores store1 = 'test_store1' store2 = 'test_store2' await TestDataHelper.create_test_store(db_connection, store1) await TestDataHelper.create_test_store(db_connection, store2) # Create customers in different stores await TestDataHelper.create_test_customer(db_connection, store1) await TestDataHelper.create_test_customer(db_connection, store2) # Set context to store1 and count customers await db_connection.execute("SELECT retail.set_store_context($1)", store1) store1_count = await db_connection.fetchval("SELECT COUNT(*) FROM retail.customers") # Set context to store2 and count customers await db_connection.execute("SELECT retail.set_store_context($1)", store2) store2_count = await db_connection.fetchval("SELECT COUNT(*) FROM retail.customers") # Each store should only see its own customers assert store1_count == 1 assert store2_count == 1 async def test_invalid_store_context(self, db_connection): """Test that invalid store context raises error.""" with pytest.raises(Exception) as exc_info: await db_connection.execute("SELECT retail.set_store_context($1)", 'invalid_store') assert "Store not found" in str(exc_info.value) async def test_cross_store_data_insertion_blocked(self, db_connection): """Test that users cannot insert data for other stores.""" store_id = 'test_seattle' await TestDataHelper.create_test_store(db_connection, store_id) # Set store context await db_connection.execute("SELECT retail.set_store_context($1)", store_id) # Try to insert customer for different store (should fail) with pytest.raises(Exception): await db_connection.execute(""" INSERT INTO retail.customers (store_id, first_name, last_name, email) VALUES ($1, $2, $3, $4) """, 'different_store', 'Test', 'Customer', 'test@example.com') class TestDatabaseProvider: """Test database provider functionality.""" @pytest.fixture async def db_provider(self): """Create database provider for testing.""" provider = DatabaseProvider(TEST_DATABASE_URL) await provider.initialize() yield provider await provider.cleanup() async def test_connection_pooling(self, db_provider): """Test connection pool functionality.""" # Get multiple connections conn1 = await db_provider.get_connection() conn2 = await db_provider.get_connection() assert conn1 is not None assert conn2 is not None assert conn1 != conn2 # Should be different connection objects # Release connections await db_provider.release_connection(conn1) await db_provider.release_connection(conn2) async def test_health_check(self, db_provider): """Test database health check.""" health_status = await db_provider.health_check() assert health_status['status'] == 'healthy' assert 'connection_pool_size' in health_status assert 'database_version' in health_status async def test_connection_recovery(self, db_provider): """Test connection recovery after database issues.""" # This would test connection recovery scenarios # In a real test, you might temporarily break the connection # and verify that the pool recovers # For now, just verify health check works health_status = await db_provider.health_check() assert health_status['status'] == 'healthy'
# tests/test_integration.py """ Integration tests for complete MCP workflows. """ import pytest import json from datetime import datetime, timedelta from mcp_server.server import MCPServer from tests.conftest import TestDataHelper class TestMCPWorkflows: """Test complete MCP server workflows.""" async def test_product_search_workflow(self, test_mcp_server, db_connection, sample_products): """Test complete product search workflow.""" store_id = 'test_seattle' await TestDataHelper.create_test_store(db_connection, store_id) # Create test products with embeddings for product_data in sample_products: product_id = await TestDataHelper.create_test_product( db_connection, store_id, product_data ) # Create embedding for product await db_connection.execute(""" INSERT INTO retail.product_embeddings ( product_id, store_id, embedding_text, embedding ) VALUES ($1, $2, $3, $4) """, product_id, store_id, f"{product_data['product_name']} {product_data['brand']}", '[' + ','.join(['0.1'] * 1536) + ']' # Mock embedding ) # Test semantic search search_result = await test_mcp_server.execute_tool( 'semantic_search_products', { 'query': 'running shoes', 'store_id': store_id, 'limit': 10 } ) assert search_result['success'] is True assert len(search_result['data']) > 0 # Test schema introspection schema_result = await test_mcp_server.execute_tool( 'get_table_schema', {'table_name': 'products'} ) assert schema_result['success'] is True assert schema_result['data']['table_name'] == 'products' async def test_sales_analysis_workflow(self, test_mcp_server, db_connection): """Test sales analysis workflow.""" store_id = 'test_seattle' await TestDataHelper.create_test_store(db_connection, store_id) # Create test customer and product customer_id = await TestDataHelper.create_test_customer(db_connection, store_id) product_id = await TestDataHelper.create_test_product( db_connection, store_id, { 'product_name': 'Test Product', 'brand': 'TestBrand', 'price': 99.99, 'product_description': 'Test product description', 'current_stock': 50 } ) # Create test transaction transaction_id = await db_connection.fetchval(""" INSERT INTO retail.sales_transactions ( store_id, customer_id, transaction_date, total_amount, subtotal, tax_amount, transaction_type ) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING transaction_id """, store_id, customer_id, datetime.now(), 107.99, 99.99, 8.00, 'sale') # Create transaction item await db_connection.execute(""" INSERT INTO retail.sales_transaction_items ( transaction_id, product_id, quantity, unit_price, total_price ) VALUES ($1, $2, $3, $4, $5) """, transaction_id, product_id, 1, 99.99, 99.99) # Test daily sales analysis sales_result = await test_mcp_server.execute_tool( 'execute_sales_query', { 'query_type': 'daily_sales', 'store_id': store_id, 'start_date': (datetime.now() - timedelta(days=1)).date().isoformat(), 'end_date': datetime.now().date().isoformat() } ) assert sales_result['success'] is True assert len(sales_result['data']) > 0 assert sales_result['data'][0]['total_revenue'] == 107.99 async def test_multi_store_workflow(self, test_mcp_server, db_connection): """Test workflows across multiple stores.""" # Create multiple stores stores = ['test_seattle', 'test_redmond', 'test_bellevue'] for store_id in stores: await TestDataHelper.create_test_store(db_connection, store_id) # Create customer in each store await TestDataHelper.create_test_customer(db_connection, store_id) # Test that each store sees only its own data for store_id in stores: schema_result = await test_mcp_server.execute_tool( 'execute_sales_query', { 'query_type': 'custom', 'store_id': store_id, 'query': 'SELECT COUNT(*) as customer_count FROM retail.customers' } ) assert schema_result['success'] is True assert schema_result['data'][0]['customer_count'] == 1 class TestErrorHandling: """Test error handling and edge cases.""" async def test_database_connection_failure(self, test_mcp_server): """Test behavior when database connection fails.""" # Simulate database failure by using invalid connection with patch.object(test_mcp_server.db_provider, 'get_connection') as mock_conn: mock_conn.side_effect = Exception("Database connection failed") result = await test_mcp_server.execute_tool( 'get_table_schema', {'table_name': 'customers'} ) assert result['success'] is False assert 'connection failed' in result['error'].lower() async def test_invalid_tool_parameters(self, test_mcp_server): """Test handling of invalid tool parameters.""" # Missing required parameter result = await test_mcp_server.execute_tool( 'semantic_search_products', {'query': 'test query'} # Missing store_id ) assert result['success'] is False assert 'store_id is required' in result['error'].lower() # Invalid parameter type result = await test_mcp_server.execute_tool( 'semantic_search_products', { 'query': 'test query', 'store_id': 'test_store', 'limit': 'invalid' # Should be integer } ) assert result['success'] is False async def test_sql_injection_prevention(self, test_mcp_server, db_connection): """Test that SQL injection attempts are blocked.""" store_id = 'test_seattle' await TestDataHelper.create_test_store(db_connection, store_id) # Attempt SQL injection malicious_query = "SELECT * FROM retail.customers; DROP TABLE retail.customers; --" result = await test_mcp_server.execute_tool( 'execute_sales_query', { 'query_type': 'custom', 'store_id': store_id, 'query': malicious_query } ) assert result['success'] is False assert 'validation failed' in result['error'].lower()
# tests/test_performance.py """ Performance and load testing for MCP server. """ import pytest import asyncio import time import statistics from concurrent.futures import ThreadPoolExecutor from typing import List, Dict, Any class TestPerformance: """Performance testing for MCP server operations.""" async def test_concurrent_tool_execution(self, test_mcp_server, db_connection): """Test performance under concurrent tool execution.""" store_id = 'test_seattle' await TestDataHelper.create_test_store(db_connection, store_id) # Create test data for i in range(100): await TestDataHelper.create_test_customer(db_connection, store_id) # Define test scenarios async def execute_tool_scenario(): """Execute a tool and measure performance.""" start_time = time.time() result = await test_mcp_server.execute_tool( 'execute_sales_query', { 'query_type': 'custom', 'store_id': store_id, 'query': 'SELECT COUNT(*) as count FROM retail.customers' } ) execution_time = time.time() - start_time return { 'success': result['success'], 'execution_time': execution_time } # Run concurrent executions concurrent_tasks = 20 tasks = [execute_tool_scenario() for _ in range(concurrent_tasks)] start_time = time.time() results = await asyncio.gather(*tasks) total_time = time.time() - start_time # Analyze results successful_executions = [r for r in results if r['success']] execution_times = [r['execution_time'] for r in successful_executions] assert len(successful_executions) == concurrent_tasks assert statistics.mean(execution_times) < 1.0 # Average under 1 second assert max(execution_times) < 5.0 # No execution over 5 seconds assert total_time < 10.0 # All executions under 10 seconds print(f"Concurrent execution stats:") print(f" Total time: {total_time:.2f}s") print(f" Average execution time: {statistics.mean(execution_times):.3f}s") print(f" Max execution time: {max(execution_times):.3f}s") print(f" Min execution time: {min(execution_times):.3f}s") async def test_database_query_performance(self, test_mcp_server, db_connection): """Test database query performance with large datasets.""" store_id = 'test_seattle' await TestDataHelper.create_test_store(db_connection, store_id) # Create large dataset print("Creating test dataset...") for i in range(1000): await TestDataHelper.create_test_customer(db_connection, store_id) # Test various query patterns query_tests = [ { 'name': 'Simple COUNT', 'query': 'SELECT COUNT(*) FROM retail.customers', 'expected_max_time': 0.1 }, { 'name': 'Filtered SELECT', 'query': "SELECT * FROM retail.customers WHERE loyalty_tier = 'bronze' LIMIT 100", 'expected_max_time': 0.5 }, { 'name': 'Aggregation', 'query': 'SELECT loyalty_tier, COUNT(*) FROM retail.customers GROUP BY loyalty_tier', 'expected_max_time': 0.5 } ] for test_case in query_tests: start_time = time.time() result = await test_mcp_server.execute_tool( 'execute_sales_query', { 'query_type': 'custom', 'store_id': store_id, 'query': test_case['query'] } ) execution_time = time.time() - start_time assert result['success'] is True assert execution_time < test_case['expected_max_time'] print(f"Query '{test_case['name']}': {execution_time:.3f}s") async def test_embedding_generation_performance(self, test_mcp_server): """Test embedding generation performance.""" from mcp_server.embeddings.product_embedder import ProductEmbedder # Test with mock embedding manager (no actual API calls) embedder = ProductEmbedder(test_mcp_server.db_provider) embedder.embedding_manager = test_mcp_server.embedding_manager # Test batch embedding generation test_texts = [f"Test product {i} description" for i in range(100)] start_time = time.time() embeddings = await embedder.embedding_manager.generate_embeddings_batch(test_texts) batch_time = time.time() - start_time assert len(embeddings) == 100 assert batch_time < 5.0 # Should complete in under 5 seconds with mocks print(f"Batch embedding generation (100 items): {batch_time:.3f}s") print(f"Average per embedding: {batch_time/100:.4f}s") @pytest.mark.slow async def test_memory_usage(self, test_mcp_server, db_connection): """Test memory usage under load.""" import psutil import os process = psutil.Process(os.getpid()) initial_memory = process.memory_info().rss / 1024 / 1024 # MB store_id = 'test_seattle' await TestDataHelper.create_test_store(db_connection, store_id) # Create substantial dataset for i in range(500): await TestDataHelper.create_test_customer(db_connection, store_id) # Execute multiple operations for i in range(50): await test_mcp_server.execute_tool( 'execute_sales_query', { 'query_type': 'custom', 'store_id': store_id, 'query': 'SELECT * FROM retail.customers LIMIT 100' } ) final_memory = process.memory_info().rss / 1024 / 1024 # MB memory_increase = final_memory - initial_memory # Memory increase should be reasonable (under 100MB for this test) assert memory_increase < 100 print(f"Memory usage:") print(f" Initial: {initial_memory:.1f} MB") print(f" Final: {final_memory:.1f} MB") print(f" Increase: {memory_increase:.1f} MB") class TestScalability: """Test scalability characteristics.""" async def test_response_time_scaling(self, test_mcp_server, db_connection): """Test how response time scales with data size.""" store_id = 'test_seattle' await TestDataHelper.create_test_store(db_connection, store_id) # Test with different data sizes data_sizes = [100, 500, 1000, 2000] response_times = [] for size in data_sizes: # Clear existing data await db_connection.execute("DELETE FROM retail.customers WHERE store_id = $1", store_id) # Create dataset of specified size for i in range(size): await TestDataHelper.create_test_customer(db_connection, store_id) # Measure query time start_time = time.time() result = await test_mcp_server.execute_tool( 'execute_sales_query', { 'query_type': 'custom', 'store_id': store_id, 'query': 'SELECT COUNT(*) FROM retail.customers' } ) execution_time = time.time() - start_time assert result['success'] is True response_times.append(execution_time) print(f"Data size {size}: {execution_time:.3f}s") # Response time should scale reasonably (not exponentially) # Simple count queries should remain fast even with larger datasets for time_val in response_times: assert time_val < 1.0 # All queries under 1 second
# mcp_server/debugging/debug_tools.py """ Advanced debugging tools for MCP server troubleshooting. """ import asyncio import json import time import traceback from typing import Dict, Any, List, Optional from datetime import datetime import logging from contextlib import asynccontextmanager logger = logging.getLogger(__name__) class MCPDebugger: """Comprehensive debugging utilities for MCP server.""" def __init__(self, server_instance): self.server = server_instance self.debug_logs = [] self.performance_metrics = {} self.active_traces = {} @asynccontextmanager async def trace_execution(self, operation_name: str, context: Dict[str, Any] = None): """Trace operation execution with detailed logging.""" trace_id = f"{operation_name}_{int(time.time() * 1000)}" start_time = time.time() trace_info = { 'trace_id': trace_id, 'operation': operation_name, 'start_time': start_time, 'context': context or {}, 'status': 'running' } self.active_traces[trace_id] = trace_info logger.debug(f"Starting trace: {trace_id} - {operation_name}") try: yield trace_info # Success execution_time = time.time() - start_time trace_info.update({ 'status': 'completed', 'execution_time': execution_time }) logger.debug(f"Completed trace: {trace_id} in {execution_time:.3f}s") except Exception as e: # Error execution_time = time.time() - start_time trace_info.update({ 'status': 'error', 'execution_time': execution_time, 'error': str(e), 'traceback': traceback.format_exc() }) logger.error(f"Error in trace: {trace_id} - {str(e)}") raise finally: # Store completed trace self.debug_logs.append(trace_info.copy()) del self.active_traces[trace_id] # Limit debug log size if len(self.debug_logs) > 1000: self.debug_logs = self.debug_logs[-500:] async def debug_tool_execution(self, tool_name: str, parameters: Dict[str, Any]) -> Dict[str, Any]: """Debug tool execution with comprehensive logging.""" async with self.trace_execution(f"tool_execution_{tool_name}", {'parameters': parameters}) as trace: # Pre-execution validation validation_result = await self._validate_tool_parameters(tool_name, parameters) trace['validation'] = validation_result if not validation_result['valid']: return { 'success': False, 'error': f"Parameter validation failed: {validation_result['errors']}", 'debug_info': trace } # Database connection check db_health = await self._check_database_health() trace['database_health'] = db_health # Execute tool with monitoring try: tool_instance = self.server.get_tool(tool_name) if not tool_instance: return { 'success': False, 'error': f"Tool '{tool_name}' not found", 'debug_info': trace } # Monitor resource usage during execution start_memory = await self._get_memory_usage() result = await tool_instance.call(**parameters) end_memory = await self._get_memory_usage() trace.update({ 'memory_start_mb': start_memory, 'memory_end_mb': end_memory, 'memory_used_mb': end_memory - start_memory, 'result_success': result.success, 'result_row_count': result.row_count }) return { 'success': result.success, 'data': result.data, 'error': result.error, 'metadata': result.metadata, 'debug_info': trace } except Exception as e: trace['exception'] = { 'type': type(e).__name__, 'message': str(e), 'traceback': traceback.format_exc() } return { 'success': False, 'error': f"Tool execution failed: {str(e)}", 'debug_info': trace } async def analyze_performance_bottlenecks(self) -> Dict[str, Any]: """Analyze performance bottlenecks from debug logs.""" if not self.debug_logs: return {'message': 'No debug data available'} # Analyze execution times execution_times = {} error_rates = {} memory_usage = {} for log_entry in self.debug_logs[-100:]: # Last 100 entries operation = log_entry['operation'] # Execution time analysis if 'execution_time' in log_entry: if operation not in execution_times: execution_times[operation] = [] execution_times[operation].append(log_entry['execution_time']) # Error rate analysis if operation not in error_rates: error_rates[operation] = {'total': 0, 'errors': 0} error_rates[operation]['total'] += 1 if log_entry['status'] == 'error': error_rates[operation]['errors'] += 1 # Memory usage analysis if 'memory_used_mb' in log_entry: if operation not in memory_usage: memory_usage[operation] = [] memory_usage[operation].append(log_entry['memory_used_mb']) # Calculate statistics performance_stats = {} for operation, times in execution_times.items(): if times: performance_stats[operation] = { 'avg_execution_time': sum(times) / len(times), 'max_execution_time': max(times), 'min_execution_time': min(times), 'execution_count': len(times), 'error_rate': (error_rates[operation]['errors'] / error_rates[operation]['total'] * 100), 'avg_memory_usage': (sum(memory_usage.get(operation, [0])) / len(memory_usage.get(operation, [1]))) } # Identify bottlenecks bottlenecks = [] for operation, stats in performance_stats.items(): if stats['avg_execution_time'] > 2.0: # Slow operations bottlenecks.append({ 'type': 'slow_execution', 'operation': operation, 'avg_time': stats['avg_execution_time'] }) if stats['error_rate'] > 5.0: # High error rate bottlenecks.append({ 'type': 'high_error_rate', 'operation': operation, 'error_rate': stats['error_rate'] }) if stats['avg_memory_usage'] > 100: # High memory usage bottlenecks.append({ 'type': 'high_memory_usage', 'operation': operation, 'memory_mb': stats['avg_memory_usage'] }) return { 'performance_stats': performance_stats, 'bottlenecks': bottlenecks, 'total_operations': len(self.debug_logs), 'analysis_timestamp': datetime.now().isoformat() } async def _validate_tool_parameters(self, tool_name: str, parameters: Dict[str, Any]) -> Dict[str, Any]: """Validate tool parameters against schema.""" try: tool_instance = self.server.get_tool(tool_name) if not tool_instance: return { 'valid': False, 'errors': [f"Tool '{tool_name}' not found"] } schema = tool_instance.get_input_schema() # Basic validation (in production, use jsonschema library) errors = [] required_props = schema.get('required', []) for prop in required_props: if prop not in parameters: errors.append(f"Missing required parameter: {prop}") return { 'valid': len(errors) == 0, 'errors': errors, 'schema': schema } except Exception as e: return { 'valid': False, 'errors': [f"Validation error: {str(e)}"] } async def _check_database_health(self) -> Dict[str, Any]: """Check database health and connectivity.""" try: health_status = await self.server.db_provider.health_check() return { 'healthy': health_status.get('status') == 'healthy', 'details': health_status } except Exception as e: return { 'healthy': False, 'error': str(e) } async def _get_memory_usage(self) -> float: """Get current memory usage in MB.""" try: import psutil import os process = psutil.Process(os.getpid()) return process.memory_info().rss / 1024 / 1024 except: return 0.0 def get_debug_summary(self) -> Dict[str, Any]: """Get summary of debug information.""" recent_logs = self.debug_logs[-50:] if self.debug_logs else [] return { 'total_operations': len(self.debug_logs), 'active_traces': len(self.active_traces), 'recent_operations': [ { 'operation': log['operation'], 'status': log['status'], 'execution_time': log.get('execution_time', 0), 'timestamp': log.get('start_time', 0) } for log in recent_logs ], 'current_traces': list(self.active_traces.keys()) } # Debug tool for direct use class DebugTool: """Interactive debugging tool for MCP server.""" def __init__(self, server_instance): self.debugger = MCPDebugger(server_instance) async def debug_query(self, query: str, store_id: str) -> Dict[str, Any]: """Debug a specific database query.""" return await self.debugger.debug_tool_execution( 'execute_sales_query', { 'query_type': 'custom', 'store_id': store_id, 'query': query } ) async def debug_search(self, query: str, store_id: str) -> Dict[str, Any]: """Debug a semantic search query.""" return await self.debugger.debug_tool_execution( 'semantic_search_products', { 'query': query, 'store_id': store_id, 'limit': 10 } ) async def get_performance_report(self) -> Dict[str, Any]: """Get comprehensive performance report.""" return await self.debugger.analyze_performance_bottlenecks()
完成本实验后,您应该具备以下能力:
✅ 全面的测试框架:针对所有组件的单元测试、集成测试和性能测试
✅ 高级调试工具:复杂的调试工具,支持执行追踪
✅ 性能验证:负载测试和可扩展性分析能力
✅ 安全测试:防止 SQL 注入和验证行级安全(RLS)
✅ 监控集成:性能指标和瓶颈分析能力
✅ CI/CD 准备:自动化测试工作流,支持持续集成
继续学习 实验 09:VS Code 集成,内容包括:
上一节: 实验 07:语义搜索集成
下一节: 实验 09:VS Code 集成
免责声明:
本文档使用AI翻译服务 Co-op Translator 进行翻译。尽管我们努力确保翻译的准确性,但请注意,自动翻译可能包含错误或不准确之处。原始语言的文档应被视为权威来源。对于关键信息,建议使用专业人工翻译。我们对因使用此翻译而产生的任何误解或误读不承担责任。