Files
product-crawler/crawler/browser.py
hubian 0ee0abbbd1 feat: 产品参数爬取系统 v1.0.0
功能:
- 多步骤爬取流程(入口页→列表页→详情页)
- 浏览器爬虫支持(Playwright,处理JS渲染)
- 比亚迪汽车爬虫示例
- 后台管理界面
- 数据存储和导出

技术栈:
- Python 3 + Flask
- Playwright (浏览器自动化)
- BeautifulSoup (HTML解析)

端口:
- API服务: 19011
- 后台管理: 19012
2026-04-10 00:45:51 +08:00

201 lines
6.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
浏览器爬虫 - 用于处理JS渲染的页面
"""
import asyncio
import json
from typing import Dict, List, Any, Optional
from crawler.base import BaseSpider
import logging
logger = logging.getLogger(__name__)
class BrowserSpider(BaseSpider):
"""浏览器爬虫使用Playwright"""
name = "browser"
def __init__(self, config: Dict = None):
super().__init__(config)
self.browser = None
self.context = None
self.page = None
self._playwright = None
async def init_browser(self):
"""初始化浏览器"""
try:
from playwright.async_api import async_playwright
self._playwright = await async_playwright().start()
self.browser = await self._playwright.chromium.launch(
headless=self.config.get("headless", True)
)
self.context = await self.browser.new_context(
user_agent=self.config.get("user_agent", "")
)
self.page = await self.context.new_page()
logger.info("浏览器初始化成功")
except ImportError:
raise RuntimeError("请安装 playwright: pip install playwright && playwright install chromium")
async def close_browser(self):
"""关闭浏览器"""
if self.page:
await self.page.close()
if self.context:
await self.context.close()
if self.browser:
await self.browser.close()
if self._playwright:
await self._playwright.stop()
logger.info("浏览器已关闭")
async def fetch_page(self, url: str, wait_for: str = None, timeout: int = 30000) -> str:
"""
获取页面内容
Args:
url: 页面URL
wait_for: 等待元素选择器
timeout: 超时时间(毫秒)
Returns:
页面HTML
"""
if not self.page:
await self.init_browser()
logger.info(f"访问页面: {url}")
try:
await self.page.goto(url, timeout=timeout)
if wait_for:
await self.page.wait_for_selector(wait_for, timeout=timeout)
# 等待页面稳定
await self.page.wait_for_load_state("networkidle", timeout=timeout)
html = await self.page.content()
return html
except Exception as e:
logger.error(f"获取页面失败: {e}")
return ""
async def extract_data(self, html: str, parse_config: Dict) -> List[Dict]:
"""
根据配置提取数据
Args:
html: 页面HTML
parse_config: 解析配置
Returns:
提取的数据列表
"""
results = []
soup = self.parse_html(html)
selector = parse_config.get("selector")
if not selector:
return results
elements = self.css_select(soup, selector)
extract_rules = parse_config.get("extract", {})
for elem in elements:
item = {}
for field, rule in extract_rules.items():
if rule.startswith("."):
# CSS选择器
sub_elem = elem.select_one(rule.split("::")[0])
if "::text" in rule:
item[field] = self.extract_text(sub_elem)
else:
item[field] = self.extract_text(sub_elem)
elif rule == "text":
item[field] = self.extract_text(elem)
elif rule == "href":
item[field] = self.extract_attr(elem, "href")
elif rule == "src":
item[field] = self.extract_attr(elem, "src")
else:
item[field] = self.extract_attr(elem, rule)
if item:
results.append(item)
return results
async def run_steps(self, steps: List[Dict], start_url: str = None) -> List[Dict]:
"""
执行多步骤爬取
Args:
steps: 步骤配置列表
start_url: 起始URL
Returns:
最终结果
"""
all_results = []
previous_data = []
for i, step in enumerate(steps):
step_name = step.get("name", f"步骤{i+1}")
logger.info(f"执行步骤: {step_name}")
# 获取URL
if i == 0:
url = step.get("url", start_url)
else:
# 从上一步结果获取URL
url_template = step.get("url_from", "")
if url_template == "previous.url":
urls = [item.get("url", "") for item in previous_data if item.get("url")]
else:
urls = [url_template]
urls = urls if isinstance(urls, list) else [urls]
step_results = []
for url in urls:
if not url:
continue
# 完整URL处理
if url.startswith("/"):
base_url = step.get("url", start_url)
from urllib.parse import urljoin
url = urljoin(base_url, url)
# 获取页面
html = await self.fetch_page(
url=url,
wait_for=step.get("wait_for"),
timeout=step.get("timeout", 30000)
)
if not html:
continue
# 解析数据
parse_config = step.get("parse", {})
if parse_config:
data = await self.extract_data(html, parse_config)
# 合并URL信息
for item in data:
if "url" not in item:
item["source_url"] = url
step_results.extend(data)
previous_data = step_results
all_results = step_results if i == len(steps) - 1 else []
return all_results