""" 产品参数爬取系统 - 主程序 v2.0.0 - 合并后台管理到单端口 端口: 19011 前台: http://localhost:19011 后台: http://localhost:19011/admin """ from flask import Flask, jsonify, request, render_template from flask_cors import CORS import json import os from datetime import datetime from pathlib import Path import asyncio import logging # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) app = Flask(__name__, template_folder='templates') CORS(app) # 路径配置 BASE_DIR = Path(__file__).parent DATA_DIR = BASE_DIR / 'data' DATA_DIR.mkdir(exist_ok=True) PRODUCTS_FILE = DATA_DIR / 'products.json' TASKS_FILE = DATA_DIR / 'tasks.json' LOGS_DIR = BASE_DIR / 'logs' LOGS_DIR.mkdir(exist_ok=True) # ============ 数据存储 ============ def load_products(): """加载产品数据""" if PRODUCTS_FILE.exists(): return json.loads(PRODUCTS_FILE.read_text(encoding='utf-8')) return {"products": [], "last_update": None} def save_products(data): """保存产品数据""" data["last_update"] = datetime.now().isoformat() PRODUCTS_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8') def load_tasks(): """加载任务数据""" if TASKS_FILE.exists(): return json.loads(TASKS_FILE.read_text(encoding='utf-8')) return {"tasks": []} def save_tasks(data): """保存任务数据""" TASKS_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8') # ============ 前台 API 路由 ============ @app.route('/') def index(): """前台首页 - API说明""" return jsonify({ "name": "Product Crawler", "version": "2.0.0", "description": "产品参数爬取系统", "endpoints": { "products": "/api/products", "tasks": "/api/tasks", "spiders": "/api/spiders", "run": "/api/run/", "admin": "/admin" } }) @app.route('/api/products') def api_list_products(): """获取产品列表""" data = load_products() # 支持筛选 brand = request.args.get('brand') search = request.args.get('search') products = data.get("products", []) if brand: products = [p for p in products if p.get("brand") == brand] if search: search_lower = search.lower() products = [p for p in products if search_lower in p.get("name", "").lower() or search_lower in json.dumps(p.get("params", {}), ensure_ascii=False).lower()] return jsonify({ "products": products, "total": len(products), "last_update": data.get("last_update") }) @app.route('/api/products/') def api_get_product(product_id): """获取产品详情""" data = load_products() for product in data.get("products", []): if product.get("id") == product_id: return jsonify(product) return jsonify({"error": "Product not found"}), 404 @app.route('/api/products', methods=['POST']) def api_add_product(): """添加产品""" product = request.get_json() if not product: return jsonify({"error": "Invalid data"}), 400 data = load_products() # 生成ID if not product.get("id"): product["id"] = f"{product.get('brand', 'unknown')}-{datetime.now().strftime('%Y%m%d%H%M%S')}" product["created_at"] = datetime.now().isoformat() product["updated_at"] = datetime.now().isoformat() data["products"].append(product) save_products(data) return jsonify({"success": True, "product": product}) @app.route('/api/products/', methods=['PUT']) def api_update_product(product_id): """更新产品""" updates = request.get_json() data = load_products() for i, product in enumerate(data["products"]): if product.get("id") == product_id: data["products"][i].update(updates) data["products"][i]["updated_at"] = datetime.now().isoformat() save_products(data) return jsonify({"success": True, "product": data["products"][i]}) return jsonify({"error": "Product not found"}), 404 @app.route('/api/products/', methods=['DELETE']) def api_delete_product(product_id): """删除产品""" data = load_products() original_count = len(data["products"]) data["products"] = [p for p in data["products"] if p.get("id") != product_id] if len(data["products"]) < original_count: save_products(data) return jsonify({"success": True}) return jsonify({"error": "Product not found"}), 404 @app.route('/api/spiders') def api_list_spiders(): """获取可用爬虫列表""" spiders = [ { "name": "byd", "display_name": "比亚迪汽车", "description": "爬取比亚迪官网车型参数", "status": "available", "last_run": None }, { "name": "custom", "display_name": "自定义爬虫", "description": "通过配置自定义爬虫规则", "status": "available", "last_run": None } ] # 检查任务记录 tasks = load_tasks().get("tasks", []) for spider in spiders: for task in tasks: if task.get("spider") == spider["name"]: spider["last_run"] = task.get("end_time") if task.get("status") == "running": spider["status"] = "running" return jsonify(spiders) @app.route('/api/run/', methods=['POST']) def api_run_spider(spider_name): """运行爬虫""" data = load_products() async def run_spider(): try: if spider_name == "byd": from spiders.byd import BYDSpider spider = BYDSpider({"headless": True}) results = await spider.run() # 保存结果 for item in results: # 检查是否已存在 existing = False for i, p in enumerate(data["products"]): if p.get("name") == item.get("name"): # 更新 data["products"][i].update(item) data["products"][i]["updated_at"] = datetime.now().isoformat() existing = True break if not existing: item["id"] = f"byd-{datetime.now().strftime('%Y%m%d%H%M%S')}-{len(data['products'])}" item["brand"] = "比亚迪" item["source"] = "byd.com" item["created_at"] = datetime.now().isoformat() data["products"].append(item) save_products(data) return {"success": True, "count": len(results)} else: return {"success": False, "error": f"Unknown spider: {spider_name}"} except Exception as e: logger.error(f"爬虫运行失败: {e}") return {"success": False, "error": str(e)} # 记录任务 tasks = load_tasks() task = { "id": f"task-{datetime.now().strftime('%Y%m%d%H%M%S')}", "spider": spider_name, "status": "running", "start_time": datetime.now().isoformat(), "end_time": None, "result": None } tasks["tasks"].append(task) save_tasks(tasks) # 运行爬虫 result = asyncio.run(run_spider()) # 更新任务状态 task["status"] = "completed" if result.get("success") else "failed" task["end_time"] = datetime.now().isoformat() task["result"] = result save_tasks(tasks) return jsonify(result) @app.route('/api/tasks') def api_list_tasks(): """获取任务列表""" tasks = load_tasks().get("tasks", []) # 按时间倒序 tasks.sort(key=lambda x: x.get("start_time", ""), reverse=True) # 限制返回数量 limit = request.args.get('limit', 50, type=int) tasks = tasks[:limit] return jsonify(tasks) @app.route('/api/export') def api_export(): """导出数据""" format = request.args.get('format', 'json') data = load_products() if format == 'json': return jsonify(data) elif format == 'csv': import io import csv output = io.StringIO() writer = csv.writer(output) # 写入表头 if data["products"]: headers = ["id", "name", "brand", "source"] + list(data["products"][0].get("params", {}).keys()) writer.writerow(headers) for product in data["products"]: row = [ product.get("id", ""), product.get("name", ""), product.get("brand", ""), product.get("source", "") ] for key in headers[4:]: row.append(product.get("params", {}).get(key, "")) writer.writerow(row) output.seek(0) return output.getvalue(), 200, { "Content-Type": "text/csv; charset=utf-8", "Content-Disposition": "attachment; filename=products.csv" } return jsonify({"error": "Unsupported format"}), 400 # ============ 后台管理页面路由 ============ @app.route('/admin') def admin_index(): """后台首页""" return render_template('index.html') @app.route('/admin/products') def admin_products(): """后台 - 产品管理""" return render_template('products.html') @app.route('/admin/spiders') def admin_spiders(): """后台 - 爬虫管理""" return render_template('spiders.html') @app.route('/admin/tasks') def admin_tasks(): """后台 - 任务管理""" return render_template('tasks.html') @app.route('/admin/config') def admin_config(): """后台 - 配置管理""" return render_template('config.html') # ============ 后台管理 API(统计) ============ @app.route('/api/admin/stats') def api_admin_stats(): """后台统计信息""" data = load_products() tasks = load_tasks().get("tasks", []) return jsonify({ "total_products": len(data.get("products", [])), "last_update": data.get("last_update"), "total_tasks": len(tasks), "running_tasks": len([t for t in tasks if t.get("status") == "running"]) }) if __name__ == '__main__': print("=" * 60) print("产品参数爬取系统 v2.0.0") print("=" * 60) print(f"API地址: http://localhost:19011") print(f"后台管理: http://localhost:19011/admin") print("=" * 60) app.run(host='0.0.0.0', port=19011, debug=True)