- 需要已经安装nodejs以及python环境
- 准备好任意一个大模型的api-key和base_url
- 和上节一样,我们先试试用tool来调用当前时间,使用mcp-server-time 来做演示。
- 使用pip安装一下它。
pip install mcp-server-time
- 安装完成后,我们可以使用下面的代码来测试一下。
import asyncio
from mcp.client.stdio import stdio_client
from mcp import ClientSession, StdioServerParameters
# 连接参数,使用stdio模式连接,即在本地执行本地代码
server_params = StdioServerParameters(
command="python3",
args=[
"-m",
"mcp_server_time",
"--local-timezone=Asia/Shanghai"
],
)
async def main():
async with stdio_client(server_params) as (stdio, write):
async with ClientSession(stdio, write) as session:
# 初始化会话
await session.initialize()
# 列出所有工具
response = await session.list_tools()
print("=============begin list tools =============")
print(response)
print("tools number: ", len(response.tools))
tool_names = [tool.name for tool in response.tools]
print("tools names: ", tool_names)
print("=============end list tools =============")
# 执行其中一个工具,根据上面的描述,以及它需要的参数
response = await session.call_tool("get_current_time", {"timezone": "Asia/Shanghai"})
if not response.isError:
print(response.content[0].text)
else:
print("Error: ", response)
if __name__ == "__main__":
asyncio.run(main())
- 输出结果,可以看到能够准确获取当前时间
=============begin list tools =============
meta=None nextCursor=None tools=[Tool(name='get_current_time', description='Get current time in a specific timezones', inputSchema={'type': 'object', 'properties': {'timezone': {'type': 'string', 'description': "IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use 'Asia/Shanghai' as local timezone if no timezone provided by the user."}}, 'required': ['timezone']}, annotations=None), Tool(name='convert_time', description='Convert time between timezones', inputSchema={'type': 'object', 'properties': {'source_timezone': {'type': 'string', 'description': "Source IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use 'Asia/Shanghai' as local timezone if no source timezone provided by the user."}, 'time': {'type': 'string', 'description': 'Time to convert in 24-hour format (HH:MM)'}, 'target_timezone': {'type': 'string', 'description': "Target IANA timezone name (e.g., 'Asia/Tokyo', 'America/San_Francisco'). Use 'Asia/Shanghai' as local timezone if no target timezone provided by the user."}}, 'required': ['source_timezone', 'time', 'target_timezone']}, annotations=None)]
tools number: 2
tools names: ['get_current_time', 'convert_time']
=============end list tools =============
{
"timezone": "Asia/Shanghai",
"datetime": "2025-05-29T17:53:34+08:00",
"is_dst": false
}
- 不过,上面调用工具这块,不够优雅。只能手动调用固定的tool,这部分应该让AI来完成,就像我们上一节做的那样,我们提供工具和命令,AI执行工具以及获取结果。我们应该将工具和要求传给AI,我们可以回忆一下之前tool_call时,是怎么做的。于是有了下面的代码:
import asyncio
import os
from mcp.client.stdio import stdio_client
from mcp import ClientSession, StdioServerParameters
from openai import OpenAI
from contextlib import AsyncExitStack
import json
# 连接参数,使用stdio模式连接,即在本地执行本地代码
server_params = StdioServerParameters(
command="python3",
args=[
"-m",
"mcp_server_time",
"--local-timezone=Asia/Shanghai"
],
)
class MCPClient:
def __init__(self):
self.session: ClientSession = None
self.exit_stack = AsyncExitStack()
self.client = OpenAI(
api_key=os.environ["DASHSCOPE_API_KEY"], # 设置你自己的api-key
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
self.model = "qwen-max"
async def connect_server(self):
# 连接服务器,将结果异步入栈
(stdio, write) = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
# 将mcp会话异步入栈
self.session = await self.exit_stack.enter_async_context(
ClientSession(stdio, write))
# 初始化会话
await self.session.initialize()
async def process_query(self, query, system_prompt="You are a helpful assistant."):
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": query}
]
# 列出所有工具
tool_response = await self.session.list_tools()
# 生成 tool call 的描述信息
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
# 注意下面的key,qwen要求的是parameters, 可能其它模型不一样
"parameters": tool.inputSchema
}
} for tool in tool_response.tools
]
# 调用模型, 我先试试
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=available_tools
)
# 处理返回的内容
content = response.choices[0]
if content.finish_reason == "tool_calls":
# 如果AI决定执行tool_call
# 如何是需要使用工具,就解析工具
tool_call = content.message.tool_calls[0]
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# 将结果加入上下文
messages.append({
"role": "assistant",
"tool_calls": [
{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_name ,
"arguments": tool_call.function.arguments
}
}
]
})
# 执行工具
# print(f"\n\n[Calling tool {tool_name} with args {tool_args}]\n\n")
result = await self.session.call_tool(tool_name, tool_args)
# 将工具执行结果丢给AI处理
messages.append({
"role": "tool",
"content": result.content[0].text,
"tool_call_id": tool_call.id,
})
# 将上面的结果再返回给 AI用于生产最终的结果
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
)
return response.choices[0].message.content
else:
return content.message.content
async def disconnect(self):
# 退出并关闭栈
await self.exit_stack.aclose()
async def main():
client = MCPClient()
# 连接服务
await client.connect_server()
# 查询问题
response = await client.process_query("告诉我当前时间")
print(response)
# 退出连接
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
根据您提供的信息,当前时间是2025年6月3日16点47分27秒,时区为亚洲/上海(东八区),并且没有实行夏令时。请注意,这个时间是您给定的时间,可能与实际的当前时间有所不同。
- 上面输入时写死的,如果还要更加灵活的话,我们可以支持用户自定义输入,并且弄一个循环体,并且利用messages来控制上下文,于是就有了下面的代码。
import asyncio
import os
from mcp.client.stdio import stdio_client
from mcp import ClientSession, StdioServerParameters
from openai import OpenAI
from contextlib import AsyncExitStack
import json
# 连接参数,使用stdio模式连接,即在本地执行本地代码
server_params = StdioServerParameters(
command="python3",
args=[
"-m",
"mcp_server_time",
"--local-timezone=Asia/Shanghai"
],
)
class MCPClient:
def __init__(self):
self.session: ClientSession = None
self.exit_stack = AsyncExitStack()
self.client = OpenAI(
api_key=os.environ["DASHSCOPE_API_KEY"], # 设置你自己的api-key
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
self.model = "qwen-max"
async def connect_server(self):
# 连接服务器,将结果异步入栈
(stdio, write) = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
# 将mcp会话异步入栈
self.session = await self.exit_stack.enter_async_context(
ClientSession(stdio, write))
# 初始化会话
await self.session.initialize()
async def process_query(self, messages: list):
# 列出所有工具
tool_response = await self.session.list_tools()
# 生成 tool call 的描述信息
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
# 注意下面的key,qwen要求的是parameters, 可能其它模型不一样
"parameters": tool.inputSchema
}
} for tool in tool_response.tools
]
# 调用模型, 我先试试
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=available_tools
)
# 处理返回的内容
content = response.choices[0]
if content.finish_reason == "tool_calls":
# 如果AI决定执行tool_call
# 如何是需要使用工具,就解析工具
tool_call = content.message.tool_calls[0]
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# 将结果加入上下文
messages.append({
"role": "assistant",
"tool_calls": [
{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_name ,
"arguments": tool_call.function.arguments
}
}
]
})
# 执行工具
# print(f"\n\n[Calling tool {tool_name} with args {tool_args}]\n\n")
result = await self.session.call_tool(tool_name, tool_args)
# 将工具执行结果丢给AI处理
messages.append({
"role": "tool",
"content": result.content[0].text,
"tool_call_id": tool_call.id,
})
# 将上面的结果再返回给 AI 用于生成最终结果
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
)
return response.choices[0].message.content
else:
return content.message.content
async def disconnect(self):
# 退出并关闭栈
await self.exit_stack.aclose()
async def main():
client = MCPClient()
# 连接服务
await client.connect_server()
system_prompt = "You are a helpful assistant."
messages = [
{"role": "system", "content": system_prompt},
]
print("=" * 20)
print("欢迎使用AI聊天机器人,你可以问我当前时间相关信息,输入exit或者quit退出,输入clear清空历史记录")
print("=" * 20)
while True:
# 查询问题
query = input("用户:")
if query in ["exit", "quit", "exit()", "quit()"]:
print("response: 即将退出")
break
if query == 'clear':
messages = messages[:1]
print("response: 已清理历史对话信息。")
continue
messages.append({"role": "user", "content": query})
response = await client.process_query(messages)
messages.append({"role": "assistant", "content": query})
print("response: ", response)
# 退出连接
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
====================
欢迎使用AI聊天机器人,你可以问我当前时间相关信息,输入exit或者quit退出,输入clear清空历史记录
====================
用户:你好
response: 你好!有什么我能帮助你的吗?
用户:你叫啥?
response: 您可以叫我AI助手。我在这里帮助您解答问题和提供信息。有什么我可以帮您的?
用户:现在几点啦?
response: 根据您提供的信息,现在是北京时间2025年6月3日17点00分53秒。不过请注意,这个时间是基于您给定的数据,实际时间可能有所不同。如果您需要查询实时的时间,可以查看设备上的时间或者使用相应的在线服务。
- 可以看到,AI能够在需要的时候,正确调用获取当前时间tool。不需要的时候,也可以进入闲聊模式。
- 那么,动动手吧,试试上面的代码,顺便把上节的
playwright
操作浏览器这个功能,也用代码实现一下。