news 2026/4/3 6:08:26

Python 爬虫框架设计:类封装与工程化实践

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python 爬虫框架设计:类封装与工程化实践

在 1688 数据采集等爬虫场景中,类封装能实现代码的复用与解耦,工程化则保障爬虫的稳定性、可维护性和可扩展性。本文将结合 1688 爬虫的实际需求,从框架设计原则核心类封装工程化配套模块实战落地,完整讲解爬虫框架的设计与实现。

一、框架设计原则与整体架构

1. 核心设计原则

爬虫框架需遵循开闭原则(对扩展开放、对修改关闭)、单一职责(每个模块只做一件事)和依赖注入(模块间通过配置解耦),同时需适配 1688 的反爬特性(如动态渲染、IP 封禁)。

2. 整体架构分层

将爬虫拆分为5 个核心层,层与层之间通过接口交互,降低耦合:

层级职责核心实现方式
配置层管理爬虫参数(如代理、UA、爬取关键词)、存储配置等YAML/JSON 配置文件 + 配置类
请求层封装 HTTP 请求,处理反爬(代理、UA、延迟)、异常重试基础请求类 + 反爬中间件
解析层解析网页 / 接口数据,提取目标字段(如 1688 商品标题、价格)解析器基类 + 业务解析子类
存储层处理数据持久化(CSV/MySQL/MongoDB),支持数据去重存储基类 + 多存储实现子类
调度层管理爬取任务(分页、多线程 / 异步)、监控任务状态调度器类 + 任务队列

二、核心类封装实现

基于分层架构,我们通过类的继承与多态封装通用逻辑,再针对 1688 场景实现具体业务。

1. 环境准备

安装必备依赖:

bash

运行

pip install requests beautifulsoup4 pyyaml fake-useragent playwright pymongo mysql-connector-python playwright install chromium # 处理动态页面

2. 配置层封装(Config 类)

通过 YAML 配置文件管理参数,避免硬编码,便于后续修改。

配置文件(config.yaml)

yaml

# 爬虫基础配置 spider: keyword: "手机壳" # 1688搜索关键词 max_page: 5 # 最大爬取页数 delay: 3 # 请求延迟(秒) retry_times: 3 # 失败重试次数 # 反爬配置 anti_crawl: user_agent_pool: ["Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36...", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) ..."] proxy_pool: ["http://127.0.0.1:7890", "http://username:password@proxy.example.com:8080"] # 代理池 # 存储配置 storage: type: "csv" # 可选:csv/mongo/mysql csv_path: "./1688_products.csv" mongo: uri: "mongodb://localhost:27017/" db: "1688_spider" collection: "products" mysql: host: "localhost" port: 3306 user: "root" password: "123456" db: "1688_spider"

配置类封装(config.py)

python

运行

import yaml from typing import Dict, List, Any class Config: """配置管理类,加载并解析YAML配置文件""" def __init__(self, config_path: str = "./config.yaml"): self.config_path = config_path self.config = self._load_config() def _load_config(self) -> Dict[str, Any]: """加载YAML配置""" try: with open(self.config_path, "r", encoding="utf-8") as f: return yaml.safe_load(f) except FileNotFoundError: raise Exception(f"配置文件{self.config_path}不存在") except yaml.YAMLError as e: raise Exception(f"配置文件解析错误:{e}") def get(self, key: str, default: Any = None) -> Any: """按层级获取配置,如'spider.keyword'""" keys = key.split(".") value = self.config for k in keys: if k not in value: return default value = value[k] return value # 测试配置类 if __name__ == "__main__": config = Config() print(config.get("spider.keyword")) # 输出:手机壳 print(config.get("anti_crawl.proxy_pool")) # 输出代理池列表

3. 请求层封装(BaseRequest 类)

封装 HTTP 请求的通用逻辑(反爬、重试、延迟),支持同步请求和动态页面请求(Playwright)。

python

运行

