MCP (Model Context Protocol) 深度解析

Dec 24, 2025·
1ch0
1ch0
· 7 min read

MCP (Model Context Protocol) 深度解析

MCP 简介

Model Context Protocol (MCP) 是由Anthropic开发的开放标准,用于AI模型与外部数据源和工具的安全、标准化通信。MCP使AI助手能够安全地访问本地和远程资源,同时保持用户控制和透明度。

核心概念

1. 协议架构

┌─────────────────┐    MCP Protocol    ┌─────────────────┐
│   MCP Client    │◄──────────────────►│   MCP Server    │
│  (AI Assistant) │                    │ (Tool Provider) │
└─────────────────┘                    └─────────────────┘

2. 主要组件

  • MCP Client: AI助手或应用程序
  • MCP Server: 提供工具和资源的服务
  • Transport Layer: 通信传输层(stdio, HTTP, WebSocket等)

MCP 协议规范

1. 消息格式

{
  "jsonrpc": "2.0",
  "id": "unique-request-id",
  "method": "method_name",
  "params": {
    "parameter1": "value1",
    "parameter2": "value2"
  }
}

2. 核心方法

初始化

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "initialize",
  "params": {
    "protocolVersion": "2024-11-05",
    "capabilities": {
      "tools": {},
      "resources": {},
      "prompts": {}
    },
    "clientInfo": {
      "name": "example-client",
      "version": "1.0.0"
    }
  }
}

工具列表

{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "tools/list",
  "params": {}
}

工具调用

{
  "jsonrpc": "2.0",
  "id": 3,
  "method": "tools/call",
  "params": {
    "name": "get_weather",
    "arguments": {
      "location": "San Francisco",
      "units": "celsius"
    }
  }
}

构建MCP服务器

1. Python实现示例

import asyncio
import json
from typing import Any, Dict, List
from mcp import Server, types
from mcp.server.models import InitializationOptions
import httpx

class WeatherMCPServer:
    def __init__(self):
        self.server = Server("weather-server")
        self.setup_handlers()
    
    def setup_handlers(self):
        @self.server.list_tools()
        async def handle_list_tools() -> List[types.Tool]:
            """返回可用工具列表"""
            return [
                types.Tool(
                    name="get_weather",
                    description="获取指定城市的天气信息",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "location": {
                                "type": "string",
                                "description": "城市名称"
                            },
                            "units": {
                                "type": "string",
                                "enum": ["celsius", "fahrenheit"],
                                "description": "温度单位",
                                "default": "celsius"
                            }
                        },
                        "required": ["location"]
                    }
                ),
                types.Tool(
                    name="get_forecast",
                    description="获取未来几天的天气预报",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "location": {"type": "string"},
                            "days": {
                                "type": "integer",
                                "minimum": 1,
                                "maximum": 7,
                                "default": 3
                            }
                        },
                        "required": ["location"]
                    }
                )
            ]
        
        @self.server.call_tool()
        async def handle_call_tool(
            name: str, 
            arguments: Dict[str, Any]
        ) -> List[types.TextContent]:
            """处理工具调用"""
            if name == "get_weather":
                return await self.get_weather(arguments)
            elif name == "get_forecast":
                return await self.get_forecast(arguments)
            else:
                raise ValueError(f"未知工具: {name}")
    
    async def get_weather(self, args: Dict[str, Any]) -> List[types.TextContent]:
        """获取当前天气"""
        location = args["location"]
        units = args.get("units", "celsius")
        
        # 模拟API调用
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"https://api.weather.com/current",
                params={"location": location, "units": units}
            )
            weather_data = response.json()
        
        result = f"📍 {location}\n"
        result += f"🌡️ 温度: {weather_data['temperature']}°{'C' if units == 'celsius' else 'F'}\n"
        result += f"☁️ 天气: {weather_data['condition']}\n"
        result += f"💨 风速: {weather_data['wind_speed']} km/h"
        
        return [types.TextContent(type="text", text=result)]
    
    async def get_forecast(self, args: Dict[str, Any]) -> List[types.TextContent]:
        """获取天气预报"""
        location = args["location"]
        days = args.get("days", 3)
        
        # 模拟预报数据
        forecast_data = []
        for i in range(days):
            forecast_data.append({
                "date": f"2025-12-{24+i}",
                "high": 20 + i,
                "low": 10 + i,
                "condition": "晴朗"
            })
        
        result = f"📍 {location} 未来{days}天预报:\n\n"
        for day in forecast_data:
            result += f"📅 {day['date']}\n"
            result += f"🌡️ {day['low']}°C - {day['high']}°C\n"
            result += f"☀️ {day['condition']}\n\n"
        
        return [types.TextContent(type="text", text=result)]

# 运行服务器
async def main():
    server = WeatherMCPServer()
    
    # 使用stdio传输
    from mcp.server.stdio import stdio_server
    async with stdio_server() as (read_stream, write_stream):
        await server.server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="weather-server",
                server_version="1.0.0",
                capabilities=server.server.get_capabilities()
            )
        )

if __name__ == "__main__":
    asyncio.run(main())

2. 资源提供示例

