功能: - 多步骤爬取流程(入口页→列表页→详情页) - 浏览器爬虫支持(Playwright,处理JS渲染) - 比亚迪汽车爬虫示例 - 后台管理界面 - 数据存储和导出 技术栈: - Python 3 + Flask - Playwright (浏览器自动化) - BeautifulSoup (HTML解析) 端口: - API服务: 19011 - 后台管理: 19012
185 lines
5.1 KiB
Python
185 lines
5.1 KiB
Python
"""
|
|
产品参数爬取系统 - 后台管理
|
|
"""
|
|
|
|
from flask import Flask, render_template, jsonify, request
|
|
from flask_cors import CORS
|
|
import json
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import asyncio
|
|
|
|
app = Flask(__name__)
|
|
CORS(app)
|
|
|
|
BASE_DIR = Path(__file__).parent.parent
|
|
DATA_DIR = BASE_DIR / 'data'
|
|
PRODUCTS_FILE = DATA_DIR / 'products.json'
|
|
TASKS_FILE = DATA_DIR / 'tasks.json'
|
|
|
|
|
|
def load_products():
|
|
if PRODUCTS_FILE.exists():
|
|
return json.loads(PRODUCTS_FILE.read_text(encoding='utf-8'))
|
|
return {"products": [], "last_update": None}
|
|
|
|
|
|
def load_tasks():
|
|
if TASKS_FILE.exists():
|
|
return json.loads(TASKS_FILE.read_text(encoding='utf-8'))
|
|
return {"tasks": []}
|
|
|
|
|
|
# ============ 页面路由 ============
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
|
|
@app.route('/products')
|
|
def products_page():
|
|
return render_template('products.html')
|
|
|
|
|
|
@app.route('/spiders')
|
|
def spiders_page():
|
|
return render_template('spiders.html')
|
|
|
|
|
|
@app.route('/tasks')
|
|
def tasks_page():
|
|
return render_template('tasks.html')
|
|
|
|
|
|
@app.route('/config')
|
|
def config_page():
|
|
return render_template('config.html')
|
|
|
|
|
|
# ============ API代理 ============
|
|
|
|
@app.route('/api/stats')
|
|
def api_stats():
|
|
"""获取统计信息"""
|
|
data = load_products()
|
|
tasks = load_tasks().get("tasks", [])
|
|
|
|
return jsonify({
|
|
"total_products": len(data.get("products", [])),
|
|
"last_update": data.get("last_update"),
|
|
"total_tasks": len(tasks),
|
|
"running_tasks": len([t for t in tasks if t.get("status") == "running"])
|
|
})
|
|
|
|
|
|
@app.route('/api/products')
|
|
def api_products():
|
|
"""获取产品列表"""
|
|
data = load_products()
|
|
return jsonify(data)
|
|
|
|
|
|
@app.route('/api/spiders')
|
|
def api_spiders():
|
|
"""获取爬虫列表"""
|
|
tasks = load_tasks().get("tasks", [])
|
|
|
|
spiders = [
|
|
{
|
|
"name": "byd",
|
|
"display_name": "比亚迪汽车",
|
|
"description": "爬取比亚迪官网车型参数",
|
|
"url": "https://www.byd.com/cn/",
|
|
"status": "available"
|
|
}
|
|
]
|
|
|
|
# 更新最后运行时间
|
|
for spider in spiders:
|
|
for task in reversed(tasks):
|
|
if task.get("spider") == spider["name"]:
|
|
spider["last_run"] = task.get("end_time")
|
|
spider["last_status"] = task.get("status")
|
|
break
|
|
|
|
return jsonify(spiders)
|
|
|
|
|
|
@app.route('/api/tasks')
|
|
def api_tasks():
|
|
"""获取任务列表"""
|
|
tasks = load_tasks().get("tasks", [])
|
|
tasks.sort(key=lambda x: x.get("start_time", ""), reverse=True)
|
|
return jsonify(tasks[:50])
|
|
|
|
|
|
@app.route('/api/run/<spider_name>', methods=['POST'])
|
|
def api_run_spider(spider_name):
|
|
"""运行爬虫"""
|
|
import sys
|
|
sys.path.insert(0, str(BASE_DIR))
|
|
|
|
data = load_products()
|
|
|
|
async def run_spider():
|
|
try:
|
|
if spider_name == "byd":
|
|
from spiders.byd import BYDSpider
|
|
|
|
spider = BYDSpider({"headless": True})
|
|
results = await spider.run()
|
|
|
|
for item in results:
|
|
existing = False
|
|
for i, p in enumerate(data["products"]):
|
|
if p.get("name") == item.get("name"):
|
|
data["products"][i].update(item)
|
|
data["products"][i]["updated_at"] = datetime.now().isoformat()
|
|
existing = True
|
|
break
|
|
|
|
if not existing:
|
|
item["id"] = f"byd-{datetime.now().strftime('%Y%m%d%H%M%S')}-{len(data['products'])}"
|
|
item["brand"] = "比亚迪"
|
|
item["source"] = "byd.com"
|
|
item["created_at"] = datetime.now().isoformat()
|
|
data["products"].append(item)
|
|
|
|
PRODUCTS_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
|
|
return {"success": True, "count": len(results)}
|
|
|
|
return {"success": False, "error": f"Unknown spider: {spider_name}"}
|
|
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
# 记录任务
|
|
tasks = load_tasks()
|
|
task = {
|
|
"id": f"task-{datetime.now().strftime('%Y%m%d%H%M%S')}",
|
|
"spider": spider_name,
|
|
"status": "running",
|
|
"start_time": datetime.now().isoformat()
|
|
}
|
|
tasks["tasks"].append(task)
|
|
TASKS_FILE.write_text(json.dumps(tasks, ensure_ascii=False, indent=2), encoding='utf-8')
|
|
|
|
result = asyncio.run(run_spider())
|
|
|
|
task["status"] = "completed" if result.get("success") else "failed"
|
|
task["end_time"] = datetime.now().isoformat()
|
|
task["result"] = result
|
|
TASKS_FILE.write_text(json.dumps(tasks, ensure_ascii=False, indent=2), encoding='utf-8')
|
|
|
|
return jsonify(result)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print("=" * 50)
|
|
print("产品参数爬取系统 - 后台管理")
|
|
print("=" * 50)
|
|
print(f"访问地址: http://localhost:19012")
|
|
print("=" * 50)
|
|
|
|
app.run(host='0.0.0.0', port=19012, debug=True) |