import requests import time import random from typing import Dict, Any, Optional from fake_useragent import UserAgent from playwright.sync_api import sync_playwright from config import Config class BaseRequest: """请求基类,封装通用请求逻辑""" def __init__(self, config: Config): self.config = config self.ua = UserAgent() self.retry_times = self.config.get("spider.retry_times", 3) self.delay = self.config.get("spider.delay", 2) self.proxy_pool = self.config.get("anti_crawl.proxy_pool", []) self.ua_pool = self.config.get("anti_crawl.user_agent_pool", []) def _get_random_proxy(self) -> Optional[str]: """随机获取代理""" return random.choice(self.proxy_pool) if self.proxy_pool else None def _get_random_ua(self) -> str: """随机获取User-Agent""" return random.choice(self.ua_pool) if self.ua_pool else self.ua.random def _add_delay(self) -> None: """请求延迟,防反爬""" time.sleep(random.uniform(self.delay, self.delay + 2)) def get(self, url: str, params: Optional[Dict] = None, headers: Optional[Dict] = None) -> Optional[str]: """同步GET请求,支持重试和反爬""" headers = headers or {} headers["User-Agent"] = self._get_random_ua() proxies = {"http": self._get_random_proxy(), "https": self._get_random_proxy()} if self._get_random_proxy() else None for retry in range(self.retry_times): try: self._add_delay() resp = requests.get(url, params=params, headers=headers, proxies=proxies, timeout=10) resp.raise_for_status() # 抛出HTTP错误 return resp.text except Exception as e: print(f"请求失败(第{retry+1}次重试):{e}") time.sleep(2 ** retry) # 指数退避重试 return None def get_dynamic(self, url: str) -> Optional[Dict[str, Any]]: """动态页面请求(Playwright),返回关键数据""" result = {"title": None, "price": None, "sales": None} with sync_playwright() as p: browser = p.chromium.launch(headless=True) page = browser.new_page(user_agent=self._get_random_ua()) try: page.goto(url, timeout=30000) # 提取1688商品详情页关键数据(需根据页面结构调整) result["title"] = page.locator(".detail-title").inner_text() if page.locator(".detail-title").count() > 0 else None result["price"] = page.locator(".price").inner_text() if page.locator(".price").count() > 0 else None result["sales"] = page.locator(".sales-volume").inner_text() if page.locator(".sales-volume").count() > 0 else None self._add_delay() except Exception as e: print(f"动态页面请求失败:{e}") finally: browser.close() return result # 测试请求类 if __name__ == "__main__": config = Config() request = BaseRequest(config) html = request.get("https://s.1688.com/selloffer/offer_search.htm?keywords=手机壳") print(html[:500]) # 输出页面前500字符

4. 解析层封装(BaseParser 类)

封装数据解析的通用接口,子类实现具体的 1688 页面解析逻辑。

python

运行

from bs4 import BeautifulSoup from typing import List, Dict, Any from config import Config class BaseParser: """解析器基类,定义解析接口""" def __init__(self, config: Config): self.config = config def parse(self, html: str) -> List[Dict[str, Any]]: """解析接口,子类必须实现""" raise NotImplementedError("子类需实现parse方法") class Ali1688ListParser(BaseParser): """1688商品列表页解析器""" def parse(self, html: str) -> List[Dict[str, Any]]: """解析商品列表页,提取标题、价格、链接""" soup = BeautifulSoup(html, "lxml") products = soup.select(".sm-offer-item") result = [] for item in products: # 提取字段(需根据1688页面结构实时调整) title_elem = item.select_one(".offer-title a") price_elem = item.select_one(".price") link_elem = item.select_one(".offer-title a") if not (title_elem and price_elem and link_elem): continue product = { "title": title_elem.get("title", "").strip(), "price": price_elem.text.strip(), "link": link_elem.get("href", "").strip(), "source": "1688" } result.append(product) return result # 测试解析类 if __name__ == "__main__": config = Config() request = BaseRequest(config) parser = Ali1688ListParser(config) html = request.get("https://s.1688.com/selloffer/offer_search.htm?keywords=手机壳") if html: products = parser.parse(html) print(f"解析到{len(products)}个商品:") print(products[:2])

5. 存储层封装(BaseStorage 类)

支持多存储方式(CSV/MySQL/MongoDB),通过子类实现具体存储逻辑。

python

运行

