feat: 支持 OpenClaw skill 格式 (SKILL.md + scripts/)

- scan_skills() 支持两种格式: .yaml 文件 和 目录/SKILL.md
- 解析 SKILL.md 的 YAML frontmatter 提取 name/description
- 自动扫描 scripts/ 子目录的 .py 脚本
- skill_exec 支持 asyncio 子进程执行脚本
- 新增示例 OpenClaw 技能: skills/time-tool/
This commit is contained in:
2026-04-23 23:53:45 +08:00
parent 1c42ba0812
commit 28da16829f
5 changed files with 215 additions and 7 deletions

View File

@@ -69,6 +69,10 @@ TOOLS = [get_stock_price] # 必须!供自动扫描
## 新增技能 ## 新增技能
支持两种格式:
### 格式1: .yaml 文件(简洁定义)
`skills/` 目录下创建 .yaml 文件: `skills/` 目录下创建 .yaml 文件:
```yaml ```yaml
@@ -82,11 +86,41 @@ tools:
- get_stock_price - get_stock_price
``` ```
然后在 `config.yaml``skill_keywords` 中添加路由关键词: ### 格式2: OpenClaw 格式(目录 + SKILL.md + scripts/
`skills/` 目录下创建目录,放入 `SKILL.md``scripts/`
```
skills/
└── my-skill/
├── SKILL.md ← OpenClaw 标准格式
└── scripts/
└── do_something.py
```
SKILL.md 格式:
```markdown
---
name: my-skill
description: "我的自定义技能"
---
# 我的自定义技能
技能的详细说明和使用方法...
```
Agent 会自动扫描 SKILL.md 的 frontmatter 和 scripts/ 目录,执行脚本并将输出传给 LLM。
### 路由关键词
无论哪种格式,都需要在 `config.yaml``skill_keywords` 中添加路由:
```yaml ```yaml
skill_keywords: skill_keywords:
stock_analyst: ["股价", "股票", "行情"] stock_analyst: ["股价", "股票", "行情"]
my-skill: ["关键词1", "关键词2"]
``` ```
## 新增 MCP 服务器 ## 新增 MCP 服务器

132
agent.py
View File

