Files
product-crawler/app.py
hubian 7b20773c29 feat: v2.0.0 - 合并后台管理到单端口
- 端口从 19011+19012 合并为 19011
- 前台API: http://localhost:19011
- 后台管理: http://localhost:19011/admin
- 新增 templates 目录,整合管理页面模板
- 更新所有路由为 /admin 路径
2026-04-13 10:59:00 +08:00

391 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
产品参数爬取系统 - 主程序
v2.0.0 - 合并后台管理到单端口
端口: 19011
前台: http://localhost:19011
后台: http://localhost:19011/admin
"""
from flask import Flask, jsonify, request, render_template
from flask_cors import CORS
import json
import os
from datetime import datetime
from pathlib import Path
import asyncio
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = Flask(__name__, template_folder='templates')
CORS(app)
# 路径配置
BASE_DIR = Path(__file__).parent
DATA_DIR = BASE_DIR / 'data'
DATA_DIR.mkdir(exist_ok=True)
PRODUCTS_FILE = DATA_DIR / 'products.json'
TASKS_FILE = DATA_DIR / 'tasks.json'
LOGS_DIR = BASE_DIR / 'logs'
LOGS_DIR.mkdir(exist_ok=True)
# ============ 数据存储 ============
def load_products():
"""加载产品数据"""
if PRODUCTS_FILE.exists():
return json.loads(PRODUCTS_FILE.read_text(encoding='utf-8'))
return {"products": [], "last_update": None}
def save_products(data):
"""保存产品数据"""
data["last_update"] = datetime.now().isoformat()
PRODUCTS_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
def load_tasks():
"""加载任务数据"""
if TASKS_FILE.exists():
return json.loads(TASKS_FILE.read_text(encoding='utf-8'))
return {"tasks": []}
def save_tasks(data):
"""保存任务数据"""
TASKS_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
# ============ 前台 API 路由 ============
@app.route('/')
def index():
"""前台首页 - API说明"""
return jsonify({
"name": "Product Crawler",
"version": "2.0.0",
"description": "产品参数爬取系统",
"endpoints": {
"products": "/api/products",
"tasks": "/api/tasks",
"spiders": "/api/spiders",
"run": "/api/run/<spider_name>",
"admin": "/admin"
}
})
@app.route('/api/products')
def api_list_products():
"""获取产品列表"""
data = load_products()
# 支持筛选
brand = request.args.get('brand')
search = request.args.get('search')
products = data.get("products", [])
if brand:
products = [p for p in products if p.get("brand") == brand]
if search:
search_lower = search.lower()
products = [p for p in products if
search_lower in p.get("name", "").lower() or
search_lower in json.dumps(p.get("params", {}), ensure_ascii=False).lower()]
return jsonify({
"products": products,
"total": len(products),
"last_update": data.get("last_update")
})
@app.route('/api/products/<product_id>')
def api_get_product(product_id):
"""获取产品详情"""
data = load_products()
for product in data.get("products", []):
if product.get("id") == product_id:
return jsonify(product)
return jsonify({"error": "Product not found"}), 404
@app.route('/api/products', methods=['POST'])
def api_add_product():
"""添加产品"""
product = request.get_json()
if not product:
return jsonify({"error": "Invalid data"}), 400
data = load_products()
# 生成ID
if not product.get("id"):
product["id"] = f"{product.get('brand', 'unknown')}-{datetime.now().strftime('%Y%m%d%H%M%S')}"
product["created_at"] = datetime.now().isoformat()
product["updated_at"] = datetime.now().isoformat()
data["products"].append(product)
save_products(data)
return jsonify({"success": True, "product": product})
@app.route('/api/products/<product_id>', methods=['PUT'])
def api_update_product(product_id):
"""更新产品"""
updates = request.get_json()
data = load_products()
for i, product in enumerate(data["products"]):
if product.get("id") == product_id:
data["products"][i].update(updates)
data["products"][i]["updated_at"] = datetime.now().isoformat()
save_products(data)
return jsonify({"success": True, "product": data["products"][i]})
return jsonify({"error": "Product not found"}), 404
@app.route('/api/products/<product_id>', methods=['DELETE'])
def api_delete_product(product_id):
"""删除产品"""
data = load_products()
original_count = len(data["products"])
data["products"] = [p for p in data["products"] if p.get("id") != product_id]
if len(data["products"]) < original_count:
save_products(data)
return jsonify({"success": True})
return jsonify({"error": "Product not found"}), 404
@app.route('/api/spiders')
def api_list_spiders():
"""获取可用爬虫列表"""
spiders = [
{
"name": "byd",
"display_name": "比亚迪汽车",
"description": "爬取比亚迪官网车型参数",
"status": "available",
"last_run": None
},
{
"name": "custom",
"display_name": "自定义爬虫",
"description": "通过配置自定义爬虫规则",
"status": "available",
"last_run": None
}
]
# 检查任务记录
tasks = load_tasks().get("tasks", [])
for spider in spiders:
for task in tasks:
if task.get("spider") == spider["name"]:
spider["last_run"] = task.get("end_time")
if task.get("status") == "running":
spider["status"] = "running"
return jsonify(spiders)
@app.route('/api/run/<spider_name>', methods=['POST'])
def api_run_spider(spider_name):
"""运行爬虫"""
data = load_products()
async def run_spider():
try:
if spider_name == "byd":
from spiders.byd import BYDSpider
spider = BYDSpider({"headless": True})
results = await spider.run()
# 保存结果
for item in results:
# 检查是否已存在
existing = False
for i, p in enumerate(data["products"]):
if p.get("name") == item.get("name"):
# 更新
data["products"][i].update(item)
data["products"][i]["updated_at"] = datetime.now().isoformat()
existing = True
break
if not existing:
item["id"] = f"byd-{datetime.now().strftime('%Y%m%d%H%M%S')}-{len(data['products'])}"
item["brand"] = "比亚迪"
item["source"] = "byd.com"
item["created_at"] = datetime.now().isoformat()
data["products"].append(item)
save_products(data)
return {"success": True, "count": len(results)}
else:
return {"success": False, "error": f"Unknown spider: {spider_name}"}
except Exception as e:
logger.error(f"爬虫运行失败: {e}")
return {"success": False, "error": str(e)}
# 记录任务
tasks = load_tasks()
task = {
"id": f"task-{datetime.now().strftime('%Y%m%d%H%M%S')}",
"spider": spider_name,
"status": "running",
"start_time": datetime.now().isoformat(),
"end_time": None,
"result": None
}
tasks["tasks"].append(task)
save_tasks(tasks)
# 运行爬虫
result = asyncio.run(run_spider())
# 更新任务状态
task["status"] = "completed" if result.get("success") else "failed"
task["end_time"] = datetime.now().isoformat()
task["result"] = result
save_tasks(tasks)
return jsonify(result)
@app.route('/api/tasks')
def api_list_tasks():
"""获取任务列表"""
tasks = load_tasks().get("tasks", [])
# 按时间倒序
tasks.sort(key=lambda x: x.get("start_time", ""), reverse=True)
# 限制返回数量
limit = request.args.get('limit', 50, type=int)
tasks = tasks[:limit]
return jsonify(tasks)
@app.route('/api/export')
def api_export():
"""导出数据"""
format = request.args.get('format', 'json')
data = load_products()
if format == 'json':
return jsonify(data)
elif format == 'csv':
import io
import csv
output = io.StringIO()
writer = csv.writer(output)
# 写入表头
if data["products"]:
headers = ["id", "name", "brand", "source"] + list(data["products"][0].get("params", {}).keys())
writer.writerow(headers)
for product in data["products"]:
row = [
product.get("id", ""),
product.get("name", ""),
product.get("brand", ""),
product.get("source", "")
]
for key in headers[4:]:
row.append(product.get("params", {}).get(key, ""))
writer.writerow(row)
output.seek(0)
return output.getvalue(), 200, {
"Content-Type": "text/csv; charset=utf-8",
"Content-Disposition": "attachment; filename=products.csv"
}
return jsonify({"error": "Unsupported format"}), 400
# ============ 后台管理页面路由 ============
@app.route('/admin')
def admin_index():
"""后台首页"""
return render_template('index.html')
@app.route('/admin/products')
def admin_products():
"""后台 - 产品管理"""
return render_template('products.html')
@app.route('/admin/spiders')
def admin_spiders():
"""后台 - 爬虫管理"""
return render_template('spiders.html')
@app.route('/admin/tasks')
def admin_tasks():
"""后台 - 任务管理"""
return render_template('tasks.html')
@app.route('/admin/config')
def admin_config():
"""后台 - 配置管理"""
return render_template('config.html')
# ============ 后台管理 API统计 ============
@app.route('/api/admin/stats')
def api_admin_stats():
"""后台统计信息"""
data = load_products()
tasks = load_tasks().get("tasks", [])
return jsonify({
"total_products": len(data.get("products", [])),
"last_update": data.get("last_update"),
"total_tasks": len(tasks),
"running_tasks": len([t for t in tasks if t.get("status") == "running"])
})
if __name__ == '__main__':
print("=" * 60)
print("产品参数爬取系统 v2.0.0")
print("=" * 60)
print(f"API地址: http://localhost:19011")
print(f"后台管理: http://localhost:19011/admin")
print("=" * 60)
app.run(host='0.0.0.0', port=19011, debug=True)