功能: - 多步骤爬取流程(入口页→列表页→详情页) - 浏览器爬虫支持(Playwright,处理JS渲染) - 比亚迪汽车爬虫示例 - 后台管理界面 - 数据存储和导出 技术栈: - Python 3 + Flask - Playwright (浏览器自动化) - BeautifulSoup (HTML解析) 端口: - API服务: 19011 - 后台管理: 19012
201 lines
6.3 KiB
Python
201 lines
6.3 KiB
Python
"""
|
||
浏览器爬虫 - 用于处理JS渲染的页面
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
from typing import Dict, List, Any, Optional
|
||
from crawler.base import BaseSpider
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class BrowserSpider(BaseSpider):
|
||
"""浏览器爬虫(使用Playwright)"""
|
||
|
||
name = "browser"
|
||
|
||
def __init__(self, config: Dict = None):
|
||
super().__init__(config)
|
||
self.browser = None
|
||
self.context = None
|
||
self.page = None
|
||
self._playwright = None
|
||
|
||
async def init_browser(self):
|
||
"""初始化浏览器"""
|
||
try:
|
||
from playwright.async_api import async_playwright
|
||
|
||
self._playwright = await async_playwright().start()
|
||
self.browser = await self._playwright.chromium.launch(
|
||
headless=self.config.get("headless", True)
|
||
)
|
||
self.context = await self.browser.new_context(
|
||
user_agent=self.config.get("user_agent", "")
|
||
)
|
||
self.page = await self.context.new_page()
|
||
logger.info("浏览器初始化成功")
|
||
|
||
except ImportError:
|
||
raise RuntimeError("请安装 playwright: pip install playwright && playwright install chromium")
|
||
|
||
async def close_browser(self):
|
||
"""关闭浏览器"""
|
||
if self.page:
|
||
await self.page.close()
|
||
if self.context:
|
||
await self.context.close()
|
||
if self.browser:
|
||
await self.browser.close()
|
||
if self._playwright:
|
||
await self._playwright.stop()
|
||
logger.info("浏览器已关闭")
|
||
|
||
async def fetch_page(self, url: str, wait_for: str = None, timeout: int = 30000) -> str:
|
||
"""
|
||
获取页面内容
|
||
|
||
Args:
|
||
url: 页面URL
|
||
wait_for: 等待元素选择器
|
||
timeout: 超时时间(毫秒)
|
||
|
||
Returns:
|
||
页面HTML
|
||
"""
|
||
if not self.page:
|
||
await self.init_browser()
|
||
|
||
logger.info(f"访问页面: {url}")
|
||
|
||
try:
|
||
await self.page.goto(url, timeout=timeout)
|
||
|
||
if wait_for:
|
||
await self.page.wait_for_selector(wait_for, timeout=timeout)
|
||
|
||
# 等待页面稳定
|
||
await self.page.wait_for_load_state("networkidle", timeout=timeout)
|
||
|
||
html = await self.page.content()
|
||
return html
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取页面失败: {e}")
|
||
return ""
|
||
|
||
async def extract_data(self, html: str, parse_config: Dict) -> List[Dict]:
|
||
"""
|
||
根据配置提取数据
|
||
|
||
Args:
|
||
html: 页面HTML
|
||
parse_config: 解析配置
|
||
|
||
Returns:
|
||
提取的数据列表
|
||
"""
|
||
results = []
|
||
soup = self.parse_html(html)
|
||
|
||
selector = parse_config.get("selector")
|
||
if not selector:
|
||
return results
|
||
|
||
elements = self.css_select(soup, selector)
|
||
extract_rules = parse_config.get("extract", {})
|
||
|
||
for elem in elements:
|
||
item = {}
|
||
for field, rule in extract_rules.items():
|
||
if rule.startswith("."):
|
||
# CSS选择器
|
||
sub_elem = elem.select_one(rule.split("::")[0])
|
||
if "::text" in rule:
|
||
item[field] = self.extract_text(sub_elem)
|
||
else:
|
||
item[field] = self.extract_text(sub_elem)
|
||
elif rule == "text":
|
||
item[field] = self.extract_text(elem)
|
||
elif rule == "href":
|
||
item[field] = self.extract_attr(elem, "href")
|
||
elif rule == "src":
|
||
item[field] = self.extract_attr(elem, "src")
|
||
else:
|
||
item[field] = self.extract_attr(elem, rule)
|
||
|
||
if item:
|
||
results.append(item)
|
||
|
||
return results
|
||
|
||
async def run_steps(self, steps: List[Dict], start_url: str = None) -> List[Dict]:
|
||
"""
|
||
执行多步骤爬取
|
||
|
||
Args:
|
||
steps: 步骤配置列表
|
||
start_url: 起始URL
|
||
|
||
Returns:
|
||
最终结果
|
||
"""
|
||
all_results = []
|
||
previous_data = []
|
||
|
||
for i, step in enumerate(steps):
|
||
step_name = step.get("name", f"步骤{i+1}")
|
||
logger.info(f"执行步骤: {step_name}")
|
||
|
||
# 获取URL
|
||
if i == 0:
|
||
url = step.get("url", start_url)
|
||
else:
|
||
# 从上一步结果获取URL
|
||
url_template = step.get("url_from", "")
|
||
if url_template == "previous.url":
|
||
urls = [item.get("url", "") for item in previous_data if item.get("url")]
|
||
else:
|
||
urls = [url_template]
|
||
|
||
urls = urls if isinstance(urls, list) else [urls]
|
||
|
||
step_results = []
|
||
for url in urls:
|
||
if not url:
|
||
continue
|
||
|
||
# 完整URL处理
|
||
if url.startswith("/"):
|
||
base_url = step.get("url", start_url)
|
||
from urllib.parse import urljoin
|
||
url = urljoin(base_url, url)
|
||
|
||
# 获取页面
|
||
html = await self.fetch_page(
|
||
url=url,
|
||
wait_for=step.get("wait_for"),
|
||
timeout=step.get("timeout", 30000)
|
||
)
|
||
|
||
if not html:
|
||
continue
|
||
|
||
# 解析数据
|
||
parse_config = step.get("parse", {})
|
||
if parse_config:
|
||
data = await self.extract_data(html, parse_config)
|
||
|
||
# 合并URL信息
|
||
for item in data:
|
||
if "url" not in item:
|
||
item["source_url"] = url
|
||
|
||
step_results.extend(data)
|
||
|
||
previous_data = step_results
|
||
all_results = step_results if i == len(steps) - 1 else []
|
||
|
||
return all_results |