#!/usr/bin/env python3 """ 项目服务管理面板 v2.2.0 端口: 19013 新增: Web服务卡片添加自定义链接入口(+按钮) 新增: 自定义新增外部web服务功能(新增服务按钮) """ import os import json import subprocess import signal import threading import sys import sqlite3 import shutil import re from datetime import datetime from flask import Flask, render_template_string, jsonify, request, g import urllib.request import urllib.error app = Flask(__name__) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) WORKSPACE_DIR = os.path.dirname(BASE_DIR) # works目录 ROOT_DIR = os.path.dirname(WORKSPACE_DIR) # workspace-coder目录 PROJECTS_FILE = os.path.join(BASE_DIR, 'projects.json') LOG_FILE = os.path.join(BASE_DIR, 'logs', 'app.log') DB_FILE = os.path.join(BASE_DIR, 'cron_manager.db') CRON_BACKUP_DIR = os.path.join(BASE_DIR, 'cron_backups') # 进程状态缓存 process_cache = {} def get_db(): """获取数据库连接""" if 'db' not in g: g.db = sqlite3.connect(DB_FILE) g.db.row_factory = sqlite3.Row return g.db @app.teardown_appcontext def close_db(error): """关闭数据库连接""" if hasattr(g, 'db'): g.db.close() def init_db(): """初始化数据库""" conn = sqlite3.connect(DB_FILE) conn.row_factory = sqlite3.Row c = conn.cursor() # Cron 任务表 c.execute('''CREATE TABLE IF NOT EXISTS cron_tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, expression TEXT NOT NULL, command TEXT NOT NULL, description TEXT, category TEXT DEFAULT 'other', enabled INTEGER DEFAULT 1, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP )''') # Cron 历史版本表 c.execute('''CREATE TABLE IF NOT EXISTS cron_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id INTEGER NOT NULL, version INTEGER NOT NULL, name TEXT, expression TEXT, command TEXT, description TEXT, category TEXT, enabled INTEGER, operation TEXT, operator TEXT DEFAULT 'system', created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (task_id) REFERENCES cron_tasks(id) )''') # Cron 执行日志表 c.execute('''CREATE TABLE IF NOT EXISTS cron_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id INTEGER, execution_time TEXT DEFAULT CURRENT_TIMESTAMP, status TEXT, output TEXT, FOREIGN KEY (task_id) REFERENCES cron_tasks(id) )''') # 创建索引 c.execute('CREATE INDEX IF NOT EXISTS idx_cron_history_task ON cron_history(task_id)') c.execute('CREATE INDEX IF NOT EXISTS idx_cron_logs_task ON cron_logs(task_id)') conn.commit() conn.close() def log_message(msg): """写入日志""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_line = f"[{timestamp}] {msg}\n" print(log_line.strip()) try: os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) with open(LOG_FILE, 'a') as f: f.write(log_line) except: pass def signal_handler(signum, frame): """处理信号""" signal_names = { signal.SIGTERM: 'SIGTERM', signal.SIGINT: 'SIGINT', signal.SIGHUP: 'SIGHUP', } sig_name = signal_names.get(signum, f'SIGNAL_{signum}') log_message(f"⚠️ 进程收到 {sig_name} 信号,即将退出!") sys.exit(0) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) try: signal.signal(signal.SIGHUP, signal_handler) except: pass def load_projects(): """加载项目配置""" with open(PROJECTS_FILE, 'r', encoding='utf-8') as f: return json.load(f)['projects'] def save_projects(projects): """保存项目配置""" with open(PROJECTS_FILE, 'w', encoding='utf-8') as f: json.dump({'projects': projects}, f, ensure_ascii=False, indent=2) def check_port(port): """检查端口是否有进程监听""" try: import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) result = sock.connect_ex(('localhost', port)) sock.close() return result == 0 except: return False def check_health(url): """检查健康检查URL""" try: req = urllib.request.Request(url, method='GET') with urllib.request.urlopen(req, timeout=3) as response: return response.status == 200 except: return False def get_process_on_port(port): """获取占用端口的进程信息""" try: result = subprocess.run( f"lsof -i :{port} -t 2>/dev/null | head -1", shell=True, capture_output=True, text=True, timeout=5 ) pid = result.stdout.strip() if pid: cmd_result = subprocess.run( f"ps -p {pid} -o pid,ppid,user,cmd --no-headers 2>/dev/null", shell=True, capture_output=True, text=True ) return {'pid': int(pid), 'info': cmd_result.stdout.strip()} except: pass return None def get_project_status(project): """获取项目状态""" project_type = project.get('type', 'web') if project_type == 'web': ports = project.get('ports', []) if not ports: return {'status': 'unknown', 'message': '未配置端口'} port_status = {} all_running = True for port in ports: running = check_port(port) port_status[port] = {'running': running, 'process': get_process_on_port(port) if running else None} if not running: all_running = False health_ok = None if all_running and project.get('health_url'): health_ok = check_health(project['health_url']) return { 'status': 'running' if all_running else ('partial' if any(p['running'] for p in port_status.values()) else 'stopped'), 'ports': port_status, 'health': health_ok } elif project_type == 'cron': cron_expr = project.get('cron', '') cron_cmd = project.get('cron_cmd', '') try: result = subprocess.run(f"crontab -l 2>/dev/null | grep -F '{cron_cmd}'", shell=True, capture_output=True, text=True) cron_configured = bool(result.stdout.strip()) except: cron_configured = False return {'status': 'configured' if cron_configured else 'not_configured', 'cron': cron_expr, 'cron_configured': cron_configured} elif project_type == 'extension': return {'status': 'completed', 'message': '浏览器插件,手动安装'} elif project_type == 'cli': return {'status': 'ready', 'message': '命令行工具,按需运行'} return {'status': 'unknown'} def run_command(cmd, cwd, project_id, action): """异步执行命令""" def _run(): try: log_file = os.path.join(BASE_DIR, 'logs', f'{project_id}.log') os.makedirs(os.path.dirname(log_file), exist_ok=True) with open(log_file, 'a') as f: f.write(f"\n{'='*50}\n[{datetime.now().isoformat()}] {action}\nCommand: {cmd}\nDirectory: {cwd}\n") process = subprocess.Popen(cmd, shell=True, cwd=cwd, stdout=open(log_file, 'a'), stderr=subprocess.STDOUT, start_new_session=True) process_cache[project_id] = process.pid except Exception as e: with open(os.path.join(BASE_DIR, 'logs', 'error.log'), 'a') as f: f.write(f"[{datetime.now().isoformat()}] Error: {e}\n") thread = threading.Thread(target=_run) thread.daemon = True thread.start() def stop_process(port): """停止占用端口的进程""" try: result = subprocess.run(f"lsof -i :{port} -t 2>/dev/null", shell=True, capture_output=True, text=True, timeout=5) pids = [p for p in result.stdout.strip().split('\n') if p] for pid in pids: try: os.kill(int(pid), signal.SIGTERM) except: pass return len(pids) except: return 0 # ==================== Cron 管理 API ==================== def backup_crontab(): """备份当前 crontab""" os.makedirs(CRON_BACKUP_DIR, exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_file = os.path.join(CRON_BACKUP_DIR, f'crontab_{timestamp}.txt') try: result = subprocess.run("crontab -l 2>/dev/null", shell=True, capture_output=True, text=True) with open(backup_file, 'w') as f: f.write(result.stdout) return backup_file except: return None def sync_crontab_from_db(): """从数据库同步到系统 crontab""" db = get_db() c = db.cursor() c.execute("SELECT expression, command, enabled FROM cron_tasks WHERE enabled = 1") tasks = c.fetchall() lines = [] for task in tasks: lines.append(f"{task['expression']} {task['command']}") # 写入临时文件 tmp_file = '/tmp/crontab_sync.tmp' with open(tmp_file, 'w') as f: f.write('\n'.join(lines) + '\n') # 安装 crontab subprocess.run(f"crontab {tmp_file}", shell=True) os.remove(tmp_file) # 备份 backup_crontab() def get_system_crons(): """获取系统所有 cron 任务""" crons = [] try: result = subprocess.run("crontab -l 2>/dev/null", shell=True, capture_output=True, text=True, timeout=5) for line in result.stdout.strip().split('\n'): line = line.strip() if line and not line.startswith('#'): parts = line.split() if len(parts) >= 6: crons.append({'source': 'user', 'expression': ' '.join(parts[:5]), 'command': ' '.join(parts[5:]), 'line': line}) except: pass try: if os.path.exists('/etc/crontab'): with open('/etc/crontab', 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): parts = line.split() if len(parts) >= 7: crons.append({'source': 'system', 'expression': ' '.join(parts[:5]), 'user': parts[5], 'command': ' '.join(parts[6:]), 'line': line}) except: pass try: if os.path.exists('/etc/cron.d'): for filename in os.listdir('/etc/cron.d'): filepath = os.path.join('/etc/cron.d', filename) if os.path.isfile(filepath): with open(filepath, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): parts = line.split() if len(parts) >= 7: crons.append({'source': 'cron.d', 'file': filename, 'expression': ' '.join(parts[:5]), 'user': parts[5], 'command': ' '.join(parts[6:]), 'line': line}) except: pass return crons def parse_cron_expression(expr): """解析 cron 表达式为人类可读格式""" parts = expr.split() if len(parts) != 5: return expr minute, hour, day, month, weekday = parts desc = [] if minute == '*': desc.append('每分钟') elif minute.startswith('*/'): desc.append(f'每{minute[2:]}分钟') else: desc.append(f'{minute}分') if hour != '*': if minute == '*': desc = [f'{hour}点每分钟'] else: desc = [f'{hour}点{minute}分'] if day != '*': desc.append(f'{day}号') if month != '*': desc.append(f'{month}月') if weekday != '*': weekdays = ['周日', '周一', '周二', '周三', '周四', '周五', '周六'] try: desc.append(weekdays[int(weekday)]) except: desc.append(f'周{weekday}') return ' '.join(desc) if desc else expr def cron_expression_to_human(expr): """更友好的人类可读格式""" parts = expr.split() if len(parts) != 5: return expr minute, hour, day, month, weekday = parts # 特殊模式 patterns = [ ('0 4 * * *', '每天凌晨4点'), ('0 0 * * *', '每天午夜0点'), ('*/5 * * * *', '每5分钟'), ('*/10 * * * *', '每10分钟'), ('*/20 * * * *', '每20分钟'), ('*/30 * * * *', '每30分钟'), ('0 */1 * * *', '每小时'), ('0 */2 * * *', '每2小时'), ('0 9 * * 1', '每周一9点'), ('0 9 * * 1-5', '每周一到五9点'), ] for pattern, desc in patterns: if expr == pattern: return desc # 通用解析 return parse_cron_expression(expr) @app.route('/api/cron/tasks') def api_cron_tasks(): """获取数据库中的 Cron 任务列表""" db = get_db() c = db.cursor() c.execute("SELECT * FROM cron_tasks ORDER BY created_at DESC") tasks = [dict(row) for row in c.fetchall()] # 添加人类可读描述和下次执行时间 for task in tasks: task['human_time'] = cron_expression_to_human(task['expression']) task['next_run'] = get_next_run_time(task['expression']) return jsonify({'tasks': tasks}) @app.route('/api/cron/tasks', methods=['POST']) def api_cron_add(): """添加新的 Cron 任务""" data = request.json if not data.get('name') or not data.get('expression') or not data.get('command'): return jsonify({'error': '缺少必要字段'}), 400 db = get_db() c = db.cursor() # 插入任务 c.execute('''INSERT INTO cron_tasks (name, expression, command, description, category, enabled) VALUES (?, ?, ?, ?, ?, ?)''', (data['name'], data['expression'], data['command'], data.get('description', ''), data.get('category', 'other'), 1)) task_id = c.lastrowid db.commit() # 记录历史(版本1) c.execute('''INSERT INTO cron_history (task_id, version, name, expression, command, description, category, enabled, operation) VALUES (?, 1, ?, ?, ?, ?, ?, ?, 'create')''', (task_id, data['name'], data['expression'], data['command'], data.get('description', ''), data.get('category', 'other'), 1)) db.commit() # 同步到系统 crontab sync_crontab_from_db() log_message(f"添加 Cron 任务: {data['name']} ({data['expression']})") return jsonify({'message': '任务添加成功', 'task_id': task_id}) @app.route('/api/cron/tasks/', methods=['PUT']) def api_cron_update(task_id): """更新 Cron 任务""" data = request.json db = get_db() c = db.cursor() # 获取当前任务 c.execute("SELECT * FROM cron_tasks WHERE id = ?", (task_id,)) old_task = c.fetchone() if not old_task: return jsonify({'error': '任务不存在'}), 404 # 获取当前版本号 c.execute("SELECT MAX(version) FROM cron_history WHERE task_id = ?", (task_id,)) max_version = c.fetchone()[0] or 0 new_version = max_version + 1 # 更新任务 update_fields = [] update_values = [] for field in ['name', 'expression', 'command', 'description', 'category', 'enabled']: if field in data: update_fields.append(f"{field} = ?") update_values.append(data[field]) if update_fields: update_fields.append("updated_at = CURRENT_TIMESTAMP") c.execute(f"UPDATE cron_tasks SET {','.join(update_fields)} WHERE id = ?", update_values + [task_id]) db.commit() # 记录历史 c.execute('''INSERT INTO cron_history (task_id, version, name, expression, command, description, category, enabled, operation) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'update')''', (task_id, new_version, data.get('name', old_task['name']), data.get('expression', old_task['expression']), data.get('command', old_task['command']), data.get('description', old_task['description']), data.get('category', old_task['category']), data.get('enabled', old_task['enabled']))) db.commit() # 同步到系统 crontab sync_crontab_from_db() log_message(f"更新 Cron 任务 ID={task_id}: 版本 {new_version}") return jsonify({'message': '任务更新成功', 'version': new_version}) @app.route('/api/cron/tasks/', methods=['DELETE']) def api_cron_delete(task_id): """删除 Cron 任务""" db = get_db() c = db.cursor() c.execute("SELECT * FROM cron_tasks WHERE id = ?", (task_id,)) task = c.fetchone() if not task: return jsonify({'error': '任务不存在'}), 404 # 记录删除历史 c.execute("SELECT MAX(version) FROM cron_history WHERE task_id = ?", (task_id,)) max_version = c.fetchone()[0] or 0 c.execute('''INSERT INTO cron_history (task_id, version, name, expression, command, description, category, enabled, operation) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'delete')''', (task_id, max_version + 1, task['name'], task['expression'], task['command'], task['description'], task['category'], task['enabled'])) db.commit() # 删除任务 c.execute("DELETE FROM cron_tasks WHERE id = ?", (task_id,)) db.commit() # 同步到系统 crontab sync_crontab_from_db() log_message(f"删除 Cron 任务 ID={task_id}: {task['name']}") return jsonify({'message': '任务删除成功'}) @app.route('/api/cron/tasks//toggle', methods=['POST']) def api_cron_toggle(task_id): """启用/禁用 Cron 任务""" db = get_db() c = db.cursor() c.execute("SELECT enabled FROM cron_tasks WHERE id = ?", (task_id,)) task = c.fetchone() if not task: return jsonify({'error': '任务不存在'}), 404 new_enabled = 0 if task['enabled'] == 1 else 1 c.execute("UPDATE cron_tasks SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", (new_enabled, task_id)) db.commit() # 记录历史 c.execute("SELECT MAX(version) FROM cron_history WHERE task_id = ?", (task_id,)) max_version = c.fetchone()[0] or 0 c.execute('''INSERT INTO cron_history (task_id, version, operation, enabled) VALUES (?, ?, ?, ?)''', (task_id, max_version + 1, 'toggle', new_enabled)) db.commit() # 同步到系统 crontab sync_crontab_from_db() log_message(f"切换 Cron 任务 ID={task_id} 状态: {new_enabled}") return jsonify({'message': '状态已切换', 'enabled': new_enabled}) @app.route('/api/cron/tasks//history') def api_cron_history(task_id): """获取任务历史版本""" db = get_db() c = db.cursor() c.execute("SELECT * FROM cron_history WHERE task_id = ? ORDER BY version DESC", (task_id,)) history = [dict(row) for row in c.fetchall()] for h in history: if h['expression']: h['human_time'] = cron_expression_to_human(h['expression']) return jsonify({'history': history}) @app.route('/api/cron/tasks//rollback/', methods=['POST']) def api_cron_rollback(task_id, version): """回退到指定版本""" db = get_db() c = db.cursor() c.execute("SELECT * FROM cron_history WHERE task_id = ? AND version = ?", (task_id, version)) history = c.fetchone() if not history: return jsonify({'error': '版本不存在'}), 404 # 更新任务到历史版本 c.execute('''UPDATE cron_tasks SET name = ?, expression = ?, command = ?, description = ?, category = ?, enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?''', (history['name'], history['expression'], history['command'], history['description'], history['category'], history['enabled'], task_id)) db.commit() # 记录回退操作 c.execute("SELECT MAX(version) FROM cron_history WHERE task_id = ?", (task_id,)) max_version = c.fetchone()[0] or 0 c.execute('''INSERT INTO cron_history (task_id, version, name, expression, command, description, category, enabled, operation) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)''', (task_id, max_version + 1, history['name'], history['expression'], history['command'], history['description'], history['category'], history['enabled'], f'rollback to v{version}')) db.commit() # 同步到系统 crontab sync_crontab_from_db() log_message(f"回退 Cron 任务 ID={task_id} 到版本 {version}") return jsonify({'message': f'已回退到版本 {version}'}) @app.route('/api/cron/tasks//logs') def api_cron_logs(task_id): """获取任务执行日志""" enable_logs = request.args.get('enable', 'true') == 'true' if not enable_logs: return jsonify({'logs': [], 'message': '日志功能已关闭'}) db = get_db() c = db.cursor() c.execute("SELECT * FROM cron_logs WHERE task_id = ? ORDER BY execution_time DESC LIMIT 100", (task_id,)) logs = [dict(row) for row in c.fetchall()] return jsonify({'logs': logs}) @app.route('/api/cron/categories') def api_cron_categories(): """获取任务分类列表""" categories = [ {'id': 'monitor', 'name': '系统监控', 'icon': 'ri-heart-pulse-line', 'color': 'blue'}, {'id': 'backup', 'name': '数据备份', 'icon': 'ri-archive-line', 'color': 'green'}, {'id': 'cleanup', 'name': '清理任务', 'icon': 'ri-delete-bin-line', 'color': 'yellow'}, {'id': 'report', 'name': '报表生成', 'icon': 'ri-file-chart-line', 'color': 'purple'}, {'id': 'sync', 'name': '数据同步', 'icon': 'ri-refresh-line', 'color': 'cyan'}, {'id': 'notification', 'name': '通知提醒', 'icon': 'ri-notification-line', 'color': 'orange'}, {'id': 'other', 'name': '其他任务', 'icon': 'ri-more-line', 'color': 'gray'}, ] return jsonify({'categories': categories}) @app.route('/api/cron/sync', methods=['POST']) def api_cron_sync(): """从系统 crontab 同步到数据库""" crons = get_system_crons() user_crons = [c for c in crons if c['source'] == 'user'] db = get_db() c = db.cursor() synced_count = 0 for cron in user_crons: # 检查是否已存在 c.execute("SELECT id FROM cron_tasks WHERE expression = ? AND command = ?", (cron['expression'], cron['command'])) existing = c.fetchone() if not existing: # 尝试从命令推断分类和名称 name = guess_cron_name(cron['command']) category = guess_cron_category(cron['command']) c.execute('''INSERT INTO cron_tasks (name, expression, command, description, category, enabled) VALUES (?, ?, ?, ?, ?, 1)''', (name, cron['expression'], cron['command'], '', category)) synced_count += 1 db.commit() log_message(f"从系统 crontab 同步了 {synced_count} 个任务") return jsonify({'message': f'同步完成,新增 {synced_count} 个任务'}) def guess_cron_name(command): """从命令推断任务名称""" keywords = { 'service-monitor': '服务监控', 'board_monitor': 'A股板块监控', 'daily-summary': '每日总结', 'backup': '数据备份', 'cleanup': '清理任务', 'xian-favor': 'Xian Favor 备份', } for keyword, name in keywords.items(): if keyword in command: return name # 从文件名推断 match = re.search(r'/([^/]+\.py)', command) if match: return match.group(1).replace('.py', '').replace('_', ' ') return '未命名任务' def guess_cron_category(command): """从命令推断任务分类""" if 'monitor' in command or 'check' in command: return 'monitor' if 'backup' in command: return 'backup' if 'cleanup' in command or 'clean' in command: return 'cleanup' if 'report' in command or 'summary' in command: return 'report' if 'sync' in command: return 'sync' if 'notify' in command or 'mail' in command: return 'notification' return 'other' def get_next_run_time(expression): """获取下次执行时间""" try: from croniter import croniter base = datetime.now() cron = croniter(expression, base) next_time = cron.get_next(datetime) return next_time.strftime('%Y-%m-%d %H:%M:%S') except: return '无法计算' @app.route('/api/crons') def api_crons(): """获取系统 cron 列表(兼容旧接口)""" crons = get_system_crons() for cron in crons: cron['description'] = parse_cron_expression(cron['expression']) return jsonify({'crons': crons}) # ==================== 项目管理 API ==================== @app.route('/') def index(): return render_template_string(HTML_TEMPLATE) @app.route('/api/projects') def api_projects(): projects = load_projects() for project in projects: project['status'] = get_project_status(project) return jsonify({'projects': projects}) @app.route('/api/projects/') def api_project(project_id): projects = load_projects() project = next((p for p in projects if p['id'] == project_id), None) if not project: return jsonify({'error': '项目不存在'}), 404 project['status'] = get_project_status(project) return jsonify(project) @app.route('/api/projects//start', methods=['POST']) def api_start(project_id): projects = load_projects() project = next((p for p in projects if p['id'] == project_id), None) if not project: return jsonify({'error': '项目不存在'}), 404 if project['type'] != 'web': return jsonify({'error': '只有Web服务支持启动'}), 400 directory = project.get('directory', '') if directory.startswith('/'): cwd = directory else: cwd = os.path.join(ROOT_DIR, directory) if not os.path.exists(cwd): return jsonify({'error': f'目录不存在: {cwd}'}), 400 if project.get('start_cmds'): for name, info in project['start_cmds'].items(): run_command(info['cmd'], cwd, project_id, f'start-{name}') else: cmd = project.get('start_cmd', 'python3 app.py') run_command(cmd, cwd, project_id, 'start') return jsonify({'message': '服务启动中...'}) @app.route('/api/projects//stop', methods=['POST']) def api_stop(project_id): projects = load_projects() project = next((p for p in projects if p['id'] == project_id), None) if not project: return jsonify({'error': '项目不存在'}), 404 if project['type'] != 'web': return jsonify({'error': '只有Web服务支持停止'}), 400 ports = project.get('ports', []) stopped = 0 for port in ports: stopped += stop_process(port) return jsonify({'message': f'已停止 {stopped} 个进程'}) @app.route('/api/projects//run', methods=['POST']) def api_run(project_id): projects = load_projects() project = next((p for p in projects if p['id'] == project_id), None) if not project: return jsonify({'error': '项目不存在'}), 404 if project['type'] != 'cli': return jsonify({'error': '只有CLI工具支持运行'}), 400 directory = project.get('directory', '') if directory.startswith('/'): cwd = directory else: cwd = os.path.join(ROOT_DIR, directory) cmd = project.get('run_cmd', '') if not cmd: return jsonify({'error': '未配置运行命令'}), 400 run_command(cmd, cwd, project_id, 'run') return jsonify({'message': 'CLI工具已开始运行...'}) @app.route('/api/projects//log') def api_log(project_id): log_file = os.path.join(BASE_DIR, 'logs', f'{project_id}.log') if os.path.exists(log_file): with open(log_file, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines()[-1000:] return jsonify({'log': ''.join(lines)}) return jsonify({'log': '暂无日志'}) @app.route('/api/projects/add', methods=['POST']) def api_add_project(): data = request.json if not data.get('name') or not data.get('type'): return jsonify({'error': '缺少必要字段'}), 400 projects = load_projects() project_id = re.sub(r'[^a-z0-9-]', '-', data['name'].lower()) if any(p['id'] == project_id for p in projects): return jsonify({'error': '项目ID已存在'}), 400 new_project = { 'id': project_id, 'name': data['name'], 'type': data['type'], 'description': data.get('description', ''), 'directory': data.get('directory', ''), 'version': data.get('version', 'v1.0.0'), 'git_repo': data.get('git_repo'), } if data['type'] == 'web': new_project['ports'] = data.get('ports', []) new_project['start_cmd'] = data.get('start_cmd', 'python3 app.py') new_project['health_url'] = data.get('health_url') new_project['admin_url'] = data.get('admin_url') elif data['type'] == 'cron': new_project['cron'] = data.get('cron') new_project['cron_cmd'] = data.get('cron_cmd') elif data['type'] == 'cli': new_project['run_cmd'] = data.get('run_cmd') projects.append(new_project) save_projects(projects) return jsonify({'message': '项目添加成功', 'project': new_project}) @app.route('/api/projects/', methods=['PUT']) def api_update_project(project_id): projects = load_projects() project = next((p for p in projects if p['id'] == project_id), None) if not project: return jsonify({'error': '项目不存在'}), 404 data = request.json for key, value in data.items(): if key != 'id': project[key] = value save_projects(projects) return jsonify({'message': '更新成功', 'project': project}) @app.route('/api/projects/', methods=['DELETE']) def api_delete_project(project_id): projects = load_projects() project = next((p for p in projects if p['id'] == project_id), None) if not project: return jsonify({'error': '项目不存在'}), 404 projects = [p for p in projects if p['id'] != project_id] save_projects(projects) return jsonify({'message': '删除成功'}) # ==================== HTML 模板 ==================== HTML_TEMPLATE = ''' 项目服务管理面板 v2.3

项目服务管理面板 v2.1

对外IP:
间隔: s
已连接

Web服务

-

运行中

-

Cron任务

-

CLI工具

-

加载中...

''' if __name__ == '__main__': os.makedirs(os.path.join(BASE_DIR, 'logs'), exist_ok=True) os.makedirs(CRON_BACKUP_DIR, exist_ok=True) init_db() log_message("=" * 50) log_message("项目服务管理面板 v2.0.0 启动") log_message("访问地址: http://localhost:19013") log_message(f"进程PID: {os.getpid()}") log_message("=" * 50) app.run(host='0.0.0.0', port=19013, debug=False)