""" 产品参数爬取系统 - 后台管理 """ from flask import Flask, render_template, jsonify, request from flask_cors import CORS import json from datetime import datetime from pathlib import Path import asyncio app = Flask(__name__) CORS(app) BASE_DIR = Path(__file__).parent.parent DATA_DIR = BASE_DIR / 'data' PRODUCTS_FILE = DATA_DIR / 'products.json' TASKS_FILE = DATA_DIR / 'tasks.json' def load_products(): if PRODUCTS_FILE.exists(): return json.loads(PRODUCTS_FILE.read_text(encoding='utf-8')) return {"products": [], "last_update": None} def load_tasks(): if TASKS_FILE.exists(): return json.loads(TASKS_FILE.read_text(encoding='utf-8')) return {"tasks": []} # ============ 页面路由 ============ @app.route('/') def index(): return render_template('index.html') @app.route('/products') def products_page(): return render_template('products.html') @app.route('/spiders') def spiders_page(): return render_template('spiders.html') @app.route('/tasks') def tasks_page(): return render_template('tasks.html') @app.route('/config') def config_page(): return render_template('config.html') # ============ API代理 ============ @app.route('/api/stats') def api_stats(): """获取统计信息""" data = load_products() tasks = load_tasks().get("tasks", []) return jsonify({ "total_products": len(data.get("products", [])), "last_update": data.get("last_update"), "total_tasks": len(tasks), "running_tasks": len([t for t in tasks if t.get("status") == "running"]) }) @app.route('/api/products') def api_products(): """获取产品列表""" data = load_products() return jsonify(data) @app.route('/api/spiders') def api_spiders(): """获取爬虫列表""" tasks = load_tasks().get("tasks", []) spiders = [ { "name": "byd", "display_name": "比亚迪汽车", "description": "爬取比亚迪官网车型参数", "url": "https://www.byd.com/cn/", "status": "available" } ] # 更新最后运行时间 for spider in spiders: for task in reversed(tasks): if task.get("spider") == spider["name"]: spider["last_run"] = task.get("end_time") spider["last_status"] = task.get("status") break return jsonify(spiders) @app.route('/api/tasks') def api_tasks(): """获取任务列表""" tasks = load_tasks().get("tasks", []) tasks.sort(key=lambda x: x.get("start_time", ""), reverse=True) return jsonify(tasks[:50]) @app.route('/api/run/', methods=['POST']) def api_run_spider(spider_name): """运行爬虫""" import sys sys.path.insert(0, str(BASE_DIR)) data = load_products() async def run_spider(): try: if spider_name == "byd": from spiders.byd import BYDSpider spider = BYDSpider({"headless": True}) results = await spider.run() for item in results: existing = False for i, p in enumerate(data["products"]): if p.get("name") == item.get("name"): data["products"][i].update(item) data["products"][i]["updated_at"] = datetime.now().isoformat() existing = True break if not existing: item["id"] = f"byd-{datetime.now().strftime('%Y%m%d%H%M%S')}-{len(data['products'])}" item["brand"] = "比亚迪" item["source"] = "byd.com" item["created_at"] = datetime.now().isoformat() data["products"].append(item) PRODUCTS_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8') return {"success": True, "count": len(results)} return {"success": False, "error": f"Unknown spider: {spider_name}"} except Exception as e: return {"success": False, "error": str(e)} # 记录任务 tasks = load_tasks() task = { "id": f"task-{datetime.now().strftime('%Y%m%d%H%M%S')}", "spider": spider_name, "status": "running", "start_time": datetime.now().isoformat() } tasks["tasks"].append(task) TASKS_FILE.write_text(json.dumps(tasks, ensure_ascii=False, indent=2), encoding='utf-8') result = asyncio.run(run_spider()) task["status"] = "completed" if result.get("success") else "failed" task["end_time"] = datetime.now().isoformat() task["result"] = result TASKS_FILE.write_text(json.dumps(tasks, ensure_ascii=False, indent=2), encoding='utf-8') return jsonify(result) if __name__ == '__main__': print("=" * 50) print("产品参数爬取系统 - 后台管理") print("=" * 50) print(f"访问地址: http://localhost:19012") print("=" * 50) app.run(host='0.0.0.0', port=19012, debug=True)