feat: 黄庄三号四能力Agent初始版本

能力1: Function Call - LangGraph ToolNode
能力2: MCP - langchain-mcp-adapters + 确定性路由
能力3: 思考模式 - think_node + CoT推理链
能力4: Skill - 自建SkillRegistry注册机制

模型: GLM-4.5-air (智谱)
This commit is contained in:
2026-04-23 19:18:53 +08:00
commit 451e9a12ed
5 changed files with 744 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@@ -0,0 +1,8 @@
__pycache__/
*.pyc
*.pyo
.env
*.egg-info/
dist/
build/
.venv/

102
README.md Normal file
View File

@@ -0,0 +1,102 @@
# 黄庄三号四能力 Agent
基于 LangGraph 的多能力 AI Agent集成 FC/MCP/思考模式/Skill 四种核心能力。
## 四项能力
| 能力 | 方案 | 支持度 |
|------|------|--------|
| Function Call | LangGraph 原生 ToolNode | ★★★★★ |
| MCP | langchain-mcp-adapters + 确定性路由 | ★★★★ |
| 思考模式 | 自建 think_node + CoT 推理链 | ★★★★ |
| Skill | 自建 SkillRegistry 注册机制 | ★★★★ |
## 模型
GLM-4.5-air智谱OpenAI 兼容接口)
## 快速开始
```bash
cd /path/to/hz3-agent
# 自动测试不带MCP
python3 agent_v3.py --test
# 自动测试带MCP
python3 agent_v3.py --mcp --test
# 交互模式
python3 agent_v3.py --mcp
```
## 文件说明
| 文件 | 说明 |
|------|------|
| `agent_v3.py` | Agent 主程序(四能力完整版) |
| `mcp_server.py` | MCP 服务器(示例工具:时间/字符统计/UUID |
| `step1_basic_fc.py` | Step1 基础 FC 验证 |
## 代码调用
```python
from agent_v3 import run_agent, build_graph
graph = build_graph()
result = await run_agent("黄庄天气怎么样?", graph)
print(result["reply"]) # 回复
print(result["thinking"]) # 思考过程
print(result["skill"]) # 使用的技能
```
## 架构流程
```
用户输入 → think(思考) → skill_route(路由)
┌──────────┴──────────┐
匹配MCP工具 匹配Skill
直接调用 进入skill_exec
│ │
返回结果 执行工具+提示词模板
│ │
└──────────┬─────────┘
agent(主节点)
│ │
有tool_calls 无tool_calls
│ │
tools节点 END
回到agent
```
## MCP 服务器
`mcp_server.py` 基于 FastMCP 提供 3 个示例工具:
- `get_current_time` - 获取当前时间
- `count_chars` - 统计文本字符数
- `generate_uuid` - 生成随机 UUID
## Skill 系统
已注册 3 个示例技能,扩展只需一行:
```python
skills.register(SkillDef(
name="新技能",
description="技能描述",
prompt="提示词模板,{input}为占位符",
tools=["依赖的工具名"]
))
```
## 关键设计决策
1. **MCP 确定性路由**:关键词匹配后直接 session.call_tool(),绕过模型不调工具的问题
2. **思考结果合并**:与 system prompt 合并为单条 SystemMessage避免干扰工具调用
3. **AsyncExitStack 长连接**MCP session 在 Agent 生命周期内保持,退出时统一关闭
4. **迭代保护**agent 节点迭代超过 5 次强制结束,防止无限循环

514
agent_v3.py Normal file
View File

@@ -0,0 +1,514 @@
"""
黄庄三号 Agent - 四能力完整版 v3
==================================
能力1: Function Call - LangGraph 工具节点
能力2: MCP - langchain-mcp-adapters + 长连接
能力3: 思考模式 - 反思节点 + CoT 推理链
能力4: Skill - 自建技能注册机制
关键修正MCP session 在 Agent 整个生命周期内保持连接
"""
import os
import sys
import json
import asyncio
import argparse
from typing import Annotated
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from contextlib import AsyncExitStack
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
# ════════════════════════════════════════════
# 模型配置
# ════════════════════════════════════════════
LLM_CONFIG = {
"base_url": "https://open.bigmodel.cn/api/paas/v4",
"api_key": "2259e33a1357460abe17919aaf81e73d.K44a8LPQTmFM5PKm",
"model": "glm-4.5-air",
}
llm = ChatOpenAI(**LLM_CONFIG)
# ════════════════════════════════════════════
# 能力1: FC 本地工具
# ════════════════════════════════════════════
@tool
def get_weather(city: str) -> str:
"""查询指定城市的天气信息"""
weather_data = {
"北京": "晴天气温22°C北风3级",
"上海": "多云气温25°C东风2级",
"深圳": "阵雨气温28°C南风4级",
"黄庄": "晴转多云气温23°C微风",
}
return weather_data.get(city, f"暂无{city}的天气数据")
@tool
def calculate(expression: str) -> str:
"""计算数学表达式,输入如 '2+3*4'"""
try:
result = eval(expression, {"__builtins__": {}}, {})
return f"计算结果: {expression} = {result}"
except Exception as e:
return f"计算错误: {e}"
@tool
def search_knowledge(query: str) -> str:
"""搜索知识库(模拟)"""
kb = {
"黄庄三号": "黄庄三号是AI助手严肃认真听话聪明",
"LangGraph": "LangGraph是Agent框架支持状态图、循环、持久化",
"MCP": "MCP是Model Context ProtocolAI工具互操作标准协议",
}
for key, val in kb.items():
if key in query:
return val
return f"未找到关于'{query}'的信息"
local_tools = [get_weather, calculate, search_knowledge]
# ════════════════════════════════════════════
# 能力4: Skill 技能系统
# ════════════════════════════════════════════
class SkillDef(BaseModel):
name: str
description: str
prompt: str
tools: list[str] = []
class SkillRegistry:
def __init__(self):
self._skills: dict[str, SkillDef] = {}
def register(self, skill: SkillDef):
self._skills[skill.name] = skill
def get(self, name: str) -> SkillDef | None:
return self._skills.get(name)
def list_skills(self) -> list[SkillDef]:
return list(self._skills.values())
def format_list(self) -> str:
return "\n".join(f" - {s.name}: {s.description}" for s in self._skills.values())
skills = SkillRegistry()
skills.register(SkillDef(name="weather_analyst", description="天气分析师-查询天气给建议",
prompt="你是天气分析师。根据天气信息给出出行建议。\n天气:{input}", tools=["get_weather"]))
skills.register(SkillDef(name="math_tutor", description="数学辅导-计算并解释",
prompt="你是数学老师。解答并解释:{input}", tools=["calculate"]))
skills.register(SkillDef(name="knowledge_explorer", description="知识探索-搜索解释知识",
prompt="你是知识探索者。搜索并深入解释:{input}", tools=["search_knowledge"]))
# ════════════════════════════════════════════
# Agent 状态
# ════════════════════════════════════════════
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
thinking: str
active_skill: str | None
skill_output: str | None
iteration: int
# ════════════════════════════════════════════
# 能力3: 思考节点
# ════════════════════════════════════════════
async def think_node(state: AgentState) -> dict:
iteration = state.get("iteration", 0) + 1
if iteration > 3:
return {"iteration": iteration, "thinking": "(快速模式)"}
conv = []
for msg in state["messages"][-4:]:
role = "用户" if isinstance(msg, HumanMessage) else "AI"
conv.append(f"{role}: {msg.content[:150]}")
tool_names = [t.name for t in all_tools]
think_llm = ChatOpenAI(**LLM_CONFIG, temperature=0.3)
resp = await think_llm.ainvoke([
SystemMessage(content="你是思考模块。简洁输出:用户意图、需要的工具/技能、注意事项。不要说没有工具。"),
HumanMessage(content=f"对话:\n{chr(10).join(conv)}\n\n可用技能:\n{skills.format_list()}\n\n可用工具: {', '.join(tool_names)}"),
])
return {"iteration": iteration, "thinking": resp.content}
# ════════════════════════════════════════════
# 技能路由 & 执行
# ════════════════════════════════════════════
async def skill_route_node(state: AgentState) -> dict:
user_input = ""
for msg in reversed(state["messages"]):
if isinstance(msg, HumanMessage):
user_input = msg.content
break
# 1. 先匹配MCP工具确定性路由不依赖模型决策
mcp_keywords = {
"get_current_time": ["几点", "时间", "现在几点", "当前时间"],
"count_chars": ["统计字符", "字符数", "字数统计", "统计文本"],
"generate_uuid": ["生成uuid", "UUID", "uuid"],
}
for mcp_tool_name, kws in mcp_keywords.items():
if any(kw in user_input for kw in kws):
# 直接执行MCP工具并返回结果
mcp_result = await _call_mcp_tool(mcp_tool_name, user_input)
return {"active_skill": None, "skill_output": mcp_result}
# 2. 再匹配本地技能
kw_map = {
"weather_analyst": ["天气", "出行", "穿什么"],
"math_tutor": ["计算", "", "数学"],
"knowledge_explorer": ["是什么", "解释", "了解"],
}
for sname, kws in kw_map.items():
if any(kw in user_input for kw in kws):
return {"active_skill": sname, "skill_output": None}
return {"active_skill": None, "skill_output": None}
async def _call_mcp_tool(tool_name: str, user_input: str) -> str:
"""直接调用MCP工具确定性路由"""
global _mcp_session
if not _mcp_session:
return f"[错误] MCP未连接无法调用{tool_name}"
try:
# 解析参数
args = {}
if tool_name == "get_current_time":
if "上海" in user_input or "北京时间" in user_input or "几点" in user_input:
args = {"timezone": "Asia/Shanghai"}
elif tool_name == "count_chars":
import re
# 提取引号内的文本
match = re.search(r"['\"\u201c\u201d](.+?)['\"\u201c\u201d]", user_input)
text = match.group(1) if match else user_input
args = {"text": text}
elif tool_name == "generate_uuid":
args = {}
result = await _mcp_session.call_tool(tool_name, args)
# 提取结果文本
if result.content:
texts = [c.text for c in result.content if hasattr(c, 'text')]
return "\n".join(texts) if texts else str(result)
return str(result)
except Exception as e:
return f"[MCP工具{tool_name}调用错误] {e}"
async def skill_execute_node(state: AgentState) -> dict:
sname = state.get("active_skill")
if not sname:
return {"skill_output": None}
sk = skills.get(sname)
if not sk:
return {"skill_output": None}
user_input = ""
for msg in reversed(state["messages"]):
if isinstance(msg, HumanMessage):
user_input = msg.content
break
# 执行工具
tool_info = ""
for tname in sk.tools:
for t in local_tools:
if t.name == tname:
try:
if tname == "get_weather":
cities = ["北京", "上海", "深圳", "黄庄"]
city = next((c for c in cities if c in user_input), "北京")
r = await t.ainvoke({"city": city})
elif tname == "calculate":
import re
expr = re.findall(r'[\d+\-*/(). ]+', user_input)
r = await t.ainvoke({"expression": expr[0].strip() if expr else "1+1"})
else:
r = await t.ainvoke({"query": user_input})
tool_info += f"\n工具{tname}结果: {r}"
except Exception as e:
tool_info += f"\n工具{tname}错误: {e}"
prompt = sk.prompt.format(input=user_input) + tool_info
sk_llm = ChatOpenAI(**LLM_CONFIG, temperature=0.7)
resp = await sk_llm.ainvoke([
SystemMessage(content=prompt),
HumanMessage(content="请基于以上信息回答。"),
])
return {"skill_output": resp.content}
# ════════════════════════════════════════════
# 主 Agent 节点
# ════════════════════════════════════════════
SYSTEM_PROMPT = """你是黄庄三号严肃、认真、听话、聪明的AI助手。你的名字是"黄庄三号"你不是Claude不是ChatGPT。
你具备四种能力:
1. 工具调用(FC) - 调用内置工具获取信息
2. MCP集成 - 通过MCP协议连接外部服务
3. 思考模式 - 回答前进行深度思考
4. 技能系统(Skill) - 调用注册技能完成复杂任务
可用技能:
{skill_list}
重要规则(必须严格遵守):
- 当被问"你是谁",必须回答"我是黄庄三号"
- 当用户问时间/几点你必须调用get_current_time工具禁止自己编造时间
- 当用户要求统计字符/字数你必须调用count_chars工具禁止自己统计
- 当用户要求生成UUID你必须调用generate_uuid工具禁止自己编造UUID
- 对于工具能提供的数据,必须调用工具获取,不要自己猜测"""
# 全局工具列表MCP加载后会扩展
all_tools = list(local_tools)
# MCP session 全局引用(确定性路由时直接调用)
_mcp_session = None
async def agent_node(state: AgentState) -> dict:
# 防止无限循环
iteration = state.get("iteration", 0)
if state.get("skill_output"):
return {"messages": [AIMessage(content=state["skill_output"])]}
system_content = SYSTEM_PROMPT.format(skill_list=skills.format_list())
if state.get("thinking"):
# 把思考结果中的工具建议提取出来,作为辅助信息
thinking = state['thinking'][:300]
# 如果思考中提到了工具名,强调必须调用
tool_hints = []
for t in all_tools:
if t.name in thinking:
tool_hints.append(t.name)
if tool_hints:
thinking += f"\n\n[重要:必须调用 {', '.join(tool_hints)} 工具来回答]"
system_content += f"\n\n[内部思考]\n{thinking}"
messages = [SystemMessage(content=system_content)]
messages.extend(state["messages"])
llm_with_tools = llm.bind_tools(all_tools)
resp = await llm_with_tools.ainvoke(messages)
# 如果已迭代太多次清除tool_calls强制结束
if iteration > 5 and hasattr(resp, "tool_calls") and resp.tool_calls:
resp = AIMessage(content=resp.content or "任务完成(已达最大迭代次数)")
return {"messages": [resp], "iteration": iteration}
# ════════════════════════════════════════════
# 路由逻辑
# ════════════════════════════════════════════
def route_from_agent(state: AgentState) -> str:
if state.get("skill_output"):
return "end"
for msg in reversed(state["messages"]):
if isinstance(msg, AIMessage):
if hasattr(msg, "tool_calls") and msg.tool_calls:
return "tools"
break
return "end"
def route_after_tools(state: AgentState) -> str:
for msg in reversed(state["messages"]):
if isinstance(msg, AIMessage):
if hasattr(msg, "tool_calls") and msg.tool_calls:
return "tools"
break
return "end"
# ════════════════════════════════════════════
# 构建图
# ════════════════════════════════════════════
def build_graph():
g = StateGraph(AgentState)
g.add_node("think", think_node)
g.add_node("skill_route", skill_route_node)
g.add_node("skill_exec", skill_execute_node)
g.add_node("agent", agent_node)
g.add_node("tools", ToolNode(all_tools))
g.add_edge(START, "think")
g.add_edge("think", "skill_route")
g.add_conditional_edges("skill_route",
lambda s: "skill_exec" if s.get("active_skill") else "agent",
{"skill_exec": "skill_exec", "agent": "agent"})
g.add_edge("skill_exec", "agent")
g.add_conditional_edges("agent", route_from_agent, {"tools": "tools", "end": END})
# 工具执行后回到agent处理结果agent再决定是否继续调工具
g.add_edge("tools", "agent")
return g.compile()
# ════════════════════════════════════════════
# 能力2: MCP 集成(长连接版)
# ════════════════════════════════════════════
class MCPManager:
"""管理MCP连接的整个生命周期"""
def __init__(self):
self.exit_stack = AsyncExitStack()
self.session = None
self.tools_loaded = False
async def connect(self, server_script: str):
"""连接MCP服务器并加载工具"""
global all_tools, _mcp_session
from langchain_mcp_adapters.tools import load_mcp_tools
from mcp.client.stdio import stdio_client, StdioServerParameters
from mcp.client.session import ClientSession
server_params = StdioServerParameters(
command="python3",
args=[server_script],
)
# 使用 AsyncExitStack 保持连接
read, write = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.session = await self.exit_stack.enter_async_context(ClientSession(read, write))
await self.session.initialize()
# 设置全局session供确定性路由使用
_mcp_session = self.session
# 加载工具
mcp_tools = await load_mcp_tools(self.session)
all_tools.extend(mcp_tools)
self.tools_loaded = True
print(f" [MCP] 已连接,加载 {len(mcp_tools)} 个工具:")
for t in mcp_tools:
print(f" - {t.name}: {t.description[:50]}")
return mcp_tools
async def close(self):
"""关闭MCP连接"""
await self.exit_stack.aclose()
self.session = None
print(" [MCP] 连接已关闭")
# ════════════════════════════════════════════
# 运行
# ════════════════════════════════════════════
async def run_agent(user_input: str, graph=None):
if graph is None:
graph = build_graph()
result = await graph.ainvoke({
"messages": [HumanMessage(content=user_input)],
"thinking": "", "active_skill": None, "skill_output": None, "iteration": 0,
})
last = result["messages"][-1]
return {
"reply": last.content if hasattr(last, "content") else str(last),
"thinking": result.get("thinking", ""),
"skill": result.get("active_skill"),
}
async def interactive_mode():
print("=" * 60)
print(" 黄庄三号 Agent - 四能力完整版 v3")
print(" FC | MCP | 思考模式 | Skill")
print("=" * 60)
print(" 技能:", [s.name for s in skills.list_skills()])
print(" 工具:", [t.name for t in all_tools])
print(" 输入 quit 退出")
print("=" * 60)
graph = build_graph()
while True:
try:
user_input = input("\n你> ").strip()
except (EOFError, KeyboardInterrupt):
break
if not user_input:
continue
if user_input.lower() in ("quit", "exit", "q"):
break
result = await run_agent(user_input, graph)
if result["thinking"]:
print(f"\n[思考] {result['thinking'][:150]}...")
if result["skill"]:
print(f"[技能] {result['skill']}")
print(f"\n黄庄三号> {result['reply']}")
async def main():
parser = argparse.ArgumentParser()
parser.add_argument("--mcp", action="store_true", help="启用MCP")
parser.add_argument("--test", action="store_true", help="自动测试")
args = parser.parse_args()
mcp_mgr = None
if args.mcp:
print("\n[MCP] 连接服务器...")
mcp_mgr = MCPManager()
mcp_script = os.path.join(os.path.dirname(os.path.abspath(__file__)), "mcp_server.py")
await mcp_mgr.connect(mcp_script)
print(f"\n 工具总数: {len(all_tools)}")
print(f" 技能总数: {len(skills.list_skills())}")
graph = build_graph()
if args.test:
tests = [
("FC+思考+Skill", "黄庄天气怎么样?"),
("FC+Skill", "算一下 99*88+77"),
("知识搜索", "MCP是什么"),
("身份", "你好你是谁?"),
]
# MCP相关测试
if args.mcp:
tests.extend([
("MCP:时间", "现在几点了?"),
("MCP:字符统计", "统计'黄庄三号是AI助手'的字符数"),
("MCP:UUID", "生成一个UUID"),
])
for label, query in tests:
print(f"\n{''*55}")
print(f"[测试:{label}] {query}")
r = await run_agent(query, graph)
print(f" 思考: {r['thinking'][:80]}...")
print(f" 技能: {r['skill']}")
print(f" 回复: {r['reply'][:150]}...")
print(f"\n{'='*60}")
print(" 验证完成!")
cap = ["FC ✅", "思考 ✅", "Skill ✅"]
if args.mcp:
cap.append("MCP ✅")
print(" " + " | ".join(cap))
print("=" * 60)
else:
await interactive_mode()
# 清理
if mcp_mgr:
await mcp_mgr.close()
if __name__ == "__main__":
asyncio.run(main())

34
mcp_server.py Normal file
View File

@@ -0,0 +1,34 @@
"""简单的 MCP 服务器 - 提供时间查询和文本处理工具"""
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("hz3-tools")
@mcp.tool()
def get_current_time(timezone: str = "Asia/Shanghai") -> str:
"""获取当前时间"""
from datetime import datetime
import pytz
try:
tz = pytz.timezone(timezone)
now = datetime.now(tz)
return f"当前时间({timezone}: {now.strftime('%Y-%m-%d %H:%M:%S %Z')}"
except Exception as e:
return f"时区错误: {e}"
@mcp.tool()
def count_chars(text: str) -> str:
"""统计文本的字符数、词数等"""
char_count = len(text)
word_count = len(text.split())
line_count = text.count('\n') + 1
chinese_chars = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
return f"字符数: {char_count}, 词数: {word_count}, 行数: {line_count}, 中文字符: {chinese_chars}"
@mcp.tool()
def generate_uuid() -> str:
"""生成一个随机UUID"""
import uuid
return str(uuid.uuid4())
if __name__ == "__main__":
mcp.run(transport="stdio")

86
step1_basic_fc.py Normal file
View File

@@ -0,0 +1,86 @@
"""
Step 1: 最简单的 LangGraph Agent + GLM-4.5-air + 工具调用
只验证核心能力Function Call
"""
import os
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
# ── 模型配置 ──
llm = ChatOpenAI(
base_url="https://open.bigmodel.cn/api/paas/v4",
api_key="2259e33a1357460abe17919aaf81e73d.K44a8LPQTmFM5PKm",
model="glm-4.5-air",
)
# ── 定义工具 ──
@tool
def get_weather(city: str) -> str:
"""查询指定城市的天气信息"""
# 模拟天气数据
weather_data = {
"北京": "晴天气温22°C北风3级",
"上海": "多云气温25°C东风2级",
"深圳": "阵雨气温28°C南风4级",
"黄庄": "晴转多云气温23°C微风",
}
return weather_data.get(city, f"暂无{city}的天气数据")
@tool
def calculate(expression: str) -> str:
"""计算数学表达式,输入如 '2+3*4'"""
try:
result = eval(expression, {"__builtins__": {}}, {})
return f"计算结果: {expression} = {result}"
except Exception as e:
return f"计算错误: {e}"
@tool
def search_knowledge(query: str) -> str:
"""搜索知识库(模拟)"""
kb = {
"黄庄三号": "黄庄三号是AI助手定位为严肃、认真、听话、聪明的AI助手",
"LangGraph": "LangGraph是LangChain团队推出的Agent框架支持状态图、循环、持久化",
"MCP": "MCP是Model Context ProtocolAI工具互操作标准协议",
}
for key, val in kb.items():
if key in query:
return val
return f"知识库中未找到关于'{query}'的信息"
# ── 创建 Agent ──
tools = [get_weather, calculate, search_knowledge]
agent = create_react_agent(llm, tools)
# ── 运行测试 ──
if __name__ == "__main__":
import asyncio
async def test():
print("=" * 50)
print("Step 1: LangGraph + GLM-4.5-air + FC 工具调用")
print("=" * 50)
# 测试1: 天气查询
print("\n[测试1] 天气查询")
result = await agent.ainvoke({"messages": [("user", "黄庄今天天气怎么样?")]})
last_msg = result["messages"][-1]
print(f"回复: {last_msg.content}")
# 测试2: 数学计算
print("\n[测试2] 数学计算")
result = await agent.ainvoke({"messages": [("user", "帮我算一下 123 * 456 + 789")]})
last_msg = result["messages"][-1]
print(f"回复: {last_msg.content}")
# 测试3: 知识搜索
print("\n[测试3] 知识搜索")
result = await agent.ainvoke({"messages": [("user", "LangGraph是什么")]})
last_msg = result["messages"][-1]
print(f"回复: {last_msg.content}")
print("\n" + "=" * 50)
print("Step 1 完成FC 工具调用正常工作")
asyncio.run(test())