commit 0ee0abbbd1814d65a65099e7aaf3f55b64a68dba Author: hubian <908234780@qq.com> Date: Fri Apr 10 00:45:51 2026 +0800 feat: 产品参数爬取系统 v1.0.0 功能: - 多步骤爬取流程(入口页→列表页→详情页) - 浏览器爬虫支持(Playwright,处理JS渲染) - 比亚迪汽车爬虫示例 - 后台管理界面 - 数据存储和导出 技术栈: - Python 3 + Flask - Playwright (浏览器自动化) - BeautifulSoup (HTML解析) 端口: - API服务: 19011 - 后台管理: 19012 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c3110a5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,14 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] + +# Data files +data/*.json + +# Logs +logs/ +*.log + +# Environment +.env +venv/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..f6a47d9 --- /dev/null +++ b/README.md @@ -0,0 +1,65 @@ +# Product Crawler - 产品参数爬取系统 + +> 自动从官方网站爬取产品参数信息,支持多步骤爬取流程配置 + +## 项目结构 + +``` +product-crawler/ +├── app.py # 主程序入口 +├── crawler/ +│ ├── __init__.py +│ ├── base.py # 爬虫基类 +│ ├── browser.py # 浏览器爬虫(处理JS渲染) +│ ├── static.py # 静态页面爬虫 +│ └── pipelines.py # 数据处理管道 +├── spiders/ # 具体网站爬虫配置 +│ ├── __init__.py +│ ├── byd.py # 比亚迪爬虫 +│ └── templates.py # 爬虫模板 +├── admin/ # 后台管理 +│ ├── app.py +│ └── templates/ +├── data/ # 数据存储 +│ └── products.json +├── config/ +│ └── settings.py # 配置文件 +└── requirements.txt +``` + +## 功能特点 + +1. **多步骤爬取流程** + - 入口页面 → 产品列表 → 产品详情 + - 可配置每一步的解析规则 + +2. **多种爬取方式** + - 静态爬虫:requests + BeautifulSoup + - 浏览器爬虫:Playwright/Selenium(处理JS渲染) + +3. **后台管理** + - 爬虫任务管理 + - 爬取结果查看 + - 定时任务配置 + +4. **数据存储** + - JSON文件存储 + - 支持导出CSV/Excel + +## 快速开始 + +```bash +# 安装依赖 +pip install -r requirements.txt + +# 运行主服务 +python app.py + +# 后台管理 +python admin/app.py +``` + +## 访问地址 + +- API服务: http://localhost:19011 +- 后台管理: http://localhost:19012 \ No newline at end of file diff --git a/admin/app.py b/admin/app.py new file mode 100644 index 0000000..f7766c8 --- /dev/null +++ b/admin/app.py @@ -0,0 +1,185 @@ +""" +产品参数爬取系统 - 后台管理 +""" + +from flask import Flask, render_template, jsonify, request +from flask_cors import CORS +import json +from datetime import datetime +from pathlib import Path +import asyncio + +app = Flask(__name__) +CORS(app) + +BASE_DIR = Path(__file__).parent.parent +DATA_DIR = BASE_DIR / 'data' +PRODUCTS_FILE = DATA_DIR / 'products.json' +TASKS_FILE = DATA_DIR / 'tasks.json' + + +def load_products(): + if PRODUCTS_FILE.exists(): + return json.loads(PRODUCTS_FILE.read_text(encoding='utf-8')) + return {"products": [], "last_update": None} + + +def load_tasks(): + if TASKS_FILE.exists(): + return json.loads(TASKS_FILE.read_text(encoding='utf-8')) + return {"tasks": []} + + +# ============ 页面路由 ============ + +@app.route('/') +def index(): + return render_template('index.html') + + +@app.route('/products') +def products_page(): + return render_template('products.html') + + +@app.route('/spiders') +def spiders_page(): + return render_template('spiders.html') + + +@app.route('/tasks') +def tasks_page(): + return render_template('tasks.html') + + +@app.route('/config') +def config_page(): + return render_template('config.html') + + +# ============ API代理 ============ + +@app.route('/api/stats') +def api_stats(): + """获取统计信息""" + data = load_products() + tasks = load_tasks().get("tasks", []) + + return jsonify({ + "total_products": len(data.get("products", [])), + "last_update": data.get("last_update"), + "total_tasks": len(tasks), + "running_tasks": len([t for t in tasks if t.get("status") == "running"]) + }) + + +@app.route('/api/products') +def api_products(): + """获取产品列表""" + data = load_products() + return jsonify(data) + + +@app.route('/api/spiders') +def api_spiders(): + """获取爬虫列表""" + tasks = load_tasks().get("tasks", []) + + spiders = [ + { + "name": "byd", + "display_name": "比亚迪汽车", + "description": "爬取比亚迪官网车型参数", + "url": "https://www.byd.com/cn/", + "status": "available" + } + ] + + # 更新最后运行时间 + for spider in spiders: + for task in reversed(tasks): + if task.get("spider") == spider["name"]: + spider["last_run"] = task.get("end_time") + spider["last_status"] = task.get("status") + break + + return jsonify(spiders) + + +@app.route('/api/tasks') +def api_tasks(): + """获取任务列表""" + tasks = load_tasks().get("tasks", []) + tasks.sort(key=lambda x: x.get("start_time", ""), reverse=True) + return jsonify(tasks[:50]) + + +@app.route('/api/run/', methods=['POST']) +def api_run_spider(spider_name): + """运行爬虫""" + import sys + sys.path.insert(0, str(BASE_DIR)) + + data = load_products() + + async def run_spider(): + try: + if spider_name == "byd": + from spiders.byd import BYDSpider + + spider = BYDSpider({"headless": True}) + results = await spider.run() + + for item in results: + existing = False + for i, p in enumerate(data["products"]): + if p.get("name") == item.get("name"): + data["products"][i].update(item) + data["products"][i]["updated_at"] = datetime.now().isoformat() + existing = True + break + + if not existing: + item["id"] = f"byd-{datetime.now().strftime('%Y%m%d%H%M%S')}-{len(data['products'])}" + item["brand"] = "比亚迪" + item["source"] = "byd.com" + item["created_at"] = datetime.now().isoformat() + data["products"].append(item) + + PRODUCTS_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8') + return {"success": True, "count": len(results)} + + return {"success": False, "error": f"Unknown spider: {spider_name}"} + + except Exception as e: + return {"success": False, "error": str(e)} + + # 记录任务 + tasks = load_tasks() + task = { + "id": f"task-{datetime.now().strftime('%Y%m%d%H%M%S')}", + "spider": spider_name, + "status": "running", + "start_time": datetime.now().isoformat() + } + tasks["tasks"].append(task) + TASKS_FILE.write_text(json.dumps(tasks, ensure_ascii=False, indent=2), encoding='utf-8') + + result = asyncio.run(run_spider()) + + task["status"] = "completed" if result.get("success") else "failed" + task["end_time"] = datetime.now().isoformat() + task["result"] = result + TASKS_FILE.write_text(json.dumps(tasks, ensure_ascii=False, indent=2), encoding='utf-8') + + return jsonify(result) + + +if __name__ == '__main__': + print("=" * 50) + print("产品参数爬取系统 - 后台管理") + print("=" * 50) + print(f"访问地址: http://localhost:19012") + print("=" * 50) + + app.run(host='0.0.0.0', port=19012, debug=True) \ No newline at end of file diff --git a/admin/templates/config.html b/admin/templates/config.html new file mode 100644 index 0000000..ec64680 --- /dev/null +++ b/admin/templates/config.html @@ -0,0 +1,102 @@ + + + + + + 系统配置 - 产品爬取系统 + + + + +
+ + + + +
+
+