import csv import pymongo import mysql.connector from typing import List, Dict, Any from config import Config class BaseStorage: """存储基类,定义存储接口""" def __init__(self, config: Config): self.config = config def save(self, data: List[Dict[str, Any]]) -> None: """存储接口,子类必须实现""" raise NotImplementedError("子类需实现save方法") class CsvStorage(BaseStorage): """CSV存储类""" def __init__(self, config: Config): super().__init__(config) self.csv_path = self.config.get("storage.csv_path", "./products.csv") # 初始化CSV文件并写入表头 with open(self.csv_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=["title", "price", "link", "source"]) writer.writeheader() def save(self, data: List[Dict[str, Any]]) -> None: """将数据追加写入CSV""" with open(self.csv_path, "a", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=["title", "price", "link", "source"]) writer.writerows(data) print(f"成功写入{len(data)}条数据到CSV:{self.csv_path}") class MongoStorage(BaseStorage): """MongoDB存储类""" def __init__(self, config: Config): super().__init__(config) self.client = pymongo.MongoClient(self.config.get("storage.mongo.uri")) self.db = self.client[self.config.get("storage.mongo.db")] self.collection = self.db[self.config.get("storage.mongo.collection")] # 创建唯一索引,避免重复存储 self.collection.create_index("link", unique=True) def save(self, data: List[Dict[str, Any]]) -> None: """将数据写入MongoDB,自动去重""" if not data: return try: self.collection.insert_many(data, ordered=False) print(f"成功写入{len(data)}条数据到MongoDB") except pymongo.errors.BulkWriteError as e: # 忽略重复数据错误 print(f"部分数据重复,实际写入{len(data) - len(e.details['writeErrors'])}条") # 测试存储类 if __name__ == "__main__": config = Config() storage = CsvStorage(config) # 模拟数据 test_data = [ {"title": "苹果15手机壳", "price": "10.00", "link": "https://example.com/1", "source": "1688"}, {"title": "华为Mate60手机壳", "price": "8.50", "link": "https://example.com/2", "source": "1688"} ] storage.save(test_data)

6. 调度层封装(SpiderScheduler 类)

管理爬取任务的生命周期(分页、任务分发),整合请求、解析、存储模块。

python

运行

from typing import List, Dict, Any from config import Config from request import BaseRequest from parser import Ali1688ListParser from storage import BaseStorage, CsvStorage, MongoStorage class SpiderScheduler: """爬虫调度器,整合各模块并管理爬取任务""" def __init__(self, config: Config): self.config = config self.request = BaseRequest(config) self.parser = Ali1688ListParser(config) self.storage = self._init_storage() self.keyword = self.config.get("spider.keyword") self.max_page = self.config.get("spider.max_page") def _init_storage(self) -> BaseStorage: """根据配置初始化存储类""" storage_type = self.config.get("storage.type", "csv") if storage_type == "csv": return CsvStorage(self.config) elif storage_type == "mongo": return MongoStorage(self.config) else: raise ValueError(f"不支持的存储类型:{storage_type}") def build_url(self, page: int) -> str: """构建1688搜索页URL""" from urllib.parse import quote return f"https://s.1688.com/selloffer/offer_search.htm?keywords={quote(self.keyword)}&page={page}" def run(self) -> None: """启动爬虫任务""" print(f"开始爬取1688关键词【{self.keyword}】,共{self.max_page}页") all_data = [] for page in range(1, self.max_page + 1): print(f"正在爬取第{page}页...") url = self.build_url(page) html = self.request.get(url) if not html: print(f"第{page}页爬取失败,跳过") continue # 解析数据 page_data = self.parser.parse(html) if page_data: all_data.extend(page_data) # 实时存储 self.storage.save(page_data) print(f"爬取完成,总计获取{len(all_data)}条商品数据") # 测试调度器 if __name__ == "__main__": config = Config() scheduler = SpiderScheduler(config) scheduler.run()

三、工程化配套模块

1. 日志系统(Logging)

替换 print 语句,使用 Python 标准库logging实现分级日志(INFO/ERROR),便于问题排查。

python

运行

import logging import os def init_logger() -> None: """初始化日志系统""" # 创建日志目录 if not os.path.exists("logs"): os.makedirs("logs") # 配置日志格式 log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" # 写入文件 + 控制台输出 logging.basicConfig( level=logging.INFO, format=log_format, handlers=[ logging.FileHandler("logs/1688_spider.log", encoding="utf-8"), logging.StreamHandler() ] ) # 在调度器中使用日志 if __name__ == "__main__": init_logger() logger = logging.getLogger(__name__) logger.info("爬虫启动") try: config = Config() scheduler = SpiderScheduler(config) scheduler.run() logger.info("爬虫结束") except Exception as e: logger.error(f"爬虫异常:{e}", exc_info=True)

2. 异常处理体系

在核心模块中定义自定义异常,便于精准捕获和处理不同类型的错误:

python

运行

# exceptions.py class SpiderRequestError(Exception): """请求异常""" pass class SpiderParseError(Exception): """解析异常""" pass class SpiderStorageError(Exception): """存储异常""" pass # 在请求层中抛出自定义异常 def get(self, url: str) -> Optional[str]: for retry in range(self.retry_times): try: # ... 原有逻辑 ... return resp.text except Exception as e: if retry == self.retry_times - 1: raise SpiderRequestError(f"请求{url}失败:{e}") time.sleep(2 ** retry) return None

