功能: - 多步骤爬取流程(入口页→列表页→详情页) - 浏览器爬虫支持(Playwright,处理JS渲染) - 比亚迪汽车爬虫示例 - 后台管理界面 - 数据存储和导出 技术栈: - Python 3 + Flask - Playwright (浏览器自动化) - BeautifulSoup (HTML解析) 端口: - API服务: 19011 - 后台管理: 19012
166 lines
5.2 KiB
Python
166 lines
5.2 KiB
Python
"""
|
|
比亚迪汽车爬虫
|
|
"""
|
|
|
|
import asyncio
|
|
from typing import Dict, List
|
|
from crawler.browser import BrowserSpider
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BYDSpider(BrowserSpider):
|
|
"""比亚迪汽车爬虫"""
|
|
|
|
name = "byd"
|
|
|
|
# 网站配置
|
|
BASE_URL = "https://www.byd.com"
|
|
CAR_LIST_URL = "https://www.byd.com/cn/car/"
|
|
|
|
async def get_car_models(self) -> List[Dict]:
|
|
"""获取车型列表"""
|
|
logger.info("开始获取比亚迪车型列表...")
|
|
|
|
html = await self.fetch_page(
|
|
url=self.CAR_LIST_URL,
|
|
wait_for=".car-list",
|
|
timeout=30000
|
|
)
|
|
|
|
if not html:
|
|
logger.error("获取车型列表页面失败")
|
|
return []
|
|
|
|
soup = self.parse_html(html)
|
|
car_items = []
|
|
|
|
# 解析车型列表(需要根据实际页面结构调整选择器)
|
|
# 这里是示例选择器,实际需要查看比亚迪官网结构
|
|
|
|
# 方式1: 查找所有车型卡片
|
|
items = soup.select(".car-item, .model-item, .product-card")
|
|
|
|
for item in items:
|
|
name_elem = item.select_one(".name, .title, h3, h4")
|
|
link_elem = item.select_one("a")
|
|
img_elem = item.select_one("img")
|
|
|
|
car_info = {
|
|
"name": self.extract_text(name_elem),
|
|
"url": self.extract_attr(link_elem, "href"),
|
|
"image": self.extract_attr(img_elem, "src"),
|
|
}
|
|
|
|
if car_info["name"]:
|
|
# 处理相对URL
|
|
if car_info["url"] and car_info["url"].startswith("/"):
|
|
car_info["url"] = self.BASE_URL + car_info["url"]
|
|
|
|
car_items.append(car_info)
|
|
|
|
logger.info(f"找到 {len(car_items)} 个车型")
|
|
return car_items
|
|
|
|
async def get_car_detail(self, car_url: str, car_name: str) -> Dict:
|
|
"""获取车型详情"""
|
|
logger.info(f"获取车型详情: {car_name}")
|
|
|
|
html = await self.fetch_page(
|
|
url=car_url,
|
|
wait_for=".detail, .params, .specification",
|
|
timeout=30000
|
|
)
|
|
|
|
if not html:
|
|
return {"name": car_name, "url": car_url, "params": {}}
|
|
|
|
soup = self.parse_html(html)
|
|
params = {}
|
|
|
|
# 解析参数表格
|
|
# 方式1: 表格形式
|
|
tables = soup.select("table, .param-table, .spec-table")
|
|
for table in tables:
|
|
rows = table.select("tr, .param-row")
|
|
for row in rows:
|
|
cells = row.select("td, th, .param-name, .param-value")
|
|
if len(cells) >= 2:
|
|
key = self.extract_text(cells[0])
|
|
value = self.extract_text(cells[1])
|
|
if key:
|
|
params[key] = value
|
|
|
|
# 方式2: 列表形式
|
|
param_items = soup.select(".param-item, .spec-item, .detail-item")
|
|
for item in param_items:
|
|
name_elem = item.select_one(".name, .label, .param-name")
|
|
value_elem = item.select_one(".value, .param-value")
|
|
|
|
if name_elem and value_elem:
|
|
key = self.extract_text(name_elem)
|
|
value = self.extract_text(value_elem)
|
|
if key:
|
|
params[key] = value
|
|
|
|
# 方式3: 定义列表
|
|
dl_items = soup.select("dl")
|
|
for dl in dl_items:
|
|
dts = dl.select("dt")
|
|
dds = dl.select("dd")
|
|
for dt, dd in zip(dts, dds):
|
|
key = self.extract_text(dt)
|
|
value = self.extract_text(dd)
|
|
if key:
|
|
params[key] = value
|
|
|
|
return {
|
|
"name": car_name,
|
|
"url": car_url,
|
|
"params": params
|
|
}
|
|
|
|
async def run(self) -> List[Dict]:
|
|
"""运行爬虫"""
|
|
results = []
|
|
|
|
try:
|
|
await self.init_browser()
|
|
|
|
# 步骤1: 获取车型列表
|
|
cars = await self.get_car_models()
|
|
|
|
if not cars:
|
|
logger.warning("未找到任何车型")
|
|
return results
|
|
|
|
# 步骤2: 获取每个车型的详情
|
|
for car in cars:
|
|
if car.get("url"):
|
|
detail = await self.get_car_detail(car["url"], car["name"])
|
|
detail["image"] = car.get("image", "")
|
|
results.append(detail)
|
|
|
|
# 保存到结果
|
|
self.save_result(detail)
|
|
|
|
logger.info(f"爬取完成,共 {len(results)} 个车型")
|
|
|
|
finally:
|
|
await self.close_browser()
|
|
|
|
return results
|
|
|
|
def run_sync(self) -> List[Dict]:
|
|
"""同步运行入口"""
|
|
return asyncio.run(self.run())
|
|
|
|
|
|
# 测试入口
|
|
if __name__ == "__main__":
|
|
spider = BYDSpider({"headless": False}) # 设置为False可以看到浏览器
|
|
results = spider.run_sync()
|
|
|
|
import json
|
|
print(json.dumps(results, ensure_ascii=False, indent=2)) |