feat: 产品参数爬取系统 v1.0.0

功能:
- 多步骤爬取流程(入口页→列表页→详情页)
- 浏览器爬虫支持(Playwright,处理JS渲染)
- 比亚迪汽车爬虫示例
- 后台管理界面
- 数据存储和导出

技术栈:
- Python 3 + Flask
- Playwright (浏览器自动化)
- BeautifulSoup (HTML解析)

端口:
- API服务: 19011
- 后台管理: 19012
This commit is contained in:
2026-04-10 00:45:51 +08:00
commit 0ee0abbbd1
16 changed files with 1847 additions and 0 deletions

337
app.py Normal file
View File

@@ -0,0 +1,337 @@
"""
产品参数爬取系统 - 主程序
"""
from flask import Flask, jsonify, request
from flask_cors import CORS
import json
import os
from datetime import datetime
from pathlib import Path
import asyncio
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
CORS(app)
# 路径配置
BASE_DIR = Path(__file__).parent
DATA_DIR = BASE_DIR / 'data'
DATA_DIR.mkdir(exist_ok=True)
PRODUCTS_FILE = DATA_DIR / 'products.json'
TASKS_FILE = DATA_DIR / 'tasks.json'
LOGS_DIR = BASE_DIR / 'logs'
LOGS_DIR.mkdir(exist_ok=True)
# ============ 数据存储 ============
def load_products():
"""加载产品数据"""
if PRODUCTS_FILE.exists():
return json.loads(PRODUCTS_FILE.read_text(encoding='utf-8'))
return {"products": [], "last_update": None}
def save_products(data):
"""保存产品数据"""
data["last_update"] = datetime.now().isoformat()
PRODUCTS_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
def load_tasks():
"""加载任务数据"""
if TASKS_FILE.exists():
return json.loads(TASKS_FILE.read_text(encoding='utf-8'))
return {"tasks": []}
def save_tasks(data):
"""保存任务数据"""
TASKS_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
# ============ API 路由 ============
@app.route('/')
def index():
"""首页"""
return jsonify({
"name": "Product Crawler",
"version": "1.0.0",
"description": "产品参数爬取系统",
"endpoints": {
"products": "/api/products",
"tasks": "/api/tasks",
"spiders": "/api/spiders",
"run": "/api/run/<spider_name>"
}
})
@app.route('/api/products')
def api_list_products():
"""获取产品列表"""
data = load_products()
# 支持筛选
brand = request.args.get('brand')
search = request.args.get('search')
products = data.get("products", [])
if brand:
products = [p for p in products if p.get("brand") == brand]
if search:
search_lower = search.lower()
products = [p for p in products if
search_lower in p.get("name", "").lower() or
search_lower in json.dumps(p.get("params", {}), ensure_ascii=False).lower()]
return jsonify({
"products": products,
"total": len(products),
"last_update": data.get("last_update")
})
@app.route('/api/products/<product_id>')
def api_get_product(product_id):
"""获取产品详情"""
data = load_products()
for product in data.get("products", []):
if product.get("id") == product_id:
return jsonify(product)
return jsonify({"error": "Product not found"}), 404
@app.route('/api/products', methods=['POST'])
def api_add_product():
"""添加产品"""
product = request.get_json()
if not product:
return jsonify({"error": "Invalid data"}), 400
data = load_products()
# 生成ID
if not product.get("id"):
product["id"] = f"{product.get('brand', 'unknown')}-{datetime.now().strftime('%Y%m%d%H%M%S')}"
product["created_at"] = datetime.now().isoformat()
product["updated_at"] = datetime.now().isoformat()
data["products"].append(product)
save_products(data)
return jsonify({"success": True, "product": product})
@app.route('/api/products/<product_id>', methods=['PUT'])
def api_update_product(product_id):
"""更新产品"""
updates = request.get_json()
data = load_products()
for i, product in enumerate(data["products"]):
if product.get("id") == product_id:
data["products"][i].update(updates)
data["products"][i]["updated_at"] = datetime.now().isoformat()
save_products(data)
return jsonify({"success": True, "product": data["products"][i]})
return jsonify({"error": "Product not found"}), 404
@app.route('/api/products/<product_id>', methods=['DELETE'])
def api_delete_product(product_id):
"""删除产品"""
data = load_products()
original_count = len(data["products"])
data["products"] = [p for p in data["products"] if p.get("id") != product_id]
if len(data["products"]) < original_count:
save_products(data)
return jsonify({"success": True})
return jsonify({"error": "Product not found"}), 404
@app.route('/api/spiders')
def api_list_spiders():
"""获取可用爬虫列表"""
spiders = [
{
"name": "byd",
"display_name": "比亚迪汽车",
"description": "爬取比亚迪官网车型参数",
"status": "available",
"last_run": None
},
{
"name": "custom",
"display_name": "自定义爬虫",
"description": "通过配置自定义爬虫规则",
"status": "available",
"last_run": None
}
]
# 检查任务记录
tasks = load_tasks().get("tasks", [])
for spider in spiders:
for task in tasks:
if task.get("spider") == spider["name"]:
spider["last_run"] = task.get("end_time")
if task.get("status") == "running":
spider["status"] = "running"
return jsonify(spiders)
@app.route('/api/run/<spider_name>', methods=['POST'])
def api_run_spider(spider_name):
"""运行爬虫"""
data = load_products()
async def run_spider():
try:
if spider_name == "byd":
from spiders.byd import BYDSpider
spider = BYDSpider({"headless": True})
results = await spider.run()
# 保存结果
for item in results:
# 检查是否已存在
existing = False
for i, p in enumerate(data["products"]):
if p.get("name") == item.get("name"):
# 更新
data["products"][i].update(item)
data["products"][i]["updated_at"] = datetime.now().isoformat()
existing = True
break
if not existing:
item["id"] = f"byd-{datetime.now().strftime('%Y%m%d%H%M%S')}-{len(data['products'])}"
item["brand"] = "比亚迪"
item["source"] = "byd.com"
item["created_at"] = datetime.now().isoformat()
data["products"].append(item)
save_products(data)
return {"success": True, "count": len(results)}
else:
return {"success": False, "error": f"Unknown spider: {spider_name}"}
except Exception as e:
logger.error(f"爬虫运行失败: {e}")
return {"success": False, "error": str(e)}
# 记录任务
tasks = load_tasks()
task = {
"id": f"task-{datetime.now().strftime('%Y%m%d%H%M%S')}",
"spider": spider_name,
"status": "running",
"start_time": datetime.now().isoformat(),
"end_time": None,
"result": None
}
tasks["tasks"].append(task)
save_tasks(tasks)
# 运行爬虫
result = asyncio.run(run_spider())
# 更新任务状态
task["status"] = "completed" if result.get("success") else "failed"
task["end_time"] = datetime.now().isoformat()
task["result"] = result
save_tasks(tasks)
return jsonify(result)
@app.route('/api/tasks')
def api_list_tasks():
"""获取任务列表"""
tasks = load_tasks().get("tasks", [])
# 按时间倒序
tasks.sort(key=lambda x: x.get("start_time", ""), reverse=True)
# 限制返回数量
limit = request.args.get('limit', 50, type=int)
tasks = tasks[:limit]
return jsonify(tasks)
@app.route('/api/export')
def api_export():
"""导出数据"""
format = request.args.get('format', 'json')
data = load_products()
if format == 'json':
return jsonify(data)
elif format == 'csv':
import io
import csv
output = io.StringIO()
writer = csv.writer(output)
# 写入表头
if data["products"]:
headers = ["id", "name", "brand", "source"] + list(data["products"][0].get("params", {}).keys())
writer.writerow(headers)
for product in data["products"]:
row = [
product.get("id", ""),
product.get("name", ""),
product.get("brand", ""),
product.get("source", "")
]
for key in headers[4:]:
row.append(product.get("params", {}).get(key, ""))
writer.writerow(row)
output.seek(0)
return output.getvalue(), 200, {
"Content-Type": "text/csv; charset=utf-8",
"Content-Disposition": "attachment; filename=products.csv"
}
return jsonify({"error": "Unsupported format"}), 400
if __name__ == '__main__':
print("=" * 60)
print("产品参数爬取系统")
print("=" * 60)
print(f"API地址: http://localhost:19011")
print(f"后台管理: http://localhost:19012")
print("=" * 60)
app.run(host='0.0.0.0', port=19011, debug=True)