@@ -85,13 +85,17 @@ def scan_tools() -> list:
# ════════════════════════════════════════════ # ════════════════════════════════════════════
# 技能自动扫描注册 # 技能自动扫描注册(兼容 OpenClaw 格式)
# ════════════════════════════════════════════ # ════════════════════════════════════════════
class SkillDef(BaseModel): class SkillDef(BaseModel):
name: str name: str
description: str description: str
prompt: str prompt: str
tools: list[str] = [] tools: list[str] = []
# OpenClaw 扩展字段
skill_dir: str = "" # OpenClaw skill 目录路径
scripts: list[str] = [] # scripts/ 下的脚本列表
is_openclaw: bool = False # 是否 OpenClaw 格式
class SkillRegistry: class SkillRegistry:
def __init__(self): def __init__(self):
@@ -107,19 +111,37 @@ class SkillRegistry:
return list(self._skills.values()) return list(self._skills.values())
def format_list(self) -> str: def format_list(self) -> str:
return "\n".join(f" - {s.name}: {s.description}" for s in self._skills.values()) lines = []
for s in self._skills.values():
tag = " [OpenClaw]" if s.is_openclaw else ""
scripts_info = f", scripts: {s.scripts}" if s.scripts else ""
lines.append(f" - {s.name}: {s.description}{tag}{scripts_info}")
return "\n".join(lines)
def _parse_skill_md_frontmatter(content: str) -> dict:
"""解析 SKILL.md 的 YAML frontmatter"""
if not content.startswith("---"):
return {}
end = content.find("---", 3)
if end == -1:
return {}
fm = content[3:end].strip()
return yaml.safe_load(fm) or {}
def scan_skills() -> SkillRegistry: def scan_skills() -> SkillRegistry:
""" """
扫描 skills/ 目录下所有 .yaml 文件 扫描 skills/ 目录,支持两种格式:
每个文件定义一个技能 1. .yaml 文件 - 简洁技能定义
2. 目录/SKILL.md - OpenClaw 格式(含 scripts/ 子目录)
""" """
registry = SkillRegistry() registry = SkillRegistry()
if not SKILLS_DIR.exists(): if not SKILLS_DIR.exists():
print(" [技能] skills/ 目录不存在,跳过") print(" [技能] skills/ 目录不存在,跳过")
return registry return registry
# 1) 扫描 .yaml 文件
for yaml_file in sorted(SKILLS_DIR.glob("*.yaml")): for yaml_file in sorted(SKILLS_DIR.glob("*.yaml")):
try: try:
with open(yaml_file, "r", encoding="utf-8") as f: with open(yaml_file, "r", encoding="utf-8") as f:
@@ -136,6 +158,61 @@ def scan_skills() -> SkillRegistry:
except Exception as e: except Exception as e:
print(f" [技能] {yaml_file.name}: 加载失败 -> {e}") print(f" [技能] {yaml_file.name}: 加载失败 -> {e}")
# 2) 扫描 OpenClaw 格式(目录/SKILL.md
for skill_dir in sorted(SKILLS_DIR.iterdir()):
if not skill_dir.is_dir():
continue
skill_md = skill_dir / "SKILL.md"
if not skill_md.exists():
continue
try:
with open(skill_md, "r", encoding="utf-8") as f:
content = f.read()
fm = _parse_skill_md_frontmatter(content)
if not fm:
print(f" [技能] {skill_dir.name}/SKILL.md: 无 frontmatter跳过")
continue
name = fm.get("name", skill_dir.name)
description = fm.get("description", "")
# 把 SKILL.md 的 markdown body 作为 prompt
body_start = content.find("---", 3)
prompt = content[body_start + 3:].strip() if body_start != -1 else ""
# 扫描 scripts/ 子目录
scripts_dir = skill_dir / "scripts"
scripts = []
if scripts_dir.exists():
for script_file in sorted(scripts_dir.glob("*.py")):
if not script_file.name.startswith("_"):
scripts.append(str(script_file))
# 从 prompt 中提取 tools 引用(### scripts/xxx.py 段落)
# OpenClaw skill 通常在 body 中提到脚本名
tools = []
for script_path in scripts:
# 脚本文件名(不含扩展名)作为工具名
tool_name = Path(script_path).stem
tools.append(tool_name)
skill = SkillDef(
name=name,
description=description,
prompt=prompt,
tools=tools,
skill_dir=str(skill_dir),
scripts=scripts,
is_openclaw=True,
)
registry.register(skill)
scripts_str = f", scripts: {len(scripts)}" if scripts else ""
print(f" [技能] {skill_dir.name}/ (OpenClaw): {name}{scripts_str}")
except Exception as e:
print(f" [技能] {skill_dir.name}/SKILL.md: 加载失败 -> {e}")
return registry return registry
@@ -352,9 +429,35 @@ async def make_skill_exec_node(config, skills_reg, tools_list):
user_input = msg.content user_input = msg.content
break break
# 执行依赖的本地工具
tool_info = "" tool_info = ""
# ---- OpenClaw 技能:执行 scripts/ 下的脚本 ----
if sk.is_openclaw and sk.scripts:
for script_path in sk.scripts:
try:
proc = await asyncio.create_subprocess_exec(
"python3", script_path, user_input,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
output = stdout.decode("utf-8", errors="replace").strip()
err = stderr.decode("utf-8", errors="replace").strip()
script_name = Path(script_path).name
if output:
tool_info += f"\n[脚本{script_name}输出]\n{output[:2000]}"
if err and "warning" not in err.lower():
tool_info += f"\n[脚本{script_name}错误]\n{err[:500]}"
except asyncio.TimeoutError:
tool_info += f"\n[脚本{Path(script_path).name}] 执行超时"
except Exception as e:
tool_info += f"\n[脚本{Path(script_path).name}] 执行错误: {e}"
# ---- 通用技能:执行依赖的本地工具 ----
for tname in sk.tools: for tname in sk.tools:
# 跳过 OpenClaw 已通过脚本执行的工具
if sk.is_openclaw:
continue
for t in tools_list: for t in tools_list:
if t.name == tname: if t.name == tname:
try: try:
@@ -371,7 +474,24 @@ async def make_skill_exec_node(config, skills_reg, tools_list):
except Exception as e: except Exception as e:
tool_info += f"\n工具{tname}错误: {e}" tool_info += f"\n工具{tname}错误: {e}"
prompt = sk.prompt.format(input=user_input) + tool_info # 构造提示词
if sk.is_openclaw:
# OpenClaw 技能:用 SKILL.md body 作为指导 + 脚本输出
prompt = f"""你是技能"{sk.name}"的执行者。
技能说明:
{sk.prompt[:2000]}
脚本执行结果:
{tool_info if tool_info else "(无脚本输出)"}
用户请求:{user_input}
请基于技能说明和脚本输出回答用户。"""
else:
# 通用技能:用 prompt 模板
prompt = sk.prompt.format(input=user_input) + tool_info
sk_llm = ChatOpenAI( sk_llm = ChatOpenAI(
base_url=llm_cfg["base_url"], base_url=llm_cfg["base_url"],
api_key=llm_cfg["api_key"], api_key=llm_cfg["api_key"],

View File

@@ -33,3 +33,4 @@ skill_keywords:
weather_analyst: ["天气", "出行", "穿什么"] weather_analyst: ["天气", "出行", "穿什么"]
math_tutor: ["计算", "算", "数学", "等于多少"] math_tutor: ["计算", "算", "数学", "等于多少"]
knowledge_explorer: ["是什么", "解释", "了解"] knowledge_explorer: ["是什么", "解释", "了解"]
time-tool: ["时间工具", "日期", "星期几"]

19
skills/time-tool/SKILL.md Normal file
View File

@@ -0,0 +1,19 @@
---
name: time-tool
description: "时间工具 - 获取当前时间和日期信息"
---
# 时间工具
获取当前时间、日期信息,以及进行简单的日期计算。
## 使用方法
```bash
python3 scripts/time_info.py [时区]
```
## 功能
- 获取当前时间(支持指定时区)
- 获取当前日期和星期
- 输出格式化的时间信息

View File

@@ -0,0 +1,34 @@
#!/usr/bin/env python3
"""时间信息脚本 - OpenClaw skill 示例"""
import sys
from datetime import datetime
try:
import pytz
HAS_PYTZ = True
except ImportError:
HAS_PYTZ = False
def main():
tz_name = sys.argv[1] if len(sys.argv) > 1 else "Asia/Shanghai"
if HAS_PYTZ:
try:
tz = pytz.timezone(tz_name)
now = datetime.now(tz)
except Exception:
now = datetime.now()
tz_name = "本地时间"
else:
now = datetime.now()
tz_name = "本地时间"
weekdays = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"]
print(f"时区: {tz_name}")
print(f"日期: {now.strftime('%Y年%m月%d')}")
print(f"星期: {weekdays[now.weekday()]}")
print(f"时间: {now.strftime('%H:%M:%S')}")
if __name__ == "__main__":
main()