MCP (Model Context Protocol) 深度解析
MCP (Model Context Protocol) 深度解析
MCP 简介
Model Context Protocol (MCP) 是由Anthropic开发的开放标准,用于AI模型与外部数据源和工具的安全、标准化通信。MCP使AI助手能够安全地访问本地和远程资源,同时保持用户控制和透明度。
核心概念
1. 协议架构
┌─────────────────┐ MCP Protocol ┌─────────────────┐
│ MCP Client │◄──────────────────►│ MCP Server │
│ (AI Assistant) │ │ (Tool Provider) │
└─────────────────┘ └─────────────────┘
2. 主要组件
- MCP Client: AI助手或应用程序
- MCP Server: 提供工具和资源的服务
- Transport Layer: 通信传输层(stdio, HTTP, WebSocket等)
MCP 协议规范
1. 消息格式
{
"jsonrpc": "2.0",
"id": "unique-request-id",
"method": "method_name",
"params": {
"parameter1": "value1",
"parameter2": "value2"
}
}
2. 核心方法
初始化
{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {},
"resources": {},
"prompts": {}
},
"clientInfo": {
"name": "example-client",
"version": "1.0.0"
}
}
}
工具列表
{
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
}
工具调用
{
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {
"name": "get_weather",
"arguments": {
"location": "San Francisco",
"units": "celsius"
}
}
}
构建MCP服务器
1. Python实现示例
import asyncio
import json
from typing import Any, Dict, List
from mcp import Server, types
from mcp.server.models import InitializationOptions
import httpx
class WeatherMCPServer:
def __init__(self):
self.server = Server("weather-server")
self.setup_handlers()
def setup_handlers(self):
@self.server.list_tools()
async def handle_list_tools() -> List[types.Tool]:
"""返回可用工具列表"""
return [
types.Tool(
name="get_weather",
description="获取指定城市的天气信息",
inputSchema={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "城市名称"
},
"units": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "温度单位",
"default": "celsius"
}
},
"required": ["location"]
}
),
types.Tool(
name="get_forecast",
description="获取未来几天的天气预报",
inputSchema={
"type": "object",
"properties": {
"location": {"type": "string"},
"days": {
"type": "integer",
"minimum": 1,
"maximum": 7,
"default": 3
}
},
"required": ["location"]
}
)
]
@self.server.call_tool()
async def handle_call_tool(
name: str,
arguments: Dict[str, Any]
) -> List[types.TextContent]:
"""处理工具调用"""
if name == "get_weather":
return await self.get_weather(arguments)
elif name == "get_forecast":
return await self.get_forecast(arguments)
else:
raise ValueError(f"未知工具: {name}")
async def get_weather(self, args: Dict[str, Any]) -> List[types.TextContent]:
"""获取当前天气"""
location = args["location"]
units = args.get("units", "celsius")
# 模拟API调用
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.weather.com/current",
params={"location": location, "units": units}
)
weather_data = response.json()
result = f"📍 {location}\n"
result += f"🌡️ 温度: {weather_data['temperature']}°{'C' if units == 'celsius' else 'F'}\n"
result += f"☁️ 天气: {weather_data['condition']}\n"
result += f"💨 风速: {weather_data['wind_speed']} km/h"
return [types.TextContent(type="text", text=result)]
async def get_forecast(self, args: Dict[str, Any]) -> List[types.TextContent]:
"""获取天气预报"""
location = args["location"]
days = args.get("days", 3)
# 模拟预报数据
forecast_data = []
for i in range(days):
forecast_data.append({
"date": f"2025-12-{24+i}",
"high": 20 + i,
"low": 10 + i,
"condition": "晴朗"
})
result = f"📍 {location} 未来{days}天预报:\n\n"
for day in forecast_data:
result += f"📅 {day['date']}\n"
result += f"🌡️ {day['low']}°C - {day['high']}°C\n"
result += f"☀️ {day['condition']}\n\n"
return [types.TextContent(type="text", text=result)]
# 运行服务器
async def main():
server = WeatherMCPServer()
# 使用stdio传输
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await server.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="weather-server",
server_version="1.0.0",
capabilities=server.server.get_capabilities()
)
)
if __name__ == "__main__":
asyncio.run(main())
2. 资源提供示例
@server.list_resources()
async def handle_list_resources() -> List[types.Resource]:
"""提供可访问的资源"""
return [
types.Resource(
uri="file:///weather-data/current.json",
name="当前天气数据",
description="实时天气数据文件",
mimeType="application/json"
),
types.Resource(
uri="database://weather/historical",
name="历史天气数据",
description="历史天气数据库",
mimeType="application/json"
)
]
@server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""读取资源内容"""
if uri == "file:///weather-data/current.json":
with open("/path/to/weather-data/current.json", "r") as f:
return f.read()
elif uri == "database://weather/historical":
# 从数据库查询历史数据
return json.dumps(query_historical_weather())
else:
raise ValueError(f"未知资源: {uri}")
MCP客户端实现
1. 基础客户端
import asyncio
import json
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
class MCPClient:
def __init__(self):
self.session = None
async def connect(self, server_command: List[str]):
"""连接到MCP服务器"""
server_params = StdioServerParameters(
command=server_command[0],
args=server_command[1:] if len(server_command) > 1 else []
)
self.session = await stdio_client(server_params)
# 初始化连接
await self.session.initialize()
async def list_tools(self) -> List[Dict]:
"""获取可用工具列表"""
if not self.session:
raise RuntimeError("未连接到服务器")
result = await self.session.list_tools()
return [tool.model_dump() for tool in result.tools]
async def call_tool(self, name: str, arguments: Dict) -> str:
"""调用工具"""
if not self.session:
raise RuntimeError("未连接到服务器")
result = await self.session.call_tool(name, arguments)
# 提取文本内容
content_parts = []
for content in result.content:
if hasattr(content, 'text'):
content_parts.append(content.text)
return "\n".join(content_parts)
async def disconnect(self):
"""断开连接"""
if self.session:
await self.session.close()
# 使用示例
async def main():
client = MCPClient()
try:
# 连接到天气服务器
await client.connect(["python", "weather_server.py"])
# 获取工具列表
tools = await client.list_tools()
print("可用工具:")
for tool in tools:
print(f"- {tool['name']}: {tool['description']}")
# 调用天气工具
weather = await client.call_tool("get_weather", {
"location": "北京",
"units": "celsius"
})
print(f"\n天气信息:\n{weather}")
# 获取天气预报
forecast = await client.call_tool("get_forecast", {
"location": "上海",
"days": 5
})
print(f"\n天气预报:\n{forecast}")
finally:
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
实际应用场景
1. 文件系统访问
class FileSystemMCPServer:
def __init__(self, allowed_paths: List[str]):
self.server = Server("filesystem-server")
self.allowed_paths = allowed_paths
self.setup_handlers()
def setup_handlers(self):
@self.server.list_tools()
async def handle_list_tools():
return [
types.Tool(
name="read_file",
description="读取文件内容",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"}
},
"required": ["path"]
}
),
types.Tool(
name="write_file",
description="写入文件内容",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"}
},
"required": ["path", "content"]
}
),
types.Tool(
name="list_directory",
description="列出目录内容",
inputSchema={
"type": "object",
"properties": {
"path": {"type": "string"}
},
"required": ["path"]
}
)
]
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]):
if not self.is_path_allowed(arguments.get("path", "")):
raise PermissionError("路径不在允许范围内")
if name == "read_file":
return await self.read_file(arguments["path"])
elif name == "write_file":
return await self.write_file(arguments["path"], arguments["content"])
elif name == "list_directory":
return await self.list_directory(arguments["path"])
def is_path_allowed(self, path: str) -> bool:
"""检查路径是否被允许"""
import os
abs_path = os.path.abspath(path)
return any(abs_path.startswith(allowed) for allowed in self.allowed_paths)
2. 数据库访问
class DatabaseMCPServer:
def __init__(self, connection_string: str):
self.server = Server("database-server")
self.connection_string = connection_string
self.setup_handlers()
def setup_handlers(self):
@self.server.list_tools()
async def handle_list_tools():
return [
types.Tool(
name="execute_query",
description="执行SQL查询",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "SQL查询语句"},
"params": {
"type": "array",
"items": {"type": "string"},
"description": "查询参数"
}
},
"required": ["query"]
}
),
types.Tool(
name="get_schema",
description="获取数据库架构",
inputSchema={
"type": "object",
"properties": {
"table_name": {"type": "string"}
}
}
)
]
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]):
if name == "execute_query":
return await self.execute_query(
arguments["query"],
arguments.get("params", [])
)
elif name == "get_schema":
return await self.get_schema(arguments.get("table_name"))
3. Web API集成
class WebAPIMCPServer:
def __init__(self, api_key: str):
self.server = Server("webapi-server")
self.api_key = api_key
self.setup_handlers()
def setup_handlers(self):
@self.server.list_tools()
async def handle_list_tools():
return [
types.Tool(
name="search_web",
description="搜索网页内容",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string"},
"limit": {"type": "integer", "default": 10}
},
"required": ["query"]
}
),
types.Tool(
name="get_news",
description="获取最新新闻",
inputSchema={
"type": "object",
"properties": {
"category": {"type": "string"},
"country": {"type": "string", "default": "us"}
}
}
)
]
安全考虑
1. 权限控制
class SecureMCPServer:
def __init__(self, permissions: Dict[str, List[str]]):
self.permissions = permissions
self.current_user = None
def check_permission(self, tool_name: str) -> bool:
"""检查用户权限"""
if not self.current_user:
return False
user_permissions = self.permissions.get(self.current_user, [])
return tool_name in user_permissions or "admin" in user_permissions
@server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]):
if not self.check_permission(name):
raise PermissionError(f"用户无权限使用工具: {name}")
# 执行工具逻辑
return await self.execute_tool(name, arguments)
2. 输入验证
def validate_input(schema: Dict, data: Dict) -> bool:
"""验证输入数据"""
import jsonschema
try:
jsonschema.validate(data, schema)
return True
except jsonschema.ValidationError:
return False
@server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]):
tool_schema = get_tool_schema(name)
if not validate_input(tool_schema["inputSchema"], arguments):
raise ValueError("输入参数不符合要求")
return await execute_tool(name, arguments)
部署和配置
1. 配置文件示例
{
"servers": {
"weather": {
"command": "python",
"args": ["weather_server.py"],
"env": {
"API_KEY": "your-weather-api-key"
}
},
"filesystem": {
"command": "python",
"args": ["fs_server.py"],
"env": {
"ALLOWED_PATHS": "/home/user/documents,/tmp"
}
}
},
"security": {
"auto_approve": ["weather.get_weather"],
"require_confirmation": ["filesystem.write_file"]
}
}
2. Docker部署
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8080
CMD ["python", "mcp_server.py", "--transport", "http", "--port", "8080"]
最佳实践
1. 错误处理
@server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]):
try:
return await execute_tool(name, arguments)
except FileNotFoundError as e:
raise MCPError(f"文件未找到: {e}")
except PermissionError as e:
raise MCPError(f"权限不足: {e}")
except Exception as e:
logger.error(f"工具执行失败: {e}")
raise MCPError("内部服务器错误")
2. 日志记录
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]):
logger.info(f"调用工具: {name}, 参数: {arguments}")
start_time = time.time()
try:
result = await execute_tool(name, arguments)
execution_time = time.time() - start_time
logger.info(f"工具执行成功: {name}, 耗时: {execution_time:.2f}s")
return result
except Exception as e:
logger.error(f"工具执行失败: {name}, 错误: {e}")
raise
3. 性能优化
from functools import lru_cache
import asyncio
class OptimizedMCPServer:
def __init__(self):
self.cache = {}
self.rate_limiter = {}
@lru_cache(maxsize=100)
def get_cached_result(self, tool_name: str, args_hash: str):
"""缓存工具结果"""
return self.cache.get(f"{tool_name}:{args_hash}")
async def rate_limit_check(self, user_id: str) -> bool:
"""速率限制检查"""
current_time = time.time()
user_requests = self.rate_limiter.get(user_id, [])
# 清理过期请求
user_requests = [t for t in user_requests if current_time - t < 60]
if len(user_requests) >= 100: # 每分钟最多100次请求
return False
user_requests.append(current_time)
self.rate_limiter[user_id] = user_requests
return True
总结
MCP协议为AI模型与外部工具的集成提供了标准化的解决方案。通过MCP,我们可以:
- 安全地扩展AI模型的能力
- 标准化工具接口和通信协议
- 模块化构建复杂的AI应用
- 透明地控制AI对外部资源的访问
关键优势:
- 🔒 安全性: 细粒度权限控制
- 🔌 可扩展性: 标准化接口
- 🔍 可观测性: 完整的调用日志
- 🛠️ 易用性: 简单的开发体验
MCP正在成为AI生态系统中的重要标准,值得深入学习和应用。
参考资源: