feat: v2.0 配置驱动版 - 新增工具/技能/MCP无需改源码

重构内容:
- agent.py 替代 agent_v3.py,所有配置从 config.yaml 加载
- tools/ 目录自动扫描,丢.py文件即注册新工具
- skills/ 目录自动扫描,丢.yaml文件即注册新技能
- config.yaml 统一管理模型参数、MCP服务器、路由关键词
- MCP支持多服务器配置 + 确定性路由关键词
- 删除旧版 step1_basic_fc.py 和 agent_v3.py
This commit is contained in:
2026-04-23 23:16:08 +08:00
parent 451e9a12ed
commit 1c42ba0812
12 changed files with 848 additions and 643 deletions

155
README.md
View File

@@ -1,6 +1,8 @@
# 黄庄三号四能力 Agent
基于 LangGraph 的多能力 AI Agent集成 FC/MCP/思考模式/Skill 四种核心能力。
基于 LangGraph 的多能力 AI Agent集成 FC / MCP / 思考模式 / Skill 四种核心能力。
**v2.0 配置驱动版** — 新增工具/技能/MCP服务器无需改源码丢文件或改配置即可。
## 四项能力
@@ -11,46 +13,107 @@
| 思考模式 | 自建 think_node + CoT 推理链 | ★★★★ |
| Skill | 自建 SkillRegistry 注册机制 | ★★★★ |
## 模型
## 目录结构
GLM-4.5-air智谱OpenAI 兼容接口)
```
hz3-agent/
├── config.yaml ← 一个文件管所有配置模型、MCP、路由关键词
├── agent.py ← Agent 主程序,从配置加载
├── mcp_server.py ← MCP 服务器(示例:时间/字符统计/UUID
├── tools/ ← 工具目录,丢 .py 文件即注册
│ ├── weather.py
│ ├── calculator.py
│ └── knowledge.py
├── skills/ ← 技能目录,丢 .yaml 文件即注册
│ ├── weather_analyst.yaml
│ ├── math_tutor.yaml
│ └── knowledge_explorer.yaml
└── README.md
```
## 快速开始
```bash
cd /path/to/hz3-agent
cd hz3-agent
# 自动测试不带MCP
python3 agent_v3.py --test
python3 agent.py --test
# 自动测试带MCP
python3 agent_v3.py --mcp --test
python3 agent.py --mcp --test
# 交互模式
python3 agent_v3.py --mcp
# 交互模式带MCP
python3 agent.py --mcp
# 交互模式不带MCP
python3 agent.py
```
## 文件说明
## 新增工具
| 文件 | 说明 |
|------|------|
| `agent_v3.py` | Agent 主程序(四能力完整版) |
| `mcp_server.py` | MCP 服务器(示例工具:时间/字符统计/UUID |
| `step1_basic_fc.py` | Step1 基础 FC 验证 |
## 代码调用
`tools/` 目录下创建 .py 文件,暴露 `TOOLS` 列表即可:
```python
from agent_v3 import run_agent, build_graph
# tools/stock.py
from langchain_core.tools import tool
graph = build_graph()
result = await run_agent("黄庄天气怎么样?", graph)
@tool
def get_stock_price(symbol: str) -> str:
"""查询股票价格"""
return f"{symbol}: 当前价格 123.45"
print(result["reply"]) # 回复
print(result["thinking"]) # 思考过程
print(result["skill"]) # 使用的技能
TOOLS = [get_stock_price] # 必须!供自动扫描
```
重启 Agent 即自动加载,无需改任何其他文件。
## 新增技能
`skills/` 目录下创建 .yaml 文件:
```yaml
# skills/stock_analyst.yaml
name: stock_analyst
description: "股票分析师 - 查询股价并给出分析"
prompt: |
你是股票分析师。根据以下行情给出分析建议:
行情数据:{input}
tools:
- get_stock_price
```
然后在 `config.yaml``skill_keywords` 中添加路由关键词:
```yaml
skill_keywords:
stock_analyst: ["股价", "股票", "行情"]
```
## 新增 MCP 服务器
`config.yaml``mcp_servers` 列表中添加一项:
```yaml
mcp_servers:
- name: "hz3-tools" # 已有的
command: "python3"
args: ["mcp_server.py"]
transport: "stdio"
route_keywords:
get_current_time: ["几点", "时间"]
count_chars: ["统计字符", "字符数"]
generate_uuid: ["UUID"]
- name: "my-new-server" # 新增的
command: "python3"
args: ["path/to/my_server.py"]
transport: "stdio"
route_keywords:
my_tool: ["关键词1", "关键词2"]
```
启动时加 `--mcp` 参数即可连接所有配置的 MCP 服务器。
## 架构流程
```
@@ -73,30 +136,36 @@ print(result["skill"]) # 使用的技能
回到agent
```
## MCP 服务器
`mcp_server.py` 基于 FastMCP 提供 3 个示例工具:
- `get_current_time` - 获取当前时间
- `count_chars` - 统计文本字符数
- `generate_uuid` - 生成随机 UUID
## Skill 系统
已注册 3 个示例技能,扩展只需一行:
## 代码调用
```python
skills.register(SkillDef(
name="新技能",
description="技能描述",
prompt="提示词模板,{input}为占位符",
tools=["依赖的工具名"]
))
from agent import run_agent, build_graph, load_config, scan_tools, scan_skills
config = load_config()
tools = scan_tools()
skills = scan_skills()
graph = await build_graph(config, skills, None, tools)
result = await run_agent("黄庄天气怎么样?", graph)
print(result["reply"]) # 回复
print(result["thinking"]) # 思考过程
print(result["skill"]) # 使用的技能
```
## 关键设计决策
1. **MCP 确定性路由**关键词匹配后直接 session.call_tool(),绕过模型不调工具的问题
2. **思考结果合并**与 system prompt 合并为单条 SystemMessage避免干扰工具调用
3. **AsyncExitStack 长连接**MCP session 在 Agent 生命周期内保持,退出时统一关闭
4. **迭代保护**agent 节点迭代超过 5 次强制结束,防止无限循环
1. **MCP 确定性路由**关键词匹配后直接 session.call_tool(),绕过模型不调工具的问题
2. **思考结果合并**与 system prompt 合并为单条 SystemMessage避免干扰工具调用
3. **AsyncExitStack 长连接**MCP session 在 Agent 生命周期内保持
4. **迭代保护**agent 节点迭代超过阈值强制结束,防循环
5. **配置驱动** — 所有新增/调整通过 config.yaml + 丢文件完成,不改源码
## 依赖
```
langgraph>=1.1
langchain-openai>=1.2
langchain-mcp-adapters>=0.2
mcp>=1.0
pyyaml>=6.0
```

619
agent.py Normal file
View File

@@ -0,0 +1,619 @@
"""
黄庄三号 Agent v2.0 - 配置驱动版
==================================
所有工具、技能、MCP服务器、路由关键词均从配置加载
新增:丢文件到 tools/ 或 skills/ 即可,不改源码
运行方式:
python3 agent.py --test 自动测试
python3 agent.py --mcp --test 带MCP测试
python3 agent.py --mcp 交互模式(带MCP)
python3 agent.py 交互模式(不带MCP)
"""
import os
import sys
import re
import asyncio
import argparse
import importlib.util
from typing import Annotated
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from contextlib import AsyncExitStack
from pathlib import Path
import yaml
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
# ════════════════════════════════════════════
# 基础路径
# ════════════════════════════════════════════
BASE_DIR = Path(__file__).parent.resolve()
CONFIG_PATH = BASE_DIR / "config.yaml"
TOOLS_DIR = BASE_DIR / "tools"
SKILLS_DIR = BASE_DIR / "skills"
# ════════════════════════════════════════════
# 配置加载
# ════════════════════════════════════════════
def load_config() -> dict:
"""加载 config.yaml"""
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
# ════════════════════════════════════════════
# 工具自动扫描注册
# ════════════════════════════════════════════
def scan_tools() -> list:
"""
扫描 tools/ 目录下所有 .py 文件
每个文件必须暴露 TOOLS 列表
"""
all_tools = []
if not TOOLS_DIR.exists():
print(" [工具] tools/ 目录不存在,跳过")
return all_tools
for py_file in sorted(TOOLS_DIR.glob("*.py")):
if py_file.name.startswith("_"):
continue
try:
spec = importlib.util.spec_from_file_location(
f"tools.{py_file.stem}", str(py_file)
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
if hasattr(mod, "TOOLS"):
tool_list = mod.TOOLS
all_tools.extend(tool_list)
print(f" [工具] {py_file.name}: 加载 {len(tool_list)} 个 -> {[t.name for t in tool_list]}")
else:
print(f" [工具] {py_file.name}: 无 TOOLS 变量,跳过")
except Exception as e:
print(f" [工具] {py_file.name}: 加载失败 -> {e}")
return all_tools
# ════════════════════════════════════════════
# 技能自动扫描注册
# ════════════════════════════════════════════
class SkillDef(BaseModel):
name: str
description: str
prompt: str
tools: list[str] = []
class SkillRegistry:
def __init__(self):
self._skills: dict[str, SkillDef] = {}
def register(self, skill: SkillDef):
self._skills[skill.name] = skill
def get(self, name: str) -> SkillDef | None:
return self._skills.get(name)
def list_skills(self) -> list[SkillDef]:
return list(self._skills.values())
def format_list(self) -> str:
return "\n".join(f" - {s.name}: {s.description}" for s in self._skills.values())
def scan_skills() -> SkillRegistry:
"""
扫描 skills/ 目录下所有 .yaml 文件
每个文件定义一个技能
"""
registry = SkillRegistry()
if not SKILLS_DIR.exists():
print(" [技能] skills/ 目录不存在,跳过")
return registry
for yaml_file in sorted(SKILLS_DIR.glob("*.yaml")):
try:
with open(yaml_file, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
skill = SkillDef(
name=data["name"],
description=data.get("description", ""),
prompt=data.get("prompt", ""),
tools=data.get("tools", []),
)
registry.register(skill)
print(f" [技能] {yaml_file.name}: {skill.name}")
except Exception as e:
print(f" [技能] {yaml_file.name}: 加载失败 -> {e}")
return registry
# ════════════════════════════════════════════
# MCP 管理器(配置驱动,支持多服务器)
# ════════════════════════════════════════════
class MCPManager:
"""管理多个MCP服务器连接"""
def __init__(self):
self.exit_stack = AsyncExitStack()
self.sessions: dict[str, object] = {} # server_name -> session
self.route_map: dict[str, tuple] = {} # keyword -> (session, tool_name)
self.mcp_tools: list = []
async def connect_all(self, mcp_configs: list[dict]):
"""根据配置连接所有MCP服务器"""
global all_tools
from langchain_mcp_adapters.tools import load_mcp_tools
from mcp.client.stdio import stdio_client, StdioServerParameters
from mcp.client.session import ClientSession
for srv in mcp_configs:
name = srv["name"]
try:
server_params = StdioServerParameters(
command=srv["command"],
args=srv.get("args", []),
)
read, write = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
session = await self.exit_stack.enter_async_context(
ClientSession(read, write)
)
await session.initialize()
# 加载工具
mcp_tools = await load_mcp_tools(session)
self.mcp_tools.extend(mcp_tools)
self.sessions[name] = session
# 注册路由关键词
route_kw = srv.get("route_keywords", {})
for tool_name, keywords in route_kw.items():
for kw in keywords:
self.route_map[kw] = (session, tool_name)
print(f" [MCP] {name}: 已连接,加载 {len(mcp_tools)} 个工具")
for t in mcp_tools:
print(f" - {t.name}: {t.description[:50]}")
if route_kw:
print(f" 路由关键词: {list(route_kw.keys())}")
except Exception as e:
print(f" [MCP] {name}: 连接失败 -> {e}")
# 把MCP工具加入全局列表供LLM bind_tools用
all_tools.extend(self.mcp_tools)
def match_route(self, user_input: str) -> tuple | None:
"""关键词匹配MCP路由返回 (session, tool_name) 或 None"""
for keyword, (session, tool_name) in self.route_map.items():
if keyword in user_input:
return (session, tool_name)
return None
async def call_tool(self, session, tool_name: str, user_input: str) -> str:
"""通过MCP session调用工具"""
try:
# 简单参数解析
args = {}
# 从MCP工具列表中查找工具信息
for t in self.mcp_tools:
if t.name == tool_name:
# MCP工具的 args_schema 可能是 dict 或 Pydantic model
schema = getattr(t, "args_schema", None)
if schema:
if isinstance(schema, dict):
schema_fields = schema.get("properties", {})
elif hasattr(schema, "model_json_schema"):
schema_fields = schema.model_json_schema().get("properties", {})
else:
schema_fields = {}
args = _parse_tool_args(tool_name, schema_fields, user_input)
break
result = await session.call_tool(tool_name, args)
if result.content:
texts = [c.text for c in result.content if hasattr(c, "text")]
return "\n".join(texts) if texts else str(result)
return str(result)
except Exception as e:
return f"[MCP工具{tool_name}调用错误] {e}"
async def close(self):
await self.exit_stack.aclose()
self.sessions.clear()
print(" [MCP] 所有连接已关闭")
def _parse_tool_args(tool_name: str, schema_fields: dict, user_input: str) -> dict:
"""根据工具参数schema从用户输入中解析参数"""
args = {}
for field_name, field_info in schema_fields.items():
if field_name in ("timezone",):
args[field_name] = "Asia/Shanghai"
elif field_name in ("text",):
# 提取引号内的文本
match = re.search(r"['\"\u201c\u201d](.+?)['\"\u201c\u201d]", user_input)
args[field_name] = match.group(1) if match else user_input
elif field_name in ("city",):
args[field_name] = user_input
return args
# ════════════════════════════════════════════
# Agent 状态
# ════════════════════════════════════════════
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
thinking: str
active_skill: str | None
skill_output: str | None
iteration: int
# ════════════════════════════════════════════
# LangGraph 节点
# ════════════════════════════════════════════
# --- 思考节点 ---
async def make_think_node(config, skills_reg, tools_list):
llm_cfg = config["llm"]
agent_cfg = config.get("agent", {})
temp = agent_cfg.get("think_temperature", 0.3)
async def think_node(state: AgentState) -> dict:
iteration = state.get("iteration", 0) + 1
if iteration > 3:
return {"iteration": iteration, "thinking": "(快速模式)"}
conv = []
for msg in state["messages"][-4:]:
role = "用户" if isinstance(msg, HumanMessage) else "AI"
conv.append(f"{role}: {msg.content[:150]}")
tool_names = [t.name for t in tools_list]
think_llm = ChatOpenAI(
base_url=llm_cfg["base_url"],
api_key=llm_cfg["api_key"],
model=llm_cfg["model"],
temperature=temp,
)
resp = await think_llm.ainvoke([
SystemMessage(content="你是思考模块。简洁输出:用户意图、需要的工具/技能、注意事项。不要说没有工具。"),
HumanMessage(content=f"对话:\n{chr(10).join(conv)}\n\n可用技能:\n{skills_reg.format_list()}\n\n可用工具: {', '.join(tool_names)}"),
])
return {"iteration": iteration, "thinking": resp.content}
return think_node
# --- 技能路由节点 ---
async def make_skill_route_node(config, skills_reg, mcp_mgr):
skill_keywords = config.get("skill_keywords", {})
async def skill_route_node(state: AgentState) -> dict:
user_input = ""
for msg in reversed(state["messages"]):
if isinstance(msg, HumanMessage):
user_input = msg.content
break
# 1. MCP确定性路由优先
if mcp_mgr:
route = mcp_mgr.match_route(user_input)
if route:
session, tool_name = route
mcp_result = await mcp_mgr.call_tool(session, tool_name, user_input)
return {"active_skill": None, "skill_output": mcp_result}
# 2. Skill关键词路由
for sname, keywords in skill_keywords.items():
if any(kw in user_input for kw in keywords):
if skills_reg.get(sname):
return {"active_skill": sname, "skill_output": None}
return {"active_skill": None, "skill_output": None}
return skill_route_node
# --- 技能执行节点 ---
async def make_skill_exec_node(config, skills_reg, tools_list):
llm_cfg = config["llm"]
agent_cfg = config.get("agent", {})
temp = agent_cfg.get("skill_temperature", 0.7)
async def skill_execute_node(state: AgentState) -> dict:
sname = state.get("active_skill")
if not sname:
return {"skill_output": None}
sk = skills_reg.get(sname)
if not sk:
return {"skill_output": None}
user_input = ""
for msg in reversed(state["messages"]):
if isinstance(msg, HumanMessage):
user_input = msg.content
break
# 执行依赖的本地工具
tool_info = ""
for tname in sk.tools:
for t in tools_list:
if t.name == tname:
try:
if tname == "get_weather":
cities = ["北京", "上海", "深圳", "黄庄"]
city = next((c for c in cities if c in user_input), "北京")
r = await t.ainvoke({"city": city})
elif tname == "calculate":
expr = re.findall(r'[\d+\-*/(). ]+', user_input)
r = await t.ainvoke({"expression": expr[0].strip() if expr else "1+1"})
else:
r = await t.ainvoke({"query": user_input})
tool_info += f"\n工具{tname}结果: {r}"
except Exception as e:
tool_info += f"\n工具{tname}错误: {e}"
prompt = sk.prompt.format(input=user_input) + tool_info
sk_llm = ChatOpenAI(
base_url=llm_cfg["base_url"],
api_key=llm_cfg["api_key"],
model=llm_cfg["model"],
temperature=temp,
)
resp = await sk_llm.ainvoke([
SystemMessage(content=prompt),
HumanMessage(content="请基于以上信息回答。"),
])
return {"skill_output": resp.content}
return skill_execute_node
# --- Agent主节点 ---
async def make_agent_node(config, skills_reg, tools_list):
llm_cfg = config["llm"]
agent_cfg = config.get("agent", {})
max_iter = agent_cfg.get("max_iterations", 5)
SYSTEM_PROMPT = """你是黄庄三号严肃、认真、听话、聪明的AI助手。你的名字是"黄庄三号"你不是Claude不是ChatGPT。
你具备四种能力:
1. 工具调用(FC) - 调用内置工具获取信息
2. MCP集成 - 通过MCP协议连接外部服务
3. 思考模式 - 回答前进行深度思考
4. 技能系统(Skill) - 调用注册技能完成复杂任务
可用技能:
{skill_list}
重要规则(必须严格遵守):
- 当被问"你是谁",必须回答"我是黄庄三号"
- 对于工具能提供的数据,必须调用工具获取,不要自己猜测"""
async def agent_node(state: AgentState) -> dict:
iteration = state.get("iteration", 0)
if state.get("skill_output"):
return {"messages": [AIMessage(content=state["skill_output"])]}
system_content = SYSTEM_PROMPT.format(skill_list=skills_reg.format_list())
if state.get("thinking"):
thinking = state["thinking"][:300]
# 如果思考中提到了工具名,强调必须调用
tool_hints = [t.name for t in tools_list if t.name in thinking]
if tool_hints:
thinking += f"\n\n[重要:必须调用 {', '.join(tool_hints)} 工具来回答]"
system_content += f"\n\n[内部思考]\n{thinking}"
messages = [SystemMessage(content=system_content)]
messages.extend(state["messages"])
llm = ChatOpenAI(
base_url=llm_cfg["base_url"],
api_key=llm_cfg["api_key"],
model=llm_cfg["model"],
)
llm_with_tools = llm.bind_tools(tools_list)
resp = await llm_with_tools.ainvoke(messages)
# 迭代保护
if iteration > max_iter and hasattr(resp, "tool_calls") and resp.tool_calls:
resp = AIMessage(content=resp.content or "任务完成(已达最大迭代次数)")
return {"messages": [resp], "iteration": iteration}
return agent_node
# --- 路由函数 ---
def route_from_agent(state: AgentState) -> str:
if state.get("skill_output"):
return "end"
for msg in reversed(state["messages"]):
if isinstance(msg, AIMessage):
if hasattr(msg, "tool_calls") and msg.tool_calls:
return "tools"
break
return "end"
# ════════════════════════════════════════════
# 构建图
# ════════════════════════════════════════════
async def build_graph(config, skills_reg, mcp_mgr, tools_list):
think_node = await make_think_node(config, skills_reg, tools_list)
skill_route_node = await make_skill_route_node(config, skills_reg, mcp_mgr)
skill_exec_node = await make_skill_exec_node(config, skills_reg, tools_list)
agent_node = await make_agent_node(config, skills_reg, tools_list)
g = StateGraph(AgentState)
g.add_node("think", think_node)
g.add_node("skill_route", skill_route_node)
g.add_node("skill_exec", skill_exec_node)
g.add_node("agent", agent_node)
g.add_node("tools", ToolNode(tools_list))
g.add_edge(START, "think")
g.add_edge("think", "skill_route")
g.add_conditional_edges("skill_route",
lambda s: "skill_exec" if s.get("active_skill") else "agent",
{"skill_exec": "skill_exec", "agent": "agent"})
g.add_edge("skill_exec", "agent")
g.add_conditional_edges("agent", route_from_agent, {"tools": "tools", "end": END})
g.add_edge("tools", "agent")
return g.compile()
# ════════════════════════════════════════════
# 运行入口
# ════════════════════════════════════════════
async def run_agent(user_input: str, graph):
result = await graph.ainvoke({
"messages": [HumanMessage(content=user_input)],
"thinking": "", "active_skill": None, "skill_output": None, "iteration": 0,
})
last = result["messages"][-1]
return {
"reply": last.content if hasattr(last, "content") else str(last),
"thinking": result.get("thinking", ""),
"skill": result.get("active_skill"),
}
async def interactive_mode(graph):
print("=" * 60)
print(" 黄庄三号 Agent v2.0 - 配置驱动版")
print(" FC | MCP | 思考模式 | Skill")
print("=" * 60)
print(" 技能:", [s.name for s in skills_registry.list_skills()])
print(" 工具:", [t.name for t in all_tools])
print(" 输入 quit 退出")
print("=" * 60)
while True:
try:
user_input = input("\n你> ").strip()
except (EOFError, KeyboardInterrupt):
break
if not user_input:
continue
if user_input.lower() in ("quit", "exit", "q"):
break
result = await run_agent(user_input, graph)
if result["thinking"]:
print(f"\n[思考] {result['thinking'][:150]}...")
if result["skill"]:
print(f"[技能] {result['skill']}")
print(f"\n黄庄三号> {result['reply']}")
# ════════════════════════════════════════════
# 全局变量(由 main 初始化)
# ════════════════════════════════════════════
all_tools = []
skills_registry = SkillRegistry()
mcp_manager = None
async def main():
global all_tools, skills_registry, mcp_manager
parser = argparse.ArgumentParser(description="黄庄三号 Agent v2.0")
parser.add_argument("--mcp", action="store_true", help="启用MCP")
parser.add_argument("--test", action="store_true", help="自动测试")
args = parser.parse_args()
print("=" * 60)
print(" 黄庄三号 Agent v2.0 - 配置驱动版")
print("=" * 60)
# ── 加载配置 ──
print("\n[配置] 加载 config.yaml ...")
config = load_config()
print(f" 模型: {config['llm']['model']}")
print(f" MCP服务器: {len(config.get('mcp_servers', []))}")
print(f" 技能关键词: {len(config.get('skill_keywords', {}))}")
# ── 扫描工具 ──
print("\n[工具] 扫描 tools/ 目录 ...")
all_tools = scan_tools()
print(f" 工具总数: {len(all_tools)}")
# ── 扫描技能 ──
print("\n[技能] 扫描 skills/ 目录 ...")
skills_registry = scan_skills()
print(f" 技能总数: {len(skills_registry.list_skills())}")
# ── 连接MCP ──
if args.mcp and config.get("mcp_servers"):
print("\n[MCP] 连接服务器 ...")
mcp_manager = MCPManager()
await mcp_manager.connect_all(config["mcp_servers"])
print(f" MCP工具总数: {len(mcp_manager.mcp_tools)}")
print(f"\n 全部工具总数: {len(all_tools)}")
# ── 构建图 ──
graph = await build_graph(config, skills_registry, mcp_manager, all_tools)
if args.test:
# 自动测试
tests = [
("FC+思考+Skill", "黄庄天气怎么样?"),
("FC+Skill", "算一下 99*88+77"),
("知识搜索", "MCP是什么"),
("身份", "你好你是谁?"),
]
if args.mcp and mcp_manager:
tests.extend([
("MCP:时间", "现在几点了?"),
("MCP:字符统计", "统计'黄庄三号是AI助手'的字符数"),
("MCP:UUID", "生成一个UUID"),
])
for label, query in tests:
print(f"\n{''*55}")
print(f"[测试:{label}] {query}")
r = await run_agent(query, graph)
print(f" 思考: {r['thinking'][:80]}...")
print(f" 技能: {r['skill']}")
print(f" 回复: {r['reply'][:150]}...")
print(f"\n{'='*60}")
print(" 验证完成!")
caps = ["FC", "思考", "Skill"]
if args.mcp:
caps.append("MCP")
print(" " + " ✅ | ".join(caps) + "")
print("=" * 60)
else:
await interactive_mode(graph)
# ── 清理 ──
if mcp_manager:
await mcp_manager.close()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,514 +0,0 @@
"""
黄庄三号 Agent - 四能力完整版 v3
==================================
能力1: Function Call - LangGraph 工具节点
能力2: MCP - langchain-mcp-adapters + 长连接
能力3: 思考模式 - 反思节点 + CoT 推理链
能力4: Skill - 自建技能注册机制
关键修正MCP session 在 Agent 整个生命周期内保持连接
"""
import os
import sys
import json
import asyncio
import argparse
from typing import Annotated
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from contextlib import AsyncExitStack
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
# ════════════════════════════════════════════
# 模型配置
# ════════════════════════════════════════════
LLM_CONFIG = {
"base_url": "https://open.bigmodel.cn/api/paas/v4",
"api_key": "2259e33a1357460abe17919aaf81e73d.K44a8LPQTmFM5PKm",
"model": "glm-4.5-air",
}
llm = ChatOpenAI(**LLM_CONFIG)
# ════════════════════════════════════════════
# 能力1: FC 本地工具
# ════════════════════════════════════════════
@tool
def get_weather(city: str) -> str:
"""查询指定城市的天气信息"""
weather_data = {
"北京": "晴天气温22°C北风3级",
"上海": "多云气温25°C东风2级",
"深圳": "阵雨气温28°C南风4级",
"黄庄": "晴转多云气温23°C微风",
}
return weather_data.get(city, f"暂无{city}的天气数据")
@tool
def calculate(expression: str) -> str:
"""计算数学表达式,输入如 '2+3*4'"""
try:
result = eval(expression, {"__builtins__": {}}, {})
return f"计算结果: {expression} = {result}"
except Exception as e:
return f"计算错误: {e}"
@tool
def search_knowledge(query: str) -> str:
"""搜索知识库(模拟)"""
kb = {
"黄庄三号": "黄庄三号是AI助手严肃认真听话聪明",
"LangGraph": "LangGraph是Agent框架支持状态图、循环、持久化",
"MCP": "MCP是Model Context ProtocolAI工具互操作标准协议",
}
for key, val in kb.items():
if key in query:
return val
return f"未找到关于'{query}'的信息"
local_tools = [get_weather, calculate, search_knowledge]
# ════════════════════════════════════════════
# 能力4: Skill 技能系统
# ════════════════════════════════════════════
class SkillDef(BaseModel):
name: str
description: str
prompt: str
tools: list[str] = []
class SkillRegistry:
def __init__(self):
self._skills: dict[str, SkillDef] = {}
def register(self, skill: SkillDef):
self._skills[skill.name] = skill
def get(self, name: str) -> SkillDef | None:
return self._skills.get(name)
def list_skills(self) -> list[SkillDef]:
return list(self._skills.values())
def format_list(self) -> str:
return "\n".join(f" - {s.name}: {s.description}" for s in self._skills.values())
skills = SkillRegistry()
skills.register(SkillDef(name="weather_analyst", description="天气分析师-查询天气给建议",
prompt="你是天气分析师。根据天气信息给出出行建议。\n天气:{input}", tools=["get_weather"]))
skills.register(SkillDef(name="math_tutor", description="数学辅导-计算并解释",
prompt="你是数学老师。解答并解释:{input}", tools=["calculate"]))
skills.register(SkillDef(name="knowledge_explorer", description="知识探索-搜索解释知识",
prompt="你是知识探索者。搜索并深入解释:{input}", tools=["search_knowledge"]))
# ════════════════════════════════════════════
# Agent 状态
# ════════════════════════════════════════════
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
thinking: str
active_skill: str | None
skill_output: str | None
iteration: int
# ════════════════════════════════════════════
# 能力3: 思考节点
# ════════════════════════════════════════════
async def think_node(state: AgentState) -> dict:
iteration = state.get("iteration", 0) + 1
if iteration > 3:
return {"iteration": iteration, "thinking": "(快速模式)"}
conv = []
for msg in state["messages"][-4:]:
role = "用户" if isinstance(msg, HumanMessage) else "AI"
conv.append(f"{role}: {msg.content[:150]}")
tool_names = [t.name for t in all_tools]
think_llm = ChatOpenAI(**LLM_CONFIG, temperature=0.3)
resp = await think_llm.ainvoke([
SystemMessage(content="你是思考模块。简洁输出:用户意图、需要的工具/技能、注意事项。不要说没有工具。"),
HumanMessage(content=f"对话:\n{chr(10).join(conv)}\n\n可用技能:\n{skills.format_list()}\n\n可用工具: {', '.join(tool_names)}"),
])
return {"iteration": iteration, "thinking": resp.content}
# ════════════════════════════════════════════
# 技能路由 & 执行
# ════════════════════════════════════════════
async def skill_route_node(state: AgentState) -> dict:
user_input = ""
for msg in reversed(state["messages"]):
if isinstance(msg, HumanMessage):
user_input = msg.content
break
# 1. 先匹配MCP工具确定性路由不依赖模型决策
mcp_keywords = {
"get_current_time": ["几点", "时间", "现在几点", "当前时间"],
"count_chars": ["统计字符", "字符数", "字数统计", "统计文本"],
"generate_uuid": ["生成uuid", "UUID", "uuid"],
}
for mcp_tool_name, kws in mcp_keywords.items():
if any(kw in user_input for kw in kws):
# 直接执行MCP工具并返回结果
mcp_result = await _call_mcp_tool(mcp_tool_name, user_input)
return {"active_skill": None, "skill_output": mcp_result}
# 2. 再匹配本地技能
kw_map = {
"weather_analyst": ["天气", "出行", "穿什么"],
"math_tutor": ["计算", "", "数学"],
"knowledge_explorer": ["是什么", "解释", "了解"],
}
for sname, kws in kw_map.items():
if any(kw in user_input for kw in kws):
return {"active_skill": sname, "skill_output": None}
return {"active_skill": None, "skill_output": None}
async def _call_mcp_tool(tool_name: str, user_input: str) -> str:
"""直接调用MCP工具确定性路由"""
global _mcp_session
if not _mcp_session:
return f"[错误] MCP未连接无法调用{tool_name}"
try:
# 解析参数
args = {}
if tool_name == "get_current_time":
if "上海" in user_input or "北京时间" in user_input or "几点" in user_input:
args = {"timezone": "Asia/Shanghai"}
elif tool_name == "count_chars":
import re
# 提取引号内的文本
match = re.search(r"['\"\u201c\u201d](.+?)['\"\u201c\u201d]", user_input)
text = match.group(1) if match else user_input
args = {"text": text}
elif tool_name == "generate_uuid":
args = {}
result = await _mcp_session.call_tool(tool_name, args)
# 提取结果文本
if result.content:
texts = [c.text for c in result.content if hasattr(c, 'text')]
return "\n".join(texts) if texts else str(result)
return str(result)
except Exception as e:
return f"[MCP工具{tool_name}调用错误] {e}"
async def skill_execute_node(state: AgentState) -> dict:
sname = state.get("active_skill")
if not sname:
return {"skill_output": None}
sk = skills.get(sname)
if not sk:
return {"skill_output": None}
user_input = ""
for msg in reversed(state["messages"]):
if isinstance(msg, HumanMessage):
user_input = msg.content
break
# 执行工具
tool_info = ""
for tname in sk.tools:
for t in local_tools:
if t.name == tname:
try:
if tname == "get_weather":
cities = ["北京", "上海", "深圳", "黄庄"]
city = next((c for c in cities if c in user_input), "北京")
r = await t.ainvoke({"city": city})
elif tname == "calculate":
import re
expr = re.findall(r'[\d+\-*/(). ]+', user_input)
r = await t.ainvoke({"expression": expr[0].strip() if expr else "1+1"})
else:
r = await t.ainvoke({"query": user_input})
tool_info += f"\n工具{tname}结果: {r}"
except Exception as e:
tool_info += f"\n工具{tname}错误: {e}"
prompt = sk.prompt.format(input=user_input) + tool_info
sk_llm = ChatOpenAI(**LLM_CONFIG, temperature=0.7)
resp = await sk_llm.ainvoke([
SystemMessage(content=prompt),
HumanMessage(content="请基于以上信息回答。"),
])
return {"skill_output": resp.content}
# ════════════════════════════════════════════
# 主 Agent 节点
# ════════════════════════════════════════════
SYSTEM_PROMPT = """你是黄庄三号严肃、认真、听话、聪明的AI助手。你的名字是"黄庄三号"你不是Claude不是ChatGPT。
你具备四种能力:
1. 工具调用(FC) - 调用内置工具获取信息
2. MCP集成 - 通过MCP协议连接外部服务
3. 思考模式 - 回答前进行深度思考
4. 技能系统(Skill) - 调用注册技能完成复杂任务
可用技能:
{skill_list}
重要规则(必须严格遵守):
- 当被问"你是谁",必须回答"我是黄庄三号"
- 当用户问时间/几点你必须调用get_current_time工具禁止自己编造时间
- 当用户要求统计字符/字数你必须调用count_chars工具禁止自己统计
- 当用户要求生成UUID你必须调用generate_uuid工具禁止自己编造UUID
- 对于工具能提供的数据,必须调用工具获取,不要自己猜测"""
# 全局工具列表MCP加载后会扩展
all_tools = list(local_tools)
# MCP session 全局引用(确定性路由时直接调用)
_mcp_session = None
async def agent_node(state: AgentState) -> dict:
# 防止无限循环
iteration = state.get("iteration", 0)
if state.get("skill_output"):
return {"messages": [AIMessage(content=state["skill_output"])]}
system_content = SYSTEM_PROMPT.format(skill_list=skills.format_list())
if state.get("thinking"):
# 把思考结果中的工具建议提取出来,作为辅助信息
thinking = state['thinking'][:300]
# 如果思考中提到了工具名,强调必须调用
tool_hints = []
for t in all_tools:
if t.name in thinking:
tool_hints.append(t.name)
if tool_hints:
thinking += f"\n\n[重要:必须调用 {', '.join(tool_hints)} 工具来回答]"
system_content += f"\n\n[内部思考]\n{thinking}"
messages = [SystemMessage(content=system_content)]
messages.extend(state["messages"])
llm_with_tools = llm.bind_tools(all_tools)
resp = await llm_with_tools.ainvoke(messages)
# 如果已迭代太多次清除tool_calls强制结束
if iteration > 5 and hasattr(resp, "tool_calls") and resp.tool_calls:
resp = AIMessage(content=resp.content or "任务完成(已达最大迭代次数)")
return {"messages": [resp], "iteration": iteration}
# ════════════════════════════════════════════
# 路由逻辑
# ════════════════════════════════════════════
def route_from_agent(state: AgentState) -> str:
if state.get("skill_output"):
return "end"
for msg in reversed(state["messages"]):
if isinstance(msg, AIMessage):
if hasattr(msg, "tool_calls") and msg.tool_calls:
return "tools"
break
return "end"
def route_after_tools(state: AgentState) -> str:
for msg in reversed(state["messages"]):
if isinstance(msg, AIMessage):
if hasattr(msg, "tool_calls") and msg.tool_calls:
return "tools"
break
return "end"
# ════════════════════════════════════════════
# 构建图
# ════════════════════════════════════════════
def build_graph():
g = StateGraph(AgentState)
g.add_node("think", think_node)
g.add_node("skill_route", skill_route_node)
g.add_node("skill_exec", skill_execute_node)
g.add_node("agent", agent_node)
g.add_node("tools", ToolNode(all_tools))
g.add_edge(START, "think")
g.add_edge("think", "skill_route")
g.add_conditional_edges("skill_route",
lambda s: "skill_exec" if s.get("active_skill") else "agent",
{"skill_exec": "skill_exec", "agent": "agent"})
g.add_edge("skill_exec", "agent")
g.add_conditional_edges("agent", route_from_agent, {"tools": "tools", "end": END})
# 工具执行后回到agent处理结果agent再决定是否继续调工具
g.add_edge("tools", "agent")
return g.compile()
# ════════════════════════════════════════════
# 能力2: MCP 集成(长连接版)
# ════════════════════════════════════════════
class MCPManager:
"""管理MCP连接的整个生命周期"""
def __init__(self):
self.exit_stack = AsyncExitStack()
self.session = None
self.tools_loaded = False
async def connect(self, server_script: str):
"""连接MCP服务器并加载工具"""
global all_tools, _mcp_session
from langchain_mcp_adapters.tools import load_mcp_tools
from mcp.client.stdio import stdio_client, StdioServerParameters
from mcp.client.session import ClientSession
server_params = StdioServerParameters(
command="python3",
args=[server_script],
)
# 使用 AsyncExitStack 保持连接
read, write = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.session = await self.exit_stack.enter_async_context(ClientSession(read, write))
await self.session.initialize()
# 设置全局session供确定性路由使用
_mcp_session = self.session
# 加载工具
mcp_tools = await load_mcp_tools(self.session)
all_tools.extend(mcp_tools)
self.tools_loaded = True
print(f" [MCP] 已连接,加载 {len(mcp_tools)} 个工具:")
for t in mcp_tools:
print(f" - {t.name}: {t.description[:50]}")
return mcp_tools
async def close(self):
"""关闭MCP连接"""
await self.exit_stack.aclose()
self.session = None
print(" [MCP] 连接已关闭")
# ════════════════════════════════════════════
# 运行
# ════════════════════════════════════════════
async def run_agent(user_input: str, graph=None):
if graph is None:
graph = build_graph()
result = await graph.ainvoke({
"messages": [HumanMessage(content=user_input)],
"thinking": "", "active_skill": None, "skill_output": None, "iteration": 0,
})
last = result["messages"][-1]
return {
"reply": last.content if hasattr(last, "content") else str(last),
"thinking": result.get("thinking", ""),
"skill": result.get("active_skill"),
}
async def interactive_mode():
print("=" * 60)
print(" 黄庄三号 Agent - 四能力完整版 v3")
print(" FC | MCP | 思考模式 | Skill")
print("=" * 60)
print(" 技能:", [s.name for s in skills.list_skills()])
print(" 工具:", [t.name for t in all_tools])
print(" 输入 quit 退出")
print("=" * 60)
graph = build_graph()
while True:
try:
user_input = input("\n你> ").strip()
except (EOFError, KeyboardInterrupt):
break
if not user_input:
continue
if user_input.lower() in ("quit", "exit", "q"):
break
result = await run_agent(user_input, graph)
if result["thinking"]:
print(f"\n[思考] {result['thinking'][:150]}...")
if result["skill"]:
print(f"[技能] {result['skill']}")
print(f"\n黄庄三号> {result['reply']}")
async def main():
parser = argparse.ArgumentParser()
parser.add_argument("--mcp", action="store_true", help="启用MCP")
parser.add_argument("--test", action="store_true", help="自动测试")
args = parser.parse_args()
mcp_mgr = None
if args.mcp:
print("\n[MCP] 连接服务器...")
mcp_mgr = MCPManager()
mcp_script = os.path.join(os.path.dirname(os.path.abspath(__file__)), "mcp_server.py")
await mcp_mgr.connect(mcp_script)
print(f"\n 工具总数: {len(all_tools)}")
print(f" 技能总数: {len(skills.list_skills())}")
graph = build_graph()
if args.test:
tests = [
("FC+思考+Skill", "黄庄天气怎么样?"),
("FC+Skill", "算一下 99*88+77"),
("知识搜索", "MCP是什么"),
("身份", "你好你是谁?"),
]
# MCP相关测试
if args.mcp:
tests.extend([
("MCP:时间", "现在几点了?"),
("MCP:字符统计", "统计'黄庄三号是AI助手'的字符数"),
("MCP:UUID", "生成一个UUID"),
])
for label, query in tests:
print(f"\n{''*55}")
print(f"[测试:{label}] {query}")
r = await run_agent(query, graph)
print(f" 思考: {r['thinking'][:80]}...")
print(f" 技能: {r['skill']}")
print(f" 回复: {r['reply'][:150]}...")
print(f"\n{'='*60}")
print(" 验证完成!")
cap = ["FC ✅", "思考 ✅", "Skill ✅"]
if args.mcp:
cap.append("MCP ✅")
print(" " + " | ".join(cap))
print("=" * 60)
else:
await interactive_mode()
# 清理
if mcp_mgr:
await mcp_mgr.close()
if __name__ == "__main__":
asyncio.run(main())

35
config.yaml Normal file
View File

@@ -0,0 +1,35 @@
# 黄庄三号四能力Agent 配置文件
# ================================
# 新增MCP服务器、调整路由关键词、修改模型参数都在这里
# 模型配置
llm:
base_url: "https://open.bigmodel.cn/api/paas/v4"
api_key: "2259e33a1357460abe17919aaf81e73d.K44a8LPQTmFM5PKm"
model: "glm-4.5-air"
# Agent行为参数
agent:
max_iterations: 5 # 最大迭代次数(防死循环)
think_temperature: 0.3 # 思考节点温度
skill_temperature: 0.7 # 技能节点温度
# MCP服务器配置
# 新增MCP服务器在列表中添加一项即可
mcp_servers:
- name: "hz3-tools"
command: "python3"
args: ["mcp_server.py"]
transport: "stdio"
# 路由关键词匹配到就通过MCP直接调用不依赖模型决策
route_keywords:
get_current_time: ["几点", "时间", "现在几点", "当前时间"]
count_chars: ["统计字符", "字符数", "字数统计", "统计文本"]
generate_uuid: ["生成uuid", "UUID", "uuid", "生成一个UUID"]
# Skill路由关键词
# 技能定义在 skills/ 目录的yaml文件中这里配置路由关键词
skill_keywords:
weather_analyst: ["天气", "出行", "穿什么"]
math_tutor: ["计算", "算", "数学", "等于多少"]
knowledge_explorer: ["是什么", "解释", "了解"]

View File

@@ -0,0 +1,10 @@
# 知识探索技能
name: knowledge_explorer
description: "知识探索 - 搜索并深入解释知识"
prompt: |
你是一个知识探索者。请搜索以下主题,然后给出深入浅出的解释:
主题:{input}
要求:先搜索,然后综合信息给出结构化的解释。
tools:
- search_knowledge

8
skills/math_tutor.yaml Normal file
View File

@@ -0,0 +1,8 @@
# 数学辅导技能
name: math_tutor
description: "数学辅导 - 计算并解释数学问题"
prompt: |
你是一个耐心的数学老师。请解答以下问题,先给出计算过程,再解释原理:
问题:{input}
tools:
- calculate

View File

@@ -0,0 +1,10 @@
# 天气分析师技能
name: weather_analyst
description: "天气分析师 - 查询天气并给出出行建议"
prompt: |
你是一个专业的天气分析师。请根据以下天气信息给出详细的出行建议:
天气信息:{input}
请从穿衣、出行、活动安排等方面给出建议。
tools:
- get_weather

View File

@@ -1,86 +0,0 @@
"""
Step 1: 最简单的 LangGraph Agent + GLM-4.5-air + 工具调用
只验证核心能力Function Call
"""
import os
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
# ── 模型配置 ──
llm = ChatOpenAI(
base_url="https://open.bigmodel.cn/api/paas/v4",
api_key="2259e33a1357460abe17919aaf81e73d.K44a8LPQTmFM5PKm",
model="glm-4.5-air",
)
# ── 定义工具 ──
@tool
def get_weather(city: str) -> str:
"""查询指定城市的天气信息"""
# 模拟天气数据
weather_data = {
"北京": "晴天气温22°C北风3级",
"上海": "多云气温25°C东风2级",
"深圳": "阵雨气温28°C南风4级",
"黄庄": "晴转多云气温23°C微风",
}
return weather_data.get(city, f"暂无{city}的天气数据")
@tool
def calculate(expression: str) -> str:
"""计算数学表达式,输入如 '2+3*4'"""
try:
result = eval(expression, {"__builtins__": {}}, {})
return f"计算结果: {expression} = {result}"
except Exception as e:
return f"计算错误: {e}"
@tool
def search_knowledge(query: str) -> str:
"""搜索知识库(模拟)"""
kb = {
"黄庄三号": "黄庄三号是AI助手定位为严肃、认真、听话、聪明的AI助手",
"LangGraph": "LangGraph是LangChain团队推出的Agent框架支持状态图、循环、持久化",
"MCP": "MCP是Model Context ProtocolAI工具互操作标准协议",
}
for key, val in kb.items():
if key in query:
return val
return f"知识库中未找到关于'{query}'的信息"
# ── 创建 Agent ──
tools = [get_weather, calculate, search_knowledge]
agent = create_react_agent(llm, tools)
# ── 运行测试 ──
if __name__ == "__main__":
import asyncio
async def test():
print("=" * 50)
print("Step 1: LangGraph + GLM-4.5-air + FC 工具调用")
print("=" * 50)
# 测试1: 天气查询
print("\n[测试1] 天气查询")
result = await agent.ainvoke({"messages": [("user", "黄庄今天天气怎么样?")]})
last_msg = result["messages"][-1]
print(f"回复: {last_msg.content}")
# 测试2: 数学计算
print("\n[测试2] 数学计算")
result = await agent.ainvoke({"messages": [("user", "帮我算一下 123 * 456 + 789")]})
last_msg = result["messages"][-1]
print(f"回复: {last_msg.content}")
# 测试3: 知识搜索
print("\n[测试3] 知识搜索")
result = await agent.ainvoke({"messages": [("user", "LangGraph是什么")]})
last_msg = result["messages"][-1]
print(f"回复: {last_msg.content}")
print("\n" + "=" * 50)
print("Step 1 完成FC 工具调用正常工作")
asyncio.run(test())

2
tools/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
# tools/ 目录的 __init__.py
# 自动扫描注册所有工具模块

15
tools/calculator.py Normal file
View File

@@ -0,0 +1,15 @@
"""计算工具"""
from langchain_core.tools import tool
@tool
def calculate(expression: str) -> str:
"""计算数学表达式,输入如 '2+3*4'"""
try:
result = eval(expression, {"__builtins__": {}}, {})
return f"计算结果: {expression} = {result}"
except Exception as e:
return f"计算错误: {e}"
TOOLS = [calculate]

19
tools/knowledge.py Normal file
View File

@@ -0,0 +1,19 @@
"""知识库搜索工具"""
from langchain_core.tools import tool
@tool
def search_knowledge(query: str) -> str:
"""搜索知识库(模拟)"""
kb = {
"黄庄三号": "黄庄三号是AI助手严肃认真听话聪明",
"LangGraph": "LangGraph是Agent框架支持状态图、循环、持久化",
"MCP": "MCP是Model Context ProtocolAI工具互操作标准协议",
}
for key, val in kb.items():
if key in query:
return val
return f"未找到关于'{query}'的信息"
TOOLS = [search_knowledge]

18
tools/weather.py Normal file
View File

@@ -0,0 +1,18 @@
"""天气查询工具"""
from langchain_core.tools import tool
@tool
def get_weather(city: str) -> str:
"""查询指定城市的天气信息"""
weather_data = {
"北京": "晴天气温22°C北风3级",
"上海": "多云气温25°C东风2级",
"深圳": "阵雨气温28°C南风4级",
"黄庄": "晴转多云气温23°C微风",
}
return weather_data.get(city, f"暂无{city}的天气数据")
# 暴露工具列表供自动扫描
TOOLS = [get_weather]