#!/usr/bin/env python3
"""
项目服务管理面板
端口: 19013
"""
import os
import json
import subprocess
import signal
import threading
from datetime import datetime
from flask import Flask, render_template_string, jsonify, request
import urllib.request
import urllib.error
app = Flask(__name__)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
WORKSPACE_DIR = os.path.dirname(BASE_DIR) # works目录
ROOT_DIR = os.path.dirname(WORKSPACE_DIR) # workspace-coder目录
PROJECTS_FILE = os.path.join(BASE_DIR, 'projects.json')
# 进程状态缓存
process_cache = {}
def load_projects():
"""加载项目配置"""
with open(PROJECTS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)['projects']
def save_projects(projects):
"""保存项目配置"""
with open(PROJECTS_FILE, 'w', encoding='utf-8') as f:
json.dump({'projects': projects}, f, ensure_ascii=False, indent=2)
def check_port(port):
"""检查端口是否有进程监听"""
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex(('localhost', port))
sock.close()
return result == 0
except:
return False
def check_health(url):
"""检查健康检查URL"""
try:
req = urllib.request.Request(url, method='GET')
with urllib.request.urlopen(req, timeout=3) as response:
return response.status == 200
except:
return False
def get_process_on_port(port):
"""获取占用端口的进程信息"""
try:
result = subprocess.run(
f"lsof -i :{port} -t 2>/dev/null | head -1",
shell=True, capture_output=True, text=True, timeout=5
)
pid = result.stdout.strip()
if pid:
# 获取进程信息
cmd_result = subprocess.run(
f"ps -p {pid} -o pid,ppid,user,cmd --no-headers 2>/dev/null",
shell=True, capture_output=True, text=True
)
return {
'pid': int(pid),
'info': cmd_result.stdout.strip()
}
except:
pass
return None
def get_project_status(project):
"""获取项目状态"""
project_type = project.get('type', 'web')
if project_type == 'web':
ports = project.get('ports', [])
if not ports:
return {'status': 'unknown', 'message': '未配置端口'}
port_status = {}
all_running = True
for port in ports:
running = check_port(port)
port_status[port] = {
'running': running,
'process': get_process_on_port(port) if running else None
}
if not running:
all_running = False
# 健康检查
health_ok = None
if all_running and project.get('health_url'):
health_ok = check_health(project['health_url'])
return {
'status': 'running' if all_running else ('partial' if any(p['running'] for p in port_status.values()) else 'stopped'),
'ports': port_status,
'health': health_ok
}
elif project_type == 'cron':
# 检查cron任务是否配置
cron_expr = project.get('cron', '')
cron_cmd = project.get('cron_cmd', '')
try:
result = subprocess.run(
f"crontab -l 2>/dev/null | grep -F '{cron_cmd}'",
shell=True, capture_output=True, text=True
)
cron_configured = bool(result.stdout.strip())
except:
cron_configured = False
return {
'status': 'configured' if cron_configured else 'not_configured',
'cron': cron_expr,
'cron_configured': cron_configured
}
elif project_type == 'extension':
return {'status': 'completed', 'message': '浏览器插件,手动安装'}
elif project_type == 'cli':
return {'status': 'ready', 'message': '命令行工具,按需运行'}
return {'status': 'unknown'}
def run_command(cmd, cwd, project_id, action):
"""异步执行命令"""
def _run():
try:
log_file = os.path.join(BASE_DIR, 'logs', f'{project_id}.log')
os.makedirs(os.path.dirname(log_file), exist_ok=True)
with open(log_file, 'a') as f:
f.write(f"\n{'='*50}\n")
f.write(f"[{datetime.now().isoformat()}] {action}\n")
f.write(f"Command: {cmd}\n")
f.write(f"Directory: {cwd}\n")
process = subprocess.Popen(
cmd,
shell=True,
cwd=cwd,
stdout=open(log_file, 'a'),
stderr=subprocess.STDOUT,
start_new_session=True
)
process_cache[project_id] = process.pid
except Exception as e:
with open(os.path.join(BASE_DIR, 'logs', 'error.log'), 'a') as f:
f.write(f"[{datetime.now().isoformat()}] Error: {e}\n")
thread = threading.Thread(target=_run)
thread.daemon = True
thread.start()
def stop_process(port):
"""停止占用端口的进程"""
try:
result = subprocess.run(
f"lsof -i :{port} -t 2>/dev/null",
shell=True, capture_output=True, text=True, timeout=5
)
pids = result.stdout.strip().split('\n')
pids = [p for p in pids if p]
for pid in pids:
try:
os.kill(int(pid), signal.SIGTERM)
except:
pass
return len(pids)
except:
return 0
# HTML模板
HTML_TEMPLATE = '''
项目服务管理面板
'''
@app.route('/')
def index():
return render_template_string(HTML_TEMPLATE)
@app.route('/api/projects')
def api_projects():
projects = load_projects()
for project in projects:
project['status'] = get_project_status(project)
return jsonify({'projects': projects})
@app.route('/api/projects/')
def api_project(project_id):
projects = load_projects()
project = next((p for p in projects if p['id'] == project_id), None)
if not project:
return jsonify({'error': '项目不存在'}), 404
project['status'] = get_project_status(project)
return jsonify(project)
@app.route('/api/projects//start', methods=['POST'])
def api_start(project_id):
projects = load_projects()
project = next((p for p in projects if p['id'] == project_id), None)
if not project:
return jsonify({'error': '项目不存在'}), 404
if project['type'] != 'web':
return jsonify({'error': '只有Web服务支持启动'}), 400
directory = project.get('directory', '')
if directory.startswith('/'):
cwd = directory
else:
cwd = os.path.join(ROOT_DIR, directory)
if not os.path.exists(cwd):
return jsonify({'error': f'目录不存在: {cwd}'}), 400
# 检查是否有多个启动命令
if project.get('start_cmds'):
for name, info in project['start_cmds'].items():
cmd = info['cmd']
run_command(cmd, cwd, project_id, f'start-{name}')
return jsonify({'message': '服务启动中...'})
else:
cmd = project.get('start_cmd', 'python3 app.py')
run_command(cmd, cwd, project_id, 'start')
return jsonify({'message': '服务启动中...'})
@app.route('/api/projects//stop', methods=['POST'])
def api_stop(project_id):
projects = load_projects()
project = next((p for p in projects if p['id'] == project_id), None)
if not project:
return jsonify({'error': '项目不存在'}), 404
if project['type'] != 'web':
return jsonify({'error': '只有Web服务支持停止'}), 400
ports = project.get('ports', [])
stopped = 0
for port in ports:
stopped += stop_process(port)
return jsonify({'message': f'已停止 {stopped} 个进程'})
@app.route('/api/projects//run', methods=['POST'])
def api_run(project_id):
projects = load_projects()
project = next((p for p in projects if p['id'] == project_id), None)
if not project:
return jsonify({'error': '项目不存在'}), 404
if project['type'] != 'cli':
return jsonify({'error': '只有CLI工具支持运行'}), 400
directory = project.get('directory', '')
if directory.startswith('/'):
cwd = directory
else:
cwd = os.path.join(ROOT_DIR, directory)
cmd = project.get('run_cmd', '')
if not cmd:
return jsonify({'error': '未配置运行命令'}), 400
run_command(cmd, cwd, project_id, 'run')
return jsonify({'message': 'CLI工具已开始运行...'})
@app.route('/api/projects//log')
def api_log(project_id):
log_file = os.path.join(BASE_DIR, 'logs', f'{project_id}.log')
if os.path.exists(log_file):
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
# 最后1000行
lines = f.readlines()[-1000:]
return jsonify({'log': ''.join(lines)})
return jsonify({'log': '暂无日志'})
@app.route('/api/projects/add', methods=['POST'])
def api_add_project():
data = request.json
if not data.get('name') or not data.get('type'):
return jsonify({'error': '缺少必要字段'}), 400
projects = load_projects()
# 生成ID
import re
project_id = re.sub(r'[^a-z0-9-]', '-', data['name'].lower())
# 检查ID是否重复
if any(p['id'] == project_id for p in projects):
return jsonify({'error': '项目ID已存在'}), 400
new_project = {
'id': project_id,
'name': data['name'],
'type': data['type'],
'description': data.get('description', ''),
'directory': data.get('directory', ''),
'version': data.get('version', 'v1.0.0'),
'git_repo': data.get('git_repo'),
}
if data['type'] == 'web':
new_project['ports'] = data.get('ports', [])
new_project['start_cmd'] = data.get('start_cmd', 'python3 app.py')
new_project['health_url'] = data.get('health_url')
new_project['admin_url'] = data.get('admin_url')
elif data['type'] == 'cron':
new_project['cron'] = data.get('cron')
new_project['cron_cmd'] = data.get('cron_cmd')
elif data['type'] == 'cli':
new_project['run_cmd'] = data.get('run_cmd')
projects.append(new_project)
save_projects(projects)
return jsonify({'message': '项目添加成功', 'project': new_project})
@app.route('/api/projects/', methods=['PUT'])
def api_update_project(project_id):
projects = load_projects()
project = next((p for p in projects if p['id'] == project_id), None)
if not project:
return jsonify({'error': '项目不存在'}), 404
data = request.json
for key, value in data.items():
if key != 'id': # 不允许修改ID
project[key] = value
save_projects(projects)
return jsonify({'message': '更新成功', 'project': project})
@app.route('/api/projects/', methods=['DELETE'])
def api_delete_project(project_id):
projects = load_projects()
project = next((p for p in projects if p['id'] == project_id), None)
if not project:
return jsonify({'error': '项目不存在'}), 404
projects = [p for p in projects if p['id'] != project_id]
save_projects(projects)
return jsonify({'message': '删除成功'})
def get_system_crons():
"""获取系统所有 cron 任务"""
crons = []
# 获取当前用户的 crontab
try:
result = subprocess.run(
"crontab -l 2>/dev/null",
shell=True, capture_output=True, text=True, timeout=5
)
user_crontab = result.stdout.strip()
if user_crontab:
for line in user_crontab.split('\n'):
line = line.strip()
if line and not line.startswith('#'):
# 解析 cron 行
parts = line.split()
if len(parts) >= 6:
cron_expr = ' '.join(parts[:5])
command = ' '.join(parts[5:])
crons.append({
'source': 'user',
'expression': cron_expr,
'command': command,
'line': line
})
except:
pass
# 获取系统级 cron (/etc/crontab)
try:
if os.path.exists('/etc/crontab'):
with open('/etc/crontab', 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
parts = line.split()
if len(parts) >= 7:
# 系统 crontab 有额外的 user 字段
cron_expr = ' '.join(parts[:5])
user = parts[5]
command = ' '.join(parts[6:])
crons.append({
'source': 'system',
'expression': cron_expr,
'user': user,
'command': command,
'line': line
})
except:
pass
# 获取 /etc/cron.d/ 目录下的任务
try:
cron_d_dir = '/etc/cron.d'
if os.path.exists(cron_d_dir):
for filename in os.listdir(cron_d_dir):
filepath = os.path.join(cron_d_dir, filename)
if os.path.isfile(filepath):
with open(filepath, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
parts = line.split()
if len(parts) >= 7:
cron_expr = ' '.join(parts[:5])
user = parts[5]
command = ' '.join(parts[6:])
crons.append({
'source': 'cron.d',
'file': filename,
'expression': cron_expr,
'user': user,
'command': command,
'line': line
})
except:
pass
return crons
def parse_cron_expression(expr):
"""解析 cron 表达式为人类可读格式"""
parts = expr.split()
if len(parts) != 5:
return expr
minute, hour, day, month, weekday = parts
desc = []
# 分钟
if minute == '*':
desc.append('每分钟')
elif minute.startswith('*/'):
desc.append(f'每{minute[2:]}分钟')
else:
desc.append(f'{minute}分')
# 小时
if hour != '*':
if minute == '*':
desc = [f'{hour}点每分钟']
else:
desc = [f'{hour}点{minute}分']
elif minute.startswith('*/'):
# 保持 "每X分钟"
pass
# 日期
if day != '*':
desc.append(f'{day}号')
# 月份
if month != '*':
months = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12']
try:
desc.append(f'{months[int(month)-1]}月')
except:
desc.append(f'{month}月')
# 星期
if weekday != '*':
weekdays = ['周日', '周一', '周二', '周三', '周四', '周五', '周六']
try:
# 0=周日, 1=周一...
desc.append(weekdays[int(weekday)])
except:
desc.append(f'周{weekday}')
return ' '.join(desc) if desc else expr
@app.route('/api/crons')
def api_crons():
"""获取系统 cron 列表"""
crons = get_system_crons()
for cron in crons:
cron['description'] = parse_cron_expression(cron['expression'])
return jsonify({'crons': crons})
if __name__ == '__main__':
os.makedirs(os.path.join(BASE_DIR, 'logs'), exist_ok=True)
print("=" * 50)
print("项目服务管理面板")
print("访问地址: http://localhost:19013")
print("=" * 50)
app.run(host='0.0.0.0', port=19013, debug=False)