Heygem数字人系统并发控制:任务队列管理避免资源冲突
1. 引言
1.1 业务场景描述
Heygem 数字人视频生成系统是一款基于 AI 技术的口型同步视频合成工具,广泛应用于虚拟主播、在线教育、企业宣传等场景。随着用户对批量处理能力的需求日益增长,系统在高并发任务下的稳定性与资源调度效率成为关键挑战。
在实际使用中,用户常需将同一段音频驱动多个数字人形象生成个性化视频。若缺乏有效的任务调度机制,直接并行执行多个生成任务极易导致 GPU 显存溢出、CPU 资源争抢、磁盘 I/O 阻塞等问题,最终引发任务失败或系统崩溃。
1.2 痛点分析
原始版本的 WebUI 在多任务提交时存在以下问题:
- 无排队机制:多个任务同时触发,模型加载频繁,造成显存抖动。
- 资源竞争严重:FFmpeg 视频编解码、特征提取和渲染阶段共享文件路径,易产生读写冲突。
- 缺乏状态管理:任务中断后无法恢复,历史记录丢失。
- 用户体验差:进度不可控,前端无反馈,用户误以为“卡死”。
1.3 方案预告
本文介绍为 Heygem 批量版 WebUI 增加的任务队列管理系统,通过引入轻量级任务队列架构实现:
- 任务有序执行,避免资源抢占
- 支持暂停、继续、清空队列操作
- 实时进度同步至前端界面
- 错误自动捕获与日志追踪
该方案已在生产环境中稳定运行,显著提升系统鲁棒性与用户体验。
2. 技术方案选型
2.1 可选方案对比
| 方案 | 优点 | 缺点 | 适用性 |
|---|---|---|---|
| 多线程 + Lock | 实现简单,开销小 | 容易死锁,难以扩展 | 小规模任务 |
| Redis + RQ | 成熟队列系统,支持持久化 | 需额外部署服务 | 分布式环境 |
| Celery + Broker | 功能强大,支持定时任务 | 结构复杂,依赖多 | 大型项目 |
| 内存队列 + 协程 | 轻量高效,无需外部依赖 | 断电数据丢失 | 本地单机应用 |
考虑到 Heygem 系统当前为单机部署模式,且目标是快速集成、低侵入改造,我们选择内存队列 + 主循环协程调度的方式,在保留原有架构基础上增强并发控制能力。
2.2 最终技术栈
- 任务队列:Python
queue.Queue(线程安全) - 调度器:独立后台线程运行主循环
- 状态通信:全局状态字典 + 回调函数通知 UI
- 异常处理:任务级 try-except 包裹,错误信息回传
- 持久化:任务完成后自动保存元数据到 JSON 文件
3. 实现步骤详解
3.1 环境准备
确保已安装核心依赖库:
pip install gradio==3.50.2 pillow numpy opencv-python ffmpeg-python修改start_app.sh启动脚本以启用后台调度线程:
#!/bin/bash export PYTHONPATH=$(pwd) python app.py --server_port=7860 --no_gradio_queue注意:禁用 Gradio 自带队列,防止与自定义队列冲突。
3.2 核心代码结构
项目目录结构调整如下:
heygem/ ├── app.py # 主入口 ├── task_queue.py # 任务队列模块 ├── processor.py # 视频处理逻辑 ├── webui.py # UI 构建 └── outputs/ # 输出目录3.3 任务队列模块设计
创建task_queue.py,定义任务类型与队列控制器:
import queue import threading import time import json from typing import Dict, Any, Callable class Task: def __init__(self, task_id: str, audio_path: str, video_path: str, callback: Callable): self.task_id = task_id self.audio_path = audio_path self.video_path = video_path self.callback = callback # 更新UI的回调 self.status = "pending" # pending, running, success, failed self.progress = 0 self.result_path = None self.error_msg = None def to_dict(self) -> Dict[str, Any]: return { "task_id": self.task_id, "status": self.status, "progress": self.progress, "result_path": self.result_path, "error_msg": self.error_msg, "video_name": self.video_path.split("/")[-1] } class TaskQueueManager: def __init__(self): self.queue = queue.Queue() self.running = False self.current_task = None self.lock = threading.Lock() self.history = [] # 存储已完成任务 def add_task(self, task: Task): with self.lock: self.queue.put(task) task.status = "queued" task.callback("add", task.to_dict()) def start(self): if not self.running: self.running = True thread = threading.Thread(target=self._process_loop, daemon=True) thread.start() def stop(self): self.running = False def _process_loop(self): while self.running: try: task = self.queue.get(timeout=1) with self.lock: self.current_task = task task.status = "running" task.callback("update", task.to_dict()) # 执行具体处理(模拟耗时操作) from processor import generate_talking_head try: result_path = generate_talking_head(task.audio_path, task.video_path) task.result_path = result_path task.status = "success" task.progress = 100 except Exception as e: task.status = "failed" task.error_msg = str(e) task.progress = 0 # 回调更新UI task.callback("update", task.to_dict()) # 加入历史 self.history.append(task.to_dict()) self.queue.task_done() except queue.Empty: continue except Exception as e: print(f"[Error] Task loop error: {e}") continue3.4 处理逻辑封装
processor.py中实现核心生成逻辑(简化版):
import time import os import random def generate_talking_head(audio_path: str, video_path: str) -> str: """模拟数字人视频生成过程""" print(f"Processing {video_path} with {audio_path}") # 模拟分步处理 steps = ["load_model", "extract_audio_features", "sync_lip", "render_video", "encode_output"] for i, step in enumerate(steps): time.sleep(1) # 模拟每步耗时 progress = int((i + 1) / len(steps) * 100) print(f"Progress: {progress}%") # 模拟输出路径 filename = f"{os.path.basename(video_path).split('.')[0]}_talk.mp4" output_path = f"./outputs/{filename}" # 创建空文件表示完成 open(output_path, 'w').close() return output_path3.5 WebUI 集成与状态同步
在webui.py中构建界面并与队列交互:
import gradio as gr from task_queue import TaskQueueManager import json task_manager = TaskQueueManager() def update_ui(action: str, data: dict): """接收任务状态变化并触发UI更新""" global task_history if action == "add": task_history = [data] + task_history elif action == "update": for i, t in enumerate(task_history): if t["task_id"] == data["task_id"]: task_history[i] = data break def create_batch_interface(): global task_history task_history = [] with gr.Blocks() as demo: gr.Markdown("# HeyGem 批量数字人视频生成") with gr.Tab("批量处理"): with gr.Row(): with gr.Column(scale=1): audio_input = gr.Audio(label="上传音频文件") video_files = gr.File(label="上传多个视频", file_count="multiple") btn_start = gr.Button("开始批量生成") with gr.Column(scale=2): progress_bar = gr.Slider(label="整体进度", value=0, maximum=100, interactive=False) current_task_name = gr.Textbox(label="当前处理") result_gallery = gr.Gallery(label="生成结果历史").style(columns=3) def start_batch_generation(audio, videos): if not audio or not videos: return "请先上传音频和视频!" audio_path = audio.name for idx, video in enumerate(videos): task_id = f"task_{int(time.time())}_{idx}" task = Task(task_id, audio_path, video.name, update_ui) task_manager.add_task(task) task_manager.start() return "任务已加入队列,正在处理..." btn_start.click( fn=start_batch_generation, inputs=[audio_input, video_files], outputs=[] ) # 定期刷新结果 def refresh_results(): return [[t["result_path"], t["video_name"]] for t in task_history if t["status"]=="success"] demo.load(fn=refresh_results, outputs=result_gallery, every=2) return demo3.6 实践问题与优化
问题1:Gradio 页面刷新阻塞
现象:使用every=2定时刷新时,长时间任务会导致页面卡顿。
解决方案:改用 WebSocket 主动推送机制,或采用局部组件更新而非全页重载。
问题2:任务中断后无法恢复
现象:重启服务后队列清空,未完成任务丢失。
优化措施:增加任务持久化层,将待处理任务序列化存储至本地 JSON 文件:
import atexit import signal # 保存队列状态 def save_queue_state(): pending_tasks = [] while not task_manager.queue.empty(): task = task_manager.queue.get() pending_tasks.append(task.to_dict()) json.dump(pending_tasks, open("queue_backup.json", "w")) atexit.register(save_queue_state) # 启动时恢复 if os.path.exists("queue_backup.json"): tasks = json.load(open("queue_backup.json")) for t in tasks: # 重建任务对象并重新加入队列 pass问题3:GPU 显存不足
现象:连续处理高清视频时出现 CUDA Out of Memory。
优化建议:
- 添加
torch.cuda.empty_cache()在任务结束后释放缓存 - 设置最大并发数限制(如最多同时处理 2 个任务)
- 提供“低内存模式”选项,启用帧抽样降负载
4. 总结
4.1 实践经验总结
通过本次二次开发,我们在不改变 Heygem 原有功能的前提下,成功实现了任务队列管理机制,解决了多任务并发带来的资源冲突问题。主要收获包括:
- 稳定性提升:任务按序执行,避免了模型重复加载和资源争抢。
- 用户体验改善:实时进度展示让用户清晰掌握处理状态。
- 可维护性增强:统一的任务生命周期管理便于调试与监控。
4.2 最佳实践建议
- 始终包裹任务执行体:每个任务都应使用 try-except 捕获异常,防止调度线程崩溃。
- 合理设置超时机制:对长时间无响应的任务进行强制终止。
- 提供手动干预接口:允许管理员暂停、跳过或重试特定任务。
- 日志分级记录:区分 INFO、WARNING、ERROR 日志,便于排查问题。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。