@server.list_resources()
async def handle_list_resources() -> List[types.Resource]:
    """提供可访问的资源"""
    return [
        types.Resource(
            uri="file:///weather-data/current.json",
            name="当前天气数据",
            description="实时天气数据文件",
            mimeType="application/json"
        ),
        types.Resource(
            uri="database://weather/historical",
            name="历史天气数据",
            description="历史天气数据库",
            mimeType="application/json"
        )
    ]

@server.read_resource()
async def handle_read_resource(uri: str) -> str:
    """读取资源内容"""
    if uri == "file:///weather-data/current.json":
        with open("/path/to/weather-data/current.json", "r") as f:
            return f.read()
    elif uri == "database://weather/historical":
        # 从数据库查询历史数据
        return json.dumps(query_historical_weather())
    else:
        raise ValueError(f"未知资源: {uri}")

MCP客户端实现

1. 基础客户端

import asyncio
import json
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

class MCPClient:
    def __init__(self):
        self.session = None
    
    async def connect(self, server_command: List[str]):
        """连接到MCP服务器"""
        server_params = StdioServerParameters(
            command=server_command[0],
            args=server_command[1:] if len(server_command) > 1 else []
        )
        
        self.session = await stdio_client(server_params)
        
        # 初始化连接
        await self.session.initialize()
    
    async def list_tools(self) -> List[Dict]:
        """获取可用工具列表"""
        if not self.session:
            raise RuntimeError("未连接到服务器")
        
        result = await self.session.list_tools()
        return [tool.model_dump() for tool in result.tools]
    
    async def call_tool(self, name: str, arguments: Dict) -> str:
        """调用工具"""
        if not self.session:
            raise RuntimeError("未连接到服务器")
        
        result = await self.session.call_tool(name, arguments)
        
        # 提取文本内容
        content_parts = []
        for content in result.content:
            if hasattr(content, 'text'):
                content_parts.append(content.text)
        
        return "\n".join(content_parts)
    
    async def disconnect(self):
        """断开连接"""
        if self.session:
            await self.session.close()

# 使用示例
async def main():
    client = MCPClient()
    
    try:
        # 连接到天气服务器
        await client.connect(["python", "weather_server.py"])
        
        # 获取工具列表
        tools = await client.list_tools()
        print("可用工具:")
        for tool in tools:
            print(f"- {tool['name']}: {tool['description']}")
        
        # 调用天气工具
        weather = await client.call_tool("get_weather", {
            "location": "北京",
            "units": "celsius"
        })
        print(f"\n天气信息:\n{weather}")
        
        # 获取天气预报
        forecast = await client.call_tool("get_forecast", {
            "location": "上海",
            "days": 5
        })
        print(f"\n天气预报:\n{forecast}")
        
    finally:
        await client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

实际应用场景

1. 文件系统访问

class FileSystemMCPServer:
    def __init__(self, allowed_paths: List[str]):
        self.server = Server("filesystem-server")
        self.allowed_paths = allowed_paths
        self.setup_handlers()
    
    def setup_handlers(self):
        @self.server.list_tools()
        async def handle_list_tools():
            return [
                types.Tool(
                    name="read_file",
                    description="读取文件内容",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "path": {"type": "string", "description": "文件路径"}
                        },
                        "required": ["path"]
                    }
                ),
                types.Tool(
                    name="write_file",
                    description="写入文件内容",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "path": {"type": "string"},
                            "content": {"type": "string"}
                        },
                        "required": ["path", "content"]
                    }
                ),
                types.Tool(
                    name="list_directory",
                    description="列出目录内容",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "path": {"type": "string"}
                        },
                        "required": ["path"]
                    }
                )
            ]
        
        @self.server.call_tool()
        async def handle_call_tool(name: str, arguments: Dict[str, Any]):
            if not self.is_path_allowed(arguments.get("path", "")):
                raise PermissionError("路径不在允许范围内")
            
            if name == "read_file":
                return await self.read_file(arguments["path"])
            elif name == "write_file":
                return await self.write_file(arguments["path"], arguments["content"])
            elif name == "list_directory":
                return await self.list_directory(arguments["path"])
    
    def is_path_allowed(self, path: str) -> bool:
        """检查路径是否被允许"""
        import os
        abs_path = os.path.abspath(path)
        return any(abs_path.startswith(allowed) for allowed in self.allowed_paths)

2. 数据库访问

class DatabaseMCPServer:
    def __init__(self, connection_string: str):
        self.server = Server("database-server")
        self.connection_string = connection_string
        self.setup_handlers()
    
    def setup_handlers(self):
        @self.server.list_tools()
        async def handle_list_tools():
            return [
                types.Tool(
                    name="execute_query",
                    description="执行SQL查询",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "query": {"type": "string", "description": "SQL查询语句"},
                            "params": {
                                "type": "array",
                                "items": {"type": "string"},
                                "description": "查询参数"
                            }
                        },
                        "required": ["query"]
                    }
                ),
                types.Tool(
                    name="get_schema",
                    description="获取数据库架构",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "table_name": {"type": "string"}
                        }
                    }
                )
            ]
        
        @self.server.call_tool()
        async def handle_call_tool(name: str, arguments: Dict[str, Any]):
            if name == "execute_query":
                return await self.execute_query(
                    arguments["query"],
                    arguments.get("params", [])
                )
            elif name == "get_schema":
                return await self.get_schema(arguments.get("table_name"))

