前言
很多文档和博客都只介绍如何开发MCP Server,然后集成到VS Code或者Cursor等程序,很少涉及如何开发MCP Host和MCP Client。如果你想要在自己的服务中集成完整的MCP功能,光看这些是远远不够的。所以本文及后续的MCP系列文章都会带你深入了解如何开发MCP Client,让你真正掌握这项技术。
准备开发环境
MCP官方SDK主要支持Python和TypeScript,当然也有其他语言的实现,不过我这里就以Python为例了。我的Python版本是3.13.5,但其实只要高于3.11应该都没问题。
我个人推荐使用uv来管理依赖,当然你也可以用传统的pip。Python SDK有官方的mcp包和社区的FastMCP包。官方SDK其实也内置了FastMCP,不过是v1版本,而FastMCP官网已经更新到了v2版本。作为学习,两个都装上试试也无妨。
# 使用 uv uv add mcp fastmcp # 使用 pip python -m pip install mcp fastmcp
第一个MCP项目:你好,MCP世界!
在第一个MCP项目中,我们实现一个简单的MCP Client和MCP Server,但还没集成LLM。在这个阶段,Client调用Server的tool或resource都需要手动指定。
MCP Server
下面的MCP Server示例代码定义了一些prompts、resources和tools。这里有个小贴士:函数参数的类型注解、返回类型和docstring都一定要写清楚,否则后续集成LLM时,LLM就无法正确理解如何调用你的工具了。
这段Server可以通过stdio方式被Client调用。在正式让Client调用之前,建议你先手动运行一下Server,测试它能否正常启动,避免Client启动时报一堆让人摸不着头脑的错误。
from mcp.server.fastmcp import FastMCP from datetime import datetime import asyncssh from typing import TypeAlias, Union mcp = FastMCP("custom") @mcp.prompt() def greet_user(name: str, style: str = "formal") -> str: """Greet a user with a specified style.""" if style == "formal": return f"Good day, {name}. How do you do?" elif style == "friendly": return f"Hey {name}! What's up?" elif style == "casual": return f"Yo {name}, how's it going?" else: return f"Hello, {name}!" @mcp.resource("greeting://{name}") def greeting_resource(name: str) -> str: """A simple greeting resource.""" return f"Hello, {name}!" @mcp.resource("config://app") def get_config() -> str: """Static configuration data""" return "App configuration here" @mcp.tool() def add(a: int, b: int) -> int: """Add two numbers""" return a + b @mcp.tool() def multiply(a: int, b: int) -> int: """Multiply two numbers""" return a * b Number: TypeAlias = Union[int, float] @mcp.tool() def is_greater_than(a: Number, b: Number) -> Number: """Check if a is greater than b""" return a > b @mcp.tool() async def get_weather(city: str) -> str: """Get weather for a given city.""" return f"It's always sunny in {city}!" @mcp.tool() async def get_date() -> str: """Get today's date.""" return datetime.now().strftime("%Y-%m-%d") @mcp.tool() async def execute_ssh_command_remote(hostname: str, command: str) -> str: """Execute an SSH command on a remote host. Args: hostname (str): The hostname of the remote host. command (str): The SSH command to execute. Returns: str: The output of the SSH command. """ async with asyncssh.connect(hostname, username="rainux", connect_timeout=10) as conn: result = await conn.run(command, timeout=10) stdout = result.stdout stderr = result.stderr content = str(stdout if stdout else stderr) return content if __name__ == "__main__": mcp.run(transport="stdio")
MCP Client
Client通过STDIO方式调用MCP Server,server_params中指定了如何运行Server,包括python解释器路径、Server文件名和运行位置。需要注意的是,Client启动时也会启动Server,如果Server报错,Client也会跟着无法启动。
import asyncio from pathlib import Path from pydantic import AnyUrl from mcp import ClientSession, StdioServerParameters, types from mcp.client.stdio import stdio_client server_params = StdioServerParameters( command=str(Path(__file__).parent / ".venv" / "bin" / "python"), args=[str(Path(__file__).parent / "demo1-server.py")], cwd=str(Path(__file__).parent), ) async def run(): async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: # Initialize the connection await session.initialize() # List available prompts prompts = await session.list_prompts() print(f"Available prompts: {[p.name for p in prompts.prompts]}") # Get a prompt (greet_user prompt from fastmcp_quickstart) if prompts.prompts: prompt = await session.get_prompt("greet_user", arguments={"name": "Alice", "style": "friendly"}) print(f"Prompt result: {prompt.messages[0].content}") # List available resources resources = await session.list_resources() print(f"Available resources: {[r.uri for r in resources.resources]}") # List available tools tools = await session.list_tools() print(f"Available tools: {[t.name for t in tools.tools]}") # Read a resource (greeting resource from fastmcp_quickstart) resource_content = await session.read_resource(AnyUrl("greeting://World")) content_block = resource_content.contents[0] if isinstance(content_block, types.TextResourceContents): print(f"Resource content: {content_block.text}") # Call a tool (add tool from fastmcp_quickstart) result = await session.call_tool("add", arguments={"a": 5, "b": 3}) result_unstructured = result.content[0] if isinstance(result_unstructured, types.TextContent): print(f"Tool result: {result_unstructured.text}") result_structured = result.structuredContent print(f"Structured tool result: {result_structured}") if __name__ == "__main__": asyncio.run(run())
运行Client,输出如下:
Processing request of type ListPromptsRequest Available prompts: ['greet_user'] Processing request of type GetPromptRequest Prompt result: type='text' text="Hey Alice! What's up?" annotations=None meta=None Processing request of type ListResourcesRequest Available resources: [AnyUrl('config://app')] Processing request of type ListToolsRequest Available tools: ['add', 'multiply', 'get_weather', 'get_date', 'execute_ssh_command_remote'] Processing request of type ReadResourceRequest Resource content: Hello, World! Processing request of type CallToolRequest Tool result: 8 Structured tool result: {'result': 8}
可以看到,Client成功地调用了Server上的各种功能,包括获取提示、读取资源和调用工具。
使用streamable-http远程调用:让MCP飞起来!
上面的例子中,Client通过STDIO方式在本地调用Server。现在我们稍作修改,让它可以通过HTTP远程调用Server,这样就更加灵活了。
MCP Server
只列出修改的部分:
mcp = FastMCP("custom", host="localhost", port=8001) if __name__ == "__main__": mcp.run(transport="streamable-http")
修改完成后,启动Server,它会监听在localhost:8001地址上,就像一个小小的Web服务(其实就是个Web服务,暴露的api为/mcp)。
MCP Client
同样只列出修改的部分。Client需要指定MCP Server的地址。streamablehttp_client返回的第三个参数get_session_id用于会话管理,大多数情况下你不需要直接使用它,所以在一些文档中这里会用_来占位。
from mcp.client.streamable_http import streamablehttp_client server_uri = "http://localhost:8001/mcp" async def main(): async with streamablehttp_client(server_uri) as (read, write, get_session_id): # 获取当前会话ID session_id = get_session_id() print(f"Session ID before initialization: {session_id}") async with ClientSession(read, write) as session: # Initialize the connection await session.initialize() # 初始化后再次获取会话ID session_id = get_session_id() print(f"Session ID after initialization: {session_id}")
client运行输出:
Session ID before initialization: None Session ID after initialization: 60ce4204b907469e9eb46e7e01df040d Available prompts: ['greet_user'] Prompt result: type='text' text="Hey Alice! What's up?" annotations=None meta=None Available resources: [AnyUrl('config://app')] Available tools: ['add', 'multiply', 'get_weather', 'get_date', 'execute_ssh_command_remote'] Resource content: Hello, World! Tool result: 8 Structured tool result: {'result': 8}
现在我们的MCP应用已经可以通过网络进行远程调用了,架构变得更加灵活。
集成LLM:让AI自己做决定!
前面两个示例中,我们都需要在Client中手动控制调用Server的tool,这在实际应用中显然是不现实的。我们需要集成LLM,让AI自己决定该调用哪个工具。
MCP Server
Server端不需要做任何变更,Client还是通过HTTP方式调用我们之前创建的Server。
MCP Client
这里我们选用阿里的通义千问(Qwen)。Qwen的API Key可以自行申请,氪个5块钱就够个人开发用很久了。为了便于后续开发,我把配置功能单独放到了一个模块里,下面代码中直接使用了,相关模块放在"补充"部分。
""" MCP (Model Context Protocol) 客户端示例 该客户端演示了如何使用 MCP 协议与 MCP 服务器进行交互,并通过 LLM 调用服务器提供的工具。 工作流程: 1. 连接到 MCP 服务器 2. 获取服务器提供的工具列表 3. 用户输入查询 4. 将查询发送给 LLM,LLM 可能会调用 MCP 服务器提供的工具 5. 执行工具调用并获取结果 6. 将结果返回给 LLM 进行最终回答 """ import asyncio # JSON 处理 import json # 增强输入功能(在某些系统上提供命令历史等功能) import readline # 引入readline模块用于增强python的input功能, Windows下的python标准库可能不包含 # 异常追踪信息 import traceback # 异步上下文管理器,用于资源管理 from contextlib import AsyncExitStack # 类型提示支持 from typing import List, Optional, cast # MCP 客户端会话和 HTTP 传输 from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client # OpenAI 异步客户端,用于与 LLM 通信 from openai import AsyncOpenAI # OpenAI 聊天完成相关的类型定义 from openai.types.chat import (ChatCompletionAssistantMessageParam, ChatCompletionMessageFunctionToolCall, ChatCompletionMessageParam, ChatCompletionMessageToolCall, ChatCompletionToolMessageParam, ChatCompletionToolParam, ChatCompletionUserMessageParam) # 项目配置和日志模块 from pkg.config import cfg from pkg.log import logger class MCPClient: """ MCP 客户端类,负责管理与 MCP 服务器的连接和交互 """ def __init__(self): """ 初始化 MCP 客户端 """ # 客户端会话,初始为空 self.session: Optional[ClientSession] = None # 异步上下文管理栈,用于管理异步资源的生命周期 self.exit_stack = AsyncExitStack() # OpenAI 异步客户端,用于与 LLM 通信 self.client = AsyncOpenAI( base_url=cfg.llm_base_url, api_key=cfg.llm_api_key, ) async def connect_to_server(self, server_uri: str): """ 连接到 MCP 服务器 Args: server_uri (str): MCP 服务器的 URI """ # 创建 Streamable HTTP 传输连接 http_transport = await self.exit_stack.enter_async_context( streamablehttp_client(server_uri) ) # 获取读写流 self.read, self.write, _ = http_transport # 创建并初始化客户端会话 self.session = await self.exit_stack.enter_async_context( ClientSession(self.read, self.write) ) # 初始化会话 await self.session.initialize() # 检查会话是否成功初始化 if self.session is None: raise RuntimeError("Failed to initialize session") # 获取服务器提供的工具列表 response = await self.session.list_tools() tools = response.tools logger.info(f"nConnected to server with tools: {[tool.name for tool in tools]}") async def process_query(self, query: str) -> str: """ 处理用户查询 Args: query (str): 用户的查询 Returns: str: 处理结果 """ # 初始化消息历史,包含用户的查询 messages: List[ChatCompletionMessageParam] = [ ChatCompletionUserMessageParam( role="user", content=query ) ] # 确保会话已初始化 if self.session is None: raise RuntimeError("Session not initialized. Please connect to server first.") # 获取服务器提供的工具列表 response = await self.session.list_tools() # 构建工具列表,处理可能为None的字段 # 这些工具将被传递给 LLM,以便 LLM 知道可以调用哪些工具 available_tools: List[ChatCompletionToolParam] = [] for tool in response.tools: tool_def: ChatCompletionToolParam = { "type": "function", "function": { "name": tool.name, "description": tool.description or "", "parameters": tool.inputSchema or {} } } available_tools.append(tool_def) logger.info(f"Available tools: {available_tools}") # 调用 LLM 进行聊天完成 response = await self.client.chat.completions.create( model=cfg.llm_model, messages=messages, tools=available_tools, ) # 存储最终输出文本 final_text = [] # 获取 LLM 的响应消息 message = response.choices[0].message final_text.append(message.content or "") # 如果 LLM 要求调用工具,则处理工具调用 while message.tool_calls: # 处理每个工具调用 for tool_call in message.tool_calls: # 确保我们处理的是正确的工具调用类型 if hasattr(tool_call, 'function'): # 这是一个函数工具调用 function_call = cast(ChatCompletionMessageFunctionToolCall, tool_call) function = function_call.function tool_name = function.name # 解析工具参数 tool_args = json.loads(function.arguments) else: # 跳过不支持的工具调用类型 continue # 执行工具调用 if self.session is None: raise RuntimeError("Session not initialized. Cannot call tool.") # 调用 MCP 服务器上的工具 result = await self.session.call_tool(tool_name, tool_args) final_text.append(f"[Calling tool {tool_name} with args {tool_args}]") # 将工具调用和结果添加到消息历史 # 这样 LLM 可以知道它之前调用了哪些工具 assistant_msg: ChatCompletionAssistantMessageParam = { "role": "assistant", "tool_calls": [ { "id": tool_call.id, "type": "function", "function": { "name": tool_name, "arguments": json.dumps(tool_args) } } ] } messages.append(assistant_msg) # 添加工具调用结果到消息历史 tool_msg: ChatCompletionToolMessageParam = { "role": "tool", "tool_call_id": tool_call.id, "content": str(result.content) if result.content else "" } messages.append(tool_msg) # 将工具调用的结果交给 LLM,让 LLM 生成最终回答 response = await self.client.chat.completions.create( model=cfg.llm_model, messages=messages, tools=available_tools ) # 获取新的响应消息 message = response.choices[0].message if message.content: final_text.append(message.content) # 返回最终结果 return "n".join(final_text) async def chat_loop(self): """ 运行交互式聊天循环 """ print("nMCP Client Started!") print("Type your queries or 'quit' to exit.") # 持续接收用户输入 while True: try: # 获取用户输入 query = input("nQuery: ").strip() # 检查是否退出 if query.lower() == 'quit': break # 忽略空输入 if not query: continue # 处理用户查询并输出结果 response = await self.process_query(query) print("n" + response) # 异常处理 except Exception as e: print(f"nError: {str(e)}") print(traceback.format_exc()) async def cleanup(self): """ 清理资源 """ await self.exit_stack.aclose() async def main(): """ 主函数 """ # 创建 MCP 客户端实例 client = MCPClient() try: # 连接到 MCP 服务器 await client.connect_to_server("http://localhost:8001/mcp") # 运行聊天循环 await client.chat_loop() except Exception as e: print(f"Error: {str(e)}") finally: # 清理资源 await client.cleanup() # 程序入口点 if __name__ == "__main__": asyncio.run(main())
client运行输出:
MCP Client Started! Type your queries or 'quit' to exit. Query: 今天的日期是什么 [Calling tool get_date with args {}] 今天的日期是2025年9月13日。 Query: 合肥的天气怎么样? [Calling tool get_weather with args {'city': '合肥'}] 合肥的天气总是阳光明媚! Query: 0.11比0.9大吗 [Calling tool is_greater_than with args {'a': 0.11, 'b': 0.9}] 0.11 不比 0.9 大。0.11 小于 0.9。 Query: quit
现在AI可以自己决定调用哪个工具了。当你问"今天的日期是什么"时,它会自动调用get_date工具;当你问"合肥的天气怎么样"时,它会自动调用get_weather工具。这才是真正的智能!
小结
通过这篇文章,我们从零开始构建了一个完整的MCP应用,涵盖了从基础的Client-Server通信到集成LLM的全过程。我们学习了:
- 如何搭建MCP开发环境
- 如何创建MCP Server并定义tools、resources和prompts
- 如何编写MCP Client并通过stdio和HTTP两种方式与Server通信
- 如何集成LLM,让AI自主决定调用哪个工具
整个过程就像搭积木一样,每一步都有其特定的作用:
- Server负责提供功能(工具和资源)
- Client负责协调和调用这些功能
- LLM负责智能决策,决定何时以及如何使用这些功能
这种架构的优势在于功能扩展非常灵活。当你需要添加新功能时,只需要在Server端添加新的tools或resources,Client和LLM会自动发现并使用它们,而不需要修改Client端的代码。
MCP真正实现了"上下文协议"的概念,让AI可以像人类一样访问和操作各种工具和资源,这是迈向更强大AI应用的重要一步。接下来你可以尝试添加更多有趣的工具,比如文件操作、数据库查询、API调用等,让你的AI助手变得更加强大!
补充
配置模块
pkg/config.py
import json from pathlib import Path class Config: def __init__(self): p = Path(__file__).parent.parent / "conf" / "config.json" if not p.exists(): raise FileNotFoundError(f"Config file not found: {p}") self.data = self.read_json(str(p)) def read_json(self, filepath: str) -> dict: with open(filepath, "r") as f: return json.load(f) @property def llm_model(self) -> str: return self.data["llm"]["model"] @property def llm_api_key(self): return self.data["llm"]["api_key"] @property def llm_base_url(self) -> str: return self.data["llm"]["base_url"] @property def server_host(self) -> str: return self.data["server"]["host"] @property def server_port(self) -> int: return self.data["server"]["port"] cfg = Config()
配置文件conf/config.json
{ "llm": { "model": "qwen-plus", "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1", "api_key": "your token" }, "server": { "host": "127.0.0.1", "port": 8000 } }
日志模块
pkg/log.py
import logging import sys def set_formatter(): """设置formatter""" fmt = "%(asctime)s | %(name)s | %(levelname)s | %(filename)s:%(lineno)d | %(funcName)s | %(message)s" datefmt = "%Y-%m-%d %H:%M:%S" return logging.Formatter(fmt, datefmt=datefmt) def set_stream_handler(): return logging.StreamHandler(sys.stdout) def set_file_handler(): return logging.FileHandler("app.log", mode="a", encoding="utf-8") def get_logger(name: str = "mylogger", level=logging.DEBUG): logger = logging.getLogger(name) formatter = set_formatter() # handler = set_stream_handler() handler = set_file_handler() handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(level) return logger logger = get_logger()