系统配置

+

爬虫系统配置信息

+
+ +
+ +
+

服务配置

+
+
+
API端口
+
19011
+
+
+
后台端口
+
19012
+
+
+
+ + +
+

爬虫配置

+
+
+
请求超时
+
30秒
+
+
+
重试次数
+
3次
+
+
+
请求间隔
+
1秒
+
+
+
并发限制
+
3
+
+
+
+ + +
+

+ 使用说明 +

+
+

1. 进入「爬虫管理」页面,选择要运行的爬虫

+

2. 点击「运行」按钮开始爬取数据

+

3. 爬取完成后,在「产品数据」页面查看结果

+

4. 支持导出JSON/CSV格式的数据

+
+
+
+
+
+ + \ No newline at end of file diff --git a/admin/templates/index.html b/admin/templates/index.html new file mode 100644 index 0000000..4358589 --- /dev/null +++ b/admin/templates/index.html @@ -0,0 +1,199 @@ + + + + + + 产品爬取系统 - 后台管理 + + + + + +
+ + + + +
+ +
+

仪表盘

+

产品参数爬取系统概览

+
+ + +
+
+
+
+

产品总数

+

-

+
+
+ +
+
+
+ +
+
+
+

爬虫数量

+

1

+
+
+ +
+
+
+ +
+
+
+

