""" 浏览器爬虫 - 用于处理JS渲染的页面 """ import asyncio import json from typing import Dict, List, Any, Optional from crawler.base import BaseSpider import logging logger = logging.getLogger(__name__) class BrowserSpider(BaseSpider): """浏览器爬虫(使用Playwright)""" name = "browser" def __init__(self, config: Dict = None): super().__init__(config) self.browser = None self.context = None self.page = None self._playwright = None async def init_browser(self): """初始化浏览器""" try: from playwright.async_api import async_playwright self._playwright = await async_playwright().start() self.browser = await self._playwright.chromium.launch( headless=self.config.get("headless", True) ) self.context = await self.browser.new_context( user_agent=self.config.get("user_agent", "") ) self.page = await self.context.new_page() logger.info("浏览器初始化成功") except ImportError: raise RuntimeError("请安装 playwright: pip install playwright && playwright install chromium") async def close_browser(self): """关闭浏览器""" if self.page: await self.page.close() if self.context: await self.context.close() if self.browser: await self.browser.close() if self._playwright: await self._playwright.stop() logger.info("浏览器已关闭") async def fetch_page(self, url: str, wait_for: str = None, timeout: int = 30000) -> str: """ 获取页面内容 Args: url: 页面URL wait_for: 等待元素选择器 timeout: 超时时间(毫秒) Returns: 页面HTML """ if not self.page: await self.init_browser() logger.info(f"访问页面: {url}") try: await self.page.goto(url, timeout=timeout) if wait_for: await self.page.wait_for_selector(wait_for, timeout=timeout) # 等待页面稳定 await self.page.wait_for_load_state("networkidle", timeout=timeout) html = await self.page.content() return html except Exception as e: logger.error(f"获取页面失败: {e}") return "" async def extract_data(self, html: str, parse_config: Dict) -> List[Dict]: """ 根据配置提取数据 Args: html: 页面HTML parse_config: 解析配置 Returns: 提取的数据列表 """ results = [] soup = self.parse_html(html) selector = parse_config.get("selector") if not selector: return results elements = self.css_select(soup, selector) extract_rules = parse_config.get("extract", {}) for elem in elements: item = {} for field, rule in extract_rules.items(): if rule.startswith("."): # CSS选择器 sub_elem = elem.select_one(rule.split("::")[0]) if "::text" in rule: item[field] = self.extract_text(sub_elem) else: item[field] = self.extract_text(sub_elem) elif rule == "text": item[field] = self.extract_text(elem) elif rule == "href": item[field] = self.extract_attr(elem, "href") elif rule == "src": item[field] = self.extract_attr(elem, "src") else: item[field] = self.extract_attr(elem, rule) if item: results.append(item) return results async def run_steps(self, steps: List[Dict], start_url: str = None) -> List[Dict]: """ 执行多步骤爬取 Args: steps: 步骤配置列表 start_url: 起始URL Returns: 最终结果 """ all_results = [] previous_data = [] for i, step in enumerate(steps): step_name = step.get("name", f"步骤{i+1}") logger.info(f"执行步骤: {step_name}") # 获取URL if i == 0: url = step.get("url", start_url) else: # 从上一步结果获取URL url_template = step.get("url_from", "") if url_template == "previous.url": urls = [item.get("url", "") for item in previous_data if item.get("url")] else: urls = [url_template] urls = urls if isinstance(urls, list) else [urls] step_results = [] for url in urls: if not url: continue # 完整URL处理 if url.startswith("/"): base_url = step.get("url", start_url) from urllib.parse import urljoin url = urljoin(base_url, url) # 获取页面 html = await self.fetch_page( url=url, wait_for=step.get("wait_for"), timeout=step.get("timeout", 30000) ) if not html: continue # 解析数据 parse_config = step.get("parse", {}) if parse_config: data = await self.extract_data(html, parse_config) # 合并URL信息 for item in data: if "url" not in item: item["source_url"] = url step_results.extend(data) previous_data = step_results all_results = step_results if i == len(steps) - 1 else [] return all_results