3. 代理池集成

对于大规模爬取,可对接第三方代理池(如阿布云、快代理)或自建代理池,通过 API 动态获取可用代理:

python

运行

def _get_proxy_from_pool(self) -> Optional[str]: """从代理池API获取可用代理""" proxy_api = "http://proxy.example.com/get_proxy" try: resp = requests.get(proxy_api, timeout=5) return resp.json().get("proxy") except Exception as e: print(f"获取代理失败:{e}") return None

四、工程化扩展与最佳实践

1. 多线程 / 异步爬取

针对单线程效率低的问题,可使用concurrent.futures.ThreadPoolExecutor实现多线程,或用aiohttp实现异步爬取(注意 1688 的反爬限制,避免并发过高)。

python

运行

from concurrent.futures import ThreadPoolExecutor def run_multi_thread(self) -> None: """多线程爬取""" with ThreadPoolExecutor(max_workers=3) as executor: # 控制并发数 executor.map(self.crawl_page, range(1, self.max_page + 1)) def crawl_page(self, page: int) -> None: """单页爬取逻辑,供线程调用""" url = self.build_url(page) html = self.request.get(url) if html: page_data = self.parser.parse(html) self.storage.save(page_data)

2. 爬虫监控与告警

通过prometheus+grafana监控爬虫的爬取量、失败率,或通过邮件 / 钉钉机器人在爬虫异常时发送告警:

python

运行

import smtplib from email.mime.text import MIMEText def send_alert_email(message: str) -> None: """发送告警邮件""" msg = MIMEText(message, "plain", "utf-8") msg["Subject"] = "1688爬虫异常告警" msg["From"] = "sender@example.com" msg["To"] = "receiver@example.com" smtp = smtplib.SMTP_SSL("smtp.example.com", 465) smtp.login("sender@example.com", "password") smtp.sendmail("sender@example.com", ["receiver@example.com"], msg.as_string()) smtp.quit()

3. 合规与维护

  1. 遵守 robots 协议:1688 的robots.txt(https://www.1688.com/robots.txt)明确禁止爬取的路径需严格规避。
  2. 定期更新解析规则:1688 页面结构会频繁变更,需定期检查并调整 CSS 选择器 / XPath。
  3. 数据去重与清洗:通过商品链接、ID 等唯一键去重,对价格、销量等字段做格式清洗(如去除非数字字符)。
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/1 12:08:27

【Open-AutoGLM离线队列设计全揭秘】:掌握高并发任务调度核心技术

第一章:Open-AutoGLM离线队列设计概述Open-AutoGLM 是一个面向大语言模型任务调度的自动化系统,其核心组件之一为离线队列模块。该模块负责接收批量推理请求、管理资源分配并保障高吞吐下的任务稳定性。通过解耦请求提交与执行过程,系统能够在…

作者头像 李华
网站建设 2026/3/31 4:50:51

Open-AutoGLM官方资源入口全梳理(附未公开API访问方式)

第一章:Open-AutoGLM 开发资源社区获取渠道 官方 GitHub 仓库 Open-AutoGLM 的核心开发资源集中托管于其官方 GitHub 仓库,是获取源码、提交问题和参与贡献的首要入口。开发者可通过以下命令克隆项目: # 克隆 Open-AutoGLM 主仓库 git clone…

作者头像 李华
网站建设 2026/3/26 19:02:41

为什么90%的Open-AutoGLM开发者都卡在资源获取环节?真相在这里

第一章:Open-AutoGLM 开发资源社区获取渠道 Open-AutoGLM 作为一个面向自动化生成语言模型开发的开源项目,其生态系统的活跃度高度依赖于开发者社区的参与和资源共享。获取该项目的核心开发资源、最新更新以及协作机会,主要依赖以下几个官方与…

作者头像 李华
网站建设 2026/3/29 7:25:12

智慧交通道路路面玻璃碴子检测数据集VOC+YOLO格式1276张1类别

数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件)图片数量(jpg文件个数):1276标注数量(xml文件个数):1276标注数量(txt文件个数):1276标注类别…

作者头像 李华
网站建设 2026/3/29 5:08:08

RAG知识库准确率提升手册(保姆级教程),从入门到精通就看这篇!

在RAG系统中,提升知识库文档的 召回准确率,对于提高整个系统的用户体验至关重要。 今天,我就从文档 切割粒度、检索后排序、混合检索、RAG-Fusion 这几个方面,详细介绍如何提升知识库文档的召回准确率,希望对你有所帮…

作者头像 李华