YOLO12 API二次开发:Python封装类支持批量图片+自动结果聚合统计
1. 引言:从单张测试到批量处理
如果你已经用YOLO12的WebUI玩过单张图片检测,看着它飞快地画出一个个框,可能会想:这功能确实酷,但实际工作中,我经常要处理成百上千张图片,总不能一张张上传吧?
这就是我们今天要解决的问题。
YOLO12镜像提供了强大的FastAPI接口,但直接用它处理批量任务,你需要自己写循环、管理文件、处理错误,还得把一堆零散的检测结果汇总成有用的统计报告。这个过程不仅繁琐,还容易出错。
本文将带你一步步构建一个Python封装类,它能把YOLO12的API包装成更易用的工具。有了这个工具,你只需要几行代码,就能:
- 批量处理整个文件夹的图片
- 自动重试失败的任务
- 把零散的检测结果聚合成清晰的统计报表
- 轻松导出为CSV或JSON格式
无论你是要分析监控录像的截图、整理智能相册,还是做工业质检的批量验证,这个封装都能帮你节省大量时间。
2. 为什么需要封装:直接调用API的痛点
在开始写代码之前,我们先看看如果不做封装,直接用YOLO12的API处理批量任务会遇到哪些问题。
2.1 手动处理的繁琐流程
假设你要处理100张图片,用最原始的方法,代码大概是这样的:
import requests import os from pathlib import Path # 1. 准备图片列表 image_folder = "./test_images" image_files = [f for f in os.listdir(image_folder) if f.endswith(('.jpg', '.png'))] # 2. 逐个发送请求 results = [] for image_file in image_files: image_path = os.path.join(image_folder, image_file) try: # 发送请求 with open(image_path, 'rb') as f: files = {'file': f} response = requests.post('http://localhost:8000/predict', files=files) # 解析结果 if response.status_code == 200: result = response.json() results.append({ 'filename': image_file, 'detections': result.get('detections', []), 'success': True }) else: results.append({ 'filename': image_file, 'error': f"HTTP {response.status_code}", 'success': False }) except Exception as e: results.append({ 'filename': image_file, 'error': str(e), 'success': False }) # 3. 手动统计结果(更麻烦的代码在后面...)这只是开始,真正的麻烦在后面。
2.2 你会遇到的四个主要问题
问题一:错误处理太麻烦网络可能不稳定,图片格式可能不对,API可能暂时不可用。每个错误你都要单独处理,代码里会塞满try-except块。
问题二:结果分散难分析100张图片返回100个独立的JSON,你想知道总共检测到了多少人、多少辆车,得自己写代码遍历所有结果,手动累加计数。
问题三:没有进度反馈处理大量图片时,你不知道已经完成了多少,还剩多少,只能干等着。
问题四:缺少实用功能你想把结果保存到文件,想过滤掉低置信度的检测,想按类别排序...这些都得自己从头实现。
我们的封装类就是要解决这些问题,让你用最少的代码,完成最多的工作。
3. 封装类设计:核心功能与架构
3.1 设计目标:简单但强大
好的封装应该像瑞士军刀——看起来简单,但功能齐全。我们的YOLO12封装类要实现这些目标:
- 一键批量处理:传入文件夹路径,自动处理所有图片
- 智能错误处理:网络问题自动重试,格式错误跳过并记录
- 自动结果聚合:自动统计各类别的检测数量、平均置信度等
- 进度可视化:实时显示处理进度和预估剩余时间
- 灵活导出:支持CSV、JSON、Excel多种格式
- 配置简单:通过参数就能调整置信度阈值、模型版本等
3.2 核心架构:三层设计
我们的封装类采用三层设计,每层负责不同的功能:
YOLO12BatchProcessor (主类) ├── 配置层 (Config) │ ├── API地址、端口 │ ├── 模型参数(置信度、IOU阈值) │ └── 处理参数(并发数、重试次数) ├── 执行层 (Executor) │ ├── 单张图片检测 │ ├── 批量任务调度 │ └── 错误处理与重试 └── 结果层 (Result) ├── 原始结果存储 ├── 统计信息计算 └── 多种格式导出这种设计让代码结构清晰,也方便以后扩展新功能。
4. 完整实现:YOLO12BatchProcessor类
现在来看完整的实现代码。我把这个类设计得很实用,你可以直接复制使用,也可以根据自己的需求修改。
4.1 基础版本:核心功能实现
import requests import os import time import json from pathlib import Path from typing import List, Dict, Any, Optional from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from datetime import datetime import csv @dataclass class DetectionResult: """单次检测的结果""" filename: str success: bool detections: List[Dict] = field(default_factory=list) error: Optional[str] = None processing_time: float = 0.0 @dataclass class BatchStatistics: """批量处理的统计信息""" total_images: int = 0 processed_images: int = 0 failed_images: int = 0 total_detections: int = 0 category_counts: Dict[str, int] = field(default_factory=dict) avg_confidence: Dict[str, float] = field(default_factory=dict) start_time: Optional[float] = None end_time: Optional[float] = None @property def success_rate(self) -> float: """计算处理成功率""" if self.processed_images == 0: return 0.0 return (self.processed_images - self.failed_images) / self.processed_images * 100 class YOLO12BatchProcessor: """YOLO12批量图片处理封装类""" def __init__(self, api_url: str = "http://localhost:8000", confidence_threshold: float = 0.25, max_workers: int = 4, max_retries: int = 3): """ 初始化处理器 Args: api_url: YOLO12 API地址,默认本地8000端口 confidence_threshold: 置信度阈值,低于此值的检测结果会被过滤 max_workers: 最大并发数,根据你的硬件调整 max_retries: 失败重试次数 """ self.api_url = api_url.rstrip('/') self.predict_url = f"{self.api_url}/predict" self.confidence_threshold = confidence_threshold self.max_workers = max_workers self.max_retries = max_retries self.results: List[DetectionResult] = [] self.statistics = BatchStatistics() def detect_single_image(self, image_path: str) -> DetectionResult: """ 检测单张图片 Args: image_path: 图片文件路径 Returns: DetectionResult对象,包含检测结果或错误信息 """ start_time = time.time() result = DetectionResult(filename=os.path.basename(image_path), success=False) # 检查文件是否存在 if not os.path.exists(image_path): result.error = f"文件不存在: {image_path}" result.processing_time = time.time() - start_time return result # 尝试发送请求,支持重试 for attempt in range(self.max_retries): try: with open(image_path, 'rb') as f: files = {'file': f} # 可以添加置信度参数(如果API支持) data = {'confidence': self.confidence_threshold} if hasattr(self, 'confidence_param') else {} response = requests.post(self.predict_url, files=files, data=data, timeout=30) if response.status_code == 200: data = response.json() # 过滤低置信度的检测结果 filtered_detections = [] for det in data.get('detections', []): if det.get('confidence', 0) >= self.confidence_threshold: filtered_detections.append(det) result.detections = filtered_detections result.success = True break # 成功,跳出重试循环 else: result.error = f"HTTP {response.status_code}: {response.text}" if attempt < self.max_retries - 1: time.sleep(1) # 等待1秒后重试 except requests.exceptions.Timeout: result.error = f"请求超时 (尝试 {attempt + 1}/{self.max_retries})" if attempt < self.max_retries - 1: time.sleep(2) except requests.exceptions.ConnectionError: result.error = f"连接失败 (尝试 {attempt + 1}/{self.max_retries})" if attempt < self.max_retries - 1: time.sleep(2) except Exception as e: result.error = f"未知错误: {str(e)} (尝试 {attempt + 1}/{self.max_retries})" if attempt < self.max_retries - 1: time.sleep(1) result.processing_time = time.time() - start_time return result def process_batch(self, image_paths: List[str], show_progress: bool = True) -> BatchStatistics: """ 批量处理图片 Args: image_paths: 图片路径列表 show_progress: 是否显示进度条 Returns: 批量处理的统计信息 """ self.statistics = BatchStatistics() self.statistics.start_time = time.time() self.statistics.total_images = len(image_paths) self.results = [] print(f"开始处理 {len(image_paths)} 张图片...") print(f"并发数: {self.max_workers}, 置信度阈值: {self.confidence_threshold}") processed_count = 0 failed_count = 0 # 使用线程池并发处理 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # 提交所有任务 future_to_path = { executor.submit(self.detect_single_image, path): path for path in image_paths } # 处理完成的任务 for future in as_completed(future_to_path): result = future.result() self.results.append(result) processed_count += 1 if result.success: # 更新统计信息 self._update_statistics(result) else: failed_count += 1 if show_progress: print(f" [!] 处理失败: {result.filename} - {result.error}") # 显示进度 if show_progress: progress = processed_count / len(image_paths) * 100 print(f"\r进度: {processed_count}/{len(image_paths)} ({progress:.1f}%)", end="", flush=True) if show_progress: print() # 换行 self.statistics.processed_images = processed_count self.statistics.failed_images = failed_count self.statistics.end_time = time.time() return self.statistics def _update_statistics(self, result: DetectionResult): """更新统计信息""" for detection in result.detections: category = detection.get('class', 'unknown') confidence = detection.get('confidence', 0) # 更新类别计数 self.statistics.category_counts[category] = self.statistics.category_counts.get(category, 0) + 1 self.statistics.total_detections += 1 # 更新平均置信度 if category not in self.statistics.avg_confidence: self.statistics.avg_confidence[category] = [] self.statistics.avg_confidence[category].append(confidence) def process_folder(self, folder_path: str, extensions: List[str] = None, show_progress: bool = True) -> BatchStatistics: """ 处理整个文件夹的图片 Args: folder_path: 文件夹路径 extensions: 支持的图片扩展名,默认['.jpg', '.jpeg', '.png', '.bmp'] show_progress: 是否显示进度 Returns: 批量处理的统计信息 """ if extensions is None: extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.webp'] folder = Path(folder_path) if not folder.exists(): raise ValueError(f"文件夹不存在: {folder_path}") # 收集所有图片文件 image_paths = [] for ext in extensions: image_paths.extend(folder.glob(f'*{ext}')) image_paths.extend(folder.glob(f'*{ext.upper()}')) if not image_paths: print(f"警告: 在 {folder_path} 中未找到支持的图片文件") return BatchStatistics() # 转换为字符串路径 image_paths = [str(path) for path in image_paths] return self.process_batch(image_paths, show_progress)这个基础版本已经包含了核心功能:批量处理、错误重试、进度显示。但还缺少结果聚合和导出功能,我们接下来完善它。
4.2 增强版本:添加统计与导出功能
让我们给类添加一些实用的方法,让结果处理更方便:
class YOLO12BatchProcessor: # ... 上面的初始化代码和基础方法 ... def get_summary(self) -> Dict[str, Any]: """获取处理摘要""" if not self.statistics.start_time: return {"status": "未开始处理"} total_time = self.statistics.end_time - self.statistics.start_time avg_time_per_image = total_time / self.statistics.processed_images if self.statistics.processed_images > 0 else 0 # 计算每个类别的平均置信度 category_stats = {} for category, confidences in self.statistics.avg_confidence.items(): if confidences: category_stats[category] = { 'count': self.statistics.category_counts.get(category, 0), 'avg_confidence': sum(confidences) / len(confidences), 'min_confidence': min(confidences), 'max_confidence': max(confidences) } # 按检测数量排序 sorted_categories = sorted( category_stats.items(), key=lambda x: x[1]['count'], reverse=True ) summary = { "处理状态": "完成" if self.statistics.end_time else "进行中", "总图片数": self.statistics.total_images, "成功处理": self.statistics.processed_images - self.statistics.failed_images, "处理失败": self.statistics.failed_images, "成功率": f"{self.statistics.success_rate:.1f}%", "总检测数": self.statistics.total_detections, "总耗时": f"{total_time:.2f}秒", "平均每张耗时": f"{avg_time_per_image:.2f}秒", "类别统计": {cat: stats for cat, stats in sorted_categories}, "开始时间": datetime.fromtimestamp(self.statistics.start_time).strftime('%Y-%m-%d %H:%M:%S'), "结束时间": datetime.fromtimestamp(self.statistics.end_time).strftime('%Y-%m-%d %H:%M:%S') if self.statistics.end_time else "未完成" } return summary def export_to_csv(self, output_path: str, include_detections: bool = True): """ 导出结果到CSV文件 Args: output_path: 输出文件路径 include_detections: 是否包含详细的检测结果 """ with open(output_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # 写入摘要信息 writer.writerow(["YOLO12批量处理结果汇总"]) writer.writerow(["导出时间", datetime.now().strftime('%Y-%m-%d %H:%M:%S')]) writer.writerow([]) summary = self.get_summary() writer.writerow(["处理摘要"]) for key, value in summary.items(): if key != "类别统计": writer.writerow([key, value]) writer.writerow([]) writer.writerow(["详细检测结果"]) if include_detections: # 写入表头 writer.writerow(["文件名", "处理状态", "检测数量", "处理时间(秒)", "错误信息"]) for result in self.results: status = "成功" if result.success else "失败" error_msg = result.error if result.error else "" writer.writerow([ result.filename, status, len(result.detections), f"{result.processing_time:.3f}", error_msg ]) # 写入类别统计 writer.writerow([]) writer.writerow(["类别统计详情"]) writer.writerow(["类别", "检测数量", "平均置信度", "最小置信度", "最大置信度"]) for category, stats in summary.get("类别统计", {}).items(): writer.writerow([ category, stats['count'], f"{stats['avg_confidence']:.3f}", f"{stats['min_confidence']:.3f}", f"{stats['max_confidence']:.3f}" ]) print(f"结果已导出到: {output_path}") def export_detections_json(self, output_path: str): """导出详细的检测结果到JSON文件""" export_data = { "metadata": { "export_time": datetime.now().isoformat(), "processor": "YOLO12BatchProcessor", "confidence_threshold": self.confidence_threshold }, "summary": self.get_summary(), "results": [ { "filename": r.filename, "success": r.success, "detection_count": len(r.detections), "processing_time": r.processing_time, "error": r.error, "detections": r.detections if r.success else [] } for r in self.results ] } with open(output_path, 'w', encoding='utf-8') as f: json.dump(export_data, f, indent=2, ensure_ascii=False) print(f"详细结果已导出到: {output_path}") def print_statistics(self): """打印统计信息到控制台""" summary = self.get_summary() print("\n" + "="*60) print("YOLO12批量处理统计报告") print("="*60) print(f"\n 处理概览:") print(f" 总图片数: {summary['总图片数']}") print(f" 成功处理: {summary['成功处理']}") print(f" 处理失败: {summary['处理失败']}") print(f" 成功率: {summary['成功率']}") print(f" 总检测数: {summary['总检测数']}") print(f" 总耗时: {summary['总耗时']}") print(f" 平均每张: {summary['平均每张耗时']}") print(f"\n⏰ 时间信息:") print(f" 开始时间: {summary['开始时间']}") print(f" 结束时间: {summary['结束时间']}") print(f"\n 类别统计 (按数量排序):") for category, stats in summary['类别统计'].items(): print(f" {category}: {stats['count']}个 (平均置信度: {stats['avg_confidence']:.3f})") print("\n" + "="*60)4.3 高级功能:过滤与搜索
再添加一些实用功能,让这个类更加强大:
class YOLO12BatchProcessor: # ... 上面的所有代码 ... def filter_by_confidence(self, min_confidence: float = None, max_confidence: float = None) -> List[Dict]: """ 按置信度过滤检测结果 Args: min_confidence: 最小置信度阈值 max_confidence: 最大置信度阈值 Returns: 过滤后的检测结果列表 """ filtered = [] for result in self.results: if not result.success: continue for detection in result.detections: conf = detection.get('confidence', 0) # 应用过滤条件 if min_confidence is not None and conf < min_confidence: continue if max_confidence is not None and conf > max_confidence: continue filtered.append({ 'filename': result.filename, 'category': detection.get('class', 'unknown'), 'confidence': conf, 'bbox': detection.get('bbox', []), 'processing_time': result.processing_time }) # 按置信度排序 filtered.sort(key=lambda x: x['confidence'], reverse=True) return filtered def search_by_category(self, category: str, min_confidence: float = 0.0) -> List[Dict]: """ 按类别搜索检测结果 Args: category: 要搜索的类别名称 min_confidence: 最小置信度阈值 Returns: 匹配的检测结果列表 """ results = [] for result in self.results: if not result.success: continue for detection in result.detections: if detection.get('class', '').lower() == category.lower(): if detection.get('confidence', 0) >= min_confidence: results.append({ 'filename': result.filename, 'confidence': detection.get('confidence', 0), 'bbox': detection.get('bbox', []), 'processing_time': result.processing_time }) # 按置信度排序 results.sort(key=lambda x: x['confidence'], reverse=True) return results def get_images_with_category(self, category: str, min_count: int = 1) -> List[str]: """ 获取包含特定类别物体的图片文件名 Args: category: 类别名称 min_count: 最小数量要求 Returns: 符合条件的图片文件名列表 """ image_counts = {} for result in self.results: if not result.success: continue count = 0 for detection in result.detections: if detection.get('class', '').lower() == category.lower(): count += 1 if count >= min_count: image_counts[result.filename] = count # 按数量排序 sorted_images = sorted(image_counts.items(), key=lambda x: x[1], reverse=True) return [filename for filename, count in sorted_images]5. 实战应用:三种使用场景
现在我们的封装类已经功能齐全了,来看看在实际工作中怎么用它。
5.1 场景一:批量处理监控截图
假设你有一周的监控截图,想统计每天出现的人和车的数量:
# 场景一:监控截图分析 def analyze_surveillance_footage(): """分析监控截图,统计人和车的出现频率""" # 1. 创建处理器实例 processor = YOLO12BatchProcessor( api_url="http://localhost:8000", # YOLO12 API地址 confidence_threshold=0.3, # 提高阈值,减少误报 max_workers=8, # 增加并发数,加快处理速度 max_retries=2 ) # 2. 处理整个文件夹 print("开始分析监控截图...") stats = processor.process_folder( folder_path="./surveillance_footage", extensions=['.jpg', '.png'] ) # 3. 打印统计结果 processor.print_statistics() # 4. 导出详细结果 processor.export_to_csv("surveillance_analysis.csv") processor.export_detections_json("surveillance_detections.json") # 5. 特别关注人和车 print("\n 重点关注:") person_images = processor.get_images_with_category("person", min_count=3) car_images = processor.get_images_with_category("car", min_count=1) print(f" 包含3人以上的图片: {len(person_images)}张") print(f" 包含车辆的图片: {len(car_images)}张") # 6. 获取高置信度的人体检测 high_confidence_people = processor.filter_by_confidence(min_confidence=0.7) people_detections = processor.search_by_category("person", min_confidence=0.5) print(f"\n 详细统计:") print(f" 总人体检测数: {len(people_detections)}") print(f" 高置信度(>0.7)人体检测: {len(high_confidence_people)}") return processor5.2 场景二:智能相册自动标注
如果你有很多照片,想让YOLO12自动标注里面有什么:
# 场景二:智能相册标注 def auto_tag_photo_album(): """自动标注相册中的物体""" processor = YOLO12BatchProcessor( api_url="http://localhost:8000", confidence_threshold=0.25, # 标准阈值 max_workers=4 ) print("开始自动标注相册...") # 处理照片文件夹 stats = processor.process_folder("./photos") # 生成标注报告 summary = processor.get_summary() print(f"\n🏞 相册分析完成!") print(f" 共分析 {summary['总图片数']} 张照片") print(f" 识别到 {len(summary['类别统计'])} 种不同物体") # 找出最常见的物体 categories = summary['类别统计'] top_categories = sorted(categories.items(), key=lambda x: x[1]['count'], reverse=True)[:5] print("\n🏆 最常见的5种物体:") for i, (category, stats) in enumerate(top_categories, 1): print(f" {i}. {category}: {stats['count']}次") # 导出为相册标签文件 with open("photo_tags.json", "w", encoding="utf-8") as f: album_tags = {} for result in processor.results: if result.success and result.detections: # 提取图片中的主要物体 tags = list(set([det['class'] for det in result.detections])) album_tags[result.filename] = { "tags": tags, "detection_count": len(result.detections) } json.dump(album_tags, f, indent=2, ensure_ascii=False) print(f"\n 标签已保存到 photo_tags.json") return processor5.3 场景三:工业质检批量验证
在工业生产中,可以用YOLO12检测产品缺陷:
# 场景三:工业质检批量验证 def quality_inspection_batch(): """批量检测产品图片,找出缺陷产品""" processor = YOLO12BatchProcessor( api_url="http://192.168.1.100:8000", # 生产环境API confidence_threshold=0.6, # 高阈值,确保准确率 max_workers=2 # 生产环境谨慎使用高并发 ) print("开始产品质量检测...") # 这里假设我们已经训练了自定义的缺陷检测模型 # 替换了默认的COCO权重,可以检测"scratch"(划痕)、"crack"(裂纹)等 stats = processor.process_folder("./product_images") # 分析结果 defect_types = ["scratch", "crack", "dent", "stain"] defect_results = {} for defect in defect_types: defects = processor.search_by_category(defect, min_confidence=0.5) defect_results[defect] = { "count": len(defects), "files": list(set([d['filename'] for d in defects])) } print("\n🔧 质检报告:") total_defects = sum(result["count"] for result in defect_results.values()) total_images = stats.total_images print(f" 检测产品数: {total_images}") print(f" 发现缺陷总数: {total_defects}") print(f" 缺陷率: {total_defects/total_images*100:.1f}%" if total_images > 0 else "缺陷率: N/A") print("\n 缺陷分类:") for defect_type, result in defect_results.items(): if result["count"] > 0: print(f" {defect_type}: {result['count']}处 (涉及{len(result['files'])}个产品)") # 生成质检报告 with open("quality_report.csv", "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow(["产品ID", "文件名", "缺陷类型", "置信度", "是否合格"]) for result in processor.results: if result.success: has_defect = any( det.get('class') in defect_types for det in result.detections ) for detection in result.detections: if detection.get('class') in defect_types: writer.writerow([ result.filename.replace(".jpg", ""), result.filename, detection.get('class'), f"{detection.get('confidence', 0):.3f}", "不合格" if has_defect else "合格" ]) print(f"\n📄 详细报告已保存到 quality_report.csv") return processor, defect_results6. 性能优化与实用技巧
6.1 调整参数获得最佳性能
根据你的硬件和需求,可以调整这些参数:
# 性能优化配置示例 def optimize_for_different_scenarios(): """不同场景下的优化配置""" scenarios = { "快速测试": { "confidence_threshold": 0.25, "max_workers": 1, # 单线程,避免资源竞争 "max_retries": 1 # 快速失败 }, "生产环境": { "confidence_threshold": 0.5, # 高准确率 "max_workers": 2, # 稳定优先 "max_retries": 3 # 自动重试 }, "批量处理": { "confidence_threshold": 0.3, "max_workers": 8, # 高并发 "max_retries": 2 }, "边缘设备": { "confidence_threshold": 0.4, # 减少误报 "max_workers": 1, # 资源有限 "max_retries": 2 } } return scenarios6.2 错误处理与日志记录
在生产环境中,良好的错误处理和日志记录很重要:
import logging from logging.handlers import RotatingFileHandler class YOLO12BatchProcessorWithLogging(YOLO12BatchProcessor): """带日志记录的增强版处理器""" def __init__(self, *args, log_file: str = "yolo12_batch.log", **kwargs): super().__init__(*args, **kwargs) # 配置日志 self.logger = logging.getLogger("YOLO12BatchProcessor") self.logger.setLevel(logging.INFO) # 文件日志(自动轮转,最大10MB) file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5 ) file_handler.setLevel(logging.INFO) file_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler.setFormatter(file_formatter) # 控制台日志 console_handler = logging.StreamHandler() console_handler.setLevel(logging.WARNING) self.logger.addHandler(file_handler) self.logger.addHandler(console_handler) def process_batch(self, image_paths: List[str], show_progress: bool = True) -> BatchStatistics: """重写批量处理方法,添加日志""" self.logger.info(f"开始批量处理,图片数量: {len(image_paths)}") self.logger.info(f"配置: confidence={self.confidence_threshold}, workers={self.max_workers}") try: stats = super().process_batch(image_paths, show_progress) self.logger.info(f"批量处理完成: {stats.processed_images}成功, {stats.failed_images}失败") self.logger.info(f"总检测数: {stats.total_detections}, 成功率: {stats.success_rate:.1f}%") # 记录类别统计 for category, count in stats.category_counts.items(): self.logger.info(f"类别 {category}: {count}次检测") return stats except Exception as e: self.logger.error(f"批量处理失败: {str(e)}", exc_info=True) raise6.3 集成到现有系统
如果你想把YOLO12集成到现有的Python项目中:
# 集成示例 class ExistingSystemWithYOLO12: """现有系统集成YOLO12的示例""" def __init__(self): self.yolo_processor = YOLO12BatchProcessor( api_url="http://yolo12-api:8000", # Docker容器或远程API confidence_threshold=0.35 ) self.processed_cache = {} # 缓存处理结果 def process_image_with_cache(self, image_path: str) -> Dict: """带缓存的处理方法,避免重复处理""" import hashlib # 生成图片的哈希值作为缓存键 with open(image_path, 'rb') as f: file_hash = hashlib.md5(f.read()).hexdigest() # 检查缓存 if file_hash in self.processed_cache: print(f"使用缓存结果: {image_path}") return self.processed_cache[file_hash] # 处理新图片 print(f"处理新图片: {image_path}") result = self.yolo_processor.detect_single_image(image_path) # 缓存结果(只缓存成功的) if result.success: cache_entry = { 'filename': result.filename, 'detections': result.detections, 'hash': file_hash, 'timestamp': time.time() } self.processed_cache[file_hash] = cache_entry # 简单的缓存清理(最多保留1000个条目) if len(self.processed_cache) > 1000: # 删除最旧的条目 oldest_key = min(self.processed_cache.keys(), key=lambda k: self.processed_cache[k]['timestamp']) del self.processed_cache[oldest_key] return { 'success': result.success, 'detections': result.detections if result.success else [], 'error': result.error, 'cached': False } def batch_process_with_callback(self, image_paths: List[str], callback=None) -> List[Dict]: """批量处理,支持回调函数""" results = [] for i, image_path in enumerate(image_paths): result = self.process_image_with_cache(image_path) results.append(result) # 调用回调函数(如果有) if callback: callback({ 'index': i, 'total': len(image_paths), 'filename': image_path, 'result': result }) return results7. 总结
通过这个YOLO12批量处理封装类,我们把一个简单的API变成了一个强大的生产工具。让我们回顾一下它的核心价值:
7.1 解决了什么问题
- 批量处理自动化:从手动单张处理到自动批量处理,效率提升几十倍
- 错误处理智能化:自动重试、错误记录,让处理过程更稳定
- 结果聚合自动化:自动统计各类别数量、计算平均置信度,生成清晰报告
- 进度可视化:实时显示处理进度,让你随时知道任务状态
- 灵活导出:支持CSV、JSON多种格式,方便后续分析
7.2 实际应用价值
- 监控分析:自动分析大量监控截图,统计人车流量
- 相册管理:自动标注照片内容,建立智能相册
- 工业质检:批量检测产品缺陷,生成质检报告
- 数据标注:为机器学习项目自动生成标注数据
- 研究分析:大规模测试模型在不同场景下的表现
7.3 下一步建议
如果你想让这个工具更强大,可以考虑:
- 添加可视化功能:自动生成检测结果的可视化报告
- 支持视频处理:扩展支持视频帧的批量提取和检测
- 添加数据库支持:把结果保存到数据库,方便查询和分析
- 开发Web界面:做成一个Web应用,让非技术人员也能使用
- 集成更多模型:支持切换不同的YOLO版本或其他检测模型
这个封装类的代码是完全可用的,你可以直接复制到项目中,根据需求修改。它最大的优点就是实用——没有太多花哨的功能,但每个功能都能在实际工作中派上用场。
记住,好的工具不是功能最多的,而是用起来最顺手的。这个YOLO12批量处理器就是这样一个工具:简单、可靠、实用。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。