3. Web API集成

class WebAPIMCPServer:
    def __init__(self, api_key: str):
        self.server = Server("webapi-server")
        self.api_key = api_key
        self.setup_handlers()
    
    def setup_handlers(self):
        @self.server.list_tools()
        async def handle_list_tools():
            return [
                types.Tool(
                    name="search_web",
                    description="搜索网页内容",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "query": {"type": "string"},
                            "limit": {"type": "integer", "default": 10}
                        },
                        "required": ["query"]
                    }
                ),
                types.Tool(
                    name="get_news",
                    description="获取最新新闻",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "category": {"type": "string"},
                            "country": {"type": "string", "default": "us"}
                        }
                    }
                )
            ]

安全考虑

1. 权限控制

class SecureMCPServer:
    def __init__(self, permissions: Dict[str, List[str]]):
        self.permissions = permissions
        self.current_user = None
    
    def check_permission(self, tool_name: str) -> bool:
        """检查用户权限"""
        if not self.current_user:
            return False
        
        user_permissions = self.permissions.get(self.current_user, [])
        return tool_name in user_permissions or "admin" in user_permissions
    
    @server.call_tool()
    async def handle_call_tool(name: str, arguments: Dict[str, Any]):
        if not self.check_permission(name):
            raise PermissionError(f"用户无权限使用工具: {name}")
        
        # 执行工具逻辑
        return await self.execute_tool(name, arguments)

2. 输入验证

def validate_input(schema: Dict, data: Dict) -> bool:
    """验证输入数据"""
    import jsonschema
    try:
        jsonschema.validate(data, schema)
        return True
    except jsonschema.ValidationError:
        return False

@server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]):
    tool_schema = get_tool_schema(name)
    if not validate_input(tool_schema["inputSchema"], arguments):
        raise ValueError("输入参数不符合要求")
    
    return await execute_tool(name, arguments)

部署和配置

1. 配置文件示例

{
  "servers": {
    "weather": {
      "command": "python",
      "args": ["weather_server.py"],
      "env": {
        "API_KEY": "your-weather-api-key"
      }
    },
    "filesystem": {
      "command": "python",
      "args": ["fs_server.py"],
      "env": {
        "ALLOWED_PATHS": "/home/user/documents,/tmp"
      }
    }
  },
  "security": {
    "auto_approve": ["weather.get_weather"],
    "require_confirmation": ["filesystem.write_file"]
  }
}

2. Docker部署

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

EXPOSE 8080

CMD ["python", "mcp_server.py", "--transport", "http", "--port", "8080"]

最佳实践

1. 错误处理

@server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]):
    try:
        return await execute_tool(name, arguments)
    except FileNotFoundError as e:
        raise MCPError(f"文件未找到: {e}")
    except PermissionError as e:
        raise MCPError(f"权限不足: {e}")
    except Exception as e:
        logger.error(f"工具执行失败: {e}")
        raise MCPError("内部服务器错误")

2. 日志记录

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]):
    logger.info(f"调用工具: {name}, 参数: {arguments}")
    
    start_time = time.time()
    try:
        result = await execute_tool(name, arguments)
        execution_time = time.time() - start_time
        logger.info(f"工具执行成功: {name}, 耗时: {execution_time:.2f}s")
        return result
    except Exception as e:
        logger.error(f"工具执行失败: {name}, 错误: {e}")
        raise

3. 性能优化

from functools import lru_cache
import asyncio

class OptimizedMCPServer:
    def __init__(self):
        self.cache = {}
        self.rate_limiter = {}
    
    @lru_cache(maxsize=100)
    def get_cached_result(self, tool_name: str, args_hash: str):
        """缓存工具结果"""
        return self.cache.get(f"{tool_name}:{args_hash}")
    
    async def rate_limit_check(self, user_id: str) -> bool:
        """速率限制检查"""
        current_time = time.time()
        user_requests = self.rate_limiter.get(user_id, [])
        
        # 清理过期请求
        user_requests = [t for t in user_requests if current_time - t < 60]
        
        if len(user_requests) >= 100:  # 每分钟最多100次请求
            return False
        
        user_requests.append(current_time)
        self.rate_limiter[user_id] = user_requests
        return True

总结

MCP协议为AI模型与外部工具的集成提供了标准化的解决方案。通过MCP,我们可以:

  • 安全地扩展AI模型的能力
  • 标准化工具接口和通信协议
  • 模块化构建复杂的AI应用
  • 透明地控制AI对外部资源的访问

关键优势:

  • 🔒 安全性: 细粒度权限控制
  • 🔌 可扩展性: 标准化接口
  • 🔍 可观测性: 完整的调用日志
  • 🛠️ 易用性: 简单的开发体验

MCP正在成为AI生态系统中的重要标准,值得深入学习和应用。


参考资源: