logo 🤗

技术视野

聚焦科技前沿,分享技术解析,洞见未来趋势。在这里,与您一起探索人工智能的无限可能,共赴技术盛宴。

logo
  • 需要已经安装nodejs以及python环境
  • 准备好任意一个大模型的api-key和base_url
  1. 和上节一样,我们先试试用tool来调用当前时间,使用mcp-server-time 来做演示。
  2. 使用pip安装一下它。
pip install mcp-server-time
  1. 安装完成后,我们可以使用下面的代码来测试一下。
import asyncio

from mcp.client.stdio import stdio_client
from mcp import ClientSession, StdioServerParameters

# 连接参数,使用stdio模式连接,即在本地执行本地代码
server_params = StdioServerParameters(
    command="python3",
    args=[
        "-m",
        "mcp_server_time",
        "--local-timezone=Asia/Shanghai"
    ],
)

async def main():
    async with stdio_client(server_params) as (stdio, write):
        async with ClientSession(stdio, write) as session:
            # 初始化会话
            await session.initialize()
            # 列出所有工具
            response = await session.list_tools()
            print("=============begin list tools =============")
            print(response)
            print("tools number: ", len(response.tools))
            tool_names = [tool.name for tool in response.tools] 
            print("tools names: ", tool_names)
            print("=============end list tools =============")
            # 执行其中一个工具,根据上面的描述,以及它需要的参数
            response = await session.call_tool("get_current_time", {"timezone": "Asia/Shanghai"})
            if not response.isError:
                print(response.content[0].text)
            else:
                print("Error: ", response)

if __name__ == "__main__":
    asyncio.run(main())
  1. 输出结果,可以看到能够准确获取当前时间
=============begin list tools =============
meta=None nextCursor=None tools=[Tool(name='get_current_time', description='Get current time in a specific timezones', inputSchema={'type': 'object', 'properties': {'timezone': {'type': 'string', 'description': "IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use 'Asia/Shanghai' as local timezone if no timezone provided by the user."}}, 'required': ['timezone']}, annotations=None), Tool(name='convert_time', description='Convert time between timezones', inputSchema={'type': 'object', 'properties': {'source_timezone': {'type': 'string', 'description': "Source IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use 'Asia/Shanghai' as local timezone if no source timezone provided by the user."}, 'time': {'type': 'string', 'description': 'Time to convert in 24-hour format (HH:MM)'}, 'target_timezone': {'type': 'string', 'description': "Target IANA timezone name (e.g., 'Asia/Tokyo', 'America/San_Francisco'). Use 'Asia/Shanghai' as local timezone if no target timezone provided by the user."}}, 'required': ['source_timezone', 'time', 'target_timezone']}, annotations=None)]
tools number:  2
tools names:  ['get_current_time', 'convert_time']
=============end list tools =============
{
  "timezone": "Asia/Shanghai",
  "datetime": "2025-05-29T17:53:34+08:00",
  "is_dst": false
}
  1. 不过,上面调用工具这块,不够优雅。只能手动调用固定的tool,这部分应该让AI来完成,就像我们上一节做的那样,我们提供工具和命令,AI执行工具以及获取结果。我们应该将工具和要求传给AI,我们可以回忆一下之前tool_call时,是怎么做的。于是有了下面的代码:
import asyncio
import os
from mcp.client.stdio import stdio_client
from mcp import ClientSession, StdioServerParameters
from openai import OpenAI
from contextlib import AsyncExitStack
import json

# 连接参数,使用stdio模式连接,即在本地执行本地代码
server_params = StdioServerParameters(
    command="python3",
    args=[
        "-m",
        "mcp_server_time",
        "--local-timezone=Asia/Shanghai"
    ],
)

class MCPClient:
    def __init__(self):
        self.session: ClientSession = None
        self.exit_stack = AsyncExitStack()
        self.client = OpenAI(
            api_key=os.environ["DASHSCOPE_API_KEY"], # 设置你自己的api-key
            base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
        )
        self.model = "qwen-max"
    
    async def connect_server(self):
        # 连接服务器,将结果异步入栈
        (stdio, write) = await self.exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        # 将mcp会话异步入栈 
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(stdio, write))
        # 初始化会话
        await self.session.initialize()
    
    async def process_query(self, query, system_prompt="You are a helpful assistant."):
        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": query}
        ]
        # 列出所有工具
        tool_response = await self.session.list_tools()
        # 生成 tool call 的描述信息
        available_tools = [{
            "type": "function",
            "function": {
                "name": tool.name,
                "description": tool.description,
                # 注意下面的key,qwen要求的是parameters, 可能其它模型不一样
                "parameters": tool.inputSchema
            }
            } for tool in tool_response.tools
        ]
        # 调用模型, 我先试试
        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=available_tools
        )
        # 处理返回的内容
        content = response.choices[0]
        if content.finish_reason == "tool_calls":
            # 如果AI决定执行tool_call
             # 如何是需要使用工具,就解析工具
            tool_call = content.message.tool_calls[0]
            tool_name = tool_call.function.name
            tool_args = json.loads(tool_call.function.arguments)
            # 将结果加入上下文
            messages.append({
              "role": "assistant",
              "tool_calls": [
                {
                  "id": tool_call.id,
                  "type": "function",
                  "function": {
                    "name": tool_name ,
                    "arguments": tool_call.function.arguments
                  }
                }
              ]
            })

            # 执行工具
            # print(f"\n\n[Calling tool {tool_name} with args {tool_args}]\n\n")
            result = await self.session.call_tool(tool_name, tool_args)
            # 将工具执行结果丢给AI处理
            messages.append({
                "role": "tool",
                "content": result.content[0].text,
                "tool_call_id": tool_call.id,
            })

            # 将上面的结果再返回给 AI用于生产最终的结果
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
            )
            return response.choices[0].message.content
        else:
            return content.message.content
    
    async def disconnect(self):
        # 退出并关闭栈
        await self.exit_stack.aclose()
        

async def main():
    client = MCPClient()
    # 连接服务
    await client.connect_server()
    # 查询问题
    response = await client.process_query("告诉我当前时间")
    print(response)
    # 退出连接
    await client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())
  • 运行一下,输出结果为:
根据您提供的信息,当前时间是2025年6月3日16点47分27秒,时区为亚洲/上海(东八区),并且没有实行夏令时。请注意,这个时间是您给定的时间,可能与实际的当前时间有所不同。
  1. 上面输入时写死的,如果还要更加灵活的话,我们可以支持用户自定义输入,并且弄一个循环体,并且利用messages来控制上下文,于是就有了下面的代码。
import asyncio
import os
from mcp.client.stdio import stdio_client
from mcp import ClientSession, StdioServerParameters
from openai import OpenAI
from contextlib import AsyncExitStack
import json

# 连接参数,使用stdio模式连接,即在本地执行本地代码
server_params = StdioServerParameters(
    command="python3",
    args=[
        "-m",
        "mcp_server_time",
        "--local-timezone=Asia/Shanghai"
    ],
)

class MCPClient:
    def __init__(self):
        self.session: ClientSession = None
        self.exit_stack = AsyncExitStack()
        self.client = OpenAI(
            api_key=os.environ["DASHSCOPE_API_KEY"], # 设置你自己的api-key
            base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
        )
        self.model = "qwen-max"
    
    async def connect_server(self):
        # 连接服务器,将结果异步入栈
        (stdio, write) = await self.exit_stack.enter_async_context(
            stdio_client(server_params)
        )
        # 将mcp会话异步入栈 
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(stdio, write))
        # 初始化会话
        await self.session.initialize()
    
    async def process_query(self, messages: list):
        
        # 列出所有工具
        tool_response = await self.session.list_tools()
        # 生成 tool call 的描述信息
        available_tools = [{
            "type": "function",
            "function": {
                "name": tool.name,
                "description": tool.description,
                # 注意下面的key,qwen要求的是parameters, 可能其它模型不一样
                "parameters": tool.inputSchema
            }
            } for tool in tool_response.tools
        ]
        # 调用模型, 我先试试
        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=available_tools
        )
        # 处理返回的内容
        content = response.choices[0]
        if content.finish_reason == "tool_calls":
            # 如果AI决定执行tool_call
             # 如何是需要使用工具,就解析工具
            tool_call = content.message.tool_calls[0]
            tool_name = tool_call.function.name
            tool_args = json.loads(tool_call.function.arguments)
            # 将结果加入上下文
            messages.append({
              "role": "assistant",
              "tool_calls": [
                {
                  "id": tool_call.id,
                  "type": "function",
                  "function": {
                    "name": tool_name ,
                    "arguments": tool_call.function.arguments
                  }
                }
              ]
            })

            # 执行工具
            # print(f"\n\n[Calling tool {tool_name} with args {tool_args}]\n\n")
            result = await self.session.call_tool(tool_name, tool_args)
            # 将工具执行结果丢给AI处理
            messages.append({
                "role": "tool",
                "content": result.content[0].text,
                "tool_call_id": tool_call.id,
            })

            # 将上面的结果再返回给 AI 用于生成最终结果
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
            )
            return response.choices[0].message.content
        else:
            return content.message.content
    
    async def disconnect(self):
        # 退出并关闭栈
        await self.exit_stack.aclose()
        

async def main():
    client = MCPClient()
    # 连接服务
    await client.connect_server()
    system_prompt = "You are a helpful assistant."
    messages = [
        {"role": "system", "content": system_prompt},
    ]
    print("=" * 20)
    print("欢迎使用AI聊天机器人,你可以问我当前时间相关信息,输入exit或者quit退出,输入clear清空历史记录")
    print("=" * 20)
    while True:
        # 查询问题
        query = input("用户:")
        if query in ["exit", "quit", "exit()", "quit()"]:
            print("response: 即将退出")
            break
        if query == 'clear':
            messages = messages[:1]
            print("response: 已清理历史对话信息。")
            continue
        messages.append({"role": "user", "content": query})
        response = await client.process_query(messages)
        messages.append({"role": "assistant", "content": query})
        print("response: ", response)
    # 退出连接
    await client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())
  • 输入输出信息如下:
====================
欢迎使用AI聊天机器人,你可以问我当前时间相关信息,输入exit或者quit退出,输入clear清空历史记录
====================
用户:你好
response:  你好!有什么我能帮助你的吗?
用户:你叫啥?
response:  您可以叫我AI助手。我在这里帮助您解答问题和提供信息。有什么我可以帮您的?
用户:现在几点啦?
response:  根据您提供的信息,现在是北京时间2025年6月3日17点00分53秒。不过请注意,这个时间是基于您给定的数据,实际时间可能有所不同。如果您需要查询实时的时间,可以查看设备上的时间或者使用相应的在线服务。
  • 可以看到,AI能够在需要的时候,正确调用获取当前时间tool。不需要的时候,也可以进入闲聊模式。
  • 那么,动动手吧,试试上面的代码,顺便把上节的playwright操作浏览器这个功能,也用代码实现一下。

版权属于:tlntin
作品采用:本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
更新于: 2025年06月05日 18:19


46 文章数
6 分类数
47 页面数
已在风雨中度过 2年27天9小时19分
目录
来自 《(2)使用代码调用MCP工具(python 篇)》
暗黑模式
暗黑模式
返回顶部
暗黑模式
暗黑模式
返回顶部