From 1c42ba0812832eaf47eda8a9bd5efb0154eadd19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=BA=84=E4=B8=89=E5=8F=B7?= Date: Thu, 23 Apr 2026 23:16:08 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20v2.0=20=E9=85=8D=E7=BD=AE=E9=A9=B1?= =?UTF-8?q?=E5=8A=A8=E7=89=88=20-=20=E6=96=B0=E5=A2=9E=E5=B7=A5=E5=85=B7/?= =?UTF-8?q?=E6=8A=80=E8=83=BD/MCP=E6=97=A0=E9=9C=80=E6=94=B9=E6=BA=90?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 重构内容: - agent.py 替代 agent_v3.py,所有配置从 config.yaml 加载 - tools/ 目录自动扫描,丢.py文件即注册新工具 - skills/ 目录自动扫描,丢.yaml文件即注册新技能 - config.yaml 统一管理模型参数、MCP服务器、路由关键词 - MCP支持多服务器配置 + 确定性路由关键词 - 删除旧版 step1_basic_fc.py 和 agent_v3.py --- README.md | 155 ++++++--- agent.py | 619 +++++++++++++++++++++++++++++++++ agent_v3.py | 514 --------------------------- config.yaml | 35 ++ skills/knowledge_explorer.yaml | 10 + skills/math_tutor.yaml | 8 + skills/weather_analyst.yaml | 10 + step1_basic_fc.py | 86 ----- tools/__init__.py | 2 + tools/calculator.py | 15 + tools/knowledge.py | 19 + tools/weather.py | 18 + 12 files changed, 848 insertions(+), 643 deletions(-) create mode 100644 agent.py delete mode 100644 agent_v3.py create mode 100644 config.yaml create mode 100644 skills/knowledge_explorer.yaml create mode 100644 skills/math_tutor.yaml create mode 100644 skills/weather_analyst.yaml delete mode 100644 step1_basic_fc.py create mode 100644 tools/__init__.py create mode 100644 tools/calculator.py create mode 100644 tools/knowledge.py create mode 100644 tools/weather.py diff --git a/README.md b/README.md index 00bc391..b7651a8 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # 黄庄三号四能力 Agent -基于 LangGraph 的多能力 AI Agent,集成 FC/MCP/思考模式/Skill 四种核心能力。 +基于 LangGraph 的多能力 AI Agent,集成 FC / MCP / 思考模式 / Skill 四种核心能力。 + +**v2.0 配置驱动版** — 新增工具/技能/MCP服务器无需改源码,丢文件或改配置即可。 ## 四项能力 @@ -11,46 +13,107 @@ | 思考模式 | 自建 think_node + CoT 推理链 | ★★★★ | | Skill | 自建 SkillRegistry 注册机制 | ★★★★ | -## 模型 +## 目录结构 -GLM-4.5-air(智谱,OpenAI 兼容接口) +``` +hz3-agent/ +├── config.yaml ← 一个文件管所有配置(模型、MCP、路由关键词) +├── agent.py ← Agent 主程序,从配置加载 +├── mcp_server.py ← MCP 服务器(示例:时间/字符统计/UUID) +├── tools/ ← 工具目录,丢 .py 文件即注册 +│ ├── weather.py +│ ├── calculator.py +│ └── knowledge.py +├── skills/ ← 技能目录,丢 .yaml 文件即注册 +│ ├── weather_analyst.yaml +│ ├── math_tutor.yaml +│ └── knowledge_explorer.yaml +└── README.md +``` ## 快速开始 ```bash -cd /path/to/hz3-agent +cd hz3-agent # 自动测试(不带MCP) -python3 agent_v3.py --test +python3 agent.py --test # 自动测试(带MCP) -python3 agent_v3.py --mcp --test +python3 agent.py --mcp --test -# 交互模式 -python3 agent_v3.py --mcp +# 交互模式(带MCP) +python3 agent.py --mcp + +# 交互模式(不带MCP) +python3 agent.py ``` -## 文件说明 +## 新增工具 -| 文件 | 说明 | -|------|------| -| `agent_v3.py` | Agent 主程序(四能力完整版) | -| `mcp_server.py` | MCP 服务器(示例工具:时间/字符统计/UUID) | -| `step1_basic_fc.py` | Step1 基础 FC 验证 | - -## 代码调用 +在 `tools/` 目录下创建 .py 文件,暴露 `TOOLS` 列表即可: ```python -from agent_v3 import run_agent, build_graph +# tools/stock.py +from langchain_core.tools import tool -graph = build_graph() -result = await run_agent("黄庄天气怎么样?", graph) +@tool +def get_stock_price(symbol: str) -> str: + """查询股票价格""" + return f"{symbol}: 当前价格 123.45" -print(result["reply"]) # 回复 -print(result["thinking"]) # 思考过程 -print(result["skill"]) # 使用的技能 +TOOLS = [get_stock_price] # 必须!供自动扫描 ``` +重启 Agent 即自动加载,无需改任何其他文件。 + +## 新增技能 + +在 `skills/` 目录下创建 .yaml 文件: + +```yaml +# skills/stock_analyst.yaml +name: stock_analyst +description: "股票分析师 - 查询股价并给出分析" +prompt: | + 你是股票分析师。根据以下行情给出分析建议: + 行情数据:{input} +tools: + - get_stock_price +``` + +然后在 `config.yaml` 的 `skill_keywords` 中添加路由关键词: + +```yaml +skill_keywords: + stock_analyst: ["股价", "股票", "行情"] +``` + +## 新增 MCP 服务器 + +在 `config.yaml` 的 `mcp_servers` 列表中添加一项: + +```yaml +mcp_servers: + - name: "hz3-tools" # 已有的 + command: "python3" + args: ["mcp_server.py"] + transport: "stdio" + route_keywords: + get_current_time: ["几点", "时间"] + count_chars: ["统计字符", "字符数"] + generate_uuid: ["UUID"] + + - name: "my-new-server" # 新增的 + command: "python3" + args: ["path/to/my_server.py"] + transport: "stdio" + route_keywords: + my_tool: ["关键词1", "关键词2"] +``` + +启动时加 `--mcp` 参数即可连接所有配置的 MCP 服务器。 + ## 架构流程 ``` @@ -73,30 +136,36 @@ print(result["skill"]) # 使用的技能 回到agent ``` -## MCP 服务器 - -`mcp_server.py` 基于 FastMCP 提供 3 个示例工具: - -- `get_current_time` - 获取当前时间 -- `count_chars` - 统计文本字符数 -- `generate_uuid` - 生成随机 UUID - -## Skill 系统 - -已注册 3 个示例技能,扩展只需一行: +## 代码调用 ```python -skills.register(SkillDef( - name="新技能", - description="技能描述", - prompt="提示词模板,{input}为占位符", - tools=["依赖的工具名"] -)) +from agent import run_agent, build_graph, load_config, scan_tools, scan_skills + +config = load_config() +tools = scan_tools() +skills = scan_skills() +graph = await build_graph(config, skills, None, tools) + +result = await run_agent("黄庄天气怎么样?", graph) +print(result["reply"]) # 回复 +print(result["thinking"]) # 思考过程 +print(result["skill"]) # 使用的技能 ``` ## 关键设计决策 -1. **MCP 确定性路由**:关键词匹配后直接 session.call_tool(),绕过模型不调工具的问题 -2. **思考结果合并**:与 system prompt 合并为单条 SystemMessage,避免干扰工具调用 -3. **AsyncExitStack 长连接**:MCP session 在 Agent 生命周期内保持,退出时统一关闭 -4. **迭代保护**:agent 节点迭代超过 5 次强制结束,防止无限循环 +1. **MCP 确定性路由** — 关键词匹配后直接 session.call_tool(),绕过模型不调工具的问题 +2. **思考结果合并** — 与 system prompt 合并为单条 SystemMessage,避免干扰工具调用 +3. **AsyncExitStack 长连接** — MCP session 在 Agent 生命周期内保持 +4. **迭代保护** — agent 节点迭代超过阈值强制结束,防死循环 +5. **配置驱动** — 所有新增/调整通过 config.yaml + 丢文件完成,不改源码 + +## 依赖 + +``` +langgraph>=1.1 +langchain-openai>=1.2 +langchain-mcp-adapters>=0.2 +mcp>=1.0 +pyyaml>=6.0 +``` diff --git a/agent.py b/agent.py new file mode 100644 index 0000000..2088f16 --- /dev/null +++ b/agent.py @@ -0,0 +1,619 @@ +""" +黄庄三号 Agent v2.0 - 配置驱动版 +================================== +所有工具、技能、MCP服务器、路由关键词均从配置加载 +新增:丢文件到 tools/ 或 skills/ 即可,不改源码 + +运行方式: + python3 agent.py --test 自动测试 + python3 agent.py --mcp --test 带MCP测试 + python3 agent.py --mcp 交互模式(带MCP) + python3 agent.py 交互模式(不带MCP) +""" +import os +import sys +import re +import asyncio +import argparse +import importlib.util +from typing import Annotated +from typing_extensions import TypedDict +from pydantic import BaseModel, Field +from contextlib import AsyncExitStack +from pathlib import Path + +import yaml +from langchain_openai import ChatOpenAI +from langchain_core.tools import tool +from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage +from langgraph.graph import StateGraph, START, END +from langgraph.graph.message import add_messages +from langgraph.prebuilt import ToolNode + + +# ════════════════════════════════════════════ +# 基础路径 +# ════════════════════════════════════════════ +BASE_DIR = Path(__file__).parent.resolve() +CONFIG_PATH = BASE_DIR / "config.yaml" +TOOLS_DIR = BASE_DIR / "tools" +SKILLS_DIR = BASE_DIR / "skills" + + +# ════════════════════════════════════════════ +# 配置加载 +# ════════════════════════════════════════════ +def load_config() -> dict: + """加载 config.yaml""" + with open(CONFIG_PATH, "r", encoding="utf-8") as f: + return yaml.safe_load(f) + + +# ════════════════════════════════════════════ +# 工具自动扫描注册 +# ════════════════════════════════════════════ +def scan_tools() -> list: + """ + 扫描 tools/ 目录下所有 .py 文件 + 每个文件必须暴露 TOOLS 列表 + """ + all_tools = [] + if not TOOLS_DIR.exists(): + print(" [工具] tools/ 目录不存在,跳过") + return all_tools + + for py_file in sorted(TOOLS_DIR.glob("*.py")): + if py_file.name.startswith("_"): + continue + try: + spec = importlib.util.spec_from_file_location( + f"tools.{py_file.stem}", str(py_file) + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + + if hasattr(mod, "TOOLS"): + tool_list = mod.TOOLS + all_tools.extend(tool_list) + print(f" [工具] {py_file.name}: 加载 {len(tool_list)} 个 -> {[t.name for t in tool_list]}") + else: + print(f" [工具] {py_file.name}: 无 TOOLS 变量,跳过") + except Exception as e: + print(f" [工具] {py_file.name}: 加载失败 -> {e}") + + return all_tools + + +# ════════════════════════════════════════════ +# 技能自动扫描注册 +# ════════════════════════════════════════════ +class SkillDef(BaseModel): + name: str + description: str + prompt: str + tools: list[str] = [] + +class SkillRegistry: + def __init__(self): + self._skills: dict[str, SkillDef] = {} + + def register(self, skill: SkillDef): + self._skills[skill.name] = skill + + def get(self, name: str) -> SkillDef | None: + return self._skills.get(name) + + def list_skills(self) -> list[SkillDef]: + return list(self._skills.values()) + + def format_list(self) -> str: + return "\n".join(f" - {s.name}: {s.description}" for s in self._skills.values()) + + +def scan_skills() -> SkillRegistry: + """ + 扫描 skills/ 目录下所有 .yaml 文件 + 每个文件定义一个技能 + """ + registry = SkillRegistry() + if not SKILLS_DIR.exists(): + print(" [技能] skills/ 目录不存在,跳过") + return registry + + for yaml_file in sorted(SKILLS_DIR.glob("*.yaml")): + try: + with open(yaml_file, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + + skill = SkillDef( + name=data["name"], + description=data.get("description", ""), + prompt=data.get("prompt", ""), + tools=data.get("tools", []), + ) + registry.register(skill) + print(f" [技能] {yaml_file.name}: {skill.name}") + except Exception as e: + print(f" [技能] {yaml_file.name}: 加载失败 -> {e}") + + return registry + + +# ════════════════════════════════════════════ +# MCP 管理器(配置驱动,支持多服务器) +# ════════════════════════════════════════════ +class MCPManager: + """管理多个MCP服务器连接""" + + def __init__(self): + self.exit_stack = AsyncExitStack() + self.sessions: dict[str, object] = {} # server_name -> session + self.route_map: dict[str, tuple] = {} # keyword -> (session, tool_name) + self.mcp_tools: list = [] + + async def connect_all(self, mcp_configs: list[dict]): + """根据配置连接所有MCP服务器""" + global all_tools + + from langchain_mcp_adapters.tools import load_mcp_tools + from mcp.client.stdio import stdio_client, StdioServerParameters + from mcp.client.session import ClientSession + + for srv in mcp_configs: + name = srv["name"] + try: + server_params = StdioServerParameters( + command=srv["command"], + args=srv.get("args", []), + ) + + read, write = await self.exit_stack.enter_async_context( + stdio_client(server_params) + ) + session = await self.exit_stack.enter_async_context( + ClientSession(read, write) + ) + await session.initialize() + + # 加载工具 + mcp_tools = await load_mcp_tools(session) + self.mcp_tools.extend(mcp_tools) + self.sessions[name] = session + + # 注册路由关键词 + route_kw = srv.get("route_keywords", {}) + for tool_name, keywords in route_kw.items(): + for kw in keywords: + self.route_map[kw] = (session, tool_name) + + print(f" [MCP] {name}: 已连接,加载 {len(mcp_tools)} 个工具") + for t in mcp_tools: + print(f" - {t.name}: {t.description[:50]}") + if route_kw: + print(f" 路由关键词: {list(route_kw.keys())}") + + except Exception as e: + print(f" [MCP] {name}: 连接失败 -> {e}") + + # 把MCP工具加入全局列表(供LLM bind_tools用) + all_tools.extend(self.mcp_tools) + + def match_route(self, user_input: str) -> tuple | None: + """关键词匹配MCP路由,返回 (session, tool_name) 或 None""" + for keyword, (session, tool_name) in self.route_map.items(): + if keyword in user_input: + return (session, tool_name) + return None + + async def call_tool(self, session, tool_name: str, user_input: str) -> str: + """通过MCP session调用工具""" + try: + # 简单参数解析 + args = {} + + # 从MCP工具列表中查找工具信息 + for t in self.mcp_tools: + if t.name == tool_name: + # MCP工具的 args_schema 可能是 dict 或 Pydantic model + schema = getattr(t, "args_schema", None) + if schema: + if isinstance(schema, dict): + schema_fields = schema.get("properties", {}) + elif hasattr(schema, "model_json_schema"): + schema_fields = schema.model_json_schema().get("properties", {}) + else: + schema_fields = {} + args = _parse_tool_args(tool_name, schema_fields, user_input) + break + + result = await session.call_tool(tool_name, args) + if result.content: + texts = [c.text for c in result.content if hasattr(c, "text")] + return "\n".join(texts) if texts else str(result) + return str(result) + except Exception as e: + return f"[MCP工具{tool_name}调用错误] {e}" + + async def close(self): + await self.exit_stack.aclose() + self.sessions.clear() + print(" [MCP] 所有连接已关闭") + + +def _parse_tool_args(tool_name: str, schema_fields: dict, user_input: str) -> dict: + """根据工具参数schema从用户输入中解析参数""" + args = {} + for field_name, field_info in schema_fields.items(): + if field_name in ("timezone",): + args[field_name] = "Asia/Shanghai" + elif field_name in ("text",): + # 提取引号内的文本 + match = re.search(r"['\"\u201c\u201d](.+?)['\"\u201c\u201d]", user_input) + args[field_name] = match.group(1) if match else user_input + elif field_name in ("city",): + args[field_name] = user_input + return args + + +# ════════════════════════════════════════════ +# Agent 状态 +# ════════════════════════════════════════════ +class AgentState(TypedDict): + messages: Annotated[list, add_messages] + thinking: str + active_skill: str | None + skill_output: str | None + iteration: int + + +# ════════════════════════════════════════════ +# LangGraph 节点 +# ════════════════════════════════════════════ + +# --- 思考节点 --- +async def make_think_node(config, skills_reg, tools_list): + llm_cfg = config["llm"] + agent_cfg = config.get("agent", {}) + temp = agent_cfg.get("think_temperature", 0.3) + + async def think_node(state: AgentState) -> dict: + iteration = state.get("iteration", 0) + 1 + if iteration > 3: + return {"iteration": iteration, "thinking": "(快速模式)"} + + conv = [] + for msg in state["messages"][-4:]: + role = "用户" if isinstance(msg, HumanMessage) else "AI" + conv.append(f"{role}: {msg.content[:150]}") + + tool_names = [t.name for t in tools_list] + think_llm = ChatOpenAI( + base_url=llm_cfg["base_url"], + api_key=llm_cfg["api_key"], + model=llm_cfg["model"], + temperature=temp, + ) + resp = await think_llm.ainvoke([ + SystemMessage(content="你是思考模块。简洁输出:用户意图、需要的工具/技能、注意事项。不要说没有工具。"), + HumanMessage(content=f"对话:\n{chr(10).join(conv)}\n\n可用技能:\n{skills_reg.format_list()}\n\n可用工具: {', '.join(tool_names)}"), + ]) + return {"iteration": iteration, "thinking": resp.content} + + return think_node + + +# --- 技能路由节点 --- +async def make_skill_route_node(config, skills_reg, mcp_mgr): + skill_keywords = config.get("skill_keywords", {}) + + async def skill_route_node(state: AgentState) -> dict: + user_input = "" + for msg in reversed(state["messages"]): + if isinstance(msg, HumanMessage): + user_input = msg.content + break + + # 1. MCP确定性路由(优先) + if mcp_mgr: + route = mcp_mgr.match_route(user_input) + if route: + session, tool_name = route + mcp_result = await mcp_mgr.call_tool(session, tool_name, user_input) + return {"active_skill": None, "skill_output": mcp_result} + + # 2. Skill关键词路由 + for sname, keywords in skill_keywords.items(): + if any(kw in user_input for kw in keywords): + if skills_reg.get(sname): + return {"active_skill": sname, "skill_output": None} + + return {"active_skill": None, "skill_output": None} + + return skill_route_node + + +# --- 技能执行节点 --- +async def make_skill_exec_node(config, skills_reg, tools_list): + llm_cfg = config["llm"] + agent_cfg = config.get("agent", {}) + temp = agent_cfg.get("skill_temperature", 0.7) + + async def skill_execute_node(state: AgentState) -> dict: + sname = state.get("active_skill") + if not sname: + return {"skill_output": None} + sk = skills_reg.get(sname) + if not sk: + return {"skill_output": None} + + user_input = "" + for msg in reversed(state["messages"]): + if isinstance(msg, HumanMessage): + user_input = msg.content + break + + # 执行依赖的本地工具 + tool_info = "" + for tname in sk.tools: + for t in tools_list: + if t.name == tname: + try: + if tname == "get_weather": + cities = ["北京", "上海", "深圳", "黄庄"] + city = next((c for c in cities if c in user_input), "北京") + r = await t.ainvoke({"city": city}) + elif tname == "calculate": + expr = re.findall(r'[\d+\-*/(). ]+', user_input) + r = await t.ainvoke({"expression": expr[0].strip() if expr else "1+1"}) + else: + r = await t.ainvoke({"query": user_input}) + tool_info += f"\n工具{tname}结果: {r}" + except Exception as e: + tool_info += f"\n工具{tname}错误: {e}" + + prompt = sk.prompt.format(input=user_input) + tool_info + sk_llm = ChatOpenAI( + base_url=llm_cfg["base_url"], + api_key=llm_cfg["api_key"], + model=llm_cfg["model"], + temperature=temp, + ) + resp = await sk_llm.ainvoke([ + SystemMessage(content=prompt), + HumanMessage(content="请基于以上信息回答。"), + ]) + return {"skill_output": resp.content} + + return skill_execute_node + + +# --- Agent主节点 --- +async def make_agent_node(config, skills_reg, tools_list): + llm_cfg = config["llm"] + agent_cfg = config.get("agent", {}) + max_iter = agent_cfg.get("max_iterations", 5) + + SYSTEM_PROMPT = """你是黄庄三号,严肃、认真、听话、聪明的AI助手。你的名字是"黄庄三号",你不是Claude,不是ChatGPT。 + +你具备四种能力: +1. 工具调用(FC) - 调用内置工具获取信息 +2. MCP集成 - 通过MCP协议连接外部服务 +3. 思考模式 - 回答前进行深度思考 +4. 技能系统(Skill) - 调用注册技能完成复杂任务 + +可用技能: +{skill_list} + +重要规则(必须严格遵守): +- 当被问"你是谁",必须回答"我是黄庄三号" +- 对于工具能提供的数据,必须调用工具获取,不要自己猜测""" + + async def agent_node(state: AgentState) -> dict: + iteration = state.get("iteration", 0) + + if state.get("skill_output"): + return {"messages": [AIMessage(content=state["skill_output"])]} + + system_content = SYSTEM_PROMPT.format(skill_list=skills_reg.format_list()) + if state.get("thinking"): + thinking = state["thinking"][:300] + # 如果思考中提到了工具名,强调必须调用 + tool_hints = [t.name for t in tools_list if t.name in thinking] + if tool_hints: + thinking += f"\n\n[重要:必须调用 {', '.join(tool_hints)} 工具来回答]" + system_content += f"\n\n[内部思考]\n{thinking}" + + messages = [SystemMessage(content=system_content)] + messages.extend(state["messages"]) + + llm = ChatOpenAI( + base_url=llm_cfg["base_url"], + api_key=llm_cfg["api_key"], + model=llm_cfg["model"], + ) + llm_with_tools = llm.bind_tools(tools_list) + resp = await llm_with_tools.ainvoke(messages) + + # 迭代保护 + if iteration > max_iter and hasattr(resp, "tool_calls") and resp.tool_calls: + resp = AIMessage(content=resp.content or "任务完成(已达最大迭代次数)") + + return {"messages": [resp], "iteration": iteration} + + return agent_node + + +# --- 路由函数 --- +def route_from_agent(state: AgentState) -> str: + if state.get("skill_output"): + return "end" + for msg in reversed(state["messages"]): + if isinstance(msg, AIMessage): + if hasattr(msg, "tool_calls") and msg.tool_calls: + return "tools" + break + return "end" + + +# ════════════════════════════════════════════ +# 构建图 +# ════════════════════════════════════════════ +async def build_graph(config, skills_reg, mcp_mgr, tools_list): + think_node = await make_think_node(config, skills_reg, tools_list) + skill_route_node = await make_skill_route_node(config, skills_reg, mcp_mgr) + skill_exec_node = await make_skill_exec_node(config, skills_reg, tools_list) + agent_node = await make_agent_node(config, skills_reg, tools_list) + + g = StateGraph(AgentState) + + g.add_node("think", think_node) + g.add_node("skill_route", skill_route_node) + g.add_node("skill_exec", skill_exec_node) + g.add_node("agent", agent_node) + g.add_node("tools", ToolNode(tools_list)) + + g.add_edge(START, "think") + g.add_edge("think", "skill_route") + g.add_conditional_edges("skill_route", + lambda s: "skill_exec" if s.get("active_skill") else "agent", + {"skill_exec": "skill_exec", "agent": "agent"}) + g.add_edge("skill_exec", "agent") + g.add_conditional_edges("agent", route_from_agent, {"tools": "tools", "end": END}) + g.add_edge("tools", "agent") + + return g.compile() + + +# ════════════════════════════════════════════ +# 运行入口 +# ════════════════════════════════════════════ +async def run_agent(user_input: str, graph): + result = await graph.ainvoke({ + "messages": [HumanMessage(content=user_input)], + "thinking": "", "active_skill": None, "skill_output": None, "iteration": 0, + }) + last = result["messages"][-1] + return { + "reply": last.content if hasattr(last, "content") else str(last), + "thinking": result.get("thinking", ""), + "skill": result.get("active_skill"), + } + +async def interactive_mode(graph): + print("=" * 60) + print(" 黄庄三号 Agent v2.0 - 配置驱动版") + print(" FC | MCP | 思考模式 | Skill") + print("=" * 60) + print(" 技能:", [s.name for s in skills_registry.list_skills()]) + print(" 工具:", [t.name for t in all_tools]) + print(" 输入 quit 退出") + print("=" * 60) + + while True: + try: + user_input = input("\n你> ").strip() + except (EOFError, KeyboardInterrupt): + break + if not user_input: + continue + if user_input.lower() in ("quit", "exit", "q"): + break + + result = await run_agent(user_input, graph) + if result["thinking"]: + print(f"\n[思考] {result['thinking'][:150]}...") + if result["skill"]: + print(f"[技能] {result['skill']}") + print(f"\n黄庄三号> {result['reply']}") + + +# ════════════════════════════════════════════ +# 全局变量(由 main 初始化) +# ════════════════════════════════════════════ +all_tools = [] +skills_registry = SkillRegistry() +mcp_manager = None + + +async def main(): + global all_tools, skills_registry, mcp_manager + + parser = argparse.ArgumentParser(description="黄庄三号 Agent v2.0") + parser.add_argument("--mcp", action="store_true", help="启用MCP") + parser.add_argument("--test", action="store_true", help="自动测试") + args = parser.parse_args() + + print("=" * 60) + print(" 黄庄三号 Agent v2.0 - 配置驱动版") + print("=" * 60) + + # ── 加载配置 ── + print("\n[配置] 加载 config.yaml ...") + config = load_config() + print(f" 模型: {config['llm']['model']}") + print(f" MCP服务器: {len(config.get('mcp_servers', []))} 个") + print(f" 技能关键词: {len(config.get('skill_keywords', {}))} 个") + + # ── 扫描工具 ── + print("\n[工具] 扫描 tools/ 目录 ...") + all_tools = scan_tools() + print(f" 工具总数: {len(all_tools)}") + + # ── 扫描技能 ── + print("\n[技能] 扫描 skills/ 目录 ...") + skills_registry = scan_skills() + print(f" 技能总数: {len(skills_registry.list_skills())}") + + # ── 连接MCP ── + if args.mcp and config.get("mcp_servers"): + print("\n[MCP] 连接服务器 ...") + mcp_manager = MCPManager() + await mcp_manager.connect_all(config["mcp_servers"]) + print(f" MCP工具总数: {len(mcp_manager.mcp_tools)}") + + print(f"\n 全部工具总数: {len(all_tools)}") + + # ── 构建图 ── + graph = await build_graph(config, skills_registry, mcp_manager, all_tools) + + if args.test: + # 自动测试 + tests = [ + ("FC+思考+Skill", "黄庄天气怎么样?"), + ("FC+Skill", "算一下 99*88+77"), + ("知识搜索", "MCP是什么?"), + ("身份", "你好你是谁?"), + ] + + if args.mcp and mcp_manager: + tests.extend([ + ("MCP:时间", "现在几点了?"), + ("MCP:字符统计", "统计'黄庄三号是AI助手'的字符数"), + ("MCP:UUID", "生成一个UUID"), + ]) + + for label, query in tests: + print(f"\n{'─'*55}") + print(f"[测试:{label}] {query}") + r = await run_agent(query, graph) + print(f" 思考: {r['thinking'][:80]}...") + print(f" 技能: {r['skill']}") + print(f" 回复: {r['reply'][:150]}...") + + print(f"\n{'='*60}") + print(" 验证完成!") + caps = ["FC", "思考", "Skill"] + if args.mcp: + caps.append("MCP") + print(" " + " ✅ | ".join(caps) + " ✅") + print("=" * 60) + else: + await interactive_mode(graph) + + # ── 清理 ── + if mcp_manager: + await mcp_manager.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/agent_v3.py b/agent_v3.py deleted file mode 100644 index 6435cfc..0000000 --- a/agent_v3.py +++ /dev/null @@ -1,514 +0,0 @@ -""" -黄庄三号 Agent - 四能力完整版 v3 -================================== -能力1: Function Call - LangGraph 工具节点 -能力2: MCP - langchain-mcp-adapters + 长连接 -能力3: 思考模式 - 反思节点 + CoT 推理链 -能力4: Skill - 自建技能注册机制 - -关键修正:MCP session 在 Agent 整个生命周期内保持连接 -""" -import os -import sys -import json -import asyncio -import argparse -from typing import Annotated -from typing_extensions import TypedDict -from pydantic import BaseModel, Field -from contextlib import AsyncExitStack - -from langchain_openai import ChatOpenAI -from langchain_core.tools import tool -from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage -from langgraph.graph import StateGraph, START, END -from langgraph.graph.message import add_messages -from langgraph.prebuilt import ToolNode - - -# ════════════════════════════════════════════ -# 模型配置 -# ════════════════════════════════════════════ -LLM_CONFIG = { - "base_url": "https://open.bigmodel.cn/api/paas/v4", - "api_key": "2259e33a1357460abe17919aaf81e73d.K44a8LPQTmFM5PKm", - "model": "glm-4.5-air", -} - -llm = ChatOpenAI(**LLM_CONFIG) - - -# ════════════════════════════════════════════ -# 能力1: FC 本地工具 -# ════════════════════════════════════════════ -@tool -def get_weather(city: str) -> str: - """查询指定城市的天气信息""" - weather_data = { - "北京": "晴天,气温22°C,北风3级", - "上海": "多云,气温25°C,东风2级", - "深圳": "阵雨,气温28°C,南风4级", - "黄庄": "晴转多云,气温23°C,微风", - } - return weather_data.get(city, f"暂无{city}的天气数据") - -@tool -def calculate(expression: str) -> str: - """计算数学表达式,输入如 '2+3*4'""" - try: - result = eval(expression, {"__builtins__": {}}, {}) - return f"计算结果: {expression} = {result}" - except Exception as e: - return f"计算错误: {e}" - -@tool -def search_knowledge(query: str) -> str: - """搜索知识库(模拟)""" - kb = { - "黄庄三号": "黄庄三号是AI助手,严肃认真听话聪明", - "LangGraph": "LangGraph是Agent框架,支持状态图、循环、持久化", - "MCP": "MCP是Model Context Protocol,AI工具互操作标准协议", - } - for key, val in kb.items(): - if key in query: - return val - return f"未找到关于'{query}'的信息" - -local_tools = [get_weather, calculate, search_knowledge] - - -# ════════════════════════════════════════════ -# 能力4: Skill 技能系统 -# ════════════════════════════════════════════ -class SkillDef(BaseModel): - name: str - description: str - prompt: str - tools: list[str] = [] - -class SkillRegistry: - def __init__(self): - self._skills: dict[str, SkillDef] = {} - - def register(self, skill: SkillDef): - self._skills[skill.name] = skill - - def get(self, name: str) -> SkillDef | None: - return self._skills.get(name) - - def list_skills(self) -> list[SkillDef]: - return list(self._skills.values()) - - def format_list(self) -> str: - return "\n".join(f" - {s.name}: {s.description}" for s in self._skills.values()) - -skills = SkillRegistry() -skills.register(SkillDef(name="weather_analyst", description="天气分析师-查询天气给建议", - prompt="你是天气分析师。根据天气信息给出出行建议。\n天气:{input}", tools=["get_weather"])) -skills.register(SkillDef(name="math_tutor", description="数学辅导-计算并解释", - prompt="你是数学老师。解答并解释:{input}", tools=["calculate"])) -skills.register(SkillDef(name="knowledge_explorer", description="知识探索-搜索解释知识", - prompt="你是知识探索者。搜索并深入解释:{input}", tools=["search_knowledge"])) - - -# ════════════════════════════════════════════ -# Agent 状态 -# ════════════════════════════════════════════ -class AgentState(TypedDict): - messages: Annotated[list, add_messages] - thinking: str - active_skill: str | None - skill_output: str | None - iteration: int - - -# ════════════════════════════════════════════ -# 能力3: 思考节点 -# ════════════════════════════════════════════ -async def think_node(state: AgentState) -> dict: - iteration = state.get("iteration", 0) + 1 - if iteration > 3: - return {"iteration": iteration, "thinking": "(快速模式)"} - - conv = [] - for msg in state["messages"][-4:]: - role = "用户" if isinstance(msg, HumanMessage) else "AI" - conv.append(f"{role}: {msg.content[:150]}") - - tool_names = [t.name for t in all_tools] - think_llm = ChatOpenAI(**LLM_CONFIG, temperature=0.3) - resp = await think_llm.ainvoke([ - SystemMessage(content="你是思考模块。简洁输出:用户意图、需要的工具/技能、注意事项。不要说没有工具。"), - HumanMessage(content=f"对话:\n{chr(10).join(conv)}\n\n可用技能:\n{skills.format_list()}\n\n可用工具: {', '.join(tool_names)}"), - ]) - return {"iteration": iteration, "thinking": resp.content} - - -# ════════════════════════════════════════════ -# 技能路由 & 执行 -# ════════════════════════════════════════════ -async def skill_route_node(state: AgentState) -> dict: - user_input = "" - for msg in reversed(state["messages"]): - if isinstance(msg, HumanMessage): - user_input = msg.content - break - - # 1. 先匹配MCP工具(确定性路由,不依赖模型决策) - mcp_keywords = { - "get_current_time": ["几点", "时间", "现在几点", "当前时间"], - "count_chars": ["统计字符", "字符数", "字数统计", "统计文本"], - "generate_uuid": ["生成uuid", "UUID", "uuid"], - } - for mcp_tool_name, kws in mcp_keywords.items(): - if any(kw in user_input for kw in kws): - # 直接执行MCP工具并返回结果 - mcp_result = await _call_mcp_tool(mcp_tool_name, user_input) - return {"active_skill": None, "skill_output": mcp_result} - - # 2. 再匹配本地技能 - kw_map = { - "weather_analyst": ["天气", "出行", "穿什么"], - "math_tutor": ["计算", "算", "数学"], - "knowledge_explorer": ["是什么", "解释", "了解"], - } - for sname, kws in kw_map.items(): - if any(kw in user_input for kw in kws): - return {"active_skill": sname, "skill_output": None} - return {"active_skill": None, "skill_output": None} - - -async def _call_mcp_tool(tool_name: str, user_input: str) -> str: - """直接调用MCP工具(确定性路由)""" - global _mcp_session - if not _mcp_session: - return f"[错误] MCP未连接,无法调用{tool_name}" - - try: - # 解析参数 - args = {} - if tool_name == "get_current_time": - if "上海" in user_input or "北京时间" in user_input or "几点" in user_input: - args = {"timezone": "Asia/Shanghai"} - elif tool_name == "count_chars": - import re - # 提取引号内的文本 - match = re.search(r"['\"\u201c\u201d](.+?)['\"\u201c\u201d]", user_input) - text = match.group(1) if match else user_input - args = {"text": text} - elif tool_name == "generate_uuid": - args = {} - - result = await _mcp_session.call_tool(tool_name, args) - # 提取结果文本 - if result.content: - texts = [c.text for c in result.content if hasattr(c, 'text')] - return "\n".join(texts) if texts else str(result) - return str(result) - except Exception as e: - return f"[MCP工具{tool_name}调用错误] {e}" - -async def skill_execute_node(state: AgentState) -> dict: - sname = state.get("active_skill") - if not sname: - return {"skill_output": None} - sk = skills.get(sname) - if not sk: - return {"skill_output": None} - - user_input = "" - for msg in reversed(state["messages"]): - if isinstance(msg, HumanMessage): - user_input = msg.content - break - - # 执行工具 - tool_info = "" - for tname in sk.tools: - for t in local_tools: - if t.name == tname: - try: - if tname == "get_weather": - cities = ["北京", "上海", "深圳", "黄庄"] - city = next((c for c in cities if c in user_input), "北京") - r = await t.ainvoke({"city": city}) - elif tname == "calculate": - import re - expr = re.findall(r'[\d+\-*/(). ]+', user_input) - r = await t.ainvoke({"expression": expr[0].strip() if expr else "1+1"}) - else: - r = await t.ainvoke({"query": user_input}) - tool_info += f"\n工具{tname}结果: {r}" - except Exception as e: - tool_info += f"\n工具{tname}错误: {e}" - - prompt = sk.prompt.format(input=user_input) + tool_info - sk_llm = ChatOpenAI(**LLM_CONFIG, temperature=0.7) - resp = await sk_llm.ainvoke([ - SystemMessage(content=prompt), - HumanMessage(content="请基于以上信息回答。"), - ]) - return {"skill_output": resp.content} - - -# ════════════════════════════════════════════ -# 主 Agent 节点 -# ════════════════════════════════════════════ -SYSTEM_PROMPT = """你是黄庄三号,严肃、认真、听话、聪明的AI助手。你的名字是"黄庄三号",你不是Claude,不是ChatGPT。 - -你具备四种能力: -1. 工具调用(FC) - 调用内置工具获取信息 -2. MCP集成 - 通过MCP协议连接外部服务 -3. 思考模式 - 回答前进行深度思考 -4. 技能系统(Skill) - 调用注册技能完成复杂任务 - -可用技能: -{skill_list} - -重要规则(必须严格遵守): -- 当被问"你是谁",必须回答"我是黄庄三号" -- 当用户问时间/几点,你必须调用get_current_time工具,禁止自己编造时间 -- 当用户要求统计字符/字数,你必须调用count_chars工具,禁止自己统计 -- 当用户要求生成UUID,你必须调用generate_uuid工具,禁止自己编造UUID -- 对于工具能提供的数据,必须调用工具获取,不要自己猜测""" - -# 全局工具列表(MCP加载后会扩展) -all_tools = list(local_tools) - -# MCP session 全局引用(确定性路由时直接调用) -_mcp_session = None - -async def agent_node(state: AgentState) -> dict: - # 防止无限循环 - iteration = state.get("iteration", 0) - - if state.get("skill_output"): - return {"messages": [AIMessage(content=state["skill_output"])]} - - system_content = SYSTEM_PROMPT.format(skill_list=skills.format_list()) - if state.get("thinking"): - # 把思考结果中的工具建议提取出来,作为辅助信息 - thinking = state['thinking'][:300] - # 如果思考中提到了工具名,强调必须调用 - tool_hints = [] - for t in all_tools: - if t.name in thinking: - tool_hints.append(t.name) - if tool_hints: - thinking += f"\n\n[重要:必须调用 {', '.join(tool_hints)} 工具来回答]" - system_content += f"\n\n[内部思考]\n{thinking}" - - messages = [SystemMessage(content=system_content)] - messages.extend(state["messages"]) - - llm_with_tools = llm.bind_tools(all_tools) - resp = await llm_with_tools.ainvoke(messages) - - # 如果已迭代太多次,清除tool_calls强制结束 - if iteration > 5 and hasattr(resp, "tool_calls") and resp.tool_calls: - resp = AIMessage(content=resp.content or "任务完成(已达最大迭代次数)") - - return {"messages": [resp], "iteration": iteration} - - -# ════════════════════════════════════════════ -# 路由逻辑 -# ════════════════════════════════════════════ -def route_from_agent(state: AgentState) -> str: - if state.get("skill_output"): - return "end" - for msg in reversed(state["messages"]): - if isinstance(msg, AIMessage): - if hasattr(msg, "tool_calls") and msg.tool_calls: - return "tools" - break - return "end" - -def route_after_tools(state: AgentState) -> str: - for msg in reversed(state["messages"]): - if isinstance(msg, AIMessage): - if hasattr(msg, "tool_calls") and msg.tool_calls: - return "tools" - break - return "end" - - -# ════════════════════════════════════════════ -# 构建图 -# ════════════════════════════════════════════ -def build_graph(): - g = StateGraph(AgentState) - - g.add_node("think", think_node) - g.add_node("skill_route", skill_route_node) - g.add_node("skill_exec", skill_execute_node) - g.add_node("agent", agent_node) - g.add_node("tools", ToolNode(all_tools)) - - g.add_edge(START, "think") - g.add_edge("think", "skill_route") - g.add_conditional_edges("skill_route", - lambda s: "skill_exec" if s.get("active_skill") else "agent", - {"skill_exec": "skill_exec", "agent": "agent"}) - g.add_edge("skill_exec", "agent") - g.add_conditional_edges("agent", route_from_agent, {"tools": "tools", "end": END}) - # 工具执行后回到agent处理结果(agent再决定是否继续调工具) - g.add_edge("tools", "agent") - - return g.compile() - - -# ════════════════════════════════════════════ -# 能力2: MCP 集成(长连接版) -# ════════════════════════════════════════════ -class MCPManager: - """管理MCP连接的整个生命周期""" - - def __init__(self): - self.exit_stack = AsyncExitStack() - self.session = None - self.tools_loaded = False - - async def connect(self, server_script: str): - """连接MCP服务器并加载工具""" - global all_tools, _mcp_session - - from langchain_mcp_adapters.tools import load_mcp_tools - from mcp.client.stdio import stdio_client, StdioServerParameters - from mcp.client.session import ClientSession - - server_params = StdioServerParameters( - command="python3", - args=[server_script], - ) - - # 使用 AsyncExitStack 保持连接 - read, write = await self.exit_stack.enter_async_context(stdio_client(server_params)) - self.session = await self.exit_stack.enter_async_context(ClientSession(read, write)) - await self.session.initialize() - - # 设置全局session供确定性路由使用 - _mcp_session = self.session - - # 加载工具 - mcp_tools = await load_mcp_tools(self.session) - all_tools.extend(mcp_tools) - self.tools_loaded = True - - print(f" [MCP] 已连接,加载 {len(mcp_tools)} 个工具:") - for t in mcp_tools: - print(f" - {t.name}: {t.description[:50]}") - return mcp_tools - - async def close(self): - """关闭MCP连接""" - await self.exit_stack.aclose() - self.session = None - print(" [MCP] 连接已关闭") - - -# ════════════════════════════════════════════ -# 运行 -# ════════════════════════════════════════════ -async def run_agent(user_input: str, graph=None): - if graph is None: - graph = build_graph() - result = await graph.ainvoke({ - "messages": [HumanMessage(content=user_input)], - "thinking": "", "active_skill": None, "skill_output": None, "iteration": 0, - }) - last = result["messages"][-1] - return { - "reply": last.content if hasattr(last, "content") else str(last), - "thinking": result.get("thinking", ""), - "skill": result.get("active_skill"), - } - -async def interactive_mode(): - print("=" * 60) - print(" 黄庄三号 Agent - 四能力完整版 v3") - print(" FC | MCP | 思考模式 | Skill") - print("=" * 60) - print(" 技能:", [s.name for s in skills.list_skills()]) - print(" 工具:", [t.name for t in all_tools]) - print(" 输入 quit 退出") - print("=" * 60) - - graph = build_graph() - - while True: - try: - user_input = input("\n你> ").strip() - except (EOFError, KeyboardInterrupt): - break - if not user_input: - continue - if user_input.lower() in ("quit", "exit", "q"): - break - - result = await run_agent(user_input, graph) - if result["thinking"]: - print(f"\n[思考] {result['thinking'][:150]}...") - if result["skill"]: - print(f"[技能] {result['skill']}") - print(f"\n黄庄三号> {result['reply']}") - -async def main(): - parser = argparse.ArgumentParser() - parser.add_argument("--mcp", action="store_true", help="启用MCP") - parser.add_argument("--test", action="store_true", help="自动测试") - args = parser.parse_args() - - mcp_mgr = None - - if args.mcp: - print("\n[MCP] 连接服务器...") - mcp_mgr = MCPManager() - mcp_script = os.path.join(os.path.dirname(os.path.abspath(__file__)), "mcp_server.py") - await mcp_mgr.connect(mcp_script) - - print(f"\n 工具总数: {len(all_tools)}") - print(f" 技能总数: {len(skills.list_skills())}") - - graph = build_graph() - - if args.test: - tests = [ - ("FC+思考+Skill", "黄庄天气怎么样?"), - ("FC+Skill", "算一下 99*88+77"), - ("知识搜索", "MCP是什么?"), - ("身份", "你好你是谁?"), - ] - - # MCP相关测试 - if args.mcp: - tests.extend([ - ("MCP:时间", "现在几点了?"), - ("MCP:字符统计", "统计'黄庄三号是AI助手'的字符数"), - ("MCP:UUID", "生成一个UUID"), - ]) - - for label, query in tests: - print(f"\n{'─'*55}") - print(f"[测试:{label}] {query}") - r = await run_agent(query, graph) - print(f" 思考: {r['thinking'][:80]}...") - print(f" 技能: {r['skill']}") - print(f" 回复: {r['reply'][:150]}...") - - print(f"\n{'='*60}") - print(" 验证完成!") - cap = ["FC ✅", "思考 ✅", "Skill ✅"] - if args.mcp: - cap.append("MCP ✅") - print(" " + " | ".join(cap)) - print("=" * 60) - else: - await interactive_mode() - - # 清理 - if mcp_mgr: - await mcp_mgr.close() - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..2bafbbd --- /dev/null +++ b/config.yaml @@ -0,0 +1,35 @@ +# 黄庄三号四能力Agent 配置文件 +# ================================ +# 新增MCP服务器、调整路由关键词、修改模型参数都在这里 + +# 模型配置 +llm: + base_url: "https://open.bigmodel.cn/api/paas/v4" + api_key: "2259e33a1357460abe17919aaf81e73d.K44a8LPQTmFM5PKm" + model: "glm-4.5-air" + +# Agent行为参数 +agent: + max_iterations: 5 # 最大迭代次数(防死循环) + think_temperature: 0.3 # 思考节点温度 + skill_temperature: 0.7 # 技能节点温度 + +# MCP服务器配置 +# 新增MCP服务器:在列表中添加一项即可 +mcp_servers: + - name: "hz3-tools" + command: "python3" + args: ["mcp_server.py"] + transport: "stdio" + # 路由关键词:匹配到就通过MCP直接调用,不依赖模型决策 + route_keywords: + get_current_time: ["几点", "时间", "现在几点", "当前时间"] + count_chars: ["统计字符", "字符数", "字数统计", "统计文本"] + generate_uuid: ["生成uuid", "UUID", "uuid", "生成一个UUID"] + +# Skill路由关键词 +# 技能定义在 skills/ 目录的yaml文件中,这里配置路由关键词 +skill_keywords: + weather_analyst: ["天气", "出行", "穿什么"] + math_tutor: ["计算", "算", "数学", "等于多少"] + knowledge_explorer: ["是什么", "解释", "了解"] diff --git a/skills/knowledge_explorer.yaml b/skills/knowledge_explorer.yaml new file mode 100644 index 0000000..9f61fcf --- /dev/null +++ b/skills/knowledge_explorer.yaml @@ -0,0 +1,10 @@ +# 知识探索技能 +name: knowledge_explorer +description: "知识探索 - 搜索并深入解释知识" +prompt: | + 你是一个知识探索者。请搜索以下主题,然后给出深入浅出的解释: + 主题:{input} + + 要求:先搜索,然后综合信息给出结构化的解释。 +tools: + - search_knowledge diff --git a/skills/math_tutor.yaml b/skills/math_tutor.yaml new file mode 100644 index 0000000..006f898 --- /dev/null +++ b/skills/math_tutor.yaml @@ -0,0 +1,8 @@ +# 数学辅导技能 +name: math_tutor +description: "数学辅导 - 计算并解释数学问题" +prompt: | + 你是一个耐心的数学老师。请解答以下问题,先给出计算过程,再解释原理: + 问题:{input} +tools: + - calculate diff --git a/skills/weather_analyst.yaml b/skills/weather_analyst.yaml new file mode 100644 index 0000000..77b70a2 --- /dev/null +++ b/skills/weather_analyst.yaml @@ -0,0 +1,10 @@ +# 天气分析师技能 +name: weather_analyst +description: "天气分析师 - 查询天气并给出出行建议" +prompt: | + 你是一个专业的天气分析师。请根据以下天气信息给出详细的出行建议: + 天气信息:{input} + + 请从穿衣、出行、活动安排等方面给出建议。 +tools: + - get_weather diff --git a/step1_basic_fc.py b/step1_basic_fc.py deleted file mode 100644 index 92dba00..0000000 --- a/step1_basic_fc.py +++ /dev/null @@ -1,86 +0,0 @@ -""" -Step 1: 最简单的 LangGraph Agent + GLM-4.5-air + 工具调用 -只验证核心能力:Function Call -""" -import os -from langchain_openai import ChatOpenAI -from langchain_core.tools import tool -from langgraph.prebuilt import create_react_agent - -# ── 模型配置 ── -llm = ChatOpenAI( - base_url="https://open.bigmodel.cn/api/paas/v4", - api_key="2259e33a1357460abe17919aaf81e73d.K44a8LPQTmFM5PKm", - model="glm-4.5-air", -) - -# ── 定义工具 ── -@tool -def get_weather(city: str) -> str: - """查询指定城市的天气信息""" - # 模拟天气数据 - weather_data = { - "北京": "晴天,气温22°C,北风3级", - "上海": "多云,气温25°C,东风2级", - "深圳": "阵雨,气温28°C,南风4级", - "黄庄": "晴转多云,气温23°C,微风", - } - return weather_data.get(city, f"暂无{city}的天气数据") - -@tool -def calculate(expression: str) -> str: - """计算数学表达式,输入如 '2+3*4'""" - try: - result = eval(expression, {"__builtins__": {}}, {}) - return f"计算结果: {expression} = {result}" - except Exception as e: - return f"计算错误: {e}" - -@tool -def search_knowledge(query: str) -> str: - """搜索知识库(模拟)""" - kb = { - "黄庄三号": "黄庄三号是AI助手,定位为严肃、认真、听话、聪明的AI助手", - "LangGraph": "LangGraph是LangChain团队推出的Agent框架,支持状态图、循环、持久化", - "MCP": "MCP是Model Context Protocol,AI工具互操作标准协议", - } - for key, val in kb.items(): - if key in query: - return val - return f"知识库中未找到关于'{query}'的信息" - -# ── 创建 Agent ── -tools = [get_weather, calculate, search_knowledge] -agent = create_react_agent(llm, tools) - -# ── 运行测试 ── -if __name__ == "__main__": - import asyncio - - async def test(): - print("=" * 50) - print("Step 1: LangGraph + GLM-4.5-air + FC 工具调用") - print("=" * 50) - - # 测试1: 天气查询 - print("\n[测试1] 天气查询") - result = await agent.ainvoke({"messages": [("user", "黄庄今天天气怎么样?")]}) - last_msg = result["messages"][-1] - print(f"回复: {last_msg.content}") - - # 测试2: 数学计算 - print("\n[测试2] 数学计算") - result = await agent.ainvoke({"messages": [("user", "帮我算一下 123 * 456 + 789")]}) - last_msg = result["messages"][-1] - print(f"回复: {last_msg.content}") - - # 测试3: 知识搜索 - print("\n[测试3] 知识搜索") - result = await agent.ainvoke({"messages": [("user", "LangGraph是什么?")]}) - last_msg = result["messages"][-1] - print(f"回复: {last_msg.content}") - - print("\n" + "=" * 50) - print("Step 1 完成!FC 工具调用正常工作") - - asyncio.run(test()) diff --git a/tools/__init__.py b/tools/__init__.py new file mode 100644 index 0000000..811adfa --- /dev/null +++ b/tools/__init__.py @@ -0,0 +1,2 @@ +# tools/ 目录的 __init__.py +# 自动扫描注册所有工具模块 diff --git a/tools/calculator.py b/tools/calculator.py new file mode 100644 index 0000000..cb7739d --- /dev/null +++ b/tools/calculator.py @@ -0,0 +1,15 @@ +"""计算工具""" +from langchain_core.tools import tool + + +@tool +def calculate(expression: str) -> str: + """计算数学表达式,输入如 '2+3*4'""" + try: + result = eval(expression, {"__builtins__": {}}, {}) + return f"计算结果: {expression} = {result}" + except Exception as e: + return f"计算错误: {e}" + + +TOOLS = [calculate] diff --git a/tools/knowledge.py b/tools/knowledge.py new file mode 100644 index 0000000..4775376 --- /dev/null +++ b/tools/knowledge.py @@ -0,0 +1,19 @@ +"""知识库搜索工具""" +from langchain_core.tools import tool + + +@tool +def search_knowledge(query: str) -> str: + """搜索知识库(模拟)""" + kb = { + "黄庄三号": "黄庄三号是AI助手,严肃认真听话聪明", + "LangGraph": "LangGraph是Agent框架,支持状态图、循环、持久化", + "MCP": "MCP是Model Context Protocol,AI工具互操作标准协议", + } + for key, val in kb.items(): + if key in query: + return val + return f"未找到关于'{query}'的信息" + + +TOOLS = [search_knowledge] diff --git a/tools/weather.py b/tools/weather.py new file mode 100644 index 0000000..e43f149 --- /dev/null +++ b/tools/weather.py @@ -0,0 +1,18 @@ +"""天气查询工具""" +from langchain_core.tools import tool + + +@tool +def get_weather(city: str) -> str: + """查询指定城市的天气信息""" + weather_data = { + "北京": "晴天,气温22°C,北风3级", + "上海": "多云,气温25°C,东风2级", + "深圳": "阵雨,气温28°C,南风4级", + "黄庄": "晴转多云,气温23°C,微风", + } + return weather_data.get(city, f"暂无{city}的天气数据") + + +# 暴露工具列表供自动扫描 +TOOLS = [get_weather]