#!/usr/bin/env python3 """ 服务监控脚本 每20分钟检查所有Web服务状态,如有停止则发送邮件通知 """ import json import subprocess import urllib.request import urllib.error from datetime import datetime from pathlib import Path # 配置 SERVICES = [ {"name": "PDF翻译助手 V2", "port": 19000, "path": "/api/health", "start_cmd": "cd ~/.openclaw/workspace-coder/works/pdf-translate-web-v2 && nohup python3 app.py > /tmp/pdf-v2.log 2>&1 &"}, {"name": "LLM Index RAG", "port": 19001, "path": "/api/stats", "start_cmd": "cd ~/.openclaw/workspace-coder/works/llm-index-rag && nohup python3 app.py > /tmp/llm-index.log 2>&1 &"}, {"name": "碎片信息记录", "port": 19009, "path": "/api/notes", "start_cmd": "cd ~/.openclaw/workspace-coder/works/snippet-notes && nohup python3 app.py > /tmp/snippet.log 2>&1 &"}, {"name": "ParamHub Python", "port": 19010, "path": "/api/stats", "start_cmd": "cd ~/.openclaw/workspace-coder/works/param-hub-python && nohup python3 app.py > /tmp/paramhub.log 2>&1 &"}, {"name": "产品参数爬取 API", "port": 19011, "path": "/api/products", "start_cmd": "cd ~/.openclaw/common/projects/product-crawler && nohup python3 app.py > /tmp/crawler-api.log 2>&1 &"}, {"name": "产品参数爬取 后台", "port": 19012, "path": "/", "start_cmd": "cd ~/.openclaw/common/projects/product-crawler/admin && nohup python3 app.py > /tmp/crawler-admin.log 2>&1 &"}, {"name": "LLM Proxy API", "port": 19007, "path": "/health", "start_cmd": "cd ~/.openclaw/common/projects/llm-proxy && nohup python3 app.py > /tmp/llm-proxy.log 2>&1 &"}, {"name": "LLM Proxy 后台", "port": 19008, "path": "/api/stats", "start_cmd": "cd ~/.openclaw/common/projects/llm-proxy/admin && nohup python3 app.py > /tmp/llm-proxy-admin.log 2>&1 &"}, {"name": "项目服务管理面板", "port": 19013, "path": "/", "start_cmd": "cd ~/.openclaw/workspace-coder/works/project-panel && nohup python3 app.py > logs/app.log 2>&1 &"}, ] NOTIFY_EMAIL = "zuitoushang@tphai.com" LOG_FILE = Path(__file__).parent / "monitor.log" STATUS_FILE = Path(__file__).parent / "status.json" def check_service(port: int, path: str = "/", timeout: int = 5) -> bool: """检查服务是否运行""" url = f"http://localhost:{port}{path}" try: req = urllib.request.Request(url, method='GET') with urllib.request.urlopen(req, timeout=timeout) as response: return response.status == 200 except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, ConnectionRefusedError): return False def get_status() -> dict: """读取上次状态""" if STATUS_FILE.exists(): with open(STATUS_FILE, 'r', encoding='utf-8') as f: return json.load(f) return {"last_check": None, "stopped_services": []} def save_status(status: dict): """保存状态""" with open(STATUS_FILE, 'w', encoding='utf-8') as f: json.dump(status, f, ensure_ascii=False, indent=2) def log(message: str): """写入日志""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {message}\n" with open(LOG_FILE, 'a', encoding='utf-8') as f: f.write(log_entry) print(log_entry.strip()) def send_notification(stopped: list): """发送邮件通知""" if not stopped: return # 使用邮件发送技能 email_script = Path(__file__).parent.parent.parent / "skills/email-sender/scripts/send_email.py" # 构建邮件内容 subject = f"【服务监控警报】{len(stopped)}个服务已停止" body_lines = [ "

🚨 服务停止通知

", f"

检测时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

", f"

以下 {len(stopped)} 个服务已停止运行:

", "", "" ] for svc in stopped: body_lines.append(f"") body_lines.append(f"") body_lines.append(f"") body_lines.append(f"") body_lines.append(f"") body_lines.append("
服务名称端口启动命令
{svc['name']}{svc['port']}{svc['start_cmd']}
") body_lines.append("
") body_lines.append("

📝 启动方式

") body_lines.append("
")
    for svc in stopped:
        body_lines.append(f"# {svc['name']}")
        body_lines.append(svc['start_cmd'])
        body_lines.append("")
    body_lines.append("
") body = "\n".join(body_lines) # 写入临时HTML文件 temp_html = Path(__file__).parent / "temp_email.html" temp_html.write_text(body, encoding='utf-8') # 调用邮件发送脚本 cmd = [ "python3", str(email_script), "send", "--to", NOTIFY_EMAIL, "--subject", subject, "--body", body, "--html" ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode == 0: log(f"邮件发送成功: {NOTIFY_EMAIL}") else: log(f"邮件发送失败: {result.stderr}") except Exception as e: log(f"邮件发送异常: {e}") def main(): """主函数""" log("=" * 50) log("开始检查服务状态...") stopped_services = [] for svc in SERVICES: is_running = check_service(svc['port'], svc['path']) status = "✅ 运行中" if is_running else "❌ 已停止" log(f" {svc['name']} (端口 {svc['port']}): {status}") if not is_running: stopped_services.append(svc) # 保存状态 save_status({ "last_check": datetime.now().isoformat(), "stopped_services": [s['name'] for s in stopped_services] }) # 发送通知 if stopped_services: log(f"发现 {len(stopped_services)} 个服务停止,发送邮件通知...") send_notification(stopped_services) else: log("所有服务运行正常 ✓") log("检查完成") return len(stopped_services) if __name__ == "__main__": main()