功能: - 多步骤爬取流程(入口页→列表页→详情页) - 浏览器爬虫支持(Playwright,处理JS渲染) - 比亚迪汽车爬虫示例 - 后台管理界面 - 数据存储和导出 技术栈: - Python 3 + Flask - Playwright (浏览器自动化) - BeautifulSoup (HTML解析) 端口: - API服务: 19011 - 后台管理: 19012
337 lines
9.6 KiB
Python
337 lines
9.6 KiB
Python
"""
|
|
产品参数爬取系统 - 主程序
|
|
"""
|
|
|
|
from flask import Flask, jsonify, request
|
|
from flask_cors import CORS
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import asyncio
|
|
import logging
|
|
|
|
# 配置日志
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = Flask(__name__)
|
|
CORS(app)
|
|
|
|
# 路径配置
|
|
BASE_DIR = Path(__file__).parent
|
|
DATA_DIR = BASE_DIR / 'data'
|
|
DATA_DIR.mkdir(exist_ok=True)
|
|
|
|
PRODUCTS_FILE = DATA_DIR / 'products.json'
|
|
TASKS_FILE = DATA_DIR / 'tasks.json'
|
|
LOGS_DIR = BASE_DIR / 'logs'
|
|
LOGS_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
# ============ 数据存储 ============
|
|
|
|
def load_products():
|
|
"""加载产品数据"""
|
|
if PRODUCTS_FILE.exists():
|
|
return json.loads(PRODUCTS_FILE.read_text(encoding='utf-8'))
|
|
return {"products": [], "last_update": None}
|
|
|
|
|
|
def save_products(data):
|
|
"""保存产品数据"""
|
|
data["last_update"] = datetime.now().isoformat()
|
|
PRODUCTS_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
|
|
|
|
|
|
def load_tasks():
|
|
"""加载任务数据"""
|
|
if TASKS_FILE.exists():
|
|
return json.loads(TASKS_FILE.read_text(encoding='utf-8'))
|
|
return {"tasks": []}
|
|
|
|
|
|
def save_tasks(data):
|
|
"""保存任务数据"""
|
|
TASKS_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
|
|
|
|
|
|
# ============ API 路由 ============
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""首页"""
|
|
return jsonify({
|
|
"name": "Product Crawler",
|
|
"version": "1.0.0",
|
|
"description": "产品参数爬取系统",
|
|
"endpoints": {
|
|
"products": "/api/products",
|
|
"tasks": "/api/tasks",
|
|
"spiders": "/api/spiders",
|
|
"run": "/api/run/<spider_name>"
|
|
}
|
|
})
|
|
|
|
|
|
@app.route('/api/products')
|
|
def api_list_products():
|
|
"""获取产品列表"""
|
|
data = load_products()
|
|
|
|
# 支持筛选
|
|
brand = request.args.get('brand')
|
|
search = request.args.get('search')
|
|
|
|
products = data.get("products", [])
|
|
|
|
if brand:
|
|
products = [p for p in products if p.get("brand") == brand]
|
|
|
|
if search:
|
|
search_lower = search.lower()
|
|
products = [p for p in products if
|
|
search_lower in p.get("name", "").lower() or
|
|
search_lower in json.dumps(p.get("params", {}), ensure_ascii=False).lower()]
|
|
|
|
return jsonify({
|
|
"products": products,
|
|
"total": len(products),
|
|
"last_update": data.get("last_update")
|
|
})
|
|
|
|
|
|
@app.route('/api/products/<product_id>')
|
|
def api_get_product(product_id):
|
|
"""获取产品详情"""
|
|
data = load_products()
|
|
|
|
for product in data.get("products", []):
|
|
if product.get("id") == product_id:
|
|
return jsonify(product)
|
|
|
|
return jsonify({"error": "Product not found"}), 404
|
|
|
|
|
|
@app.route('/api/products', methods=['POST'])
|
|
def api_add_product():
|
|
"""添加产品"""
|
|
product = request.get_json()
|
|
|
|
if not product:
|
|
return jsonify({"error": "Invalid data"}), 400
|
|
|
|
data = load_products()
|
|
|
|
# 生成ID
|
|
if not product.get("id"):
|
|
product["id"] = f"{product.get('brand', 'unknown')}-{datetime.now().strftime('%Y%m%d%H%M%S')}"
|
|
|
|
product["created_at"] = datetime.now().isoformat()
|
|
product["updated_at"] = datetime.now().isoformat()
|
|
|
|
data["products"].append(product)
|
|
save_products(data)
|
|
|
|
return jsonify({"success": True, "product": product})
|
|
|
|
|
|
@app.route('/api/products/<product_id>', methods=['PUT'])
|
|
def api_update_product(product_id):
|
|
"""更新产品"""
|
|
updates = request.get_json()
|
|
|
|
data = load_products()
|
|
|
|
for i, product in enumerate(data["products"]):
|
|
if product.get("id") == product_id:
|
|
data["products"][i].update(updates)
|
|
data["products"][i]["updated_at"] = datetime.now().isoformat()
|
|
save_products(data)
|
|
return jsonify({"success": True, "product": data["products"][i]})
|
|
|
|
return jsonify({"error": "Product not found"}), 404
|
|
|
|
|
|
@app.route('/api/products/<product_id>', methods=['DELETE'])
|
|
def api_delete_product(product_id):
|
|
"""删除产品"""
|
|
data = load_products()
|
|
|
|
original_count = len(data["products"])
|
|
data["products"] = [p for p in data["products"] if p.get("id") != product_id]
|
|
|
|
if len(data["products"]) < original_count:
|
|
save_products(data)
|
|
return jsonify({"success": True})
|
|
|
|
return jsonify({"error": "Product not found"}), 404
|
|
|
|
|
|
@app.route('/api/spiders')
|
|
def api_list_spiders():
|
|
"""获取可用爬虫列表"""
|
|
spiders = [
|
|
{
|
|
"name": "byd",
|
|
"display_name": "比亚迪汽车",
|
|
"description": "爬取比亚迪官网车型参数",
|
|
"status": "available",
|
|
"last_run": None
|
|
},
|
|
{
|
|
"name": "custom",
|
|
"display_name": "自定义爬虫",
|
|
"description": "通过配置自定义爬虫规则",
|
|
"status": "available",
|
|
"last_run": None
|
|
}
|
|
]
|
|
|
|
# 检查任务记录
|
|
tasks = load_tasks().get("tasks", [])
|
|
for spider in spiders:
|
|
for task in tasks:
|
|
if task.get("spider") == spider["name"]:
|
|
spider["last_run"] = task.get("end_time")
|
|
if task.get("status") == "running":
|
|
spider["status"] = "running"
|
|
|
|
return jsonify(spiders)
|
|
|
|
|
|
@app.route('/api/run/<spider_name>', methods=['POST'])
|
|
def api_run_spider(spider_name):
|
|
"""运行爬虫"""
|
|
data = load_products()
|
|
|
|
async def run_spider():
|
|
try:
|
|
if spider_name == "byd":
|
|
from spiders.byd import BYDSpider
|
|
|
|
spider = BYDSpider({"headless": True})
|
|
results = await spider.run()
|
|
|
|
# 保存结果
|
|
for item in results:
|
|
# 检查是否已存在
|
|
existing = False
|
|
for i, p in enumerate(data["products"]):
|
|
if p.get("name") == item.get("name"):
|
|
# 更新
|
|
data["products"][i].update(item)
|
|
data["products"][i]["updated_at"] = datetime.now().isoformat()
|
|
existing = True
|
|
break
|
|
|
|
if not existing:
|
|
item["id"] = f"byd-{datetime.now().strftime('%Y%m%d%H%M%S')}-{len(data['products'])}"
|
|
item["brand"] = "比亚迪"
|
|
item["source"] = "byd.com"
|
|
item["created_at"] = datetime.now().isoformat()
|
|
data["products"].append(item)
|
|
|
|
save_products(data)
|
|
return {"success": True, "count": len(results)}
|
|
|
|
else:
|
|
return {"success": False, "error": f"Unknown spider: {spider_name}"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"爬虫运行失败: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
# 记录任务
|
|
tasks = load_tasks()
|
|
task = {
|
|
"id": f"task-{datetime.now().strftime('%Y%m%d%H%M%S')}",
|
|
"spider": spider_name,
|
|
"status": "running",
|
|
"start_time": datetime.now().isoformat(),
|
|
"end_time": None,
|
|
"result": None
|
|
}
|
|
tasks["tasks"].append(task)
|
|
save_tasks(tasks)
|
|
|
|
# 运行爬虫
|
|
result = asyncio.run(run_spider())
|
|
|
|
# 更新任务状态
|
|
task["status"] = "completed" if result.get("success") else "failed"
|
|
task["end_time"] = datetime.now().isoformat()
|
|
task["result"] = result
|
|
save_tasks(tasks)
|
|
|
|
return jsonify(result)
|
|
|
|
|
|
@app.route('/api/tasks')
|
|
def api_list_tasks():
|
|
"""获取任务列表"""
|
|
tasks = load_tasks().get("tasks", [])
|
|
|
|
# 按时间倒序
|
|
tasks.sort(key=lambda x: x.get("start_time", ""), reverse=True)
|
|
|
|
# 限制返回数量
|
|
limit = request.args.get('limit', 50, type=int)
|
|
tasks = tasks[:limit]
|
|
|
|
return jsonify(tasks)
|
|
|
|
|
|
@app.route('/api/export')
|
|
def api_export():
|
|
"""导出数据"""
|
|
format = request.args.get('format', 'json')
|
|
data = load_products()
|
|
|
|
if format == 'json':
|
|
return jsonify(data)
|
|
|
|
elif format == 'csv':
|
|
import io
|
|
import csv
|
|
|
|
output = io.StringIO()
|
|
writer = csv.writer(output)
|
|
|
|
# 写入表头
|
|
if data["products"]:
|
|
headers = ["id", "name", "brand", "source"] + list(data["products"][0].get("params", {}).keys())
|
|
writer.writerow(headers)
|
|
|
|
for product in data["products"]:
|
|
row = [
|
|
product.get("id", ""),
|
|
product.get("name", ""),
|
|
product.get("brand", ""),
|
|
product.get("source", "")
|
|
]
|
|
for key in headers[4:]:
|
|
row.append(product.get("params", {}).get(key, ""))
|
|
writer.writerow(row)
|
|
|
|
output.seek(0)
|
|
return output.getvalue(), 200, {
|
|
"Content-Type": "text/csv; charset=utf-8",
|
|
"Content-Disposition": "attachment; filename=products.csv"
|
|
}
|
|
|
|
return jsonify({"error": "Unsupported format"}), 400
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print("=" * 60)
|
|
print("产品参数爬取系统")
|
|
print("=" * 60)
|
|
print(f"API地址: http://localhost:19011")
|
|
print(f"后台管理: http://localhost:19012")
|
|
print("=" * 60)
|
|
|
|
app.run(host='0.0.0.0', port=19011, debug=True) |