- 端口从 19011+19012 合并为 19011 - 前台API: http://localhost:19011 - 后台管理: http://localhost:19011/admin - 新增 templates 目录,整合管理页面模板 - 更新所有路由为 /admin 路径
391 lines
11 KiB
Python
391 lines
11 KiB
Python
"""
|
||
产品参数爬取系统 - 主程序
|
||
v2.0.0 - 合并后台管理到单端口
|
||
|
||
端口: 19011
|
||
前台: http://localhost:19011
|
||
后台: http://localhost:19011/admin
|
||
"""
|
||
|
||
from flask import Flask, jsonify, request, render_template
|
||
from flask_cors import CORS
|
||
import json
|
||
import os
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
import asyncio
|
||
import logging
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
app = Flask(__name__, template_folder='templates')
|
||
CORS(app)
|
||
|
||
# 路径配置
|
||
BASE_DIR = Path(__file__).parent
|
||
DATA_DIR = BASE_DIR / 'data'
|
||
DATA_DIR.mkdir(exist_ok=True)
|
||
|
||
PRODUCTS_FILE = DATA_DIR / 'products.json'
|
||
TASKS_FILE = DATA_DIR / 'tasks.json'
|
||
LOGS_DIR = BASE_DIR / 'logs'
|
||
LOGS_DIR.mkdir(exist_ok=True)
|
||
|
||
|
||
# ============ 数据存储 ============
|
||
|
||
def load_products():
|
||
"""加载产品数据"""
|
||
if PRODUCTS_FILE.exists():
|
||
return json.loads(PRODUCTS_FILE.read_text(encoding='utf-8'))
|
||
return {"products": [], "last_update": None}
|
||
|
||
|
||
def save_products(data):
|
||
"""保存产品数据"""
|
||
data["last_update"] = datetime.now().isoformat()
|
||
PRODUCTS_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
|
||
|
||
|
||
def load_tasks():
|
||
"""加载任务数据"""
|
||
if TASKS_FILE.exists():
|
||
return json.loads(TASKS_FILE.read_text(encoding='utf-8'))
|
||
return {"tasks": []}
|
||
|
||
|
||
def save_tasks(data):
|
||
"""保存任务数据"""
|
||
TASKS_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
|
||
|
||
|
||
# ============ 前台 API 路由 ============
|
||
|
||
@app.route('/')
|
||
def index():
|
||
"""前台首页 - API说明"""
|
||
return jsonify({
|
||
"name": "Product Crawler",
|
||
"version": "2.0.0",
|
||
"description": "产品参数爬取系统",
|
||
"endpoints": {
|
||
"products": "/api/products",
|
||
"tasks": "/api/tasks",
|
||
"spiders": "/api/spiders",
|
||
"run": "/api/run/<spider_name>",
|
||
"admin": "/admin"
|
||
}
|
||
})
|
||
|
||
|
||
@app.route('/api/products')
|
||
def api_list_products():
|
||
"""获取产品列表"""
|
||
data = load_products()
|
||
|
||
# 支持筛选
|
||
brand = request.args.get('brand')
|
||
search = request.args.get('search')
|
||
|
||
products = data.get("products", [])
|
||
|
||
if brand:
|
||
products = [p for p in products if p.get("brand") == brand]
|
||
|
||
if search:
|
||
search_lower = search.lower()
|
||
products = [p for p in products if
|
||
search_lower in p.get("name", "").lower() or
|
||
search_lower in json.dumps(p.get("params", {}), ensure_ascii=False).lower()]
|
||
|
||
return jsonify({
|
||
"products": products,
|
||
"total": len(products),
|
||
"last_update": data.get("last_update")
|
||
})
|
||
|
||
|
||
@app.route('/api/products/<product_id>')
|
||
def api_get_product(product_id):
|
||
"""获取产品详情"""
|
||
data = load_products()
|
||
|
||
for product in data.get("products", []):
|
||
if product.get("id") == product_id:
|
||
return jsonify(product)
|
||
|
||
return jsonify({"error": "Product not found"}), 404
|
||
|
||
|
||
@app.route('/api/products', methods=['POST'])
|
||
def api_add_product():
|
||
"""添加产品"""
|
||
product = request.get_json()
|
||
|
||
if not product:
|
||
return jsonify({"error": "Invalid data"}), 400
|
||
|
||
data = load_products()
|
||
|
||
# 生成ID
|
||
if not product.get("id"):
|
||
product["id"] = f"{product.get('brand', 'unknown')}-{datetime.now().strftime('%Y%m%d%H%M%S')}"
|
||
|
||
product["created_at"] = datetime.now().isoformat()
|
||
product["updated_at"] = datetime.now().isoformat()
|
||
|
||
data["products"].append(product)
|
||
save_products(data)
|
||
|
||
return jsonify({"success": True, "product": product})
|
||
|
||
|
||
@app.route('/api/products/<product_id>', methods=['PUT'])
|
||
def api_update_product(product_id):
|
||
"""更新产品"""
|
||
updates = request.get_json()
|
||
|
||
data = load_products()
|
||
|
||
for i, product in enumerate(data["products"]):
|
||
if product.get("id") == product_id:
|
||
data["products"][i].update(updates)
|
||
data["products"][i]["updated_at"] = datetime.now().isoformat()
|
||
save_products(data)
|
||
return jsonify({"success": True, "product": data["products"][i]})
|
||
|
||
return jsonify({"error": "Product not found"}), 404
|
||
|
||
|
||
@app.route('/api/products/<product_id>', methods=['DELETE'])
|
||
def api_delete_product(product_id):
|
||
"""删除产品"""
|
||
data = load_products()
|
||
|
||
original_count = len(data["products"])
|
||
data["products"] = [p for p in data["products"] if p.get("id") != product_id]
|
||
|
||
if len(data["products"]) < original_count:
|
||
save_products(data)
|
||
return jsonify({"success": True})
|
||
|
||
return jsonify({"error": "Product not found"}), 404
|
||
|
||
|
||
@app.route('/api/spiders')
|
||
def api_list_spiders():
|
||
"""获取可用爬虫列表"""
|
||
spiders = [
|
||
{
|
||
"name": "byd",
|
||
"display_name": "比亚迪汽车",
|
||
"description": "爬取比亚迪官网车型参数",
|
||
"status": "available",
|
||
"last_run": None
|
||
},
|
||
{
|
||
"name": "custom",
|
||
"display_name": "自定义爬虫",
|
||
"description": "通过配置自定义爬虫规则",
|
||
"status": "available",
|
||
"last_run": None
|
||
}
|
||
]
|
||
|
||
# 检查任务记录
|
||
tasks = load_tasks().get("tasks", [])
|
||
for spider in spiders:
|
||
for task in tasks:
|
||
if task.get("spider") == spider["name"]:
|
||
spider["last_run"] = task.get("end_time")
|
||
if task.get("status") == "running":
|
||
spider["status"] = "running"
|
||
|
||
return jsonify(spiders)
|
||
|
||
|
||
@app.route('/api/run/<spider_name>', methods=['POST'])
|
||
def api_run_spider(spider_name):
|
||
"""运行爬虫"""
|
||
data = load_products()
|
||
|
||
async def run_spider():
|
||
try:
|
||
if spider_name == "byd":
|
||
from spiders.byd import BYDSpider
|
||
|
||
spider = BYDSpider({"headless": True})
|
||
results = await spider.run()
|
||
|
||
# 保存结果
|
||
for item in results:
|
||
# 检查是否已存在
|
||
existing = False
|
||
for i, p in enumerate(data["products"]):
|
||
if p.get("name") == item.get("name"):
|
||
# 更新
|
||
data["products"][i].update(item)
|
||
data["products"][i]["updated_at"] = datetime.now().isoformat()
|
||
existing = True
|
||
break
|
||
|
||
if not existing:
|
||
item["id"] = f"byd-{datetime.now().strftime('%Y%m%d%H%M%S')}-{len(data['products'])}"
|
||
item["brand"] = "比亚迪"
|
||
item["source"] = "byd.com"
|
||
item["created_at"] = datetime.now().isoformat()
|
||
data["products"].append(item)
|
||
|
||
save_products(data)
|
||
return {"success": True, "count": len(results)}
|
||
|
||
else:
|
||
return {"success": False, "error": f"Unknown spider: {spider_name}"}
|
||
|
||
except Exception as e:
|
||
logger.error(f"爬虫运行失败: {e}")
|
||
return {"success": False, "error": str(e)}
|
||
|
||
# 记录任务
|
||
tasks = load_tasks()
|
||
task = {
|
||
"id": f"task-{datetime.now().strftime('%Y%m%d%H%M%S')}",
|
||
"spider": spider_name,
|
||
"status": "running",
|
||
"start_time": datetime.now().isoformat(),
|
||
"end_time": None,
|
||
"result": None
|
||
}
|
||
tasks["tasks"].append(task)
|
||
save_tasks(tasks)
|
||
|
||
# 运行爬虫
|
||
result = asyncio.run(run_spider())
|
||
|
||
# 更新任务状态
|
||
task["status"] = "completed" if result.get("success") else "failed"
|
||
task["end_time"] = datetime.now().isoformat()
|
||
task["result"] = result
|
||
save_tasks(tasks)
|
||
|
||
return jsonify(result)
|
||
|
||
|
||
@app.route('/api/tasks')
|
||
def api_list_tasks():
|
||
"""获取任务列表"""
|
||
tasks = load_tasks().get("tasks", [])
|
||
|
||
# 按时间倒序
|
||
tasks.sort(key=lambda x: x.get("start_time", ""), reverse=True)
|
||
|
||
# 限制返回数量
|
||
limit = request.args.get('limit', 50, type=int)
|
||
tasks = tasks[:limit]
|
||
|
||
return jsonify(tasks)
|
||
|
||
|
||
@app.route('/api/export')
|
||
def api_export():
|
||
"""导出数据"""
|
||
format = request.args.get('format', 'json')
|
||
data = load_products()
|
||
|
||
if format == 'json':
|
||
return jsonify(data)
|
||
|
||
elif format == 'csv':
|
||
import io
|
||
import csv
|
||
|
||
output = io.StringIO()
|
||
writer = csv.writer(output)
|
||
|
||
# 写入表头
|
||
if data["products"]:
|
||
headers = ["id", "name", "brand", "source"] + list(data["products"][0].get("params", {}).keys())
|
||
writer.writerow(headers)
|
||
|
||
for product in data["products"]:
|
||
row = [
|
||
product.get("id", ""),
|
||
product.get("name", ""),
|
||
product.get("brand", ""),
|
||
product.get("source", "")
|
||
]
|
||
for key in headers[4:]:
|
||
row.append(product.get("params", {}).get(key, ""))
|
||
writer.writerow(row)
|
||
|
||
output.seek(0)
|
||
return output.getvalue(), 200, {
|
||
"Content-Type": "text/csv; charset=utf-8",
|
||
"Content-Disposition": "attachment; filename=products.csv"
|
||
}
|
||
|
||
return jsonify({"error": "Unsupported format"}), 400
|
||
|
||
|
||
# ============ 后台管理页面路由 ============
|
||
|
||
@app.route('/admin')
|
||
def admin_index():
|
||
"""后台首页"""
|
||
return render_template('index.html')
|
||
|
||
|
||
@app.route('/admin/products')
|
||
def admin_products():
|
||
"""后台 - 产品管理"""
|
||
return render_template('products.html')
|
||
|
||
|
||
@app.route('/admin/spiders')
|
||
def admin_spiders():
|
||
"""后台 - 爬虫管理"""
|
||
return render_template('spiders.html')
|
||
|
||
|
||
@app.route('/admin/tasks')
|
||
def admin_tasks():
|
||
"""后台 - 任务管理"""
|
||
return render_template('tasks.html')
|
||
|
||
|
||
@app.route('/admin/config')
|
||
def admin_config():
|
||
"""后台 - 配置管理"""
|
||
return render_template('config.html')
|
||
|
||
|
||
# ============ 后台管理 API(统计) ============
|
||
|
||
@app.route('/api/admin/stats')
|
||
def api_admin_stats():
|
||
"""后台统计信息"""
|
||
data = load_products()
|
||
tasks = load_tasks().get("tasks", [])
|
||
|
||
return jsonify({
|
||
"total_products": len(data.get("products", [])),
|
||
"last_update": data.get("last_update"),
|
||
"total_tasks": len(tasks),
|
||
"running_tasks": len([t for t in tasks if t.get("status") == "running"])
|
||
})
|
||
|
||
|
||
if __name__ == '__main__':
|
||
print("=" * 60)
|
||
print("产品参数爬取系统 v2.0.0")
|
||
print("=" * 60)
|
||
print(f"API地址: http://localhost:19011")
|
||
print(f"后台管理: http://localhost:19011/admin")
|
||
print("=" * 60)
|
||
|
||
app.run(host='0.0.0.0', port=19011, debug=True) |