#!/usr/bin/env python3 """ 项目服务管理面板 端口: 19013 """ import os import json import subprocess import signal import threading import sys from datetime import datetime from flask import Flask, render_template_string, jsonify, request import urllib.request import urllib.error app = Flask(__name__) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) WORKSPACE_DIR = os.path.dirname(BASE_DIR) # works目录 ROOT_DIR = os.path.dirname(WORKSPACE_DIR) # workspace-coder目录 PROJECTS_FILE = os.path.join(BASE_DIR, 'projects.json') LOG_FILE = os.path.join(BASE_DIR, 'logs', 'app.log') # 进程状态缓存 process_cache = {} def log_message(msg): """写入日志""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_line = f"[{timestamp}] {msg}\n" # 同时输出到控制台和日志文件 print(log_line.strip()) try: os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) with open(LOG_FILE, 'a') as f: f.write(log_line) except: pass def signal_handler(signum, frame): """处理信号,记录kill来源""" signal_names = { signal.SIGTERM: 'SIGTERM', signal.SIGINT: 'SIGINT', signal.SIGHUP: 'SIGHUP', } sig_name = signal_names.get(signum, f'SIGNAL_{signum}') # 尝试获取kill来源 try: import psutil proc = psutil.Process() parent = proc.parent() parent_info = f"父进程: {parent.pid} ({parent.name()})" except: parent_info = "无法获取父进程信息" log_message(f"⚠️ 进程收到 {sig_name} 信号,即将退出! {parent_info}") log_message(f"信号来源可能是: 手动kill命令、systemd、OOM killer、或其他进程") sys.exit(0) # 注册信号处理 signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) try: signal.signal(signal.SIGHUP, signal_handler) except: pass # Windows不支持SIGHUP def load_projects(): """加载项目配置""" with open(PROJECTS_FILE, 'r', encoding='utf-8') as f: return json.load(f)['projects'] def save_projects(projects): """保存项目配置""" with open(PROJECTS_FILE, 'w', encoding='utf-8') as f: json.dump({'projects': projects}, f, ensure_ascii=False, indent=2) def check_port(port): """检查端口是否有进程监听""" try: import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) result = sock.connect_ex(('localhost', port)) sock.close() return result == 0 except: return False def check_health(url): """检查健康检查URL""" try: req = urllib.request.Request(url, method='GET') with urllib.request.urlopen(req, timeout=3) as response: return response.status == 200 except: return False def get_process_on_port(port): """获取占用端口的进程信息""" try: result = subprocess.run( f"lsof -i :{port} -t 2>/dev/null | head -1", shell=True, capture_output=True, text=True, timeout=5 ) pid = result.stdout.strip() if pid: # 获取进程信息 cmd_result = subprocess.run( f"ps -p {pid} -o pid,ppid,user,cmd --no-headers 2>/dev/null", shell=True, capture_output=True, text=True ) return { 'pid': int(pid), 'info': cmd_result.stdout.strip() } except: pass return None def get_project_status(project): """获取项目状态""" project_type = project.get('type', 'web') if project_type == 'web': ports = project.get('ports', []) if not ports: return {'status': 'unknown', 'message': '未配置端口'} port_status = {} all_running = True for port in ports: running = check_port(port) port_status[port] = { 'running': running, 'process': get_process_on_port(port) if running else None } if not running: all_running = False # 健康检查 health_ok = None if all_running and project.get('health_url'): health_ok = check_health(project['health_url']) return { 'status': 'running' if all_running else ('partial' if any(p['running'] for p in port_status.values()) else 'stopped'), 'ports': port_status, 'health': health_ok } elif project_type == 'cron': # 检查cron任务是否配置 cron_expr = project.get('cron', '') cron_cmd = project.get('cron_cmd', '') try: result = subprocess.run( f"crontab -l 2>/dev/null | grep -F '{cron_cmd}'", shell=True, capture_output=True, text=True ) cron_configured = bool(result.stdout.strip()) except: cron_configured = False return { 'status': 'configured' if cron_configured else 'not_configured', 'cron': cron_expr, 'cron_configured': cron_configured } elif project_type == 'extension': return {'status': 'completed', 'message': '浏览器插件,手动安装'} elif project_type == 'cli': return {'status': 'ready', 'message': '命令行工具,按需运行'} return {'status': 'unknown'} def run_command(cmd, cwd, project_id, action): """异步执行命令""" def _run(): try: log_file = os.path.join(BASE_DIR, 'logs', f'{project_id}.log') os.makedirs(os.path.dirname(log_file), exist_ok=True) with open(log_file, 'a') as f: f.write(f"\n{'='*50}\n") f.write(f"[{datetime.now().isoformat()}] {action}\n") f.write(f"Command: {cmd}\n") f.write(f"Directory: {cwd}\n") process = subprocess.Popen( cmd, shell=True, cwd=cwd, stdout=open(log_file, 'a'), stderr=subprocess.STDOUT, start_new_session=True ) process_cache[project_id] = process.pid except Exception as e: with open(os.path.join(BASE_DIR, 'logs', 'error.log'), 'a') as f: f.write(f"[{datetime.now().isoformat()}] Error: {e}\n") thread = threading.Thread(target=_run) thread.daemon = True thread.start() def stop_process(port): """停止占用端口的进程""" try: result = subprocess.run( f"lsof -i :{port} -t 2>/dev/null", shell=True, capture_output=True, text=True, timeout=5 ) pids = result.stdout.strip().split('\n') pids = [p for p in pids if p] for pid in pids: try: os.kill(int(pid), signal.SIGTERM) except: pass return len(pids) except: return 0 # HTML模板 HTML_TEMPLATE = ''' 项目服务管理面板