任务总数

+

-

+
+
+ +
+
+
+ +
+
+
+

最后更新

+

-

+
+
+ +
+
+
+
+ + + + + +
+
+

最近任务

+ 查看全部 +
+
+

加载中...

+
+
+
+
+ + + + \ No newline at end of file diff --git a/admin/templates/products.html b/admin/templates/products.html new file mode 100644 index 0000000..ee425c1 --- /dev/null +++ b/admin/templates/products.html @@ -0,0 +1,142 @@ + + + + + + 产品数据 - 产品爬取系统 + + + + + +
+ + + + +
+
+
+

产品数据

+

查看爬取的产品参数数据

+
+
+ + +
+
+ + +
+
+

加载中...

+
+
+
+
+ + + + \ No newline at end of file diff --git a/admin/templates/spiders.html b/admin/templates/spiders.html new file mode 100644 index 0000000..97a1799 --- /dev/null +++ b/admin/templates/spiders.html @@ -0,0 +1,136 @@ + + + + + + 爬虫管理 - 产品爬取系统 + + + + + +
+ + + + +
+
+

爬虫管理

+

配置和运行爬虫任务

+
+ + +
+

加载中...

+
+
+
+ + + + \ No newline at end of file diff --git a/admin/templates/tasks.html b/admin/templates/tasks.html new file mode 100644 index 0000000..4f45491 --- /dev/null +++ b/admin/templates/tasks.html @@ -0,0 +1,102 @@ + + + + + + 任务记录 - 产品爬取系统 + + + + +
+ + + + +
+
+

任务记录

+

查看爬虫任务执行历史

+
+ + +
+
+

加载中...

