引言:招聘数据挖掘的价值与技术挑战
在当今大数据时代,招聘平台数据蕴含着巨大的商业价值和研究意义。58同城作为中国领先的分类信息平台,其招聘频道汇聚了海量企业招聘信息,这些数据对于市场分析、人才趋势预测、竞品研究等领域具有重要价值。然而,随着网站反爬技术的不断升级,传统的静态爬虫已难以应对动态渲染、请求加密、行为验证等多重防护。本文将深入探讨如何运用最新Python爬虫技术,构建一个高效、稳定的58同城招聘信息采集系统。
技术栈概览:现代爬虫的核心组件
在开始编码前,让我们先了解本次实战将用到的关键技术:
Playwright:微软开源的现代化浏览器自动化工具,支持无头浏览器操作,完美处理JavaScript动态渲染
Asyncio:Python原生异步IO框架,实现高并发数据抓取
BeautifulSoup4 & PyQuery:双解析引擎备用,提高数据提取稳定性
Redis:分布式任务队列和去重存储
Pydantic:数据验证与序列化
User-Agent池 & 代理IP池:绕过基础反爬检测
指纹浏览器技术:模拟真实浏览器环境
项目架构设计
text
58同城爬虫架构: ├── 反爬绕过层 (Anti-Anti-Crawler Layer) │ ├── 动态User-Agent轮换 │ ├── 代理IP中间件 │ ├── 浏览器指纹模拟 │ └── 请求延迟随机化 ├── 数据采集层 (Data Acquisition Layer) │ ├── Playwright异步控制器 │ ├── 页面渲染引擎 │ └── AJAX拦截器 ├── 数据处理层 (Data Processing Layer) │ ├── HTML解析器 │ ├── 数据清洗器 │ └── 结构化存储 └── 任务调度层 (Task Scheduler Layer) ├── Redis队列管理 ├── 分布式调度 └── 失败重试机制
环境配置与依赖安装
python
# requirements.txt playwright==1.40.0 asyncio==3.4.3 aiohttp==3.9.1 beautifulsoup4==4.12.2 pyquery==2.0.0 redis==5.0.1 pydantic==2.5.0 pydantic-settings==2.1.0 fake-useragent==1.4.0 asyncio-redis==0.1.5 lxml==4.9.3 pandas==2.1.4
安装命令:
bash
pip install -r requirements.txt playwright install chromium
核心代码实现
1. 配置文件与数据模型
python
# config/settings.py from pydantic_settings import BaseSettings from typing import List, Optional import os class CrawlerSettings(BaseSettings): # 爬虫基础配置 crawl_depth: int = 3 max_concurrent_tasks: int = 10 request_timeout: int = 30 retry_times: int = 3 # Redis配置 redis_host: str = "localhost" redis_port: int = 6379 redis_db: int = 0 redis_password: Optional[str] = None # 代理配置 proxy_enabled: bool = True proxy_pool_url: str = "http://proxy-pool/api/get" # 存储配置 save_format: str = "both" # json, csv, both output_dir: str = "./data" class Config: env_file = ".env" # models/job_model.py from pydantic import BaseModel, Field, validator from datetime import datetime from typing import Optional, List import re class CompanyInfo(BaseModel): name: str scale: Optional[str] = None # 公司规模 industry: Optional[str] = None # 行业 nature: Optional[str] = None # 公司性质 class JobSalary(BaseModel): min_salary: Optional[float] = None max_salary: Optional[float] = None unit: str = "月" # 月/年/小时 is_negotiable: bool = False @validator('min_salary', 'max_salary', pre=True) def parse_salary(cls, v): if isinstance(v, str): # 处理薪资字符串,如"8-13K" match = re.search(r'(\d+\.?\d*)[kK千]-?(\d+\.?\d*)?[kK千]?', v) if match: groups = match.groups() return float(groups[0]) * 1000 if groups[0] else None return v class JobInfo(BaseModel): # 基础信息 job_id: str title: str url: str publish_date: datetime # 薪资待遇 salary: JobSalary # 公司信息 company: CompanyInfo # 职位详情 work_location: str experience: Optional[str] = None # 经验要求 education: Optional[str] = None # 学历要求 job_type: Optional[str] = None # 职位类型 job_description: Optional[str] = None job_requirements: Optional[str] = None # 福利待遇 benefits: List[str] = [] # 爬虫元数据 source: str = "58同城" crawl_time: datetime = Field(default_factory=datetime.now) page_url: Optional[str] = None2. 反爬策略实现
python
# core/anti_spider.py import random import asyncio from typing import Dict, List from fake_useragent import UserAgent import aiohttp class AntiSpiderMiddleware: def __init__(self): self.ua = UserAgent() self.proxy_pool = [] self.load_balancing_index = 0 async def rotate_user_agent(self) -> str: """动态轮换User-Agent""" agents = [ self.ua.random, # 添加自定义移动端User-Agent "Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1", # 添加Chrome最新版 "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ] return random.choice(agents) async def get_proxy(self) -> Dict[str, str]: """获取代理IP""" if not self.proxy_pool: await self.refresh_proxy_pool() proxy = self.proxy_pool[self.load_balancing_index % len(self.proxy_pool)] self.load_balancing_index += 1 return { "server": f"http://{proxy['ip']}:{proxy['port']}", "username": proxy.get('username', ''), "password": proxy.get('password', '') } async def refresh_proxy_pool(self): """刷新代理池""" async with aiohttp.ClientSession() as session: async with session.get("http://proxy-pool/api/all") as resp: self.proxy_pool = await resp.json() def generate_browser_fingerprint(self) -> Dict[str, str]: """生成浏览器指纹""" return { "webgl_vendor": "Google Inc.", "renderer": "ANGLE (Intel, Intel(R) UHD Graphics Direct3D11 vs_5_0 ps_5_0)", "platform": "Win32", "hardware_concurrency": "8", "device_memory": "8", "timezone": "Asia/Shanghai", "language": "zh-CN", "screen_resolution": "1920x1080" } async def random_delay(self, min_sec: float = 1.0, max_sec: float = 3.0): """随机延迟,模拟人工操作""" await asyncio.sleep(random.uniform(min_sec, max_sec))3. Playwright异步爬虫核心
python
# core/playwright_crawler.py from playwright.async_api import async_playwright, Browser, Page, BrowserContext import asyncio from typing import Optional, Dict, List import json from urllib.parse import urljoin, urlparse import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class PlaywrightCrawler: def __init__(self, anti_spider: AntiSpiderMiddleware): self.anti_spider = anti_spider self.browser: Optional[Browser] = None self.context: Optional[BrowserContext] = None async def init_browser(self, headless: bool = True): """初始化浏览器实例""" playwright = await async_playwright().start() # 获取代理配置 proxy_config = await self.anti_spider.get_proxy() if self.anti_spider.proxy_enabled else None # 创建浏览器上下文,模拟真实用户 self.browser = await playwright.chromium.launch( headless=headless, proxy=proxy_config, args=[ '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage', '--no-sandbox', '--disable-setuid-sandbox', '--disable-web-security', '--disable-features=IsolateOrigins,site-per-process' ] ) # 设置浏览器指纹和User-Agent user_agent = await self.anti_spider.rotate_user_agent() fingerprint = self.anti_spider.generate_browser_fingerprint() self.context = await self.browser.new_context( user_agent=user_agent, viewport={'width': 1920, 'height': 1080}, locale='zh-CN', timezone_id='Asia/Shanghai', extra_http_headers={ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0', } ) # 注入JavaScript代码,覆盖webdriver属性 await self.context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => false }); // 覆盖plugins属性 Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] }); // 覆盖languages属性 Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh', 'en'] }); """) logger.info("浏览器初始化完成") async def crawl_job_list(self, url: str, max_pages: int = 10) -> List[str]: """爬取职位列表页,获取详情页链接""" page = await self.context.new_page() job_urls = [] try: await page.goto(url, wait_until="networkidle", timeout=60000) await self.anti_spider.random_delay(2, 4) current_page = 1 while current_page <= max_pages: logger.info(f"正在抓取第{current_page}页列表") # 等待职位列表加载 await page.wait_for_selector(".job-list", timeout=30000) # 提取职位链接 job_items = await page.query_selector_all(".job-item") for item in job_items: link = await item.query_selector("a.job-title") if link: href = await link.get_attribute("href") if href and "job" in href: full_url = urljoin(url, href) job_urls.append(full_url) # 检查是否有下一页 next_button = await page.query_selector("a.next-page") if next_button and current_page < max_pages: await next_button.click() await page.wait_for_load_state("networkidle") await self.anti_spider.random_delay(3, 5) current_page += 1 else: break except Exception as e: logger.error(f"抓取列表页失败: {e}") finally: await page.close() return job_urls async def crawl_job_detail(self, url: str) -> Optional[Dict]: """爬取职位详情页""" page = await self.context.new_page() try: # 设置请求拦截,捕获API数据 api_data = {} def intercept_response(response): if "api" in response.url or "interface" in response.url: try: api_data[response.url] = response.json() except: pass page.on("response", intercept_response) # 访问页面 await page.goto(url, wait_until="networkidle", timeout=60000) await self.anti_spider.random_delay(1, 2) # 处理可能的弹窗 await self.handle_popups(page) # 等待关键内容加载 await page.wait_for_selector(".job-detail", timeout=20000) # 提取页面数据 job_data = await self.extract_job_data(page, url) # 合并API数据 if api_data: job_data["api_data"] = api_data logger.info(f"成功抓取职位: {job_data.get('title', 'Unknown')}") return job_data except Exception as e: logger.error(f"抓取详情页失败 {url}: {e}") return None finally: await page.close() async def handle_popups(self, page: Page): """处理各种弹窗""" popup_selectors = [ ".popup-close", ".modal-close", "button:has-text('关闭')", "div[class*='mask'] button", "div[class*='dialog'] button" ] for selector in popup_selectors: try: close_btn = await page.query_selector(selector) if close_btn: await close_btn.click(timeout=3000) await asyncio.sleep(0.5) except: continue async def extract_job_data(self, page: Page, url: str) -> Dict: """提取职位数据""" # 使用多种选择器提高稳定性 selectors = { "title": [".job-title", "h1.title", "div.title"], "salary": [".salary", ".job-salary", ".pay"], "company": [".company-name", ".comp-name", "a.comp-title"], "location": [".job-location", ".location", ".address"], "experience": [".job-experience", ".experience", ".req-exp"], "education": [".job-education", ".education", ".req-edu"], "description": [".job-description", ".des", ".job-detail-content"] } data = {"url": url} for field, selector_list in selectors.items(): for selector in selector_list: try: element = await page.query_selector(selector) if element: text = await element.text_content() if text and text.strip(): data[field] = text.strip() break except: continue # 提取福利信息 benefit_elements = await page.query_selector_all(".welfare-item, .benefit-item, .tag") benefits = [] for elem in benefit_elements: text = await elem.text_content() if text and text.strip(): benefits.append(text.strip()) data["benefits"] = benefits # 提取发布时间 try: date_elem = await page.query_selector(".publish-time, .update-time") if date_elem: data["publish_date"] = await date_elem.text_content() except: data["publish_date"] = "未知" return data async def close(self): """关闭浏览器资源""" if self.context: await self.context.close() if self.browser: await self.browser.close()4. 异步任务调度器
python
# core/task_scheduler.py import asyncio import aioredis from typing import List, Dict, Optional import logging from dataclasses import dataclass from enum import Enum logger = logging.getLogger(__name__) class TaskStatus(Enum): PENDING = "pending" PROCESSING = "processing" SUCCESS = "success" FAILED = "failed" @dataclass class CrawlTask: url: str priority: int = 1 retry_count: int = 0 max_retries: int = 3 metadata: Optional[Dict] = None class AsyncTaskScheduler: def __init__(self, redis_url: str = "redis://localhost:6379/0"): self.redis_url = redis_url self.redis: Optional[aioredis.Redis] = None self.task_queue = asyncio.Queue() self.active_tasks = set() async def init_redis(self): """初始化Redis连接""" self.redis = await aioredis.from_url( self.redis_url, encoding="utf-8", decode_responses=True ) logger.info("Redis连接初始化完成") async def add_task(self, task: CrawlTask): """添加任务到队列""" await self.task_queue.put(task) # 同时存储到Redis持久化 if self.redis: task_key = f"58job:task:{hash(task.url)}" await self.redis.hmset(task_key, { "url": task.url, "status": TaskStatus.PENDING.value, "priority": str(task.priority), "retry_count": str(task.retry_count) }) async def get_task(self) -> Optional[CrawlTask]: """获取任务""" try: task = await asyncio.wait_for(self.task_queue.get(), timeout=1.0) return task except asyncio.TimeoutError: return None async def mark_task_done(self, task: CrawlTask, status: TaskStatus, result: Optional[Dict] = None): """标记任务完成""" if self.redis: task_key = f"58job:task:{hash(task.url)}" await self.redis.hset(task_key, "status", status.value) if result: result_key = f"58job:result:{hash(task.url)}" await self.redis.set(result_key, json.dumps(result, ensure_ascii=False)) self.task_queue.task_done() async def process_tasks(self, crawler: PlaywrightCrawler, max_concurrent: int = 5): """并发处理任务""" semaphore = asyncio.Semaphore(max_concurrent) async def process_single_task(task: CrawlTask): async with semaphore: try: logger.info(f"开始处理任务: {task.url}") # 爬取数据 result = await crawler.crawl_job_detail(task.url) if result: await self.mark_task_done(task, TaskStatus.SUCCESS, result) logger.info(f"任务完成: {task.url}") else: await self.retry_task(task) except Exception as e: logger.error(f"任务处理失败 {task.url}: {e}") await self.retry_task(task) # 启动任务处理循环 while True: task = await self.get_task() if task: asyncio.create_task(process_single_task(task)) else: # 短暂休眠避免CPU空转 await asyncio.sleep(0.1) async def retry_task(self, task: CrawlTask): """重试失败的任务""" task.retry_count += 1 if task.retry_count <= task.max_retries: logger.warning(f"任务重试 {task.url}, 第{task.retry_count}次") # 增加延迟后重新加入队列 await asyncio.sleep(task.retry_count * 2) await self.add_task(task) else: logger.error(f"任务最终失败 {task.url}, 已达最大重试次数") await self.mark_task_done(task, TaskStatus.FAILED)5. 主程序入口
python
# main.py import asyncio import sys import json import csv from datetime import datetime from pathlib import Path from typing import List from core.anti_spider import AntiSpiderMiddleware from core.playwright_crawler import PlaywrightCrawler from core.task_scheduler import AsyncTaskScheduler, CrawlTask from models.job_model import JobInfo class Job58Crawler: def __init__(self, settings): self.settings = settings self.anti_spider = AntiSpiderMiddleware() self.crawler = None self.scheduler = AsyncTaskScheduler() self.results = [] async def init(self): """初始化爬虫组件""" await self.scheduler.init_redis() self.crawler = PlaywrightCrawler(self.anti_spider) await self.crawler.init_browser(headless=True) async def crawl(self, start_urls: List[str], max_pages_per_url: int = 5): """主爬取流程""" # 1. 收集所有职位链接 all_job_urls = [] for start_url in start_urls: logger.info(f"开始收集职位链接: {start_url}") job_urls = await self.crawler.crawl_job_list(start_url, max_pages_per_url) all_job_urls.extend(job_urls) logger.info(f"收集到 {len(job_urls)} 个职位链接") # 去重 all_job_urls = list(set(all_job_urls)) logger.info(f"去重后共有 {len(all_job_urls)} 个唯一职位链接") # 2. 创建任务 for url in all_job_urls: task = CrawlTask(url=url, priority=1) await self.scheduler.add_task(task) # 3. 启动任务处理器 processor_task = asyncio.create_task( self.scheduler.process_tasks(self.crawler, self.settings.max_concurrent_tasks) ) # 4. 等待所有任务完成 await self.scheduler.task_queue.join() processor_task.cancel() # 5. 收集结果 await self.collect_results() async def collect_results(self): """从Redis收集所有结果""" if self.scheduler.redis: # 获取所有成功任务的结果 keys = await self.scheduler.redis.keys("58job:result:*") for key in keys: result_json = await self.scheduler.redis.get(key) if result_json: try: result = json.loads(result_json) self.results.append(result) except json.JSONDecodeError as e: logger.error(f"解析结果失败 {key}: {e}") logger.info(f"共收集到 {len(self.results)} 条有效数据") async def save_results(self): """保存结果到文件""" output_dir = Path(self.settings.output_dir) output_dir.mkdir(exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if self.settings.save_format in ["json", "both"]: json_path = output_dir / f"58_jobs_{timestamp}.json" with open(json_path, "w", encoding="utf-8") as f: json.dump(self.results, f, ensure_ascii=False, indent=2, default=str) logger.info(f"结果已保存到 JSON: {json_path}") if self.settings.save_format in ["csv", "both"] and self.results: csv_path = output_dir / f"58_jobs_{timestamp}.csv" self.save_to_csv(csv_path) logger.info(f"结果已保存到 CSV: {csv_path}") def save_to_csv(self, filepath: Path): """保存为CSV格式""" if not self.results: return # 提取所有可能的字段 all_keys = set() for item in self.results: all_keys.update(item.keys()) with open(filepath, "w", newline="", encoding="utf-8-sig") as f: writer = csv.DictWriter(f, fieldnames=list(all_keys)) writer.writeheader() writer.writerows(self.results) async def cleanup(self): """清理资源""" await self.crawler.close() if self.scheduler.redis: await self.scheduler.redis.close() async def main(): """主函数""" # 配置设置 settings = CrawlerSettings() # 起始URL(示例:Python开发岗位) start_urls = [ "https://bj.58.com/python/pn1/", "https://sh.58.com/python/pn1/", "https://sz.58.com/python/pn1/", "https://hz.58.com/python/pn1/", ] # 创建爬虫实例 crawler = Job58Crawler(settings) try: # 初始化 await crawler.init() # 执行爬取 await crawler.crawl(start_urls, max_pages_per_url=3) # 保存结果 await crawler.save_results() logger.info("爬虫任务完成!") except KeyboardInterrupt: logger.info("用户中断爬虫") except Exception as e: logger.error(f"爬虫运行失败: {e}", exc_info=True) finally: # 清理资源 await crawler.cleanup() if __name__ == "__main__": asyncio.run(main())高级优化技巧
1. 分布式爬虫扩展
python
# distributed/worker.py import asyncio import aiohttp from multiprocessing import Process import signal class DistributedWorker: def __init__(self, worker_id: str, master_url: str): self.worker_id = worker_id self.master_url = master_url self.running = True async def fetch_task(self): """从主节点获取任务""" async with aiohttp.ClientSession() as session: async with session.get(f"{self.master_url}/api/task/acquire") as resp: return await resp.json() async def report_result(self, task_id: str, result: Dict): """向主节点报告结果""" async with aiohttp.ClientSession() as session: await session.post(f"{self.master_url}/api/task/complete", json={"task_id": task_id, "result": result}) def signal_handler(self, signum, frame): """信号处理器""" self.running = False2. 智能解析策略
python
# core/intelligent_parser.py import re from typing import Dict, Any class IntelligentParser: @staticmethod def parse_salary(text: str) -> Dict[str, Any]: """智能解析薪资文本""" patterns = [ # 8-13K r'(\d+\.?\d*)[kK千]-(\d+\.?\d*)[kK千]', # 面议 r'面议|Negotiable', # 8K以上 r'(\d+\.?\d*)[kK千]以上', # 8万/年 r'(\d+\.?\d*)万/年', ] for pattern in patterns: match = re.search(pattern, text) if match: return {"pattern": pattern, "match": match.groups()} return {"pattern": "unknown", "match": None}