项目服务管理面板

统一管理所有项目和服务

对外IP:
间隔: s
已连接

Web服务

-

运行中

-

Cron任务

-

CLI工具

-

加载中...

主机 Cron 列表 -

加载中...

加载中...

''' @app.route('/') def index(): return render_template_string(HTML_TEMPLATE) @app.route('/api/projects') def api_projects(): projects = load_projects() for project in projects: project['status'] = get_project_status(project) return jsonify({'projects': projects}) @app.route('/api/projects/') def api_project(project_id): projects = load_projects() project = next((p for p in projects if p['id'] == project_id), None) if not project: return jsonify({'error': '项目不存在'}), 404 project['status'] = get_project_status(project) return jsonify(project) @app.route('/api/projects//start', methods=['POST']) def api_start(project_id): projects = load_projects() project = next((p for p in projects if p['id'] == project_id), None) if not project: return jsonify({'error': '项目不存在'}), 404 if project['type'] != 'web': return jsonify({'error': '只有Web服务支持启动'}), 400 directory = project.get('directory', '') if directory.startswith('/'): cwd = directory else: cwd = os.path.join(ROOT_DIR, directory) if not os.path.exists(cwd): return jsonify({'error': f'目录不存在: {cwd}'}), 400 # 检查是否有多个启动命令 if project.get('start_cmds'): for name, info in project['start_cmds'].items(): cmd = info['cmd'] run_command(cmd, cwd, project_id, f'start-{name}') return jsonify({'message': '服务启动中...'}) else: cmd = project.get('start_cmd', 'python3 app.py') run_command(cmd, cwd, project_id, 'start') return jsonify({'message': '服务启动中...'}) @app.route('/api/projects//stop', methods=['POST']) def api_stop(project_id): projects = load_projects() project = next((p for p in projects if p['id'] == project_id), None) if not project: return jsonify({'error': '项目不存在'}), 404 if project['type'] != 'web': return jsonify({'error': '只有Web服务支持停止'}), 400 ports = project.get('ports', []) stopped = 0 for port in ports: stopped += stop_process(port) return jsonify({'message': f'已停止 {stopped} 个进程'}) @app.route('/api/projects//run', methods=['POST']) def api_run(project_id): projects = load_projects() project = next((p for p in projects if p['id'] == project_id), None) if not project: return jsonify({'error': '项目不存在'}), 404 if project['type'] != 'cli': return jsonify({'error': '只有CLI工具支持运行'}), 400 directory = project.get('directory', '') if directory.startswith('/'): cwd = directory else: cwd = os.path.join(ROOT_DIR, directory) cmd = project.get('run_cmd', '') if not cmd: return jsonify({'error': '未配置运行命令'}), 400 run_command(cmd, cwd, project_id, 'run') return jsonify({'message': 'CLI工具已开始运行...'}) @app.route('/api/projects//log') def api_log(project_id): log_file = os.path.join(BASE_DIR, 'logs', f'{project_id}.log') if os.path.exists(log_file): with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: # 最后1000行 lines = f.readlines()[-1000:] return jsonify({'log': ''.join(lines)}) return jsonify({'log': '暂无日志'}) @app.route('/api/projects/add', methods=['POST']) def api_add_project(): data = request.json if not data.get('name') or not data.get('type'): return jsonify({'error': '缺少必要字段'}), 400 projects = load_projects() # 生成ID import re project_id = re.sub(r'[^a-z0-9-]', '-', data['name'].lower()) # 检查ID是否重复 if any(p['id'] == project_id for p in projects): return jsonify({'error': '项目ID已存在'}), 400 new_project = { 'id': project_id, 'name': data['name'], 'type': data['type'], 'description': data.get('description', ''), 'directory': data.get('directory', ''), 'version': data.get('version', 'v1.0.0'), 'git_repo': data.get('git_repo'), } if data['type'] == 'web': new_project['ports'] = data.get('ports', []) new_project['start_cmd'] = data.get('start_cmd', 'python3 app.py') new_project['health_url'] = data.get('health_url') new_project['admin_url'] = data.get('admin_url') elif data['type'] == 'cron': new_project['cron'] = data.get('cron') new_project['cron_cmd'] = data.get('cron_cmd') elif data['type'] == 'cli': new_project['run_cmd'] = data.get('run_cmd') projects.append(new_project) save_projects(projects) return jsonify({'message': '项目添加成功', 'project': new_project}) @app.route('/api/projects/', methods=['PUT']) def api_update_project(project_id): projects = load_projects() project = next((p for p in projects if p['id'] == project_id), None) if not project: return jsonify({'error': '项目不存在'}), 404 data = request.json for key, value in data.items(): if key != 'id': # 不允许修改ID project[key] = value save_projects(projects) return jsonify({'message': '更新成功', 'project': project}) @app.route('/api/projects/', methods=['DELETE']) def api_delete_project(project_id): projects = load_projects() project = next((p for p in projects if p['id'] == project_id), None) if not project: return jsonify({'error': '项目不存在'}), 404 projects = [p for p in projects if p['id'] != project_id] save_projects(projects) return jsonify({'message': '删除成功'}) def get_system_crons(): """获取系统所有 cron 任务""" crons = [] # 获取当前用户的 crontab try: result = subprocess.run( "crontab -l 2>/dev/null", shell=True, capture_output=True, text=True, timeout=5 ) user_crontab = result.stdout.strip() if user_crontab: for line in user_crontab.split('\n'): line = line.strip() if line and not line.startswith('#'): # 解析 cron 行 parts = line.split() if len(parts) >= 6: cron_expr = ' '.join(parts[:5]) command = ' '.join(parts[5:]) crons.append({ 'source': 'user', 'expression': cron_expr, 'command': command, 'line': line }) except: pass # 获取系统级 cron (/etc/crontab) try: if os.path.exists('/etc/crontab'): with open('/etc/crontab', 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): parts = line.split() if len(parts) >= 7: # 系统 crontab 有额外的 user 字段 cron_expr = ' '.join(parts[:5]) user = parts[5] command = ' '.join(parts[6:]) crons.append({ 'source': 'system', 'expression': cron_expr, 'user': user, 'command': command, 'line': line }) except: pass # 获取 /etc/cron.d/ 目录下的任务 try: cron_d_dir = '/etc/cron.d' if os.path.exists(cron_d_dir): for filename in os.listdir(cron_d_dir): filepath = os.path.join(cron_d_dir, filename) if os.path.isfile(filepath): with open(filepath, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): parts = line.split() if len(parts) >= 7: cron_expr = ' '.join(parts[:5]) user = parts[5] command = ' '.join(parts[6:]) crons.append({ 'source': 'cron.d', 'file': filename, 'expression': cron_expr, 'user': user, 'command': command, 'line': line }) except: pass return crons def parse_cron_expression(expr): """解析 cron 表达式为人类可读格式""" parts = expr.split() if len(parts) != 5: return expr minute, hour, day, month, weekday = parts desc = [] # 分钟 if minute == '*': desc.append('每分钟') elif minute.startswith('*/'): desc.append(f'每{minute[2:]}分钟') else: desc.append(f'{minute}分') # 小时 if hour != '*': if minute == '*': desc = [f'{hour}点每分钟'] else: desc = [f'{hour}点{minute}分'] elif minute.startswith('*/'): # 保持 "每X分钟" pass # 日期 if day != '*': desc.append(f'{day}号') # 月份 if month != '*': months = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12'] try: desc.append(f'{months[int(month)-1]}月') except: desc.append(f'{month}月') # 星期 if weekday != '*': weekdays = ['周日', '周一', '周二', '周三', '周四', '周五', '周六'] try: # 0=周日, 1=周一... desc.append(weekdays[int(weekday)]) except: desc.append(f'周{weekday}') return ' '.join(desc) if desc else expr @app.route('/api/crons') def api_crons(): """获取系统 cron 列表""" crons = get_system_crons() for cron in crons: cron['description'] = parse_cron_expression(cron['expression']) return jsonify({'crons': crons}) if __name__ == '__main__': os.makedirs(os.path.join(BASE_DIR, 'logs'), exist_ok=True) log_message("=" * 50) log_message("项目服务管理面板启动") log_message("访问地址: http://localhost:19013") log_message(f"进程PID: {os.getpid()}") log_message("=" * 50) app.run(host='0.0.0.0', port=19013, debug=False)