+
+
+
+
+ + + + \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..c0e42c8 --- /dev/null +++ b/app.py @@ -0,0 +1,337 @@ +""" +产品参数爬取系统 - 主程序 +""" + +from flask import Flask, jsonify, request +from flask_cors import CORS +import json +import os +from datetime import datetime +from pathlib import Path +import asyncio +import logging + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +app = Flask(__name__) +CORS(app) + +# 路径配置 +BASE_DIR = Path(__file__).parent +DATA_DIR = BASE_DIR / 'data' +DATA_DIR.mkdir(exist_ok=True) + +PRODUCTS_FILE = DATA_DIR / 'products.json' +TASKS_FILE = DATA_DIR / 'tasks.json' +LOGS_DIR = BASE_DIR / 'logs' +LOGS_DIR.mkdir(exist_ok=True) + + +# ============ 数据存储 ============ + +def load_products(): + """加载产品数据""" + if PRODUCTS_FILE.exists(): + return json.loads(PRODUCTS_FILE.read_text(encoding='utf-8')) + return {"products": [], "last_update": None} + + +def save_products(data): + """保存产品数据""" + data["last_update"] = datetime.now().isoformat() + PRODUCTS_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8') + + +def load_tasks(): + """加载任务数据""" + if TASKS_FILE.exists(): + return json.loads(TASKS_FILE.read_text(encoding='utf-8')) + return {"tasks": []} + + +def save_tasks(data): + """保存任务数据""" + TASKS_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8') + + +# ============ API 路由 ============ + +@app.route('/') +def index(): + """首页""" + return jsonify({ + "name": "Product Crawler", + "version": "1.0.0", + "description": "产品参数爬取系统", + "endpoints": { + "products": "/api/products", + "tasks": "/api/tasks", + "spiders": "/api/spiders", + "run": "/api/run/" + } + }) + + +@app.route('/api/products') +def api_list_products(): + """获取产品列表""" + data = load_products() + + # 支持筛选 + brand = request.args.get('brand') + search = request.args.get('search') + + products = data.get("products", []) + + if brand: + products = [p for p in products if p.get("brand") == brand] + + if search: + search_lower = search.lower() + products = [p for p in products if + search_lower in p.get("name", "").lower() or + search_lower in json.dumps(p.get("params", {}), ensure_ascii=False).lower()] + + return jsonify({ + "products": products, + "total": len(products), + "last_update": data.get("last_update") + }) + + +@app.route('/api/products/') +def api_get_product(product_id): + """获取产品详情""" + data = load_products() + + for product in data.get("products", []): + if product.get("id") == product_id: + return jsonify(product) + + return jsonify({"error": "Product not found"}), 404 + + +@app.route('/api/products', methods=['POST']) +def api_add_product(): + """添加产品""" + product = request.get_json() + + if not product: + return jsonify({"error": "Invalid data"}), 400 + + data = load_products() + + # 生成ID + if not product.get("id"): + product["id"] = f"{product.get('brand', 'unknown')}-{datetime.now().strftime('%Y%m%d%H%M%S')}" + + product["created_at"] = datetime.now().isoformat() + product["updated_at"] = datetime.now().isoformat() + + data["products"].append(product) + save_products(data) + + return jsonify({"success": True, "product": product}) + + +@app.route('/api/products/', methods=['PUT']) +def api_update_product(product_id): + """更新产品""" + updates = request.get_json() + + data = load_products() + + for i, product in enumerate(data["products"]): + if product.get("id") == product_id: + data["products"][i].update(updates) + data["products"][i]["updated_at"] = datetime.now().isoformat() + save_products(data) + return jsonify({"success": True, "product": data["products"][i]}) + + return jsonify({"error": "Product not found"}), 404 + + +@app.route('/api/products/', methods=['DELETE']) +def api_delete_product(product_id): + """删除产品""" + data = load_products() + + original_count = len(data["products"]) + data["products"] = [p for p in data["products"] if p.get("id") != product_id] + + if len(data["products"]) < original_count: + save_products(data) + return jsonify({"success": True}) + + return jsonify({"error": "Product not found"}), 404 + + +@app.route('/api/spiders') +def api_list_spiders(): + """获取可用爬虫列表""" + spiders = [ + { + "name": "byd", + "display_name": "比亚迪汽车", + "description": "爬取比亚迪官网车型参数", + "status": "available", + "last_run": None + }, + { + "name": "custom", + "display_name": "自定义爬虫", + "description": "通过配置自定义爬虫规则", + "status": "available", + "last_run": None + } + ] + + # 检查任务记录 + tasks = load_tasks().get("tasks", []) + for spider in spiders: + for task in tasks: + if task.get("spider") == spider["name"]: + spider["last_run"] = task.get("end_time") + if task.get("status") == "running": + spider["status"] = "running" + + return jsonify(spiders) + + +@app.route('/api/run/', methods=['POST']) +def api_run_spider(spider_name): + """运行爬虫""" + data = load_products() + + async def run_spider(): + try: + if spider_name == "byd": + from spiders.byd import BYDSpider + + spider = BYDSpider({"headless": True}) + results = await spider.run() + + # 保存结果 + for item in results: + # 检查是否已存在 + existing = False + for i, p in enumerate(data["products"]): + if p.get("name") == item.get("name"): + # 更新 + data["products"][i].update(item) + data["products"][i]["updated_at"] = datetime.now().isoformat() + existing = True + break + + if not existing: + item["id"] = f"byd-{datetime.now().strftime('%Y%m%d%H%M%S')}-{len(data['products'])}" + item["brand"] = "比亚迪" + item["source"] = "byd.com" + item["created_at"] = datetime.now().isoformat() + data["products"].append(item) + + save_products(data) + return {"success": True, "count": len(results)} + + else: + return {"success": False, "error": f"Unknown spider: {spider_name}"} + + except Exception as e: + logger.error(f"爬虫运行失败: {e}") + return {"success": False, "error": str(e)} + + # 记录任务 + tasks = load_tasks() + task = { + "id": f"task-{datetime.now().strftime('%Y%m%d%H%M%S')}", + "spider": spider_name, + "status": "running", + "start_time": datetime.now().isoformat(), + "end_time": None, + "result": None + } + tasks["tasks"].append(task) + save_tasks(tasks) + + # 运行爬虫 + result = asyncio.run(run_spider()) + + # 更新任务状态 + task["status"] = "completed" if result.get("success") else "failed" + task["end_time"] = datetime.now().isoformat() + task["result"] = result + save_tasks(tasks) + + return jsonify(result) + + +@app.route('/api/tasks') +def api_list_tasks(): + """获取任务列表""" + tasks = load_tasks().get("tasks", []) + + # 按时间倒序 + tasks.sort(key=lambda x: x.get("start_time", ""), reverse=True) + + # 限制返回数量 + limit = request.args.get('limit', 50, type=int) + tasks = tasks[:limit] + + return jsonify(tasks) + + +@app.route('/api/export') +def api_export(): + """导出数据""" + format = request.args.get('format', 'json') + data = load_products() + + if format == 'json': + return jsonify(data) + + elif format == 'csv': + import io + import csv + + output = io.StringIO() + writer = csv.writer(output) + + # 写入表头 + if data["products"]: + headers = ["id", "name", "brand", "source"] + list(data["products"][0].get("params", {}).keys()) + writer.writerow(headers) + + for product in data["products"]: + row = [ + product.get("id", ""), + product.get("name", ""), + product.get("brand", ""), + product.get("source", "") + ] + for key in headers[4:]: + row.append(product.get("params", {}).get(key, "")) + writer.writerow(row) + + output.seek(0) + return output.getvalue(), 200, { + "Content-Type": "text/csv; charset=utf-8", + "Content-Disposition": "attachment; filename=products.csv" + } + + return jsonify({"error": "Unsupported format"}), 400 + + +if __name__ == '__main__': + print("=" * 60) + print("产品参数爬取系统") + print("=" * 60) + print(f"API地址: http://localhost:19011") + print(f"后台管理: http://localhost:19012") + print("=" * 60) + + app.run(host='0.0.0.0', port=19011, debug=True) \ No newline at end of file diff --git a/config/settings.py b/config/settings.py new file mode 100644 index 0000000..d9b3583 --- /dev/null +++ b/config/settings.py @@ -0,0 +1,77 @@ +""" +产品参数爬取系统 - 配置文件 +""" + +# 服务配置 +SERVER_CONFIG = { + "host": "0.0.0.0", + "port": 19011, + "debug": True, +} + +ADMIN_CONFIG = { + "host": "0.0.0.0", + "port": 19012, + "debug": True, +} + +# 爬虫配置 +CRAWLER_CONFIG = { + "timeout": 30, + "retry_times": 3, + "retry_delay": 2, + "concurrent_limit": 3, # 并发限制 + "request_delay": 1, # 请求间隔(秒) + "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", +} + +# 数据存储 +DATA_DIR = "data" +PRODUCTS_FILE = "products.json" +TASKS_FILE = "tasks.json" + +# 浏览器配置 +BROWSER_CONFIG = { + "headless": True, + "timeout": 30000, +} + +# 爬虫任务配置示例 +DEFAULT_SPIDERS = { + "byd": { + "name": "比亚迪汽车", + "enabled": True, + "start_url": "https://www.byd.com/cn/", + "steps": [ + { + "name": "获取车型列表", + "type": "browser", + "url": "https://www.byd.com/cn/", + "wait_for": ".car-list", + "parse": { + "type": "css", + "selector": ".car-item a", + "extract": { + "name": "text", + "url": "href" + } + } + }, + { + "name": "获取车型详情", + "type": "browser", + "url_from": "previous.url", + "wait_for": ".car-detail", + "parse": { + "type": "css", + "selector": ".param-item", + "extract": { + "param_name": ".name::text", + "param_value": ".value::text" + } + } + } + ], + "schedule": "0 2 * * *", # 每天凌晨2点 + } +} \ No newline at end of file diff --git a/crawler/__init__.py b/crawler/__init__.py new file mode 100644 index 0000000..ddbcf44 --- /dev/null +++ b/crawler/__init__.py @@ -0,0 +1,3 @@ +# crawler module +from .base import BaseSpider +from .browser import BrowserSpider \ No newline at end of file diff --git a/crawler/base.py b/crawler/base.py new file mode 100644 index 0000000..46cf10e --- /dev/null +++ b/crawler/base.py @@ -0,0 +1,108 @@ +""" +爬虫基类 +""" + +import time +import requests +from abc import ABC, abstractmethod +from bs4 import BeautifulSoup +from typing import Dict, List, Any, Optional +import logging + +logger = logging.getLogger(__name__) + + +class BaseSpider(ABC): + """爬虫基类""" + + name = "base" + + def __init__(self, config: Dict = None): + self.config = config or {} + self.session = requests.Session() + self.session.headers.update({ + "User-Agent": self.config.get("user_agent", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36") + }) + self.results = [] + + @abstractmethod + def start_requests(self): + """生成初始请求""" + pass + + @abstractmethod + def parse(self, response, **kwargs): + """解析响应""" + pass + + def request(self, url: str, method: str = "GET", **kwargs) -> Optional[requests.Response]: + """发送请求""" + retry_times = self.config.get("retry_times", 3) + retry_delay = self.config.get("retry_delay", 2) + timeout = self.config.get("timeout", 30) + + for attempt in range(retry_times): + try: + logger.info(f"请求: {url} (尝试 {attempt + 1}/{retry_times})") + + response = self.session.request( + method=method, + url=url, + timeout=timeout, + **kwargs + ) + response.raise_for_status() + + # 请求间隔 + delay = self.config.get("request_delay", 1) + if delay > 0: + time.sleep(delay) + + return response + + except requests.exceptions.RequestException as e: + logger.warning(f"请求失败: {e}") + if attempt < retry_times - 1: + time.sleep(retry_delay) + else: + logger.error(f"请求最终失败: {url}") + return None + + return None + + def parse_html(self, html: str) -> BeautifulSoup: + """解析HTML""" + return BeautifulSoup(html, "lxml") + + def extract_text(self, element) -> str: + """提取元素文本""" + if element is None: + return "" + return element.get_text(strip=True) + + def extract_attr(self, element, attr: str) -> str: + """提取元素属性""" + if element is None: + return "" + return element.get(attr, "") + + def css_select(self, soup: BeautifulSoup, selector: str) -> List: + """CSS选择器""" + return soup.select(selector) + + def css_select_one(self, soup: BeautifulSoup, selector: str): + """CSS选择器(单个)""" + return soup.select_one(selector) + + def save_result(self, data: Dict): + """保存结果""" + self.results.append(data) + + def get_results(self) -> List[Dict]: + """获取所有结果""" + return self.results + + def clear_results(self): + """清空结果""" + self.results = [] \ No newline at end of file diff --git a/crawler/browser.py b/crawler/browser.py new file mode 100644 index 0000000..618af1c --- /dev/null +++ b/crawler/browser.py @@ -0,0 +1,201 @@ +""" +浏览器爬虫 - 用于处理JS渲染的页面 +""" + +import asyncio +import json +from typing import Dict, List, Any, Optional +from crawler.base import BaseSpider +import logging + +logger = logging.getLogger(__name__) + + +class BrowserSpider(BaseSpider): + """浏览器爬虫(使用Playwright)""" + + name = "browser" + + def __init__(self, config: Dict = None): + super().__init__(config) + self.browser = None + self.context = None + self.page = None + self._playwright = None + + async def init_browser(self): + """初始化浏览器""" + try: + from playwright.async_api import async_playwright + + self._playwright = await async_playwright().start() + self.browser = await self._playwright.chromium.launch( + headless=self.config.get("headless", True) + ) + self.context = await self.browser.new_context( + user_agent=self.config.get("user_agent", "") + ) + self.page = await self.context.new_page() + logger.info("浏览器初始化成功") + + except ImportError: + raise RuntimeError("请安装 playwright: pip install playwright && playwright install chromium") + + async def close_browser(self): + """关闭浏览器""" + if self.page: + await self.page.close() + if self.context: + await self.context.close() + if self.browser: + await self.browser.close() + if self._playwright: + await self._playwright.stop() + logger.info("浏览器已关闭") + + async def fetch_page(self, url: str, wait_for: str = None, timeout: int = 30000) -> str: + """ + 获取页面内容 + + Args: + url: 页面URL + wait_for: 等待元素选择器 + timeout: 超时时间(毫秒) + + Returns: + 页面HTML + """ + if not self.page: + await self.init_browser() + + logger.info(f"访问页面: {url}") + + try: + await self.page.goto(url, timeout=timeout) + + if wait_for: + await self.page.wait_for_selector(wait_for, timeout=timeout) + + # 等待页面稳定 + await self.page.wait_for_load_state("networkidle", timeout=timeout) + + html = await self.page.content() + return html + + except Exception as e: + logger.error(f"获取页面失败: {e}") + return "" + + async def extract_data(self, html: str, parse_config: Dict) -> List[Dict]: + """ + 根据配置提取数据 + + Args: + html: 页面HTML + parse_config: 解析配置 + + Returns: + 提取的数据列表 + """ + results = [] + soup = self.parse_html(html) + + selector = parse_config.get("selector") + if not selector: + return results + + elements = self.css_select(soup, selector) + extract_rules = parse_config.get("extract", {}) + + for elem in elements: + item = {} + for field, rule in extract_rules.items(): + if rule.startswith("."): + # CSS选择器 + sub_elem = elem.select_one(rule.split("::")[0]) + if "::text" in rule: + item[field] = self.extract_text(sub_elem) + else: + item[field] = self.extract_text(sub_elem) + elif rule == "text": + item[field] = self.extract_text(elem) + elif rule == "href": + item[field] = self.extract_attr(elem, "href") + elif rule == "src": + item[field] = self.extract_attr(elem, "src") + else: + item[field] = self.extract_attr(elem, rule) + + if item: + results.append(item) + + return results + + async def run_steps(self, steps: List[Dict], start_url: str = None) -> List[Dict]: + """ + 执行多步骤爬取 + + Args: + steps: 步骤配置列表 + start_url: 起始URL + + Returns: + 最终结果 + """ + all_results = [] + previous_data = [] + + for i, step in enumerate(steps): + step_name = step.get("name", f"步骤{i+1}") + logger.info(f"执行步骤: {step_name}") + + # 获取URL + if i == 0: + url = step.get("url", start_url) + else: + # 从上一步结果获取URL + url_template = step.get("url_from", "") + if url_template == "previous.url": + urls = [item.get("url", "") for item in previous_data if item.get("url")] + else: + urls = [url_template] + + urls = urls if isinstance(urls, list) else [urls] + + step_results = [] + for url in urls: + if not url: + continue + + # 完整URL处理 + if url.startswith("/"): + base_url = step.get("url", start_url) + from urllib.parse import urljoin + url = urljoin(base_url, url) + + # 获取页面 + html = await self.fetch_page( + url=url, + wait_for=step.get("wait_for"), + timeout=step.get("timeout", 30000) + ) + + if not html: + continue + + # 解析数据 + parse_config = step.get("parse", {}) + if parse_config: + data = await self.extract_data(html, parse_config) + + # 合并URL信息 + for item in data: + if "url" not in item: + item["source_url"] = url + + step_results.extend(data) + + previous_data = step_results + all_results = step_results if i == len(steps) - 1 else [] + + return all_results \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..dd1a681 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +flask>=2.0.0 +flask-cors>=3.0.0 +requests>=2.28.0 +beautifulsoup4>=4.11.0 +lxml>=4.9.0 +playwright>=1.30.0 +apscheduler>=3.9.0 +python-dateutil>=2.8.0 \ No newline at end of file diff --git a/spiders/__init__.py b/spiders/__init__.py new file mode 100644 index 0000000..39fe794 --- /dev/null +++ b/spiders/__init__.py @@ -0,0 +1,2 @@ +# spiders module +from .byd import BYDSpider \ No newline at end of file diff --git a/spiders/byd.py b/spiders/byd.py new file mode 100644 index 0000000..808b4b3 --- /dev/null +++ b/spiders/byd.py @@ -0,0 +1,166 @@ +""" +比亚迪汽车爬虫 +""" + +import asyncio +from typing import Dict, List +from crawler.browser import BrowserSpider +import logging + +logger = logging.getLogger(__name__) + + +class BYDSpider(BrowserSpider): + """比亚迪汽车爬虫""" + + name = "byd" + + # 网站配置 + BASE_URL = "https://www.byd.com" + CAR_LIST_URL = "https://www.byd.com/cn/car/" + + async def get_car_models(self) -> List[Dict]: + """获取车型列表""" + logger.info("开始获取比亚迪车型列表...") + + html = await self.fetch_page( + url=self.CAR_LIST_URL, + wait_for=".car-list", + timeout=30000 + ) + + if not html: + logger.error("获取车型列表页面失败") + return [] + + soup = self.parse_html(html) + car_items = [] + + # 解析车型列表(需要根据实际页面结构调整选择器) + # 这里是示例选择器,实际需要查看比亚迪官网结构 + + # 方式1: 查找所有车型卡片 + items = soup.select(".car-item, .model-item, .product-card") + + for item in items: + name_elem = item.select_one(".name, .title, h3, h4") + link_elem = item.select_one("a") + img_elem = item.select_one("img") + + car_info = { + "name": self.extract_text(name_elem), + "url": self.extract_attr(link_elem, "href"), + "image": self.extract_attr(img_elem, "src"), + } + + if car_info["name"]: + # 处理相对URL + if car_info["url"] and car_info["url"].startswith("/"): + car_info["url"] = self.BASE_URL + car_info["url"] + + car_items.append(car_info) + + logger.info(f"找到 {len(car_items)} 个车型") + return car_items + + async def get_car_detail(self, car_url: str, car_name: str) -> Dict: + """获取车型详情""" + logger.info(f"获取车型详情: {car_name}") + + html = await self.fetch_page( + url=car_url, + wait_for=".detail, .params, .specification", + timeout=30000 + ) + + if not html: + return {"name": car_name, "url": car_url, "params": {}} + + soup = self.parse_html(html) + params = {} + + # 解析参数表格 + # 方式1: 表格形式 + tables = soup.select("table, .param-table, .spec-table") + for table in tables: + rows = table.select("tr, .param-row") + for row in rows: + cells = row.select("td, th, .param-name, .param-value") + if len(cells) >= 2: + key = self.extract_text(cells[0]) + value = self.extract_text(cells[1]) + if key: + params[key] = value + + # 方式2: 列表形式 + param_items = soup.select(".param-item, .spec-item, .detail-item") + for item in param_items: + name_elem = item.select_one(".name, .label, .param-name") + value_elem = item.select_one(".value, .param-value") + + if name_elem and value_elem: + key = self.extract_text(name_elem) + value = self.extract_text(value_elem) + if key: + params[key] = value + + # 方式3: 定义列表 + dl_items = soup.select("dl") + for dl in dl_items: + dts = dl.select("dt") + dds = dl.select("dd") + for dt, dd in zip(dts, dds): + key = self.extract_text(dt) + value = self.extract_text(dd) + if key: + params[key] = value + + return { + "name": car_name, + "url": car_url, + "params": params + } + + async def run(self) -> List[Dict]: + """运行爬虫""" + results = [] + + try: + await self.init_browser() + + # 步骤1: 获取车型列表 + cars = await self.get_car_models() + + if not cars: + logger.warning("未找到任何车型") + return results + + # 步骤2: 获取每个车型的详情 + for car in cars: + if car.get("url"): + detail = await self.get_car_detail(car["url"], car["name"]) + detail["image"] = car.get("image", "") + results.append(detail) + + # 保存到结果 + self.save_result(detail) + + logger.info(f"爬取完成,共 {len(results)} 个车型") + + finally: + await self.close_browser() + + return results + + def run_sync(self) -> List[Dict]: + """同步运行入口""" + return asyncio.run(self.run()) + + +# 测试入口 +if __name__ == "__main__": + spider = BYDSpider({"headless": False}) # 设置为False可以看到浏览器 + results = spider.run_sync() + + import json + print(json.dumps(results, ensure_ascii=False, indent=2)) \ No newline at end of file