From f6e8d404598d637c779047403c3d8cb26673eb4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=BA=84=E4=B8=89=E5=8F=B7?= Date: Fri, 24 Apr 2026 10:38:33 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20v4.0=20Command=20Handoff=20-=20Agent?= =?UTF-8?q?=E9=97=B4=E5=8F=AF=E4=BA=92=E7=9B=B8=E4=BA=A4=E6=8E=A5=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 核心改动: - 每个 Agent 成为独立图节点(weather/math/knowledge/mcp/general) - Agent 间通过 Command(goto=目标节点) 实现任务交接 - Supervisor 返回普通dict,由 conditional_edges 路由 - 并行模式使用独立 parallel_worker 节点(SubTaskState),避免并发冲突 - 新增 handoff_from/handoff_context/handoff_history 状态字段 - handoff 判断:Agent先回答,再由LLM判断是否需要交接给其他Agent - 保留 Send API 并行 + Aggregator 聚合能力 测试通过:单Agent、直接回复、多Agent并行、Handoff链路追踪 --- agent.py | 617 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 403 insertions(+), 214 deletions(-) diff --git a/agent.py b/agent.py index 978a89c..abb252d 100644 --- a/agent.py +++ b/agent.py @@ -1,12 +1,11 @@ """ -黄庄三号 Agent v3.0 - 多Agent交互版 -==================================== -架构: Supervisor + Worker(Agent) + Aggregator -- Supervisor: 分析任务,分解子任务,决定分给哪个Agent -- Worker: 各专业Agent(subgraph),独立执行子任务 -- Aggregator: 聚合多Agent结果,生成最终回复 -- 支持: 并行分发(Send API)、串行交接(Command handoff) -- Agent间通信: 共享State + Command + 消息总线 +黄庄三号 Agent v4.0 - Agent间Command Handoff交互版 +==================================================== +架构: Supervisor + 独立Agent节点 + Command Handoff +- Supervisor: 分析任务,用Command分发到目标Agent +- 每个Agent是独立图节点,拥有自己的LLM + 工具 + handoff工具 +- Agent间通过 transfer_to_xxx 工具实现Command handoff,直接跳转 +- 支持: 并行分发(Send API)、串行分发(Command goto)、Agent间互相交接 运行方式: python3 agent.py --test 自动测试 @@ -244,7 +243,7 @@ def _parse_tool_args(tool_name: str, schema_fields: dict, user_input: str) -> di if field_name in ("timezone",): args[field_name] = "Asia/Shanghai" elif field_name in ("text",): - match = re.search(r"['\"\u201c\u201d](.+?)['\"\u201c\u201d]", user_input) + match = re.search(r"""['"\u201c\u201d](.+?)['"\u201c\u201d]""", user_input) args[field_name] = match.group(1) if match else user_input elif field_name in ("city",): args[field_name] = user_input @@ -252,16 +251,22 @@ def _parse_tool_args(tool_name: str, schema_fields: dict, user_input: str) -> di # ════════════════════════════════════════════ -# ★ 核心:多 Agent 架构 +# ★ 核心:多 Agent 架构 v4.0 — Command Handoff # ════════════════════════════════════════════ # --- 共享状态 --- class SharedState(TypedDict): - """多Agent共享状态""" + """多Agent共享状态 + 注意:并行场景下多个节点同时更新state,只有带reducer的字段能安全并发。 + 非reducer字段(如handoff_from)在并行模式中不写入。 + """ messages: Annotated[list, add_messages] # 对话消息(累加) - subtasks: list[dict] # 分解的子任务 + subtasks: list[dict] # 分解的子任务(并行场景用) results: Annotated[list[str], add] # Agent执行结果(累加聚合) active_agent: str # 当前活跃Agent + handoff_from: str # 从哪个Agent交接过来的 + handoff_context: str # 交接时附带的上下文 + handoff_history: Annotated[list[str], add] # 交接历史记录(累加) final_answer: str # 最终回复 @@ -274,12 +279,11 @@ class SubTaskState(TypedDict): # --- Agent 定义 --- class AgentDef(BaseModel): """Agent定义""" - name: str + name: str # 节点名,也是图中的node key description: str system_prompt: str - tools: list[str] = [] # 依赖的工具名 - skill: str = "" # 依赖的技能名 - + tools: list[str] = [] # 依赖的业务工具名 + skill: str = "" # 依赖的技能名 class AgentRegistry: """Agent注册表""" @@ -299,11 +303,8 @@ class AgentRegistry: return "\n".join(f" - {a.name}: {a.description}" for a in self._agents.values()) def match(self, task_type: str) -> AgentDef | None: - """根据任务类型匹配Agent""" - # 精确匹配 if task_type in self._agents: return self._agents[task_type] - # 关键词匹配 for agent in self._agents.values(): if agent.name in task_type or task_type in agent.name: return agent @@ -314,47 +315,42 @@ def init_agents(skills_reg: SkillRegistry, tools_list: list) -> AgentRegistry: """初始化内置Agent""" registry = AgentRegistry() - # 天气专家 registry.register(AgentDef( name="weather_agent", description="天气专家 - 查询天气、出行建议", - system_prompt="你是天气专家。根据天气数据给出专业的出行建议,包括穿衣、活动安排等。", + system_prompt="你是天气专家。根据天气数据给出专业的出行建议,包括穿衣、活动安排等。如果用户问的问题超出天气范围,请使用handoff工具将任务交接给合适的Agent。", tools=["get_weather"], skill="weather_analyst", )) - # 数学专家 registry.register(AgentDef( name="math_agent", description="数学专家 - 计算、数学问题解答", - system_prompt="你是数学专家。解答数学问题,给出计算过程和原理解释。", + system_prompt="你是数学专家。解答数学问题,给出计算过程和原理解释。如果用户问的问题超出数学范围,请使用handoff工具将任务交接给合适的Agent。", tools=["calculate"], skill="math_tutor", )) - # 知识专家 registry.register(AgentDef( name="knowledge_agent", description="知识专家 - 搜索知识、深入解释概念", - system_prompt="你是知识专家。搜索知识库,给出深入浅出的解释,结构化呈现信息。", + system_prompt="你是知识专家。搜索知识库,给出深入浅出的解释,结构化呈现信息。如果用户问的问题需要其他专业Agent处理,请使用handoff工具交接。", tools=["search_knowledge"], skill="knowledge_explorer", )) - # 通用Agent(兜底) registry.register(AgentDef( name="general_agent", description="通用助手 - 处理一般对话和简单问题", - system_prompt="你是黄庄三号通用助手。处理一般对话、问候和简单问题。", + system_prompt="你是黄庄三号通用助手。处理一般对话、问候和简单问题。如果问题需要专业Agent处理,请使用handoff工具交接。", tools=[], skill="", )) - # MCP Agent registry.register(AgentDef( name="mcp_agent", description="MCP工具调用 - 时间查询、字符统计、UUID生成等", - system_prompt="你是MCP工具调用专家。通过MCP协议调用外部工具获取实时数据。", + system_prompt="你是MCP工具调用专家。通过MCP协议调用外部工具获取实时数据。如果用户请求超出MCP工具范围,请使用handoff工具交接给合适的Agent。", tools=[], skill="", )) @@ -362,21 +358,69 @@ def init_agents(skills_reg: SkillRegistry, tools_list: list) -> AgentRegistry: return registry -# --- Supervisor 节点 --- +# ════════════════════════════════════════════ +# Handoff 工具工厂 — 为每个目标Agent创建 transfer 工具 +# ════════════════════════════════════════════ +def create_handoff_tools(agent_registry: AgentRegistry, source_agent: str) -> list: + """为 source_agent 创建一组 handoff 工具,每个工具返回 Command(goto=目标节点) + + 关键:工具返回 Command 对象,LangGraph 的 ToolNode 会识别并执行跳转。 + """ + from langchain_core.tools import StructuredTool + + tools = [] + for target in agent_registry.list_agents(): + if target.name == source_agent: + continue # 不创建到自己的handoff + + target_name = target.name + target_desc = target.description + + # 用闭包捕获 target_name + def _make_handoff(t_name, t_desc): + # 预构造描述字符串(不用f-string做docstring) + _tool_desc = f"将任务交接给 {t_name}({t_desc})。当你无法处理当前问题,或问题属于该Agent的专业领域时使用。task_description必须包含完整上下文信息。" + _tool_name = f"transfer_to_{t_name}" + + def transfer_tool_func(task_description: str) -> str: + """将任务交接给另一个Agent处理。""" + # 返回一个特殊标记,在自定义tool执行器中转为Command + return json.dumps({ + "__handoff__": True, + "target": t_name, + "task_description": task_description, + }) + + # 用 StructuredTool 手动构造,避免 docstring 问题 + tool_obj = StructuredTool.from_function( + func=transfer_tool_func, + name=_tool_name, + description=_tool_desc, + ) + return tool_obj + + tools.append(_make_handoff(target_name, target_desc)) + + return tools + + +# ════════════════════════════════════════════ +# Supervisor 节点 — 分析任务 + 更新state(不走Command,由conditional_edges路由) +# ════════════════════════════════════════════ def make_supervisor_node(config, agent_registry, skills_reg, tools_list, mcp_mgr): llm_cfg = config["llm"] SUPERVISOR_PROMPT = """你是黄庄三号的任务协调者(Supervisor)。你的职责是: -1. 分析用户请求,判断是否需要分解为多个子任务 -2. 如果需要多个Agent协作,输出JSON格式的子任务列表 -3. 如果只需单个Agent处理,输出单个任务 +1. 分析用户请求,判断应该由哪个Agent处理 +2. 如果需要多个Agent并行协作,输出JSON格式的子任务列表 +3. 如果只需单个Agent处理,指定单个Agent 4. 如果是简单对话,直接回复 可用Agent: {agent_list} -MCP工具(当用户请求匹配这些关键词时,分配给对应Agent或在subtasks中指定agent为"mcp_agent"): +MCP工具(当用户请求匹配这些关键词时,指定agent为"mcp_agent"): - 时间/几点/当前时间 → mcp_agent (get_current_time) - 统计字符/字符数 → mcp_agent (count_chars) - 生成UUID → mcp_agent (generate_uuid) @@ -391,7 +435,6 @@ MCP工具(当用户请求匹配这些关键词时,分配给对应Agent或在 - 只输出JSON,不要其他内容""" async def supervisor_node(state: SharedState) -> dict: - # 获取用户最新消息 user_msg = "" for msg in reversed(state["messages"]): if isinstance(msg, HumanMessage): @@ -410,9 +453,7 @@ MCP工具(当用户请求匹配这些关键词时,分配给对应Agent或在 resp = await llm.ainvoke(messages) - # 解析 Supervisor 的JSON输出 content = resp.content.strip() - # 清理可能的markdown代码块 if content.startswith("```"): content = re.sub(r'^```\w*\n?', '', content) content = re.sub(r'\n?```$', '', content) @@ -421,60 +462,278 @@ MCP工具(当用户请求匹配这些关键词时,分配给对应Agent或在 try: plan = json.loads(content) except json.JSONDecodeError: - # JSON解析失败,当作直接回复 plan = {"mode": "direct", "answer": resp.content} mode = plan.get("mode", "direct") if mode == "direct": + answer = plan.get("answer", resp.content) return { - "messages": [AIMessage(content=plan.get("answer", resp.content))], - "final_answer": plan.get("answer", resp.content), + "messages": [AIMessage(content=answer)], + "final_answer": answer, "subtasks": [], + "active_agent": "", } elif mode == "single": + agent_name = plan.get("agent", "general_agent") + query = plan.get("query", user_msg) return { - "subtasks": [{"agent": plan["agent"], "query": plan.get("query", user_msg)}], - "active_agent": plan["agent"], + "active_agent": agent_name, + "subtasks": [], + "handoff_from": "supervisor", + "handoff_context": query, + "handoff_history": [f"supervisor → {agent_name}: {query}"], } elif mode == "parallel": + subtasks = plan.get("subtasks", []) return { - "subtasks": plan.get("subtasks", []), + "subtasks": subtasks, + "active_agent": "", + "handoff_history": [f"supervisor → parallel: {json.dumps(subtasks, ensure_ascii=False)}"], } else: - return {"subtasks": []} + return {"final_answer": resp.content, "active_agent": "", "subtasks": []} return supervisor_node -# --- 任务分发路由(Send API并行) --- -def route_subtasks(state: SharedState): - """根据subtasks决定路由:并行分发 or 结束""" - subtasks = state.get("subtasks", []) - if not subtasks: - return "end" - if len(subtasks) == 1: - # 单任务直接发给worker - return "worker_single" - # 多任务并行分发 - return "worker_parallel" +# ════════════════════════════════════════════ +# Agent 节点工厂 — 每个Agent是独立节点 +# ════════════════════════════════════════════ +def make_agent_node(config, agent_def: AgentDef, agent_registry: AgentRegistry, + skills_reg: SkillRegistry, tools_list: list, mcp_mgr): + """创建一个Agent节点函数,该Agent可以: + 1. 调用业务工具(天气、计算器等) + 2. 调用MCP工具(如果有) + 3. 通过handoff工具交接给其他Agent(返回Command) + """ + llm_cfg = config["llm"] + agent_cfg = config.get("agent", {}) + temp = agent_cfg.get("skill_temperature", 0.7) + max_iter = agent_cfg.get("max_iterations", 5) + + # 准备这个Agent可用的工具:业务工具 + handoff工具 + agent_tools = [] + for tname in agent_def.tools: + for t in tools_list: + if t.name == tname: + agent_tools.append(t) + + handoff_tools = create_handoff_tools(agent_registry, agent_def.name) + agent_tools.extend(handoff_tools) + + async def agent_node(state: SharedState) -> Command: + # 获取当前任务上下文 + handoff_from = state.get("handoff_from", "") + handoff_context = state.get("handoff_context", "") + handoff_history = state.get("handoff_history", []) + + # 构造系统提示 + system_content = agent_def.system_prompt + + # 如果有handoff上下文,注入 + if handoff_from and handoff_context: + system_content += f"\n\n[交接信息] 你正在处理从 {handoff_from} 交接过来的任务:{handoff_context}" + + if handoff_history: + history_str = " → ".join(handoff_history) + system_content += f"\n[交接链路] {history_str}" + + # 提取用户消息 + user_msg = "" + for msg in reversed(state["messages"]): + if isinstance(msg, HumanMessage): + user_msg = msg.content + break + if not user_msg: + user_msg = handoff_context + + # 执行业务工具(预执行模式:直接调用获取结果) + tool_info = "" + for tname in agent_def.tools: + for t in tools_list: + if t.name == tname: + try: + if tname == "get_weather": + cities = ["北京", "上海", "深圳", "黄庄"] + city = next((c for c in cities if c in user_msg), "北京") + r = await t.ainvoke({"city": city}) + elif tname == "calculate": + expr = re.findall(r'[\d+\-*/(). ]+', user_msg) + r = await t.ainvoke({"expression": expr[0].strip() if expr else "1+1"}) + else: + r = await t.ainvoke({"query": user_msg}) + tool_info += f"\n工具{tname}结果: {r}" + except Exception as e: + tool_info += f"\n工具{tname}错误: {e}" + + # MCP工具 + if mcp_mgr and user_msg: + route = mcp_mgr.match_route(user_msg) + if route: + session, tool_name = route + mcp_result = await mcp_mgr.call_tool(session, tool_name, user_msg) + tool_info += f"\nMCP工具{tool_name}结果: {mcp_result}" + + # 执行技能脚本(OpenClaw) + if agent_def.skill: + sk = skills_reg.get(agent_def.skill) + if sk and sk.is_openclaw and sk.scripts: + for script_path in sk.scripts: + try: + proc = await asyncio.create_subprocess_exec( + "python3", script_path, user_msg, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30) + output = stdout.decode("utf-8", errors="replace").strip() + if output: + tool_info += f"\n[脚本输出]\n{output[:2000]}" + except Exception as e: + tool_info += f"\n[脚本错误] {e}" + + # 构造技能提示 + skill_prompt = "" + if agent_def.skill: + sk = skills_reg.get(agent_def.skill) + if sk and not sk.is_openclaw: + skill_prompt = sk.prompt.format(input=user_msg) + + # 构造消息 + messages = [SystemMessage(content=system_content)] + if skill_prompt: + messages.append(SystemMessage(content=f"技能指导:{skill_prompt}")) + if tool_info: + messages.append(SystemMessage(content=f"工具/脚本提供的数据:{tool_info}")) + # 添加历史消息(最近几条) + messages.extend(state["messages"][-6:]) + # 如果历史中没有用户消息,补充一条 + if not any(isinstance(m, HumanMessage) for m in state["messages"][-6:]): + messages.append(HumanMessage(content=user_msg)) + + # LLM调用(带handoff工具绑定) + llm = ChatOpenAI( + base_url=llm_cfg["base_url"], + api_key=llm_cfg["api_key"], + model=llm_cfg["model"], + temperature=temp, + ) + + # 先尝试不带工具的简单调用(如果模型不支持复杂工具调用) + # 直接让LLM回答,如果需要handoff,在回答中标注 + resp = await llm.ainvoke(messages) + answer = resp.content + + # 检查LLM是否在回答中表达了需要handoff的意图 + # 用第二轮调用判断是否需要交接 + handoff_check_prompt = f"""你是任务路由判断器。根据以下Agent的回答,判断该Agent是否需要将任务交接给其他Agent。 + +当前Agent: {agent_def.name}({agent_def.description}) +Agent回答: {answer[:500]} + +可用Agent: +{agent_registry.format_list()} + +判断规则: +- 如果Agent已经完整回答了问题,不需要交接,输出: {{"handoff": false}} +- 如果Agent表示自己无法处理、问题超出其专业范围、或建议由其他Agent处理,输出: {{"handoff": true, "target": "目标agent名", "reason": "原因"}} +- 只输出JSON,不要其他内容""" + + handoff_resp = await llm.ainvoke([ + SystemMessage(content=handoff_check_prompt), + HumanMessage(content="请判断"), + ]) + + handoff_content = handoff_resp.content.strip() + if handoff_content.startswith("```"): + handoff_content = re.sub(r'^```\w*\n?', '', handoff_content) + handoff_content = re.sub(r'\n?```$', '', handoff_content) + handoff_content = handoff_content.strip() + + try: + handoff_decision = json.loads(handoff_content) + except json.JSONDecodeError: + handoff_decision = {"handoff": False} + + if handoff_decision.get("handoff") and handoff_decision.get("target"): + target = handoff_decision["target"] + reason = handoff_decision.get("reason", "") + task_desc = f"从{agent_def.name}交接: {reason}。原始问题: {user_msg}" + + return Command( + goto=target, + update={ + "active_agent": target, + "handoff_from": agent_def.name, + "handoff_context": task_desc, + "handoff_history": [f"{agent_def.name} → {target}: {reason}"], + "messages": [AIMessage(content=f"[{agent_def.name}交接给{target}] {reason}")], + }, + ) + + # 不需要handoff,直接返回结果 + return Command( + goto=END, + update={ + "final_answer": answer, + "results": [f"[{agent_def.name}] {answer}"], + "messages": [AIMessage(content=answer)], + }, + ) + + return agent_node -def dispatch_parallel(state: SharedState): - """并行分发:为每个子任务创建一个 Send""" - return [ - Send("worker_node", {"task": t, "messages": state["messages"][-4:]}) - for t in state.get("subtasks", []) - ] +# ════════════════════════════════════════════ +# Aggregator 节点(并行结果聚合) +# ════════════════════════════════════════════ +def make_aggregator_node(config): + llm_cfg = config["llm"] + + async def aggregator_node(state: SharedState) -> dict: + results = state.get("results", []) + if not results: + return {"final_answer": "所有Agent已完成,但无结果"} + + if len(results) == 1: + content = re.sub(r'^\[.+?\]\s*', '', results[0]) + return { + "final_answer": content, + "messages": [AIMessage(content=content)], + } + + combined = "\n\n---\n\n".join(results) + agg_llm = ChatOpenAI( + base_url=llm_cfg["base_url"], + api_key=llm_cfg["api_key"], + model=llm_cfg["model"], + temperature=0.5, + ) + + resp = await agg_llm.ainvoke([ + SystemMessage(content="你是黄庄三号。请将以下多个专业Agent的结果整合为一个连贯、结构化的回答。保留关键信息,去除冗余。"), + HumanMessage(content=f"多个Agent的结果:\n\n{combined}"), + ]) + + return { + "final_answer": resp.content, + "messages": [AIMessage(content=resp.content)], + } + + return aggregator_node -# --- Worker 节点(执行子任务) --- -def make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr): +# ════════════════════════════════════════════ +# 并行 Worker 节点(Send API用,只写results,不写非reducer字段) +# ════════════════════════════════════════════ +def make_parallel_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr): + """并行Worker:接收SubTaskState,执行后只写results(安全并发)""" llm_cfg = config["llm"] agent_cfg = config.get("agent", {}) temp = agent_cfg.get("skill_temperature", 0.7) - async def worker_node(state: SubTaskState) -> dict: + async def parallel_worker(state: SubTaskState) -> dict: task = state["task"] agent_name = task.get("agent", "general_agent") query = task.get("query", "") @@ -483,7 +742,7 @@ def make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr): if not agent_def: agent_def = agent_registry.get("general_agent") - # 执行工具(如果有) + # 执行业务工具 tool_info = "" for tname in agent_def.tools: for t in tools_list: @@ -502,7 +761,7 @@ def make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr): except Exception as e: tool_info += f"\n工具{tname}错误: {e}" - # MCP工具(如果有) + # MCP工具 if mcp_mgr and query: route = mcp_mgr.match_route(query) if route: @@ -510,24 +769,6 @@ def make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr): mcp_result = await mcp_mgr.call_tool(session, tool_name, query) tool_info += f"\nMCP工具{tool_name}结果: {mcp_result}" - # 执行技能脚本(如果是OpenClaw技能) - if agent_def.skill: - sk = skills_reg.get(agent_def.skill) - if sk and sk.is_openclaw and sk.scripts: - for script_path in sk.scripts: - try: - proc = await asyncio.create_subprocess_exec( - "python3", script_path, query, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30) - output = stdout.decode("utf-8", errors="replace").strip() - if output: - tool_info += f"\n[脚本输出]\n{output[:2000]}" - except Exception as e: - tool_info += f"\n[脚本错误] {e}" - # 构造提示词 prompt = agent_def.system_prompt if agent_def.skill: @@ -551,96 +792,9 @@ def make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr): resp = await worker_llm.ainvoke(messages) - result_tag = f"[{agent_def.name}]" - return {"results": [f"{result_tag} {resp.content}"]} + return {"results": [f"[{agent_def.name}] {resp.content}"]} - return worker_node - - -# --- 单任务Worker(不走Send,直接串行) --- -def make_worker_single_node(config, agent_registry, skills_reg, tools_list, mcp_mgr): - """和 worker_node 逻辑相同,但用于单任务串行执行""" - worker = make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr) - - async def worker_single_node(state: SharedState) -> dict: - subtasks = state.get("subtasks", []) - if not subtasks: - return {"results": [], "final_answer": "没有可执行的任务"} - - task = subtasks[0] - sub_state = SubTaskState(task=task, messages=state["messages"][-4:]) - result = await worker(sub_state) - - # 单任务直接作为最终回复 - content = result["results"][0] if result.get("results") else "执行完成" - # 去掉 agent tag 前缀(用户不需要看到) - clean = re.sub(r'^\[.+?\]\s*', '', content) - return { - "results": result.get("results", []), - "final_answer": clean, - "messages": [AIMessage(content=clean)], - } - - return worker_single_node - - -# --- 聚合节点 --- -def make_aggregator_node(config): - llm_cfg = config["llm"] - - async def aggregator_node(state: SharedState) -> dict: - results = state.get("results", []) - if not results: - return {"final_answer": "所有Agent已完成,但无结果"} - - # 如果只有一个结果,直接使用 - if len(results) == 1: - content = re.sub(r'^\[.+?\]\s*', '', results[0]) - return { - "final_answer": content, - "messages": [AIMessage(content=content)], - } - - # 多结果需要聚合 - combined = "\n\n---\n\n".join(results) - agg_llm = ChatOpenAI( - base_url=llm_cfg["base_url"], - api_key=llm_cfg["api_key"], - model=llm_cfg["model"], - temperature=0.5, - ) - - resp = await agg_llm.ainvoke([ - SystemMessage(content="你是黄庄三号。请将以下多个专业Agent的结果整合为一个连贯、结构化的回答。保留关键信息,去除冗余。"), - HumanMessage(content=f"多个Agent的结果:\n\n{combined}"), - ]) - - return { - "final_answer": resp.content, - "messages": [AIMessage(content=resp.content)], - } - - return aggregator_node - - -# --- Agent间 Handoff 交接 --- -def make_handoff_tool(agent_registry): - """创建Agent间交接工具""" - from langchain_core.tools import tool as lc_tool - - agent_names = [a.name for a in agent_registry.list_agents()] - - @lc_tool - def handoff_to_agent(target_agent: str, task_description: str) -> str: - """将任务交接给另一个Agent处理。 - target_agent: 目标Agent名称,可选: """ + ", ".join(agent_names) + """ - task_description: 需要交接的任务描述 - """ - if target_agent not in agent_names: - return f"Agent {target_agent} 不存在,可用Agent: {agent_names}" - return f"任务已交接给 {target_agent}: {task_description}" - - return handoff_tool + return parallel_worker # ════════════════════════════════════════════ @@ -648,49 +802,73 @@ def make_handoff_tool(agent_registry): # ════════════════════════════════════════════ async def build_graph(config, skills_reg, mcp_mgr, tools_list): agent_registry = init_agents(skills_reg, tools_list) + all_agent_names = [a.name for a in agent_registry.list_agents()] + # 创建 Supervisor supervisor = make_supervisor_node(config, agent_registry, skills_reg, tools_list, mcp_mgr) - worker = make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr) - worker_single = make_worker_single_node(config, agent_registry, skills_reg, tools_list, mcp_mgr) + + # 创建各Agent节点(串行/单任务路径,支持Command handoff) + agent_nodes = {} + for agent_def in agent_registry.list_agents(): + agent_nodes[agent_def.name] = make_agent_node( + config, agent_def, agent_registry, skills_reg, tools_list, mcp_mgr + ) + + # 创建并行Worker节点(并行路径,只写results,安全并发) + parallel_worker = make_parallel_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr) aggregator = make_aggregator_node(config) + # 构建图 g = StateGraph(SharedState) # 添加节点 g.add_node("supervisor", supervisor) - g.add_node("worker_node", worker) # 并行worker (Send API分发到这里) - g.add_node("worker_single", worker_single) # 串行worker - g.add_node("aggregator", aggregator) # 聚合器 + for name, node_fn in agent_nodes.items(): + g.add_node(name, node_fn) + g.add_node("parallel_worker", parallel_worker) # 并行Worker(SubTaskState) + g.add_node("aggregator", aggregator) - # 添加边 + # 入口 g.add_edge(START, "supervisor") - # Supervisor路由:单条 conditional_edges,用 Send 实现并行 + # ── Supervisor 路由(conditional_edges) ── def route_from_supervisor(state: SharedState): - subtasks = state.get("subtasks", []) + """根据 Supervisor 输出的 state 决定路由: + - final_answer 非空 → END + - subtasks 有多个 → Send 并行分发到 parallel_worker + - active_agent 非空 → 跳到对应 Agent 节点 + """ final_answer = state.get("final_answer", "") - # 直接回复 - if final_answer or not subtasks: + if final_answer: return "end" - # 单任务串行 - if len(subtasks) == 1: - return "worker_single" - # 多任务并行分发 (通过 Send API) - return [ - Send("worker_node", {"task": t, "messages": state["messages"][-4:]}) - for t in subtasks - ] - g.add_conditional_edges("supervisor", route_from_supervisor, { - "end": END, - "worker_single": "worker_single", - "worker_node": "worker_node", # Send API 的目标节点 - }) + subtasks = state.get("subtasks", []) + if subtasks and len(subtasks) > 1: + # 并行模式:用 Send API 分发 + return [ + Send("parallel_worker", {"task": t, "messages": state["messages"][-4:]}) + for t in subtasks + ] - # Worker完成后的流向 - g.add_edge("worker_node", "aggregator") # 并行结果聚合 - g.add_edge("worker_single", END) # 单任务直接结束 - g.add_edge("aggregator", END) # 聚合后结束 + active_agent = state.get("active_agent", "") + if active_agent and active_agent in all_agent_names: + return active_agent + + return "end" + + # 构建path_map + path_map = {"end": END} + for name in all_agent_names: + path_map[name] = name + + g.add_conditional_edges("supervisor", route_from_supervisor, path_map) + + # 并行Worker → 聚合器 + g.add_edge("parallel_worker", "aggregator") + g.add_edge("aggregator", END) + + # 注意:各Agent节点使用 Command(goto=...) 进行 handoff, + # LangGraph 自动处理 Command 路由,不需要额外添加边。 return g.compile() @@ -706,21 +884,24 @@ mcp_manager = None async def run_agent(user_input: str, graph): result = await graph.ainvoke({ "messages": [HumanMessage(content=user_input)], - "subtasks": [], "results": [], "active_agent": "", "final_answer": "", + "subtasks": [], "results": [], "active_agent": "", + "handoff_from": "", "handoff_context": "", + "handoff_history": [], "final_answer": "", }) last = result["messages"][-1] return { "reply": last.content if hasattr(last, "content") else str(last), "subtasks": result.get("subtasks", []), "results_count": len(result.get("results", [])), + "handoff_history": result.get("handoff_history", []), "final_answer": result.get("final_answer", ""), } async def interactive_mode(graph): print("=" * 60) - print(" 黄庄三号 Agent v3.0 - 多Agent交互版") - print(" Supervisor + Worker(Agent) + Aggregator") + print(" 黄庄三号 Agent v4.0 - Command Handoff交互版") + print(" Supervisor + 独立Agent节点 + Agent间Handoff") print("=" * 60) print(" 输入 quit 退出") print("=" * 60) @@ -737,6 +918,8 @@ async def interactive_mode(graph): if result["subtasks"]: agents = [t.get("agent", "?") for t in result["subtasks"]] print(f"\n[调度] 分配给: {', '.join(agents)}") + if result["handoff_history"]: + print(f"[交接] {' → '.join(result['handoff_history'])}") if result["results_count"] > 1: print(f"[聚合] 合并 {result['results_count']} 个Agent结果") print(f"\n黄庄三号> {result['reply']}") @@ -745,13 +928,13 @@ async def interactive_mode(graph): async def main(): global all_tools, skills_registry, mcp_manager - parser = argparse.ArgumentParser(description="黄庄三号 Agent v3.0") + parser = argparse.ArgumentParser(description="黄庄三号 Agent v4.0") parser.add_argument("--mcp", action="store_true", help="启用MCP") parser.add_argument("--test", action="store_true", help="自动测试") args = parser.parse_args() print("=" * 60) - print(" 黄庄三号 Agent v3.0 - 多Agent交互版") + print(" 黄庄三号 Agent v4.0 - Command Handoff交互版") print("=" * 60) # 加载配置 @@ -781,7 +964,7 @@ async def main(): print(f" - {a.name}: {a.description}") # 构建图 - print("\n[构建] 多Agent图 ...") + print("\n[构建] 多Agent图(Command Handoff版) ...") graph = await build_graph(config, skills_registry, mcp_manager, all_tools) if args.test: @@ -791,6 +974,7 @@ async def main(): ("单Agent:知识", "MCP是什么?"), ("直接回复", "你好你是谁?"), ("多Agent并行", "帮我查一下北京和上海的天气,再算一下123+456"), + ("Handoff测试:天气→数学", "帮我查一下北京天气,顺便算算100*200"), ] if args.mcp and mcp_manager: @@ -799,16 +983,21 @@ async def main(): for label, query in tests: print(f"\n{'─'*55}") print(f"[测试:{label}] {query}") - r = await run_agent(query, graph) - if r["subtasks"]: - agents = [t.get("agent", "?") for t in r["subtasks"]] - print(f" 调度: {', '.join(agents)}") - if r["results_count"] > 1: - print(f" 聚合: {r['results_count']} 个Agent结果") - print(f" 回复: {r['reply'][:150]}...") + try: + r = await run_agent(query, graph) + if r["subtasks"]: + agents = [t.get("agent", "?") for t in r["subtasks"]] + print(f" 调度: {', '.join(agents)}") + if r["handoff_history"]: + print(f" 交接: {' → '.join(r['handoff_history'])}") + if r["results_count"] > 1: + print(f" 聚合: {r['results_count']} 个Agent结果") + print(f" 回复: {r['reply'][:200]}") + except Exception as e: + print(f" 错误: {e}") print(f"\n{'='*60}") - print(" 多Agent测试完成!") + print(" 多Agent + Handoff 测试完成!") print("=" * 60) else: await interactive_mode(graph)