feat: v4.0 Command Handoff - Agent间可互相交接任务

核心改动:
- 每个 Agent 成为独立图节点(weather/math/knowledge/mcp/general)
- Agent 间通过 Command(goto=目标节点) 实现任务交接
- Supervisor 返回普通dict,由 conditional_edges 路由
- 并行模式使用独立 parallel_worker 节点(SubTaskState),避免并发冲突
- 新增 handoff_from/handoff_context/handoff_history 状态字段
- handoff 判断:Agent先回答,再由LLM判断是否需要交接给其他Agent
- 保留 Send API 并行 + Aggregator 聚合能力

测试通过:单Agent、直接回复、多Agent并行、Handoff链路追踪
This commit is contained in:
2026-04-24 10:38:33 +08:00
parent 2c55213d39
commit f6e8d40459

617
agent.py
View File

@@ -1,12 +1,11 @@
"""
黄庄三号 Agent v3.0 - Agent交互版
====================================
架构: Supervisor + Worker(Agent) + Aggregator
- Supervisor: 分析任务,分解子任务,决定分给哪个Agent
- Worker: 各专业Agent(subgraph),独立执行子任务
- Aggregator: 聚合多Agent结果生成最终回复
- 支持: 并行分发(Send API)、串行交接(Command handoff)
- Agent间通信: 共享State + Command + 消息总线
黄庄三号 Agent v4.0 - Agent间Command Handoff交互版
====================================================
架构: Supervisor + 独立Agent节点 + Command Handoff
- Supervisor: 分析任务,用Command分发到目标Agent
- 每个Agent是独立图节点拥有自己的LLM + 工具 + handoff工具
- Agent间通过 transfer_to_xxx 工具实现Command handoff直接跳转
- 支持: 并行分发(Send API)、串行分发(Command goto)、Agent间互相交接
运行方式:
python3 agent.py --test 自动测试
@@ -244,7 +243,7 @@ def _parse_tool_args(tool_name: str, schema_fields: dict, user_input: str) -> di
if field_name in ("timezone",):
args[field_name] = "Asia/Shanghai"
elif field_name in ("text",):
match = re.search(r"['\"\u201c\u201d](.+?)['\"\u201c\u201d]", user_input)
match = re.search(r"""['"\u201c\u201d](.+?)['"\u201c\u201d]""", user_input)
args[field_name] = match.group(1) if match else user_input
elif field_name in ("city",):
args[field_name] = user_input
@@ -252,16 +251,22 @@ def _parse_tool_args(tool_name: str, schema_fields: dict, user_input: str) -> di
# ════════════════════════════════════════════
# ★ 核心:多 Agent 架构
# ★ 核心:多 Agent 架构 v4.0 — Command Handoff
# ════════════════════════════════════════════
# --- 共享状态 ---
class SharedState(TypedDict):
"""多Agent共享状态"""
"""多Agent共享状态
注意并行场景下多个节点同时更新state只有带reducer的字段能安全并发。
非reducer字段如handoff_from在并行模式中不写入。
"""
messages: Annotated[list, add_messages] # 对话消息(累加)
subtasks: list[dict] # 分解的子任务
subtasks: list[dict] # 分解的子任务(并行场景用)
results: Annotated[list[str], add] # Agent执行结果累加聚合
active_agent: str # 当前活跃Agent
handoff_from: str # 从哪个Agent交接过来的
handoff_context: str # 交接时附带的上下文
handoff_history: Annotated[list[str], add] # 交接历史记录(累加)
final_answer: str # 最终回复
@@ -274,12 +279,11 @@ class SubTaskState(TypedDict):
# --- Agent 定义 ---
class AgentDef(BaseModel):
"""Agent定义"""
name: str
name: str # 节点名也是图中的node key
description: str
system_prompt: str
tools: list[str] = [] # 依赖的工具名
skill: str = "" # 依赖的技能名
tools: list[str] = [] # 依赖的业务工具名
skill: str = "" # 依赖的技能名
class AgentRegistry:
"""Agent注册表"""
@@ -299,11 +303,8 @@ class AgentRegistry:
return "\n".join(f" - {a.name}: {a.description}" for a in self._agents.values())
def match(self, task_type: str) -> AgentDef | None:
"""根据任务类型匹配Agent"""
# 精确匹配
if task_type in self._agents:
return self._agents[task_type]
# 关键词匹配
for agent in self._agents.values():
if agent.name in task_type or task_type in agent.name:
return agent
@@ -314,47 +315,42 @@ def init_agents(skills_reg: SkillRegistry, tools_list: list) -> AgentRegistry:
"""初始化内置Agent"""
registry = AgentRegistry()
# 天气专家
registry.register(AgentDef(
name="weather_agent",
description="天气专家 - 查询天气、出行建议",
system_prompt="你是天气专家。根据天气数据给出专业的出行建议,包括穿衣、活动安排等。",
system_prompt="你是天气专家。根据天气数据给出专业的出行建议,包括穿衣、活动安排等。如果用户问的问题超出天气范围请使用handoff工具将任务交接给合适的Agent。",
tools=["get_weather"],
skill="weather_analyst",
))
# 数学专家
registry.register(AgentDef(
name="math_agent",
description="数学专家 - 计算、数学问题解答",
system_prompt="你是数学专家。解答数学问题,给出计算过程和原理解释。",
system_prompt="你是数学专家。解答数学问题,给出计算过程和原理解释。如果用户问的问题超出数学范围请使用handoff工具将任务交接给合适的Agent。",
tools=["calculate"],
skill="math_tutor",
))
# 知识专家
registry.register(AgentDef(
name="knowledge_agent",
description="知识专家 - 搜索知识、深入解释概念",
system_prompt="你是知识专家。搜索知识库,给出深入浅出的解释,结构化呈现信息。",
system_prompt="你是知识专家。搜索知识库,给出深入浅出的解释,结构化呈现信息。如果用户问的问题需要其他专业Agent处理请使用handoff工具交接。",
tools=["search_knowledge"],
skill="knowledge_explorer",
))
# 通用Agent兜底
registry.register(AgentDef(
name="general_agent",
description="通用助手 - 处理一般对话和简单问题",
system_prompt="你是黄庄三号通用助手。处理一般对话、问候和简单问题。",
system_prompt="你是黄庄三号通用助手。处理一般对话、问候和简单问题。如果问题需要专业Agent处理请使用handoff工具交接。",
tools=[],
skill="",
))
# MCP Agent
registry.register(AgentDef(
name="mcp_agent",
description="MCP工具调用 - 时间查询、字符统计、UUID生成等",
system_prompt="你是MCP工具调用专家。通过MCP协议调用外部工具获取实时数据。",
system_prompt="你是MCP工具调用专家。通过MCP协议调用外部工具获取实时数据。如果用户请求超出MCP工具范围请使用handoff工具交接给合适的Agent。",
tools=[],
skill="",
))
@@ -362,21 +358,69 @@ def init_agents(skills_reg: SkillRegistry, tools_list: list) -> AgentRegistry:
return registry
# --- Supervisor 节点 ---
# ════════════════════════════════════════════
# Handoff 工具工厂 — 为每个目标Agent创建 transfer 工具
# ════════════════════════════════════════════
def create_handoff_tools(agent_registry: AgentRegistry, source_agent: str) -> list:
"""为 source_agent 创建一组 handoff 工具,每个工具返回 Command(goto=目标节点)
关键:工具返回 Command 对象LangGraph 的 ToolNode 会识别并执行跳转。
"""
from langchain_core.tools import StructuredTool
tools = []
for target in agent_registry.list_agents():
if target.name == source_agent:
continue # 不创建到自己的handoff
target_name = target.name
target_desc = target.description
# 用闭包捕获 target_name
def _make_handoff(t_name, t_desc):
# 预构造描述字符串不用f-string做docstring
_tool_desc = f"将任务交接给 {t_name}{t_desc}。当你无法处理当前问题或问题属于该Agent的专业领域时使用。task_description必须包含完整上下文信息。"
_tool_name = f"transfer_to_{t_name}"
def transfer_tool_func(task_description: str) -> str:
"""将任务交接给另一个Agent处理。"""
# 返回一个特殊标记在自定义tool执行器中转为Command
return json.dumps({
"__handoff__": True,
"target": t_name,
"task_description": task_description,
})
# 用 StructuredTool 手动构造,避免 docstring 问题
tool_obj = StructuredTool.from_function(
func=transfer_tool_func,
name=_tool_name,
description=_tool_desc,
)
return tool_obj
tools.append(_make_handoff(target_name, target_desc))
return tools
# ════════════════════════════════════════════
# Supervisor 节点 — 分析任务 + 更新state不走Command由conditional_edges路由
# ════════════════════════════════════════════
def make_supervisor_node(config, agent_registry, skills_reg, tools_list, mcp_mgr):
llm_cfg = config["llm"]
SUPERVISOR_PROMPT = """你是黄庄三号的任务协调者(Supervisor)。你的职责是:
1. 分析用户请求,判断是否需要分解为多个子任务
2. 如果需要多个Agent协作输出JSON格式的子任务列表
3. 如果只需单个Agent处理输出单个任务
1. 分析用户请求,判断应该由哪个Agent处理
2. 如果需要多个Agent并行协作输出JSON格式的子任务列表
3. 如果只需单个Agent处理指定单个Agent
4. 如果是简单对话,直接回复
可用Agent:
{agent_list}
MCP工具当用户请求匹配这些关键词时分配给对应Agent或在subtasks中指定agent为"mcp_agent":
MCP工具当用户请求匹配这些关键词时指定agent为"mcp_agent":
- 时间/几点/当前时间 → mcp_agent (get_current_time)
- 统计字符/字符数 → mcp_agent (count_chars)
- 生成UUID → mcp_agent (generate_uuid)
@@ -391,7 +435,6 @@ MCP工具当用户请求匹配这些关键词时分配给对应Agent或在
- 只输出JSON不要其他内容"""
async def supervisor_node(state: SharedState) -> dict:
# 获取用户最新消息
user_msg = ""
for msg in reversed(state["messages"]):
if isinstance(msg, HumanMessage):
@@ -410,9 +453,7 @@ MCP工具当用户请求匹配这些关键词时分配给对应Agent或在
resp = await llm.ainvoke(messages)
# 解析 Supervisor 的JSON输出
content = resp.content.strip()
# 清理可能的markdown代码块
if content.startswith("```"):
content = re.sub(r'^```\w*\n?', '', content)
content = re.sub(r'\n?```$', '', content)
@@ -421,60 +462,278 @@ MCP工具当用户请求匹配这些关键词时分配给对应Agent或在
try:
plan = json.loads(content)
except json.JSONDecodeError:
# JSON解析失败当作直接回复
plan = {"mode": "direct", "answer": resp.content}
mode = plan.get("mode", "direct")
if mode == "direct":
answer = plan.get("answer", resp.content)
return {
"messages": [AIMessage(content=plan.get("answer", resp.content))],
"final_answer": plan.get("answer", resp.content),
"messages": [AIMessage(content=answer)],
"final_answer": answer,
"subtasks": [],
"active_agent": "",
}
elif mode == "single":
agent_name = plan.get("agent", "general_agent")
query = plan.get("query", user_msg)
return {
"subtasks": [{"agent": plan["agent"], "query": plan.get("query", user_msg)}],
"active_agent": plan["agent"],
"active_agent": agent_name,
"subtasks": [],
"handoff_from": "supervisor",
"handoff_context": query,
"handoff_history": [f"supervisor → {agent_name}: {query}"],
}
elif mode == "parallel":
subtasks = plan.get("subtasks", [])
return {
"subtasks": plan.get("subtasks", []),
"subtasks": subtasks,
"active_agent": "",
"handoff_history": [f"supervisor → parallel: {json.dumps(subtasks, ensure_ascii=False)}"],
}
else:
return {"subtasks": []}
return {"final_answer": resp.content, "active_agent": "", "subtasks": []}
return supervisor_node
# --- 任务分发路由Send API并行 ---
def route_subtasks(state: SharedState):
"""根据subtasks决定路由并行分发 or 结束"""
subtasks = state.get("subtasks", [])
if not subtasks:
return "end"
if len(subtasks) == 1:
# 单任务直接发给worker
return "worker_single"
# 多任务并行分发
return "worker_parallel"
# ════════════════════════════════════════════
# Agent 节点工厂 — 每个Agent是独立节点
# ════════════════════════════════════════════
def make_agent_node(config, agent_def: AgentDef, agent_registry: AgentRegistry,
skills_reg: SkillRegistry, tools_list: list, mcp_mgr):
"""创建一个Agent节点函数该Agent可以
1. 调用业务工具(天气、计算器等)
2. 调用MCP工具如果有
3. 通过handoff工具交接给其他Agent返回Command
"""
llm_cfg = config["llm"]
agent_cfg = config.get("agent", {})
temp = agent_cfg.get("skill_temperature", 0.7)
max_iter = agent_cfg.get("max_iterations", 5)
# 准备这个Agent可用的工具业务工具 + handoff工具
agent_tools = []
for tname in agent_def.tools:
for t in tools_list:
if t.name == tname:
agent_tools.append(t)
handoff_tools = create_handoff_tools(agent_registry, agent_def.name)
agent_tools.extend(handoff_tools)
async def agent_node(state: SharedState) -> Command:
# 获取当前任务上下文
handoff_from = state.get("handoff_from", "")
handoff_context = state.get("handoff_context", "")
handoff_history = state.get("handoff_history", [])
# 构造系统提示
system_content = agent_def.system_prompt
# 如果有handoff上下文注入
if handoff_from and handoff_context:
system_content += f"\n\n[交接信息] 你正在处理从 {handoff_from} 交接过来的任务:{handoff_context}"
if handoff_history:
history_str = "".join(handoff_history)
system_content += f"\n[交接链路] {history_str}"
# 提取用户消息
user_msg = ""
for msg in reversed(state["messages"]):
if isinstance(msg, HumanMessage):
user_msg = msg.content
break
if not user_msg:
user_msg = handoff_context
# 执行业务工具(预执行模式:直接调用获取结果)
tool_info = ""
for tname in agent_def.tools:
for t in tools_list:
if t.name == tname:
try:
if tname == "get_weather":
cities = ["北京", "上海", "深圳", "黄庄"]
city = next((c for c in cities if c in user_msg), "北京")
r = await t.ainvoke({"city": city})
elif tname == "calculate":
expr = re.findall(r'[\d+\-*/(). ]+', user_msg)
r = await t.ainvoke({"expression": expr[0].strip() if expr else "1+1"})
else:
r = await t.ainvoke({"query": user_msg})
tool_info += f"\n工具{tname}结果: {r}"
except Exception as e:
tool_info += f"\n工具{tname}错误: {e}"
# MCP工具
if mcp_mgr and user_msg:
route = mcp_mgr.match_route(user_msg)
if route:
session, tool_name = route
mcp_result = await mcp_mgr.call_tool(session, tool_name, user_msg)
tool_info += f"\nMCP工具{tool_name}结果: {mcp_result}"
# 执行技能脚本OpenClaw
if agent_def.skill:
sk = skills_reg.get(agent_def.skill)
if sk and sk.is_openclaw and sk.scripts:
for script_path in sk.scripts:
try:
proc = await asyncio.create_subprocess_exec(
"python3", script_path, user_msg,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
output = stdout.decode("utf-8", errors="replace").strip()
if output:
tool_info += f"\n[脚本输出]\n{output[:2000]}"
except Exception as e:
tool_info += f"\n[脚本错误] {e}"
# 构造技能提示
skill_prompt = ""
if agent_def.skill:
sk = skills_reg.get(agent_def.skill)
if sk and not sk.is_openclaw:
skill_prompt = sk.prompt.format(input=user_msg)
# 构造消息
messages = [SystemMessage(content=system_content)]
if skill_prompt:
messages.append(SystemMessage(content=f"技能指导:{skill_prompt}"))
if tool_info:
messages.append(SystemMessage(content=f"工具/脚本提供的数据:{tool_info}"))
# 添加历史消息(最近几条)
messages.extend(state["messages"][-6:])
# 如果历史中没有用户消息,补充一条
if not any(isinstance(m, HumanMessage) for m in state["messages"][-6:]):
messages.append(HumanMessage(content=user_msg))
# LLM调用带handoff工具绑定
llm = ChatOpenAI(
base_url=llm_cfg["base_url"],
api_key=llm_cfg["api_key"],
model=llm_cfg["model"],
temperature=temp,
)
# 先尝试不带工具的简单调用(如果模型不支持复杂工具调用)
# 直接让LLM回答如果需要handoff在回答中标注
resp = await llm.ainvoke(messages)
answer = resp.content
# 检查LLM是否在回答中表达了需要handoff的意图
# 用第二轮调用判断是否需要交接
handoff_check_prompt = f"""你是任务路由判断器。根据以下Agent的回答判断该Agent是否需要将任务交接给其他Agent。
当前Agent: {agent_def.name}{agent_def.description}
Agent回答: {answer[:500]}
可用Agent:
{agent_registry.format_list()}
判断规则:
- 如果Agent已经完整回答了问题不需要交接输出: {{"handoff": false}}
- 如果Agent表示自己无法处理、问题超出其专业范围、或建议由其他Agent处理输出: {{"handoff": true, "target": "目标agent名", "reason": "原因"}}
- 只输出JSON不要其他内容"""
handoff_resp = await llm.ainvoke([
SystemMessage(content=handoff_check_prompt),
HumanMessage(content="请判断"),
])
handoff_content = handoff_resp.content.strip()
if handoff_content.startswith("```"):
handoff_content = re.sub(r'^```\w*\n?', '', handoff_content)
handoff_content = re.sub(r'\n?```$', '', handoff_content)
handoff_content = handoff_content.strip()
try:
handoff_decision = json.loads(handoff_content)
except json.JSONDecodeError:
handoff_decision = {"handoff": False}
if handoff_decision.get("handoff") and handoff_decision.get("target"):
target = handoff_decision["target"]
reason = handoff_decision.get("reason", "")
task_desc = f"{agent_def.name}交接: {reason}。原始问题: {user_msg}"
return Command(
goto=target,
update={
"active_agent": target,
"handoff_from": agent_def.name,
"handoff_context": task_desc,
"handoff_history": [f"{agent_def.name}{target}: {reason}"],
"messages": [AIMessage(content=f"[{agent_def.name}交接给{target}] {reason}")],
},
)
# 不需要handoff直接返回结果
return Command(
goto=END,
update={
"final_answer": answer,
"results": [f"[{agent_def.name}] {answer}"],
"messages": [AIMessage(content=answer)],
},
)
return agent_node
def dispatch_parallel(state: SharedState):
"""并行分发:为每个子任务创建一个 Send"""
return [
Send("worker_node", {"task": t, "messages": state["messages"][-4:]})
for t in state.get("subtasks", [])
]
# ════════════════════════════════════════════
# Aggregator 节点(并行结果聚合)
# ════════════════════════════════════════════
def make_aggregator_node(config):
llm_cfg = config["llm"]
async def aggregator_node(state: SharedState) -> dict:
results = state.get("results", [])
if not results:
return {"final_answer": "所有Agent已完成但无结果"}
if len(results) == 1:
content = re.sub(r'^\[.+?\]\s*', '', results[0])
return {
"final_answer": content,
"messages": [AIMessage(content=content)],
}
combined = "\n\n---\n\n".join(results)
agg_llm = ChatOpenAI(
base_url=llm_cfg["base_url"],
api_key=llm_cfg["api_key"],
model=llm_cfg["model"],
temperature=0.5,
)
resp = await agg_llm.ainvoke([
SystemMessage(content="你是黄庄三号。请将以下多个专业Agent的结果整合为一个连贯、结构化的回答。保留关键信息去除冗余。"),
HumanMessage(content=f"多个Agent的结果\n\n{combined}"),
])
return {
"final_answer": resp.content,
"messages": [AIMessage(content=resp.content)],
}
return aggregator_node
# --- Worker 节点(执行子任务) ---
def make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr):
# ════════════════════════════════════════════
# 并行 Worker 节点Send API用只写results不写非reducer字段
# ════════════════════════════════════════════
def make_parallel_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr):
"""并行Worker接收SubTaskState执行后只写results安全并发"""
llm_cfg = config["llm"]
agent_cfg = config.get("agent", {})
temp = agent_cfg.get("skill_temperature", 0.7)
async def worker_node(state: SubTaskState) -> dict:
async def parallel_worker(state: SubTaskState) -> dict:
task = state["task"]
agent_name = task.get("agent", "general_agent")
query = task.get("query", "")
@@ -483,7 +742,7 @@ def make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr):
if not agent_def:
agent_def = agent_registry.get("general_agent")
# 执行工具(如果有)
# 执行业务工具
tool_info = ""
for tname in agent_def.tools:
for t in tools_list:
@@ -502,7 +761,7 @@ def make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr):
except Exception as e:
tool_info += f"\n工具{tname}错误: {e}"
# MCP工具(如果有)
# MCP工具
if mcp_mgr and query:
route = mcp_mgr.match_route(query)
if route:
@@ -510,24 +769,6 @@ def make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr):
mcp_result = await mcp_mgr.call_tool(session, tool_name, query)
tool_info += f"\nMCP工具{tool_name}结果: {mcp_result}"
# 执行技能脚本如果是OpenClaw技能
if agent_def.skill:
sk = skills_reg.get(agent_def.skill)
if sk and sk.is_openclaw and sk.scripts:
for script_path in sk.scripts:
try:
proc = await asyncio.create_subprocess_exec(
"python3", script_path, query,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
output = stdout.decode("utf-8", errors="replace").strip()
if output:
tool_info += f"\n[脚本输出]\n{output[:2000]}"
except Exception as e:
tool_info += f"\n[脚本错误] {e}"
# 构造提示词
prompt = agent_def.system_prompt
if agent_def.skill:
@@ -551,96 +792,9 @@ def make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr):
resp = await worker_llm.ainvoke(messages)
result_tag = f"[{agent_def.name}]"
return {"results": [f"{result_tag} {resp.content}"]}
return {"results": [f"[{agent_def.name}] {resp.content}"]}
return worker_node
# --- 单任务Worker不走Send直接串行 ---
def make_worker_single_node(config, agent_registry, skills_reg, tools_list, mcp_mgr):
"""和 worker_node 逻辑相同,但用于单任务串行执行"""
worker = make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr)
async def worker_single_node(state: SharedState) -> dict:
subtasks = state.get("subtasks", [])
if not subtasks:
return {"results": [], "final_answer": "没有可执行的任务"}
task = subtasks[0]
sub_state = SubTaskState(task=task, messages=state["messages"][-4:])
result = await worker(sub_state)
# 单任务直接作为最终回复
content = result["results"][0] if result.get("results") else "执行完成"
# 去掉 agent tag 前缀(用户不需要看到)
clean = re.sub(r'^\[.+?\]\s*', '', content)
return {
"results": result.get("results", []),
"final_answer": clean,
"messages": [AIMessage(content=clean)],
}
return worker_single_node
# --- 聚合节点 ---
def make_aggregator_node(config):
llm_cfg = config["llm"]
async def aggregator_node(state: SharedState) -> dict:
results = state.get("results", [])
if not results:
return {"final_answer": "所有Agent已完成但无结果"}
# 如果只有一个结果,直接使用
if len(results) == 1:
content = re.sub(r'^\[.+?\]\s*', '', results[0])
return {
"final_answer": content,
"messages": [AIMessage(content=content)],
}
# 多结果需要聚合
combined = "\n\n---\n\n".join(results)
agg_llm = ChatOpenAI(
base_url=llm_cfg["base_url"],
api_key=llm_cfg["api_key"],
model=llm_cfg["model"],
temperature=0.5,
)
resp = await agg_llm.ainvoke([
SystemMessage(content="你是黄庄三号。请将以下多个专业Agent的结果整合为一个连贯、结构化的回答。保留关键信息去除冗余。"),
HumanMessage(content=f"多个Agent的结果\n\n{combined}"),
])
return {
"final_answer": resp.content,
"messages": [AIMessage(content=resp.content)],
}
return aggregator_node
# --- Agent间 Handoff 交接 ---
def make_handoff_tool(agent_registry):
"""创建Agent间交接工具"""
from langchain_core.tools import tool as lc_tool
agent_names = [a.name for a in agent_registry.list_agents()]
@lc_tool
def handoff_to_agent(target_agent: str, task_description: str) -> str:
"""将任务交接给另一个Agent处理。
target_agent: 目标Agent名称可选: """ + ", ".join(agent_names) + """
task_description: 需要交接的任务描述
"""
if target_agent not in agent_names:
return f"Agent {target_agent} 不存在可用Agent: {agent_names}"
return f"任务已交接给 {target_agent}: {task_description}"
return handoff_tool
return parallel_worker
# ════════════════════════════════════════════
@@ -648,49 +802,73 @@ def make_handoff_tool(agent_registry):
# ════════════════════════════════════════════
async def build_graph(config, skills_reg, mcp_mgr, tools_list):
agent_registry = init_agents(skills_reg, tools_list)
all_agent_names = [a.name for a in agent_registry.list_agents()]
# 创建 Supervisor
supervisor = make_supervisor_node(config, agent_registry, skills_reg, tools_list, mcp_mgr)
worker = make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr)
worker_single = make_worker_single_node(config, agent_registry, skills_reg, tools_list, mcp_mgr)
# 创建各Agent节点串行/单任务路径支持Command handoff
agent_nodes = {}
for agent_def in agent_registry.list_agents():
agent_nodes[agent_def.name] = make_agent_node(
config, agent_def, agent_registry, skills_reg, tools_list, mcp_mgr
)
# 创建并行Worker节点并行路径只写results安全并发
parallel_worker = make_parallel_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr)
aggregator = make_aggregator_node(config)
# 构建图
g = StateGraph(SharedState)
# 添加节点
g.add_node("supervisor", supervisor)
g.add_node("worker_node", worker) # 并行worker (Send API分发到这里)
g.add_node("worker_single", worker_single) # 串行worker
g.add_node("aggregator", aggregator) # 聚合器
for name, node_fn in agent_nodes.items():
g.add_node(name, node_fn)
g.add_node("parallel_worker", parallel_worker) # 并行WorkerSubTaskState
g.add_node("aggregator", aggregator)
# 添加边
# 入口
g.add_edge(START, "supervisor")
# Supervisor路由:单条 conditional_edges,用 Send 实现并行
# ── Supervisor 路由conditional_edges ──
def route_from_supervisor(state: SharedState):
subtasks = state.get("subtasks", [])
"""根据 Supervisor 输出的 state 决定路由:
- final_answer 非空 → END
- subtasks 有多个 → Send 并行分发到 parallel_worker
- active_agent 非空 → 跳到对应 Agent 节点
"""
final_answer = state.get("final_answer", "")
# 直接回复
if final_answer or not subtasks:
if final_answer:
return "end"
# 单任务串行
if len(subtasks) == 1:
return "worker_single"
# 多任务并行分发 (通过 Send API)
return [
Send("worker_node", {"task": t, "messages": state["messages"][-4:]})
for t in subtasks
]
g.add_conditional_edges("supervisor", route_from_supervisor, {
"end": END,
"worker_single": "worker_single",
"worker_node": "worker_node", # Send API 的目标节点
})
subtasks = state.get("subtasks", [])
if subtasks and len(subtasks) > 1:
# 并行模式:用 Send API 分发
return [
Send("parallel_worker", {"task": t, "messages": state["messages"][-4:]})
for t in subtasks
]
# Worker完成后的流向
g.add_edge("worker_node", "aggregator") # 并行结果聚合
g.add_edge("worker_single", END) # 单任务直接结束
g.add_edge("aggregator", END) # 聚合后结束
active_agent = state.get("active_agent", "")
if active_agent and active_agent in all_agent_names:
return active_agent
return "end"
# 构建path_map
path_map = {"end": END}
for name in all_agent_names:
path_map[name] = name
g.add_conditional_edges("supervisor", route_from_supervisor, path_map)
# 并行Worker → 聚合器
g.add_edge("parallel_worker", "aggregator")
g.add_edge("aggregator", END)
# 注意各Agent节点使用 Command(goto=...) 进行 handoff
# LangGraph 自动处理 Command 路由,不需要额外添加边。
return g.compile()
@@ -706,21 +884,24 @@ mcp_manager = None
async def run_agent(user_input: str, graph):
result = await graph.ainvoke({
"messages": [HumanMessage(content=user_input)],
"subtasks": [], "results": [], "active_agent": "", "final_answer": "",
"subtasks": [], "results": [], "active_agent": "",
"handoff_from": "", "handoff_context": "",
"handoff_history": [], "final_answer": "",
})
last = result["messages"][-1]
return {
"reply": last.content if hasattr(last, "content") else str(last),
"subtasks": result.get("subtasks", []),
"results_count": len(result.get("results", [])),
"handoff_history": result.get("handoff_history", []),
"final_answer": result.get("final_answer", ""),
}
async def interactive_mode(graph):
print("=" * 60)
print(" 黄庄三号 Agent v3.0 - 多Agent交互版")
print(" Supervisor + Worker(Agent) + Aggregator")
print(" 黄庄三号 Agent v4.0 - Command Handoff交互版")
print(" Supervisor + 独立Agent节点 + Agent间Handoff")
print("=" * 60)
print(" 输入 quit 退出")
print("=" * 60)
@@ -737,6 +918,8 @@ async def interactive_mode(graph):
if result["subtasks"]:
agents = [t.get("agent", "?") for t in result["subtasks"]]
print(f"\n[调度] 分配给: {', '.join(agents)}")
if result["handoff_history"]:
print(f"[交接] {''.join(result['handoff_history'])}")
if result["results_count"] > 1:
print(f"[聚合] 合并 {result['results_count']} 个Agent结果")
print(f"\n黄庄三号> {result['reply']}")
@@ -745,13 +928,13 @@ async def interactive_mode(graph):
async def main():
global all_tools, skills_registry, mcp_manager
parser = argparse.ArgumentParser(description="黄庄三号 Agent v3.0")
parser = argparse.ArgumentParser(description="黄庄三号 Agent v4.0")
parser.add_argument("--mcp", action="store_true", help="启用MCP")
parser.add_argument("--test", action="store_true", help="自动测试")
args = parser.parse_args()
print("=" * 60)
print(" 黄庄三号 Agent v3.0 - 多Agent交互版")
print(" 黄庄三号 Agent v4.0 - Command Handoff交互版")
print("=" * 60)
# 加载配置
@@ -781,7 +964,7 @@ async def main():
print(f" - {a.name}: {a.description}")
# 构建图
print("\n[构建] 多Agent图 ...")
print("\n[构建] 多Agent图Command Handoff版 ...")
graph = await build_graph(config, skills_registry, mcp_manager, all_tools)
if args.test:
@@ -791,6 +974,7 @@ async def main():
("单Agent:知识", "MCP是什么"),
("直接回复", "你好你是谁?"),
("多Agent并行", "帮我查一下北京和上海的天气再算一下123+456"),
("Handoff测试:天气→数学", "帮我查一下北京天气顺便算算100*200"),
]
if args.mcp and mcp_manager:
@@ -799,16 +983,21 @@ async def main():
for label, query in tests:
print(f"\n{''*55}")
print(f"[测试:{label}] {query}")
r = await run_agent(query, graph)
if r["subtasks"]:
agents = [t.get("agent", "?") for t in r["subtasks"]]
print(f" 调度: {', '.join(agents)}")
if r["results_count"] > 1:
print(f" 聚合: {r['results_count']} 个Agent结果")
print(f" 回复: {r['reply'][:150]}...")
try:
r = await run_agent(query, graph)
if r["subtasks"]:
agents = [t.get("agent", "?") for t in r["subtasks"]]
print(f" 调度: {', '.join(agents)}")
if r["handoff_history"]:
print(f" 交接: {''.join(r['handoff_history'])}")
if r["results_count"] > 1:
print(f" 聚合: {r['results_count']} 个Agent结果")
print(f" 回复: {r['reply'][:200]}")
except Exception as e:
print(f" 错误: {e}")
print(f"\n{'='*60}")
print(" 多Agent测试完成")
print(" 多Agent + Handoff 测试完成!")
print("=" * 60)
else:
await interactive_mode(graph)