From 2c55213d3905544e10a713170d912ee74fbbb762 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=BA=84=E4=B8=89=E5=8F=B7?= Date: Fri, 24 Apr 2026 00:50:26 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20v3.0=20=E5=A4=9AAgent=E4=BA=A4=E4=BA=92?= =?UTF-8?q?=20-=20Supervisor=20+=20Worker=20+=20Aggregator?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 架构级重构: - Supervisor节点:分析任务、分解子任务、智能调度Agent - Worker节点:各专业Agent(subgraph)独立执行子任务 - Aggregator节点:并行结果自动聚合 - Send API并行:多Agent同时处理不同子任务 - Agent注册表:AgentRegistry管理5个Agent - weather_agent: 天气专家 - math_agent: 数学专家 - knowledge_agent: 知识专家 - mcp_agent: MCP工具调用 - general_agent: 通用助手(兜底) - 共享State:messages/subtasks/results/final_answer - Supervisor输出JSON格式任务计划(parallel/single/direct) --- README.md | 27 +- agent.py | 912 +++++++++++++++++++++++++++++------------------------- 2 files changed, 522 insertions(+), 417 deletions(-) diff --git a/README.md b/README.md index 40ad3d3..0d0e059 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,31 @@ # 黄庄三号四能力 Agent -基于 LangGraph 的多能力 AI Agent,集成 FC / MCP / 思考模式 / Skill 四种核心能力。 +基于 LangGraph 的多能力 AI Agent,集成 FC / MCP / 思考模式 / Skill 四种核心能力,**v3.0 支持多Agent交互**。 -**v2.0 配置驱动版** — 新增工具/技能/MCP服务器无需改源码,丢文件或改配置即可。 +## 核心架构:Supervisor + Worker + Aggregator + +``` +用户输入 → Supervisor(分析/分解/调度) + │ + ┌──────────┼──────────┬──────────┐ + ▼ ▼ ▼ ▼ +┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ +│天气 │ │数学 │ │知识 │ │MCP │ ← 各Worker Agent +│专家 │ │专家 │ │专家 │ │Agent │ +└──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ + │ │ │ │ + └─────────┴─────────┴─────────┘ + │ + Aggregator(聚合) + │ + 最终回复 +``` + +**关键能力:** +- **并行执行**:多个Agent同时处理不同子任务(Send API) +- **串行执行**:单Agent直接处理简单任务 +- **结果聚合**:多Agent结果自动合并为连贯回复 +- **智能调度**:Supervisor自动分析任务、选择Agent ## 四项能力 diff --git a/agent.py b/agent.py index 6f8dd42..978a89c 100644 --- a/agent.py +++ b/agent.py @@ -1,8 +1,12 @@ """ -黄庄三号 Agent v2.0 - 配置驱动版 -================================== -所有工具、技能、MCP服务器、路由关键词均从配置加载 -新增:丢文件到 tools/ 或 skills/ 即可,不改源码 +黄庄三号 Agent v3.0 - 多Agent交互版 +==================================== +架构: Supervisor + Worker(Agent) + Aggregator +- Supervisor: 分析任务,分解子任务,决定分给哪个Agent +- Worker: 各专业Agent(subgraph),独立执行子任务 +- Aggregator: 聚合多Agent结果,生成最终回复 +- 支持: 并行分发(Send API)、串行交接(Command handoff) +- Agent间通信: 共享State + Command + 消息总线 运行方式: python3 agent.py --test 自动测试 @@ -11,16 +15,17 @@ python3 agent.py 交互模式(不带MCP) """ import os -import sys import re +import json import asyncio import argparse import importlib.util -from typing import Annotated +from typing import Annotated, Literal from typing_extensions import TypedDict from pydantic import BaseModel, Field from contextlib import AsyncExitStack from pathlib import Path +from operator import add import yaml from langchain_openai import ChatOpenAI @@ -29,6 +34,7 @@ from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, Tool from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langgraph.prebuilt import ToolNode +from langgraph.types import Send, Command # ════════════════════════════════════════════ @@ -38,13 +44,13 @@ BASE_DIR = Path(__file__).parent.resolve() CONFIG_PATH = BASE_DIR / "config.yaml" TOOLS_DIR = BASE_DIR / "tools" SKILLS_DIR = BASE_DIR / "skills" +AGENTS_DIR = BASE_DIR / "agents" # ════════════════════════════════════════════ # 配置加载 # ════════════════════════════════════════════ def load_config() -> dict: - """加载 config.yaml""" with open(CONFIG_PATH, "r", encoding="utf-8") as f: return yaml.safe_load(f) @@ -53,49 +59,36 @@ def load_config() -> dict: # 工具自动扫描注册 # ════════════════════════════════════════════ def scan_tools() -> list: - """ - 扫描 tools/ 目录下所有 .py 文件 - 每个文件必须暴露 TOOLS 列表 - """ all_tools = [] if not TOOLS_DIR.exists(): - print(" [工具] tools/ 目录不存在,跳过") return all_tools - for py_file in sorted(TOOLS_DIR.glob("*.py")): if py_file.name.startswith("_"): continue try: - spec = importlib.util.spec_from_file_location( - f"tools.{py_file.stem}", str(py_file) - ) + spec = importlib.util.spec_from_file_location(f"tools.{py_file.stem}", str(py_file)) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) - if hasattr(mod, "TOOLS"): tool_list = mod.TOOLS all_tools.extend(tool_list) - print(f" [工具] {py_file.name}: 加载 {len(tool_list)} 个 -> {[t.name for t in tool_list]}") - else: - print(f" [工具] {py_file.name}: 无 TOOLS 变量,跳过") + print(f" [工具] {py_file.name}: {[t.name for t in tool_list]}") except Exception as e: print(f" [工具] {py_file.name}: 加载失败 -> {e}") - return all_tools # ════════════════════════════════════════════ -# 技能自动扫描注册(兼容 OpenClaw 格式) +# 技能自动扫描注册(兼容 OpenClaw) # ════════════════════════════════════════════ class SkillDef(BaseModel): name: str description: str prompt: str tools: list[str] = [] - # OpenClaw 扩展字段 - skill_dir: str = "" # OpenClaw skill 目录路径 - scripts: list[str] = [] # scripts/ 下的脚本列表 - is_openclaw: bool = False # 是否 OpenClaw 格式 + skill_dir: str = "" + scripts: list[str] = [] + is_openclaw: bool = False class SkillRegistry: def __init__(self): @@ -114,51 +107,36 @@ class SkillRegistry: lines = [] for s in self._skills.values(): tag = " [OpenClaw]" if s.is_openclaw else "" - scripts_info = f", scripts: {s.scripts}" if s.scripts else "" - lines.append(f" - {s.name}: {s.description}{tag}{scripts_info}") + lines.append(f" - {s.name}: {s.description}{tag}") return "\n".join(lines) def _parse_skill_md_frontmatter(content: str) -> dict: - """解析 SKILL.md 的 YAML frontmatter""" if not content.startswith("---"): return {} end = content.find("---", 3) if end == -1: return {} - fm = content[3:end].strip() - return yaml.safe_load(fm) or {} + return yaml.safe_load(content[3:end].strip()) or {} def scan_skills() -> SkillRegistry: - """ - 扫描 skills/ 目录,支持两种格式: - 1. .yaml 文件 - 简洁技能定义 - 2. 目录/SKILL.md - OpenClaw 格式(含 scripts/ 子目录) - """ registry = SkillRegistry() if not SKILLS_DIR.exists(): - print(" [技能] skills/ 目录不存在,跳过") return registry - # 1) 扫描 .yaml 文件 for yaml_file in sorted(SKILLS_DIR.glob("*.yaml")): try: with open(yaml_file, "r", encoding="utf-8") as f: data = yaml.safe_load(f) - - skill = SkillDef( - name=data["name"], - description=data.get("description", ""), - prompt=data.get("prompt", ""), - tools=data.get("tools", []), - ) - registry.register(skill) - print(f" [技能] {yaml_file.name}: {skill.name}") + registry.register(SkillDef( + name=data["name"], description=data.get("description", ""), + prompt=data.get("prompt", ""), tools=data.get("tools", []), + )) + print(f" [技能] {yaml_file.name}: {data['name']}") except Exception as e: print(f" [技能] {yaml_file.name}: 加载失败 -> {e}") - # 2) 扫描 OpenClaw 格式(目录/SKILL.md) for skill_dir in sorted(SKILLS_DIR.iterdir()): if not skill_dir.is_dir(): continue @@ -168,48 +146,25 @@ def scan_skills() -> SkillRegistry: try: with open(skill_md, "r", encoding="utf-8") as f: content = f.read() - fm = _parse_skill_md_frontmatter(content) if not fm: - print(f" [技能] {skill_dir.name}/SKILL.md: 无 frontmatter,跳过") continue - name = fm.get("name", skill_dir.name) description = fm.get("description", "") - - # 把 SKILL.md 的 markdown body 作为 prompt body_start = content.find("---", 3) prompt = content[body_start + 3:].strip() if body_start != -1 else "" - - # 扫描 scripts/ 子目录 scripts_dir = skill_dir / "scripts" - scripts = [] + scripts, tools = [], [] if scripts_dir.exists(): - for script_file in sorted(scripts_dir.glob("*.py")): - if not script_file.name.startswith("_"): - scripts.append(str(script_file)) - - # 从 prompt 中提取 tools 引用(### scripts/xxx.py 段落) - # OpenClaw skill 通常在 body 中提到脚本名 - tools = [] - for script_path in scripts: - # 脚本文件名(不含扩展名)作为工具名 - tool_name = Path(script_path).stem - tools.append(tool_name) - - skill = SkillDef( - name=name, - description=description, - prompt=prompt, - tools=tools, - skill_dir=str(skill_dir), - scripts=scripts, - is_openclaw=True, - ) - registry.register(skill) - scripts_str = f", scripts: {len(scripts)}" if scripts else "" - print(f" [技能] {skill_dir.name}/ (OpenClaw): {name}{scripts_str}") - + for sf in sorted(scripts_dir.glob("*.py")): + if not sf.name.startswith("_"): + scripts.append(str(sf)) + tools.append(sf.stem) + registry.register(SkillDef( + name=name, description=description, prompt=prompt, tools=tools, + skill_dir=str(skill_dir), scripts=scripts, is_openclaw=True, + )) + print(f" [技能] {skill_dir.name}/ (OpenClaw): {name}") except Exception as e: print(f" [技能] {skill_dir.name}/SKILL.md: 加载失败 -> {e}") @@ -217,21 +172,16 @@ def scan_skills() -> SkillRegistry: # ════════════════════════════════════════════ -# MCP 管理器(配置驱动,支持多服务器) +# MCP 管理器 # ════════════════════════════════════════════ class MCPManager: - """管理多个MCP服务器连接""" - def __init__(self): self.exit_stack = AsyncExitStack() - self.sessions: dict[str, object] = {} # server_name -> session - self.route_map: dict[str, tuple] = {} # keyword -> (session, tool_name) + self.sessions: dict[str, object] = {} + self.route_map: dict[str, tuple] = {} self.mcp_tools: list = [] async def connect_all(self, mcp_configs: list[dict]): - """根据配置连接所有MCP服务器""" - global all_tools - from langchain_mcp_adapters.tools import load_mcp_tools from mcp.client.stdio import stdio_client, StdioServerParameters from mcp.client.session import ClientSession @@ -239,92 +189,61 @@ class MCPManager: for srv in mcp_configs: name = srv["name"] try: - server_params = StdioServerParameters( - command=srv["command"], - args=srv.get("args", []), - ) - - read, write = await self.exit_stack.enter_async_context( - stdio_client(server_params) - ) - session = await self.exit_stack.enter_async_context( - ClientSession(read, write) - ) + sp = StdioServerParameters(command=srv["command"], args=srv.get("args", [])) + r, w = await self.exit_stack.enter_async_context(stdio_client(sp)) + session = await self.exit_stack.enter_async_context(ClientSession(r, w)) await session.initialize() - - # 加载工具 mcp_tools = await load_mcp_tools(session) self.mcp_tools.extend(mcp_tools) self.sessions[name] = session - - # 注册路由关键词 route_kw = srv.get("route_keywords", {}) for tool_name, keywords in route_kw.items(): for kw in keywords: self.route_map[kw] = (session, tool_name) - - print(f" [MCP] {name}: 已连接,加载 {len(mcp_tools)} 个工具") - for t in mcp_tools: - print(f" - {t.name}: {t.description[:50]}") - if route_kw: - print(f" 路由关键词: {list(route_kw.keys())}") - + print(f" [MCP] {name}: 已连接,{len(mcp_tools)} 个工具") except Exception as e: print(f" [MCP] {name}: 连接失败 -> {e}") - # 把MCP工具加入全局列表(供LLM bind_tools用) - all_tools.extend(self.mcp_tools) - def match_route(self, user_input: str) -> tuple | None: - """关键词匹配MCP路由,返回 (session, tool_name) 或 None""" for keyword, (session, tool_name) in self.route_map.items(): if keyword in user_input: return (session, tool_name) return None async def call_tool(self, session, tool_name: str, user_input: str) -> str: - """通过MCP session调用工具""" try: - # 简单参数解析 args = {} - - # 从MCP工具列表中查找工具信息 for t in self.mcp_tools: if t.name == tool_name: - # MCP工具的 args_schema 可能是 dict 或 Pydantic model schema = getattr(t, "args_schema", None) if schema: if isinstance(schema, dict): - schema_fields = schema.get("properties", {}) + sf = schema.get("properties", {}) elif hasattr(schema, "model_json_schema"): - schema_fields = schema.model_json_schema().get("properties", {}) + sf = schema.model_json_schema().get("properties", {}) else: - schema_fields = {} - args = _parse_tool_args(tool_name, schema_fields, user_input) + sf = {} + args = _parse_tool_args(tool_name, sf, user_input) break - result = await session.call_tool(tool_name, args) if result.content: texts = [c.text for c in result.content if hasattr(c, "text")] return "\n".join(texts) if texts else str(result) return str(result) except Exception as e: - return f"[MCP工具{tool_name}调用错误] {e}" + return f"[MCP工具{tool_name}错误] {e}" async def close(self): await self.exit_stack.aclose() self.sessions.clear() - print(" [MCP] 所有连接已关闭") def _parse_tool_args(tool_name: str, schema_fields: dict, user_input: str) -> dict: - """根据工具参数schema从用户输入中解析参数""" args = {} for field_name, field_info in schema_fields.items(): if field_name in ("timezone",): args[field_name] = "Asia/Shanghai" elif field_name in ("text",): - # 提取引号内的文本 match = re.search(r"['\"\u201c\u201d](.+?)['\"\u201c\u201d]", user_input) args[field_name] = match.group(1) if match else user_input elif field_name in ("city",): @@ -333,273 +252,445 @@ def _parse_tool_args(tool_name: str, schema_fields: dict, user_input: str) -> di # ════════════════════════════════════════════ -# Agent 状态 +# ★ 核心:多 Agent 架构 # ════════════════════════════════════════════ -class AgentState(TypedDict): + +# --- 共享状态 --- +class SharedState(TypedDict): + """多Agent共享状态""" + messages: Annotated[list, add_messages] # 对话消息(累加) + subtasks: list[dict] # 分解的子任务 + results: Annotated[list[str], add] # Agent执行结果(累加聚合) + active_agent: str # 当前活跃Agent + final_answer: str # 最终回复 + + +class SubTaskState(TypedDict): + """子任务状态(Send API用)""" + task: dict messages: Annotated[list, add_messages] - thinking: str - active_skill: str | None - skill_output: str | None - iteration: int -# ════════════════════════════════════════════ -# LangGraph 节点 -# ════════════════════════════════════════════ +# --- Agent 定义 --- +class AgentDef(BaseModel): + """Agent定义""" + name: str + description: str + system_prompt: str + tools: list[str] = [] # 依赖的工具名 + skill: str = "" # 依赖的技能名 -# --- 思考节点 --- -async def make_think_node(config, skills_reg, tools_list): + +class AgentRegistry: + """Agent注册表""" + def __init__(self): + self._agents: dict[str, AgentDef] = {} + + def register(self, agent: AgentDef): + self._agents[agent.name] = agent + + def get(self, name: str) -> AgentDef | None: + return self._agents.get(name) + + def list_agents(self) -> list[AgentDef]: + return list(self._agents.values()) + + def format_list(self) -> str: + return "\n".join(f" - {a.name}: {a.description}" for a in self._agents.values()) + + def match(self, task_type: str) -> AgentDef | None: + """根据任务类型匹配Agent""" + # 精确匹配 + if task_type in self._agents: + return self._agents[task_type] + # 关键词匹配 + for agent in self._agents.values(): + if agent.name in task_type or task_type in agent.name: + return agent + return None + + +def init_agents(skills_reg: SkillRegistry, tools_list: list) -> AgentRegistry: + """初始化内置Agent""" + registry = AgentRegistry() + + # 天气专家 + registry.register(AgentDef( + name="weather_agent", + description="天气专家 - 查询天气、出行建议", + system_prompt="你是天气专家。根据天气数据给出专业的出行建议,包括穿衣、活动安排等。", + tools=["get_weather"], + skill="weather_analyst", + )) + + # 数学专家 + registry.register(AgentDef( + name="math_agent", + description="数学专家 - 计算、数学问题解答", + system_prompt="你是数学专家。解答数学问题,给出计算过程和原理解释。", + tools=["calculate"], + skill="math_tutor", + )) + + # 知识专家 + registry.register(AgentDef( + name="knowledge_agent", + description="知识专家 - 搜索知识、深入解释概念", + system_prompt="你是知识专家。搜索知识库,给出深入浅出的解释,结构化呈现信息。", + tools=["search_knowledge"], + skill="knowledge_explorer", + )) + + # 通用Agent(兜底) + registry.register(AgentDef( + name="general_agent", + description="通用助手 - 处理一般对话和简单问题", + system_prompt="你是黄庄三号通用助手。处理一般对话、问候和简单问题。", + tools=[], + skill="", + )) + + # MCP Agent + registry.register(AgentDef( + name="mcp_agent", + description="MCP工具调用 - 时间查询、字符统计、UUID生成等", + system_prompt="你是MCP工具调用专家。通过MCP协议调用外部工具获取实时数据。", + tools=[], + skill="", + )) + + return registry + + +# --- Supervisor 节点 --- +def make_supervisor_node(config, agent_registry, skills_reg, tools_list, mcp_mgr): llm_cfg = config["llm"] - agent_cfg = config.get("agent", {}) - temp = agent_cfg.get("think_temperature", 0.3) - async def think_node(state: AgentState) -> dict: - iteration = state.get("iteration", 0) + 1 - if iteration > 3: - return {"iteration": iteration, "thinking": "(快速模式)"} + SUPERVISOR_PROMPT = """你是黄庄三号的任务协调者(Supervisor)。你的职责是: - conv = [] - for msg in state["messages"][-4:]: - role = "用户" if isinstance(msg, HumanMessage) else "AI" - conv.append(f"{role}: {msg.content[:150]}") +1. 分析用户请求,判断是否需要分解为多个子任务 +2. 如果需要多个Agent协作,输出JSON格式的子任务列表 +3. 如果只需单个Agent处理,输出单个任务 +4. 如果是简单对话,直接回复 - tool_names = [t.name for t in tools_list] - think_llm = ChatOpenAI( - base_url=llm_cfg["base_url"], - api_key=llm_cfg["api_key"], - model=llm_cfg["model"], - temperature=temp, - ) - resp = await think_llm.ainvoke([ - SystemMessage(content="你是思考模块。简洁输出:用户意图、需要的工具/技能、注意事项。不要说没有工具。"), - HumanMessage(content=f"对话:\n{chr(10).join(conv)}\n\n可用技能:\n{skills_reg.format_list()}\n\n可用工具: {', '.join(tool_names)}"), - ]) - return {"iteration": iteration, "thinking": resp.content} +可用Agent: +{agent_list} - return think_node +MCP工具(当用户请求匹配这些关键词时,分配给对应Agent或在subtasks中指定agent为"mcp_agent"): +- 时间/几点/当前时间 → mcp_agent (get_current_time) +- 统计字符/字符数 → mcp_agent (count_chars) +- 生成UUID → mcp_agent (generate_uuid) +重要规则: +- 你的名字是"黄庄三号" +- 对于简单问候,直接回复,不要分配任务 +- 输出格式必须是严格的JSON: + - 多任务: {{"mode": "parallel", "subtasks": [{{"agent": "agent名", "query": "具体任务"}}]}} + - 单任务: {{"mode": "single", "agent": "agent名", "query": "具体任务"}} + - 直接回复: {{"mode": "direct", "answer": "你的回复"}} +- 只输出JSON,不要其他内容""" -# --- 技能路由节点 --- -async def make_skill_route_node(config, skills_reg, mcp_mgr): - skill_keywords = config.get("skill_keywords", {}) - - async def skill_route_node(state: AgentState) -> dict: - user_input = "" + async def supervisor_node(state: SharedState) -> dict: + # 获取用户最新消息 + user_msg = "" for msg in reversed(state["messages"]): if isinstance(msg, HumanMessage): - user_input = msg.content + user_msg = msg.content break - # 1. MCP确定性路由(优先) - if mcp_mgr: - route = mcp_mgr.match_route(user_input) - if route: - session, tool_name = route - mcp_result = await mcp_mgr.call_tool(session, tool_name, user_input) - return {"active_skill": None, "skill_output": mcp_result} - - # 2. Skill关键词路由 - for sname, keywords in skill_keywords.items(): - if any(kw in user_input for kw in keywords): - if skills_reg.get(sname): - return {"active_skill": sname, "skill_output": None} - - return {"active_skill": None, "skill_output": None} - - return skill_route_node - - -# --- 技能执行节点 --- -async def make_skill_exec_node(config, skills_reg, tools_list): - llm_cfg = config["llm"] - agent_cfg = config.get("agent", {}) - temp = agent_cfg.get("skill_temperature", 0.7) - - async def skill_execute_node(state: AgentState) -> dict: - sname = state.get("active_skill") - if not sname: - return {"skill_output": None} - sk = skills_reg.get(sname) - if not sk: - return {"skill_output": None} - - user_input = "" - for msg in reversed(state["messages"]): - if isinstance(msg, HumanMessage): - user_input = msg.content - break - - tool_info = "" - - # ---- OpenClaw 技能:执行 scripts/ 下的脚本 ---- - if sk.is_openclaw and sk.scripts: - for script_path in sk.scripts: - try: - proc = await asyncio.create_subprocess_exec( - "python3", script_path, user_input, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30) - output = stdout.decode("utf-8", errors="replace").strip() - err = stderr.decode("utf-8", errors="replace").strip() - script_name = Path(script_path).name - if output: - tool_info += f"\n[脚本{script_name}输出]\n{output[:2000]}" - if err and "warning" not in err.lower(): - tool_info += f"\n[脚本{script_name}错误]\n{err[:500]}" - except asyncio.TimeoutError: - tool_info += f"\n[脚本{Path(script_path).name}] 执行超时" - except Exception as e: - tool_info += f"\n[脚本{Path(script_path).name}] 执行错误: {e}" - - # ---- 通用技能:执行依赖的本地工具 ---- - for tname in sk.tools: - # 跳过 OpenClaw 已通过脚本执行的工具 - if sk.is_openclaw: - continue - for t in tools_list: - if t.name == tname: - try: - if tname == "get_weather": - cities = ["北京", "上海", "深圳", "黄庄"] - city = next((c for c in cities if c in user_input), "北京") - r = await t.ainvoke({"city": city}) - elif tname == "calculate": - expr = re.findall(r'[\d+\-*/(). ]+', user_input) - r = await t.ainvoke({"expression": expr[0].strip() if expr else "1+1"}) - else: - r = await t.ainvoke({"query": user_input}) - tool_info += f"\n工具{tname}结果: {r}" - except Exception as e: - tool_info += f"\n工具{tname}错误: {e}" - - # 构造提示词 - if sk.is_openclaw: - # OpenClaw 技能:用 SKILL.md body 作为指导 + 脚本输出 - prompt = f"""你是技能"{sk.name}"的执行者。 - -技能说明: -{sk.prompt[:2000]} - -脚本执行结果: -{tool_info if tool_info else "(无脚本输出)"} - -用户请求:{user_input} - -请基于技能说明和脚本输出回答用户。""" - else: - # 通用技能:用 prompt 模板 - prompt = sk.prompt.format(input=user_input) + tool_info - - sk_llm = ChatOpenAI( - base_url=llm_cfg["base_url"], - api_key=llm_cfg["api_key"], - model=llm_cfg["model"], - temperature=temp, - ) - resp = await sk_llm.ainvoke([ - SystemMessage(content=prompt), - HumanMessage(content="请基于以上信息回答。"), - ]) - return {"skill_output": resp.content} - - return skill_execute_node - - -# --- Agent主节点 --- -async def make_agent_node(config, skills_reg, tools_list): - llm_cfg = config["llm"] - agent_cfg = config.get("agent", {}) - max_iter = agent_cfg.get("max_iterations", 5) - - SYSTEM_PROMPT = """你是黄庄三号,严肃、认真、听话、聪明的AI助手。你的名字是"黄庄三号",你不是Claude,不是ChatGPT。 - -你具备四种能力: -1. 工具调用(FC) - 调用内置工具获取信息 -2. MCP集成 - 通过MCP协议连接外部服务 -3. 思考模式 - 回答前进行深度思考 -4. 技能系统(Skill) - 调用注册技能完成复杂任务 - -可用技能: -{skill_list} - -重要规则(必须严格遵守): -- 当被问"你是谁",必须回答"我是黄庄三号" -- 对于工具能提供的数据,必须调用工具获取,不要自己猜测""" - - async def agent_node(state: AgentState) -> dict: - iteration = state.get("iteration", 0) - - if state.get("skill_output"): - return {"messages": [AIMessage(content=state["skill_output"])]} - - system_content = SYSTEM_PROMPT.format(skill_list=skills_reg.format_list()) - if state.get("thinking"): - thinking = state["thinking"][:300] - # 如果思考中提到了工具名,强调必须调用 - tool_hints = [t.name for t in tools_list if t.name in thinking] - if tool_hints: - thinking += f"\n\n[重要:必须调用 {', '.join(tool_hints)} 工具来回答]" - system_content += f"\n\n[内部思考]\n{thinking}" - - messages = [SystemMessage(content=system_content)] - messages.extend(state["messages"]) - llm = ChatOpenAI( base_url=llm_cfg["base_url"], api_key=llm_cfg["api_key"], model=llm_cfg["model"], + temperature=0.1, ) - llm_with_tools = llm.bind_tools(tools_list) - resp = await llm_with_tools.ainvoke(messages) - # 迭代保护 - if iteration > max_iter and hasattr(resp, "tool_calls") and resp.tool_calls: - resp = AIMessage(content=resp.content or "任务完成(已达最大迭代次数)") + prompt = SUPERVISOR_PROMPT.format(agent_list=agent_registry.format_list()) + messages = [SystemMessage(content=prompt), *state["messages"][-6:]] - return {"messages": [resp], "iteration": iteration} + resp = await llm.ainvoke(messages) - return agent_node + # 解析 Supervisor 的JSON输出 + content = resp.content.strip() + # 清理可能的markdown代码块 + if content.startswith("```"): + content = re.sub(r'^```\w*\n?', '', content) + content = re.sub(r'\n?```$', '', content) + content = content.strip() + + try: + plan = json.loads(content) + except json.JSONDecodeError: + # JSON解析失败,当作直接回复 + plan = {"mode": "direct", "answer": resp.content} + + mode = plan.get("mode", "direct") + + if mode == "direct": + return { + "messages": [AIMessage(content=plan.get("answer", resp.content))], + "final_answer": plan.get("answer", resp.content), + "subtasks": [], + } + elif mode == "single": + return { + "subtasks": [{"agent": plan["agent"], "query": plan.get("query", user_msg)}], + "active_agent": plan["agent"], + } + elif mode == "parallel": + return { + "subtasks": plan.get("subtasks", []), + } + else: + return {"subtasks": []} + + return supervisor_node -# --- 路由函数 --- -def route_from_agent(state: AgentState) -> str: - if state.get("skill_output"): +# --- 任务分发路由(Send API并行) --- +def route_subtasks(state: SharedState): + """根据subtasks决定路由:并行分发 or 结束""" + subtasks = state.get("subtasks", []) + if not subtasks: return "end" - for msg in reversed(state["messages"]): - if isinstance(msg, AIMessage): - if hasattr(msg, "tool_calls") and msg.tool_calls: - return "tools" - break - return "end" + if len(subtasks) == 1: + # 单任务直接发给worker + return "worker_single" + # 多任务并行分发 + return "worker_parallel" + + +def dispatch_parallel(state: SharedState): + """并行分发:为每个子任务创建一个 Send""" + return [ + Send("worker_node", {"task": t, "messages": state["messages"][-4:]}) + for t in state.get("subtasks", []) + ] + + +# --- Worker 节点(执行子任务) --- +def make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr): + llm_cfg = config["llm"] + agent_cfg = config.get("agent", {}) + temp = agent_cfg.get("skill_temperature", 0.7) + + async def worker_node(state: SubTaskState) -> dict: + task = state["task"] + agent_name = task.get("agent", "general_agent") + query = task.get("query", "") + + agent_def = agent_registry.get(agent_name) + if not agent_def: + agent_def = agent_registry.get("general_agent") + + # 执行工具(如果有) + tool_info = "" + for tname in agent_def.tools: + for t in tools_list: + if t.name == tname: + try: + if tname == "get_weather": + cities = ["北京", "上海", "深圳", "黄庄"] + city = next((c for c in cities if c in query), "北京") + r = await t.ainvoke({"city": city}) + elif tname == "calculate": + expr = re.findall(r'[\d+\-*/(). ]+', query) + r = await t.ainvoke({"expression": expr[0].strip() if expr else "1+1"}) + else: + r = await t.ainvoke({"query": query}) + tool_info += f"\n工具{tname}结果: {r}" + except Exception as e: + tool_info += f"\n工具{tname}错误: {e}" + + # MCP工具(如果有) + if mcp_mgr and query: + route = mcp_mgr.match_route(query) + if route: + session, tool_name = route + mcp_result = await mcp_mgr.call_tool(session, tool_name, query) + tool_info += f"\nMCP工具{tool_name}结果: {mcp_result}" + + # 执行技能脚本(如果是OpenClaw技能) + if agent_def.skill: + sk = skills_reg.get(agent_def.skill) + if sk and sk.is_openclaw and sk.scripts: + for script_path in sk.scripts: + try: + proc = await asyncio.create_subprocess_exec( + "python3", script_path, query, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30) + output = stdout.decode("utf-8", errors="replace").strip() + if output: + tool_info += f"\n[脚本输出]\n{output[:2000]}" + except Exception as e: + tool_info += f"\n[脚本错误] {e}" + + # 构造提示词 + prompt = agent_def.system_prompt + if agent_def.skill: + sk = skills_reg.get(agent_def.skill) + if sk and not sk.is_openclaw: + prompt = sk.prompt.format(input=query) + + worker_llm = ChatOpenAI( + base_url=llm_cfg["base_url"], + api_key=llm_cfg["api_key"], + model=llm_cfg["model"], + temperature=temp, + ) + + messages = [ + SystemMessage(content=prompt), + HumanMessage(content=query), + ] + if tool_info: + messages.append(SystemMessage(content=f"工具/脚本提供的数据:{tool_info}")) + + resp = await worker_llm.ainvoke(messages) + + result_tag = f"[{agent_def.name}]" + return {"results": [f"{result_tag} {resp.content}"]} + + return worker_node + + +# --- 单任务Worker(不走Send,直接串行) --- +def make_worker_single_node(config, agent_registry, skills_reg, tools_list, mcp_mgr): + """和 worker_node 逻辑相同,但用于单任务串行执行""" + worker = make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr) + + async def worker_single_node(state: SharedState) -> dict: + subtasks = state.get("subtasks", []) + if not subtasks: + return {"results": [], "final_answer": "没有可执行的任务"} + + task = subtasks[0] + sub_state = SubTaskState(task=task, messages=state["messages"][-4:]) + result = await worker(sub_state) + + # 单任务直接作为最终回复 + content = result["results"][0] if result.get("results") else "执行完成" + # 去掉 agent tag 前缀(用户不需要看到) + clean = re.sub(r'^\[.+?\]\s*', '', content) + return { + "results": result.get("results", []), + "final_answer": clean, + "messages": [AIMessage(content=clean)], + } + + return worker_single_node + + +# --- 聚合节点 --- +def make_aggregator_node(config): + llm_cfg = config["llm"] + + async def aggregator_node(state: SharedState) -> dict: + results = state.get("results", []) + if not results: + return {"final_answer": "所有Agent已完成,但无结果"} + + # 如果只有一个结果,直接使用 + if len(results) == 1: + content = re.sub(r'^\[.+?\]\s*', '', results[0]) + return { + "final_answer": content, + "messages": [AIMessage(content=content)], + } + + # 多结果需要聚合 + combined = "\n\n---\n\n".join(results) + agg_llm = ChatOpenAI( + base_url=llm_cfg["base_url"], + api_key=llm_cfg["api_key"], + model=llm_cfg["model"], + temperature=0.5, + ) + + resp = await agg_llm.ainvoke([ + SystemMessage(content="你是黄庄三号。请将以下多个专业Agent的结果整合为一个连贯、结构化的回答。保留关键信息,去除冗余。"), + HumanMessage(content=f"多个Agent的结果:\n\n{combined}"), + ]) + + return { + "final_answer": resp.content, + "messages": [AIMessage(content=resp.content)], + } + + return aggregator_node + + +# --- Agent间 Handoff 交接 --- +def make_handoff_tool(agent_registry): + """创建Agent间交接工具""" + from langchain_core.tools import tool as lc_tool + + agent_names = [a.name for a in agent_registry.list_agents()] + + @lc_tool + def handoff_to_agent(target_agent: str, task_description: str) -> str: + """将任务交接给另一个Agent处理。 + target_agent: 目标Agent名称,可选: """ + ", ".join(agent_names) + """ + task_description: 需要交接的任务描述 + """ + if target_agent not in agent_names: + return f"Agent {target_agent} 不存在,可用Agent: {agent_names}" + return f"任务已交接给 {target_agent}: {task_description}" + + return handoff_tool # ════════════════════════════════════════════ -# 构建图 +# 构建多Agent图 # ════════════════════════════════════════════ async def build_graph(config, skills_reg, mcp_mgr, tools_list): - think_node = await make_think_node(config, skills_reg, tools_list) - skill_route_node = await make_skill_route_node(config, skills_reg, mcp_mgr) - skill_exec_node = await make_skill_exec_node(config, skills_reg, tools_list) - agent_node = await make_agent_node(config, skills_reg, tools_list) + agent_registry = init_agents(skills_reg, tools_list) - g = StateGraph(AgentState) + supervisor = make_supervisor_node(config, agent_registry, skills_reg, tools_list, mcp_mgr) + worker = make_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr) + worker_single = make_worker_single_node(config, agent_registry, skills_reg, tools_list, mcp_mgr) + aggregator = make_aggregator_node(config) - g.add_node("think", think_node) - g.add_node("skill_route", skill_route_node) - g.add_node("skill_exec", skill_exec_node) - g.add_node("agent", agent_node) - g.add_node("tools", ToolNode(tools_list)) + g = StateGraph(SharedState) - g.add_edge(START, "think") - g.add_edge("think", "skill_route") - g.add_conditional_edges("skill_route", - lambda s: "skill_exec" if s.get("active_skill") else "agent", - {"skill_exec": "skill_exec", "agent": "agent"}) - g.add_edge("skill_exec", "agent") - g.add_conditional_edges("agent", route_from_agent, {"tools": "tools", "end": END}) - g.add_edge("tools", "agent") + # 添加节点 + g.add_node("supervisor", supervisor) + g.add_node("worker_node", worker) # 并行worker (Send API分发到这里) + g.add_node("worker_single", worker_single) # 串行worker + g.add_node("aggregator", aggregator) # 聚合器 + + # 添加边 + g.add_edge(START, "supervisor") + + # Supervisor路由:单条 conditional_edges,用 Send 实现并行 + def route_from_supervisor(state: SharedState): + subtasks = state.get("subtasks", []) + final_answer = state.get("final_answer", "") + # 直接回复 + if final_answer or not subtasks: + return "end" + # 单任务串行 + if len(subtasks) == 1: + return "worker_single" + # 多任务并行分发 (通过 Send API) + return [ + Send("worker_node", {"task": t, "messages": state["messages"][-4:]}) + for t in subtasks + ] + + g.add_conditional_edges("supervisor", route_from_supervisor, { + "end": END, + "worker_single": "worker_single", + "worker_node": "worker_node", # Send API 的目标节点 + }) + + # Worker完成后的流向 + g.add_edge("worker_node", "aggregator") # 并行结果聚合 + g.add_edge("worker_single", END) # 单任务直接结束 + g.add_edge("aggregator", END) # 聚合后结束 return g.compile() @@ -607,25 +698,30 @@ async def build_graph(config, skills_reg, mcp_mgr, tools_list): # ════════════════════════════════════════════ # 运行入口 # ════════════════════════════════════════════ +all_tools = [] +skills_registry = SkillRegistry() +mcp_manager = None + + async def run_agent(user_input: str, graph): result = await graph.ainvoke({ "messages": [HumanMessage(content=user_input)], - "thinking": "", "active_skill": None, "skill_output": None, "iteration": 0, + "subtasks": [], "results": [], "active_agent": "", "final_answer": "", }) last = result["messages"][-1] return { "reply": last.content if hasattr(last, "content") else str(last), - "thinking": result.get("thinking", ""), - "skill": result.get("active_skill"), + "subtasks": result.get("subtasks", []), + "results_count": len(result.get("results", [])), + "final_answer": result.get("final_answer", ""), } + async def interactive_mode(graph): print("=" * 60) - print(" 黄庄三号 Agent v2.0 - 配置驱动版") - print(" FC | MCP | 思考模式 | Skill") + print(" 黄庄三号 Agent v3.0 - 多Agent交互版") + print(" Supervisor + Worker(Agent) + Aggregator") print("=" * 60) - print(" 技能:", [s.name for s in skills_registry.list_skills()]) - print(" 工具:", [t.name for t in all_tools]) print(" 输入 quit 退出") print("=" * 60) @@ -634,103 +730,89 @@ async def interactive_mode(graph): user_input = input("\n你> ").strip() except (EOFError, KeyboardInterrupt): break - if not user_input: - continue - if user_input.lower() in ("quit", "exit", "q"): + if not user_input or user_input.lower() in ("quit", "exit", "q"): break result = await run_agent(user_input, graph) - if result["thinking"]: - print(f"\n[思考] {result['thinking'][:150]}...") - if result["skill"]: - print(f"[技能] {result['skill']}") + if result["subtasks"]: + agents = [t.get("agent", "?") for t in result["subtasks"]] + print(f"\n[调度] 分配给: {', '.join(agents)}") + if result["results_count"] > 1: + print(f"[聚合] 合并 {result['results_count']} 个Agent结果") print(f"\n黄庄三号> {result['reply']}") -# ════════════════════════════════════════════ -# 全局变量(由 main 初始化) -# ════════════════════════════════════════════ -all_tools = [] -skills_registry = SkillRegistry() -mcp_manager = None - - async def main(): global all_tools, skills_registry, mcp_manager - parser = argparse.ArgumentParser(description="黄庄三号 Agent v2.0") + parser = argparse.ArgumentParser(description="黄庄三号 Agent v3.0") parser.add_argument("--mcp", action="store_true", help="启用MCP") parser.add_argument("--test", action="store_true", help="自动测试") args = parser.parse_args() print("=" * 60) - print(" 黄庄三号 Agent v2.0 - 配置驱动版") + print(" 黄庄三号 Agent v3.0 - 多Agent交互版") print("=" * 60) - # ── 加载配置 ── + # 加载配置 print("\n[配置] 加载 config.yaml ...") config = load_config() - print(f" 模型: {config['llm']['model']}") - print(f" MCP服务器: {len(config.get('mcp_servers', []))} 个") - print(f" 技能关键词: {len(config.get('skill_keywords', {}))} 个") - # ── 扫描工具 ── - print("\n[工具] 扫描 tools/ 目录 ...") + # 扫描工具 + print("\n[工具] 扫描 tools/ ...") all_tools = scan_tools() print(f" 工具总数: {len(all_tools)}") - # ── 扫描技能 ── - print("\n[技能] 扫描 skills/ 目录 ...") + # 扫描技能 + print("\n[技能] 扫描 skills/ ...") skills_registry = scan_skills() print(f" 技能总数: {len(skills_registry.list_skills())}") - # ── 连接MCP ── + # 连接MCP if args.mcp and config.get("mcp_servers"): print("\n[MCP] 连接服务器 ...") mcp_manager = MCPManager() await mcp_manager.connect_all(config["mcp_servers"]) - print(f" MCP工具总数: {len(mcp_manager.mcp_tools)}") - print(f"\n 全部工具总数: {len(all_tools)}") + # 初始化Agent + agent_registry = init_agents(skills_registry, all_tools) + print(f"\n[Agent] 已注册 {len(agent_registry.list_agents())} 个Agent:") + for a in agent_registry.list_agents(): + print(f" - {a.name}: {a.description}") - # ── 构建图 ── + # 构建图 + print("\n[构建] 多Agent图 ...") graph = await build_graph(config, skills_registry, mcp_manager, all_tools) if args.test: - # 自动测试 tests = [ - ("FC+思考+Skill", "黄庄天气怎么样?"), - ("FC+Skill", "算一下 99*88+77"), - ("知识搜索", "MCP是什么?"), - ("身份", "你好你是谁?"), + ("单Agent:天气", "黄庄天气怎么样?"), + ("单Agent:数学", "算一下 99*88+77"), + ("单Agent:知识", "MCP是什么?"), + ("直接回复", "你好你是谁?"), + ("多Agent并行", "帮我查一下北京和上海的天气,再算一下123+456"), ] if args.mcp and mcp_manager: - tests.extend([ - ("MCP:时间", "现在几点了?"), - ("MCP:字符统计", "统计'黄庄三号是AI助手'的字符数"), - ("MCP:UUID", "生成一个UUID"), - ]) + tests.append(("MCP:时间", "现在几点了?")) for label, query in tests: print(f"\n{'─'*55}") print(f"[测试:{label}] {query}") r = await run_agent(query, graph) - print(f" 思考: {r['thinking'][:80]}...") - print(f" 技能: {r['skill']}") + if r["subtasks"]: + agents = [t.get("agent", "?") for t in r["subtasks"]] + print(f" 调度: {', '.join(agents)}") + if r["results_count"] > 1: + print(f" 聚合: {r['results_count']} 个Agent结果") print(f" 回复: {r['reply'][:150]}...") print(f"\n{'='*60}") - print(" 验证完成!") - caps = ["FC", "思考", "Skill"] - if args.mcp: - caps.append("MCP") - print(" " + " ✅ | ".join(caps) + " ✅") + print(" 多Agent测试完成!") print("=" * 60) else: await interactive_mode(graph) - # ── 清理 ── if mcp_manager: await mcp_manager.close()