Files
hz3-agent/agent.py
黄庄三号 f6e8d40459 feat: v4.0 Command Handoff - Agent间可互相交接任务
核心改动:
- 每个 Agent 成为独立图节点(weather/math/knowledge/mcp/general)
- Agent 间通过 Command(goto=目标节点) 实现任务交接
- Supervisor 返回普通dict,由 conditional_edges 路由
- 并行模式使用独立 parallel_worker 节点(SubTaskState),避免并发冲突
- 新增 handoff_from/handoff_context/handoff_history 状态字段
- handoff 判断:Agent先回答,再由LLM判断是否需要交接给其他Agent
- 保留 Send API 并行 + Aggregator 聚合能力

测试通过:单Agent、直接回复、多Agent并行、Handoff链路追踪
2026-04-24 10:38:33 +08:00

1011 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
黄庄三号 Agent v4.0 - Agent间Command Handoff交互版
====================================================
架构: Supervisor + 独立Agent节点 + Command Handoff
- Supervisor: 分析任务用Command分发到目标Agent
- 每个Agent是独立图节点拥有自己的LLM + 工具 + handoff工具
- Agent间通过 transfer_to_xxx 工具实现Command handoff直接跳转
- 支持: 并行分发(Send API)、串行分发(Command goto)、Agent间互相交接
运行方式:
python3 agent.py --test 自动测试
python3 agent.py --mcp --test 带MCP测试
python3 agent.py --mcp 交互模式(带MCP)
python3 agent.py 交互模式(不带MCP)
"""
import os
import re
import json
import asyncio
import argparse
import importlib.util
from typing import Annotated, Literal
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from contextlib import AsyncExitStack
from pathlib import Path
from operator import add
import yaml
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langgraph.types import Send, Command
# ════════════════════════════════════════════
# 基础路径
# ════════════════════════════════════════════
BASE_DIR = Path(__file__).parent.resolve()
CONFIG_PATH = BASE_DIR / "config.yaml"
TOOLS_DIR = BASE_DIR / "tools"
SKILLS_DIR = BASE_DIR / "skills"
AGENTS_DIR = BASE_DIR / "agents"
# ════════════════════════════════════════════
# 配置加载
# ════════════════════════════════════════════
def load_config() -> dict:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
# ════════════════════════════════════════════
# 工具自动扫描注册
# ════════════════════════════════════════════
def scan_tools() -> list:
all_tools = []
if not TOOLS_DIR.exists():
return all_tools
for py_file in sorted(TOOLS_DIR.glob("*.py")):
if py_file.name.startswith("_"):
continue
try:
spec = importlib.util.spec_from_file_location(f"tools.{py_file.stem}", str(py_file))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
if hasattr(mod, "TOOLS"):
tool_list = mod.TOOLS
all_tools.extend(tool_list)
print(f" [工具] {py_file.name}: {[t.name for t in tool_list]}")
except Exception as e:
print(f" [工具] {py_file.name}: 加载失败 -> {e}")
return all_tools
# ════════════════════════════════════════════
# 技能自动扫描注册(兼容 OpenClaw
# ════════════════════════════════════════════
class SkillDef(BaseModel):
name: str
description: str
prompt: str
tools: list[str] = []
skill_dir: str = ""
scripts: list[str] = []
is_openclaw: bool = False
class SkillRegistry:
def __init__(self):
self._skills: dict[str, SkillDef] = {}
def register(self, skill: SkillDef):
self._skills[skill.name] = skill
def get(self, name: str) -> SkillDef | None:
return self._skills.get(name)
def list_skills(self) -> list[SkillDef]:
return list(self._skills.values())
def format_list(self) -> str:
lines = []
for s in self._skills.values():
tag = " [OpenClaw]" if s.is_openclaw else ""
lines.append(f" - {s.name}: {s.description}{tag}")
return "\n".join(lines)
def _parse_skill_md_frontmatter(content: str) -> dict:
if not content.startswith("---"):
return {}
end = content.find("---", 3)
if end == -1:
return {}
return yaml.safe_load(content[3:end].strip()) or {}
def scan_skills() -> SkillRegistry:
registry = SkillRegistry()
if not SKILLS_DIR.exists():
return registry
for yaml_file in sorted(SKILLS_DIR.glob("*.yaml")):
try:
with open(yaml_file, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
registry.register(SkillDef(
name=data["name"], description=data.get("description", ""),
prompt=data.get("prompt", ""), tools=data.get("tools", []),
))
print(f" [技能] {yaml_file.name}: {data['name']}")
except Exception as e:
print(f" [技能] {yaml_file.name}: 加载失败 -> {e}")
for skill_dir in sorted(SKILLS_DIR.iterdir()):
if not skill_dir.is_dir():
continue
skill_md = skill_dir / "SKILL.md"
if not skill_md.exists():
continue
try:
with open(skill_md, "r", encoding="utf-8") as f:
content = f.read()
fm = _parse_skill_md_frontmatter(content)
if not fm:
continue
name = fm.get("name", skill_dir.name)
description = fm.get("description", "")
body_start = content.find("---", 3)
prompt = content[body_start + 3:].strip() if body_start != -1 else ""
scripts_dir = skill_dir / "scripts"
scripts, tools = [], []
if scripts_dir.exists():
for sf in sorted(scripts_dir.glob("*.py")):
if not sf.name.startswith("_"):
scripts.append(str(sf))
tools.append(sf.stem)
registry.register(SkillDef(
name=name, description=description, prompt=prompt, tools=tools,
skill_dir=str(skill_dir), scripts=scripts, is_openclaw=True,
))
print(f" [技能] {skill_dir.name}/ (OpenClaw): {name}")
except Exception as e:
print(f" [技能] {skill_dir.name}/SKILL.md: 加载失败 -> {e}")
return registry
# ════════════════════════════════════════════
# MCP 管理器
# ════════════════════════════════════════════
class MCPManager:
def __init__(self):
self.exit_stack = AsyncExitStack()
self.sessions: dict[str, object] = {}
self.route_map: dict[str, tuple] = {}
self.mcp_tools: list = []
async def connect_all(self, mcp_configs: list[dict]):
from langchain_mcp_adapters.tools import load_mcp_tools
from mcp.client.stdio import stdio_client, StdioServerParameters
from mcp.client.session import ClientSession
for srv in mcp_configs:
name = srv["name"]
try:
sp = StdioServerParameters(command=srv["command"], args=srv.get("args", []))
r, w = await self.exit_stack.enter_async_context(stdio_client(sp))
session = await self.exit_stack.enter_async_context(ClientSession(r, w))
await session.initialize()
mcp_tools = await load_mcp_tools(session)
self.mcp_tools.extend(mcp_tools)
self.sessions[name] = session
route_kw = srv.get("route_keywords", {})
for tool_name, keywords in route_kw.items():
for kw in keywords:
self.route_map[kw] = (session, tool_name)
print(f" [MCP] {name}: 已连接,{len(mcp_tools)} 个工具")
except Exception as e:
print(f" [MCP] {name}: 连接失败 -> {e}")
def match_route(self, user_input: str) -> tuple | None:
for keyword, (session, tool_name) in self.route_map.items():
if keyword in user_input:
return (session, tool_name)
return None
async def call_tool(self, session, tool_name: str, user_input: str) -> str:
try:
args = {}
for t in self.mcp_tools:
if t.name == tool_name:
schema = getattr(t, "args_schema", None)
if schema:
if isinstance(schema, dict):
sf = schema.get("properties", {})
elif hasattr(schema, "model_json_schema"):
sf = schema.model_json_schema().get("properties", {})
else:
sf = {}
args = _parse_tool_args(tool_name, sf, user_input)
break
result = await session.call_tool(tool_name, args)
if result.content:
texts = [c.text for c in result.content if hasattr(c, "text")]
return "\n".join(texts) if texts else str(result)
return str(result)
except Exception as e:
return f"[MCP工具{tool_name}错误] {e}"
async def close(self):
await self.exit_stack.aclose()
self.sessions.clear()
def _parse_tool_args(tool_name: str, schema_fields: dict, user_input: str) -> dict:
args = {}
for field_name, field_info in schema_fields.items():
if field_name in ("timezone",):
args[field_name] = "Asia/Shanghai"
elif field_name in ("text",):
match = re.search(r"""['"\u201c\u201d](.+?)['"\u201c\u201d]""", user_input)
args[field_name] = match.group(1) if match else user_input
elif field_name in ("city",):
args[field_name] = user_input
return args
# ════════════════════════════════════════════
# ★ 核心:多 Agent 架构 v4.0 — Command Handoff
# ════════════════════════════════════════════
# --- 共享状态 ---
class SharedState(TypedDict):
"""多Agent共享状态
注意并行场景下多个节点同时更新state只有带reducer的字段能安全并发。
非reducer字段如handoff_from在并行模式中不写入。
"""
messages: Annotated[list, add_messages] # 对话消息(累加)
subtasks: list[dict] # 分解的子任务(并行场景用)
results: Annotated[list[str], add] # Agent执行结果累加聚合
active_agent: str # 当前活跃Agent
handoff_from: str # 从哪个Agent交接过来的
handoff_context: str # 交接时附带的上下文
handoff_history: Annotated[list[str], add] # 交接历史记录(累加)
final_answer: str # 最终回复
class SubTaskState(TypedDict):
"""子任务状态Send API用"""
task: dict
messages: Annotated[list, add_messages]
# --- Agent 定义 ---
class AgentDef(BaseModel):
"""Agent定义"""
name: str # 节点名也是图中的node key
description: str
system_prompt: str
tools: list[str] = [] # 依赖的业务工具名
skill: str = "" # 依赖的技能名
class AgentRegistry:
"""Agent注册表"""
def __init__(self):
self._agents: dict[str, AgentDef] = {}
def register(self, agent: AgentDef):
self._agents[agent.name] = agent
def get(self, name: str) -> AgentDef | None:
return self._agents.get(name)
def list_agents(self) -> list[AgentDef]:
return list(self._agents.values())
def format_list(self) -> str:
return "\n".join(f" - {a.name}: {a.description}" for a in self._agents.values())
def match(self, task_type: str) -> AgentDef | None:
if task_type in self._agents:
return self._agents[task_type]
for agent in self._agents.values():
if agent.name in task_type or task_type in agent.name:
return agent
return None
def init_agents(skills_reg: SkillRegistry, tools_list: list) -> AgentRegistry:
"""初始化内置Agent"""
registry = AgentRegistry()
registry.register(AgentDef(
name="weather_agent",
description="天气专家 - 查询天气、出行建议",
system_prompt="你是天气专家。根据天气数据给出专业的出行建议包括穿衣、活动安排等。如果用户问的问题超出天气范围请使用handoff工具将任务交接给合适的Agent。",
tools=["get_weather"],
skill="weather_analyst",
))
registry.register(AgentDef(
name="math_agent",
description="数学专家 - 计算、数学问题解答",
system_prompt="你是数学专家。解答数学问题给出计算过程和原理解释。如果用户问的问题超出数学范围请使用handoff工具将任务交接给合适的Agent。",
tools=["calculate"],
skill="math_tutor",
))
registry.register(AgentDef(
name="knowledge_agent",
description="知识专家 - 搜索知识、深入解释概念",
system_prompt="你是知识专家。搜索知识库给出深入浅出的解释结构化呈现信息。如果用户问的问题需要其他专业Agent处理请使用handoff工具交接。",
tools=["search_knowledge"],
skill="knowledge_explorer",
))
registry.register(AgentDef(
name="general_agent",
description="通用助手 - 处理一般对话和简单问题",
system_prompt="你是黄庄三号通用助手。处理一般对话、问候和简单问题。如果问题需要专业Agent处理请使用handoff工具交接。",
tools=[],
skill="",
))
registry.register(AgentDef(
name="mcp_agent",
description="MCP工具调用 - 时间查询、字符统计、UUID生成等",
system_prompt="你是MCP工具调用专家。通过MCP协议调用外部工具获取实时数据。如果用户请求超出MCP工具范围请使用handoff工具交接给合适的Agent。",
tools=[],
skill="",
))
return registry
# ════════════════════════════════════════════
# Handoff 工具工厂 — 为每个目标Agent创建 transfer 工具
# ════════════════════════════════════════════
def create_handoff_tools(agent_registry: AgentRegistry, source_agent: str) -> list:
"""为 source_agent 创建一组 handoff 工具,每个工具返回 Command(goto=目标节点)
关键:工具返回 Command 对象LangGraph 的 ToolNode 会识别并执行跳转。
"""
from langchain_core.tools import StructuredTool
tools = []
for target in agent_registry.list_agents():
if target.name == source_agent:
continue # 不创建到自己的handoff
target_name = target.name
target_desc = target.description
# 用闭包捕获 target_name
def _make_handoff(t_name, t_desc):
# 预构造描述字符串不用f-string做docstring
_tool_desc = f"将任务交接给 {t_name}{t_desc}。当你无法处理当前问题或问题属于该Agent的专业领域时使用。task_description必须包含完整上下文信息。"
_tool_name = f"transfer_to_{t_name}"
def transfer_tool_func(task_description: str) -> str:
"""将任务交接给另一个Agent处理。"""
# 返回一个特殊标记在自定义tool执行器中转为Command
return json.dumps({
"__handoff__": True,
"target": t_name,
"task_description": task_description,
})
# 用 StructuredTool 手动构造,避免 docstring 问题
tool_obj = StructuredTool.from_function(
func=transfer_tool_func,
name=_tool_name,
description=_tool_desc,
)
return tool_obj
tools.append(_make_handoff(target_name, target_desc))
return tools
# ════════════════════════════════════════════
# Supervisor 节点 — 分析任务 + 更新state不走Command由conditional_edges路由
# ════════════════════════════════════════════
def make_supervisor_node(config, agent_registry, skills_reg, tools_list, mcp_mgr):
llm_cfg = config["llm"]
SUPERVISOR_PROMPT = """你是黄庄三号的任务协调者(Supervisor)。你的职责是:
1. 分析用户请求判断应该由哪个Agent处理
2. 如果需要多个Agent并行协作输出JSON格式的子任务列表
3. 如果只需单个Agent处理指定单个Agent
4. 如果是简单对话,直接回复
可用Agent:
{agent_list}
MCP工具当用户请求匹配这些关键词时指定agent为"mcp_agent":
- 时间/几点/当前时间 → mcp_agent (get_current_time)
- 统计字符/字符数 → mcp_agent (count_chars)
- 生成UUID → mcp_agent (generate_uuid)
重要规则:
- 你的名字是"黄庄三号"
- 对于简单问候,直接回复,不要分配任务
- 输出格式必须是严格的JSON
- 多任务: {{"mode": "parallel", "subtasks": [{{"agent": "agent名", "query": "具体任务"}}]}}
- 单任务: {{"mode": "single", "agent": "agent名", "query": "具体任务"}}
- 直接回复: {{"mode": "direct", "answer": "你的回复"}}
- 只输出JSON不要其他内容"""
async def supervisor_node(state: SharedState) -> dict:
user_msg = ""
for msg in reversed(state["messages"]):
if isinstance(msg, HumanMessage):
user_msg = msg.content
break
llm = ChatOpenAI(
base_url=llm_cfg["base_url"],
api_key=llm_cfg["api_key"],
model=llm_cfg["model"],
temperature=0.1,
)
prompt = SUPERVISOR_PROMPT.format(agent_list=agent_registry.format_list())
messages = [SystemMessage(content=prompt), *state["messages"][-6:]]
resp = await llm.ainvoke(messages)
content = resp.content.strip()
if content.startswith("```"):
content = re.sub(r'^```\w*\n?', '', content)
content = re.sub(r'\n?```$', '', content)
content = content.strip()
try:
plan = json.loads(content)
except json.JSONDecodeError:
plan = {"mode": "direct", "answer": resp.content}
mode = plan.get("mode", "direct")
if mode == "direct":
answer = plan.get("answer", resp.content)
return {
"messages": [AIMessage(content=answer)],
"final_answer": answer,
"subtasks": [],
"active_agent": "",
}
elif mode == "single":
agent_name = plan.get("agent", "general_agent")
query = plan.get("query", user_msg)
return {
"active_agent": agent_name,
"subtasks": [],
"handoff_from": "supervisor",
"handoff_context": query,
"handoff_history": [f"supervisor → {agent_name}: {query}"],
}
elif mode == "parallel":
subtasks = plan.get("subtasks", [])
return {
"subtasks": subtasks,
"active_agent": "",
"handoff_history": [f"supervisor → parallel: {json.dumps(subtasks, ensure_ascii=False)}"],
}
else:
return {"final_answer": resp.content, "active_agent": "", "subtasks": []}
return supervisor_node
# ════════════════════════════════════════════
# Agent 节点工厂 — 每个Agent是独立节点
# ════════════════════════════════════════════
def make_agent_node(config, agent_def: AgentDef, agent_registry: AgentRegistry,
skills_reg: SkillRegistry, tools_list: list, mcp_mgr):
"""创建一个Agent节点函数该Agent可以
1. 调用业务工具(天气、计算器等)
2. 调用MCP工具如果有
3. 通过handoff工具交接给其他Agent返回Command
"""
llm_cfg = config["llm"]
agent_cfg = config.get("agent", {})
temp = agent_cfg.get("skill_temperature", 0.7)
max_iter = agent_cfg.get("max_iterations", 5)
# 准备这个Agent可用的工具业务工具 + handoff工具
agent_tools = []
for tname in agent_def.tools:
for t in tools_list:
if t.name == tname:
agent_tools.append(t)
handoff_tools = create_handoff_tools(agent_registry, agent_def.name)
agent_tools.extend(handoff_tools)
async def agent_node(state: SharedState) -> Command:
# 获取当前任务上下文
handoff_from = state.get("handoff_from", "")
handoff_context = state.get("handoff_context", "")
handoff_history = state.get("handoff_history", [])
# 构造系统提示
system_content = agent_def.system_prompt
# 如果有handoff上下文注入
if handoff_from and handoff_context:
system_content += f"\n\n[交接信息] 你正在处理从 {handoff_from} 交接过来的任务:{handoff_context}"
if handoff_history:
history_str = "".join(handoff_history)
system_content += f"\n[交接链路] {history_str}"
# 提取用户消息
user_msg = ""
for msg in reversed(state["messages"]):
if isinstance(msg, HumanMessage):
user_msg = msg.content
break
if not user_msg:
user_msg = handoff_context
# 执行业务工具(预执行模式:直接调用获取结果)
tool_info = ""
for tname in agent_def.tools:
for t in tools_list:
if t.name == tname:
try:
if tname == "get_weather":
cities = ["北京", "上海", "深圳", "黄庄"]
city = next((c for c in cities if c in user_msg), "北京")
r = await t.ainvoke({"city": city})
elif tname == "calculate":
expr = re.findall(r'[\d+\-*/(). ]+', user_msg)
r = await t.ainvoke({"expression": expr[0].strip() if expr else "1+1"})
else:
r = await t.ainvoke({"query": user_msg})
tool_info += f"\n工具{tname}结果: {r}"
except Exception as e:
tool_info += f"\n工具{tname}错误: {e}"
# MCP工具
if mcp_mgr and user_msg:
route = mcp_mgr.match_route(user_msg)
if route:
session, tool_name = route
mcp_result = await mcp_mgr.call_tool(session, tool_name, user_msg)
tool_info += f"\nMCP工具{tool_name}结果: {mcp_result}"
# 执行技能脚本OpenClaw
if agent_def.skill:
sk = skills_reg.get(agent_def.skill)
if sk and sk.is_openclaw and sk.scripts:
for script_path in sk.scripts:
try:
proc = await asyncio.create_subprocess_exec(
"python3", script_path, user_msg,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
output = stdout.decode("utf-8", errors="replace").strip()
if output:
tool_info += f"\n[脚本输出]\n{output[:2000]}"
except Exception as e:
tool_info += f"\n[脚本错误] {e}"
# 构造技能提示
skill_prompt = ""
if agent_def.skill:
sk = skills_reg.get(agent_def.skill)
if sk and not sk.is_openclaw:
skill_prompt = sk.prompt.format(input=user_msg)
# 构造消息
messages = [SystemMessage(content=system_content)]
if skill_prompt:
messages.append(SystemMessage(content=f"技能指导:{skill_prompt}"))
if tool_info:
messages.append(SystemMessage(content=f"工具/脚本提供的数据:{tool_info}"))
# 添加历史消息(最近几条)
messages.extend(state["messages"][-6:])
# 如果历史中没有用户消息,补充一条
if not any(isinstance(m, HumanMessage) for m in state["messages"][-6:]):
messages.append(HumanMessage(content=user_msg))
# LLM调用带handoff工具绑定
llm = ChatOpenAI(
base_url=llm_cfg["base_url"],
api_key=llm_cfg["api_key"],
model=llm_cfg["model"],
temperature=temp,
)
# 先尝试不带工具的简单调用(如果模型不支持复杂工具调用)
# 直接让LLM回答如果需要handoff在回答中标注
resp = await llm.ainvoke(messages)
answer = resp.content
# 检查LLM是否在回答中表达了需要handoff的意图
# 用第二轮调用判断是否需要交接
handoff_check_prompt = f"""你是任务路由判断器。根据以下Agent的回答判断该Agent是否需要将任务交接给其他Agent。
当前Agent: {agent_def.name}{agent_def.description}
Agent回答: {answer[:500]}
可用Agent:
{agent_registry.format_list()}
判断规则:
- 如果Agent已经完整回答了问题不需要交接输出: {{"handoff": false}}
- 如果Agent表示自己无法处理、问题超出其专业范围、或建议由其他Agent处理输出: {{"handoff": true, "target": "目标agent名", "reason": "原因"}}
- 只输出JSON不要其他内容"""
handoff_resp = await llm.ainvoke([
SystemMessage(content=handoff_check_prompt),
HumanMessage(content="请判断"),
])
handoff_content = handoff_resp.content.strip()
if handoff_content.startswith("```"):
handoff_content = re.sub(r'^```\w*\n?', '', handoff_content)
handoff_content = re.sub(r'\n?```$', '', handoff_content)
handoff_content = handoff_content.strip()
try:
handoff_decision = json.loads(handoff_content)
except json.JSONDecodeError:
handoff_decision = {"handoff": False}
if handoff_decision.get("handoff") and handoff_decision.get("target"):
target = handoff_decision["target"]
reason = handoff_decision.get("reason", "")
task_desc = f"{agent_def.name}交接: {reason}。原始问题: {user_msg}"
return Command(
goto=target,
update={
"active_agent": target,
"handoff_from": agent_def.name,
"handoff_context": task_desc,
"handoff_history": [f"{agent_def.name}{target}: {reason}"],
"messages": [AIMessage(content=f"[{agent_def.name}交接给{target}] {reason}")],
},
)
# 不需要handoff直接返回结果
return Command(
goto=END,
update={
"final_answer": answer,
"results": [f"[{agent_def.name}] {answer}"],
"messages": [AIMessage(content=answer)],
},
)
return agent_node
# ════════════════════════════════════════════
# Aggregator 节点(并行结果聚合)
# ════════════════════════════════════════════
def make_aggregator_node(config):
llm_cfg = config["llm"]
async def aggregator_node(state: SharedState) -> dict:
results = state.get("results", [])
if not results:
return {"final_answer": "所有Agent已完成但无结果"}
if len(results) == 1:
content = re.sub(r'^\[.+?\]\s*', '', results[0])
return {
"final_answer": content,
"messages": [AIMessage(content=content)],
}
combined = "\n\n---\n\n".join(results)
agg_llm = ChatOpenAI(
base_url=llm_cfg["base_url"],
api_key=llm_cfg["api_key"],
model=llm_cfg["model"],
temperature=0.5,
)
resp = await agg_llm.ainvoke([
SystemMessage(content="你是黄庄三号。请将以下多个专业Agent的结果整合为一个连贯、结构化的回答。保留关键信息去除冗余。"),
HumanMessage(content=f"多个Agent的结果\n\n{combined}"),
])
return {
"final_answer": resp.content,
"messages": [AIMessage(content=resp.content)],
}
return aggregator_node
# ════════════════════════════════════════════
# 并行 Worker 节点Send API用只写results不写非reducer字段
# ════════════════════════════════════════════
def make_parallel_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr):
"""并行Worker接收SubTaskState执行后只写results安全并发"""
llm_cfg = config["llm"]
agent_cfg = config.get("agent", {})
temp = agent_cfg.get("skill_temperature", 0.7)
async def parallel_worker(state: SubTaskState) -> dict:
task = state["task"]
agent_name = task.get("agent", "general_agent")
query = task.get("query", "")
agent_def = agent_registry.get(agent_name)
if not agent_def:
agent_def = agent_registry.get("general_agent")
# 执行业务工具
tool_info = ""
for tname in agent_def.tools:
for t in tools_list:
if t.name == tname:
try:
if tname == "get_weather":
cities = ["北京", "上海", "深圳", "黄庄"]
city = next((c for c in cities if c in query), "北京")
r = await t.ainvoke({"city": city})
elif tname == "calculate":
expr = re.findall(r'[\d+\-*/(). ]+', query)
r = await t.ainvoke({"expression": expr[0].strip() if expr else "1+1"})
else:
r = await t.ainvoke({"query": query})
tool_info += f"\n工具{tname}结果: {r}"
except Exception as e:
tool_info += f"\n工具{tname}错误: {e}"
# MCP工具
if mcp_mgr and query:
route = mcp_mgr.match_route(query)
if route:
session, tool_name = route
mcp_result = await mcp_mgr.call_tool(session, tool_name, query)
tool_info += f"\nMCP工具{tool_name}结果: {mcp_result}"
# 构造提示词
prompt = agent_def.system_prompt
if agent_def.skill:
sk = skills_reg.get(agent_def.skill)
if sk and not sk.is_openclaw:
prompt = sk.prompt.format(input=query)
worker_llm = ChatOpenAI(
base_url=llm_cfg["base_url"],
api_key=llm_cfg["api_key"],
model=llm_cfg["model"],
temperature=temp,
)
messages = [
SystemMessage(content=prompt),
HumanMessage(content=query),
]
if tool_info:
messages.append(SystemMessage(content=f"工具/脚本提供的数据:{tool_info}"))
resp = await worker_llm.ainvoke(messages)
return {"results": [f"[{agent_def.name}] {resp.content}"]}
return parallel_worker
# ════════════════════════════════════════════
# 构建多Agent图
# ════════════════════════════════════════════
async def build_graph(config, skills_reg, mcp_mgr, tools_list):
agent_registry = init_agents(skills_reg, tools_list)
all_agent_names = [a.name for a in agent_registry.list_agents()]
# 创建 Supervisor
supervisor = make_supervisor_node(config, agent_registry, skills_reg, tools_list, mcp_mgr)
# 创建各Agent节点串行/单任务路径支持Command handoff
agent_nodes = {}
for agent_def in agent_registry.list_agents():
agent_nodes[agent_def.name] = make_agent_node(
config, agent_def, agent_registry, skills_reg, tools_list, mcp_mgr
)
# 创建并行Worker节点并行路径只写results安全并发
parallel_worker = make_parallel_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr)
aggregator = make_aggregator_node(config)
# 构建图
g = StateGraph(SharedState)
# 添加节点
g.add_node("supervisor", supervisor)
for name, node_fn in agent_nodes.items():
g.add_node(name, node_fn)
g.add_node("parallel_worker", parallel_worker) # 并行WorkerSubTaskState
g.add_node("aggregator", aggregator)
# 入口
g.add_edge(START, "supervisor")
# ── Supervisor 路由conditional_edges ──
def route_from_supervisor(state: SharedState):
"""根据 Supervisor 输出的 state 决定路由:
- final_answer 非空 → END
- subtasks 有多个 → Send 并行分发到 parallel_worker
- active_agent 非空 → 跳到对应 Agent 节点
"""
final_answer = state.get("final_answer", "")
if final_answer:
return "end"
subtasks = state.get("subtasks", [])
if subtasks and len(subtasks) > 1:
# 并行模式:用 Send API 分发
return [
Send("parallel_worker", {"task": t, "messages": state["messages"][-4:]})
for t in subtasks
]
active_agent = state.get("active_agent", "")
if active_agent and active_agent in all_agent_names:
return active_agent
return "end"
# 构建path_map
path_map = {"end": END}
for name in all_agent_names:
path_map[name] = name
g.add_conditional_edges("supervisor", route_from_supervisor, path_map)
# 并行Worker → 聚合器
g.add_edge("parallel_worker", "aggregator")
g.add_edge("aggregator", END)
# 注意各Agent节点使用 Command(goto=...) 进行 handoff
# LangGraph 自动处理 Command 路由,不需要额外添加边。
return g.compile()
# ════════════════════════════════════════════
# 运行入口
# ════════════════════════════════════════════
all_tools = []
skills_registry = SkillRegistry()
mcp_manager = None
async def run_agent(user_input: str, graph):
result = await graph.ainvoke({
"messages": [HumanMessage(content=user_input)],
"subtasks": [], "results": [], "active_agent": "",
"handoff_from": "", "handoff_context": "",
"handoff_history": [], "final_answer": "",
})
last = result["messages"][-1]
return {
"reply": last.content if hasattr(last, "content") else str(last),
"subtasks": result.get("subtasks", []),
"results_count": len(result.get("results", [])),
"handoff_history": result.get("handoff_history", []),
"final_answer": result.get("final_answer", ""),
}
async def interactive_mode(graph):
print("=" * 60)
print(" 黄庄三号 Agent v4.0 - Command Handoff交互版")
print(" Supervisor + 独立Agent节点 + Agent间Handoff")
print("=" * 60)
print(" 输入 quit 退出")
print("=" * 60)
while True:
try:
user_input = input("\n你> ").strip()
except (EOFError, KeyboardInterrupt):
break
if not user_input or user_input.lower() in ("quit", "exit", "q"):
break
result = await run_agent(user_input, graph)
if result["subtasks"]:
agents = [t.get("agent", "?") for t in result["subtasks"]]
print(f"\n[调度] 分配给: {', '.join(agents)}")
if result["handoff_history"]:
print(f"[交接] {''.join(result['handoff_history'])}")
if result["results_count"] > 1:
print(f"[聚合] 合并 {result['results_count']} 个Agent结果")
print(f"\n黄庄三号> {result['reply']}")
async def main():
global all_tools, skills_registry, mcp_manager
parser = argparse.ArgumentParser(description="黄庄三号 Agent v4.0")
parser.add_argument("--mcp", action="store_true", help="启用MCP")
parser.add_argument("--test", action="store_true", help="自动测试")
args = parser.parse_args()
print("=" * 60)
print(" 黄庄三号 Agent v4.0 - Command Handoff交互版")
print("=" * 60)
# 加载配置
print("\n[配置] 加载 config.yaml ...")
config = load_config()
# 扫描工具
print("\n[工具] 扫描 tools/ ...")
all_tools = scan_tools()
print(f" 工具总数: {len(all_tools)}")
# 扫描技能
print("\n[技能] 扫描 skills/ ...")
skills_registry = scan_skills()
print(f" 技能总数: {len(skills_registry.list_skills())}")
# 连接MCP
if args.mcp and config.get("mcp_servers"):
print("\n[MCP] 连接服务器 ...")
mcp_manager = MCPManager()
await mcp_manager.connect_all(config["mcp_servers"])
# 初始化Agent
agent_registry = init_agents(skills_registry, all_tools)
print(f"\n[Agent] 已注册 {len(agent_registry.list_agents())} 个Agent:")
for a in agent_registry.list_agents():
print(f" - {a.name}: {a.description}")
# 构建图
print("\n[构建] 多Agent图Command Handoff版 ...")
graph = await build_graph(config, skills_registry, mcp_manager, all_tools)
if args.test:
tests = [
("单Agent:天气", "黄庄天气怎么样?"),
("单Agent:数学", "算一下 99*88+77"),
("单Agent:知识", "MCP是什么"),
("直接回复", "你好你是谁?"),
("多Agent并行", "帮我查一下北京和上海的天气再算一下123+456"),
("Handoff测试:天气→数学", "帮我查一下北京天气顺便算算100*200"),
]
if args.mcp and mcp_manager:
tests.append(("MCP:时间", "现在几点了?"))
for label, query in tests:
print(f"\n{''*55}")
print(f"[测试:{label}] {query}")
try:
r = await run_agent(query, graph)
if r["subtasks"]:
agents = [t.get("agent", "?") for t in r["subtasks"]]
print(f" 调度: {', '.join(agents)}")
if r["handoff_history"]:
print(f" 交接: {''.join(r['handoff_history'])}")
if r["results_count"] > 1:
print(f" 聚合: {r['results_count']} 个Agent结果")
print(f" 回复: {r['reply'][:200]}")
except Exception as e:
print(f" 错误: {e}")
print(f"\n{'='*60}")
print(" 多Agent + Handoff 测试完成!")
print("=" * 60)
else:
await interactive_mode(graph)
if mcp_manager:
await mcp_manager.close()
if __name__ == "__main__":
asyncio.run(main())