Qwen3-TTS-12Hz-1.7B-VoiceDesign批量处理技巧:高效生成大量语音内容
如果你正在用Qwen3-TTS-12Hz-1.7B-VoiceDesign做有声书、视频配音或者批量内容创作,肯定遇到过这样的问题:一次只能生成一段语音,效率太低了。手动一段一段处理,几百条内容得花上大半天时间,还容易出错。
其实,这个模型本身支持批量处理,只是官方文档里没怎么强调。今天我就来分享几个实战技巧,让你能一次性生成成百上千条语音,效率提升几十倍。这些方法都是我在实际项目中验证过的,从简单的脚本到复杂的任务调度,一步步带你掌握。
1. 为什么需要批量处理?
先说说批量处理到底能解决什么问题。假设你要做一本有声书,有50个章节,每个章节平均10段话。如果手动一段段生成,你得:
- 复制文本到界面
- 点击生成按钮
- 等待生成完成
- 下载音频文件
- 重命名文件
- 重复以上步骤500次
这还没算上中间可能出现的网络问题、生成失败、文件保存错误等等。整个过程枯燥又容易出错。
批量处理就是把所有这些步骤自动化,你只需要准备好文本文件,运行一个脚本,然后去喝杯咖啡,回来的时候所有音频都已经生成好了,文件名也自动整理得整整齐齐。
2. 环境准备与基础配置
在开始批量处理之前,先确保你的环境已经正确设置。如果你还没安装Qwen3-TTS,可以按照下面的步骤快速搭建。
2.1 安装基础环境
# 创建Python虚拟环境 conda create -n qwen3-tts-batch python=3.10 -y conda activate qwen3-tts-batch # 安装Qwen3-TTS pip install qwen-tts # 可选:安装FlashAttention加速(能提升30-40%速度) pip install flash-attn --no-build-isolation2.2 验证安装
创建一个简单的测试脚本,确保模型能正常工作:
import torch from qwen_tts import Qwen3TTSModel # 加载VoiceDesign模型 model = Qwen3TTSModel.from_pretrained( "Qwen/Qwen3-TTS-12Hz-1.7B-VoiceDesign", device_map="cuda:0", # 如果用CPU就改成"cpu" torch_dtype=torch.bfloat16, attn_implementation="flash_attention_2", # 如果装了FlashAttention ) print("模型加载成功!") print(f"设备:{model.device}") print(f"模型类型:{type(model).__name__}")如果这段代码能正常运行,说明环境配置没问题。接下来就可以开始批量处理的实战了。
3. 最简单的批量处理脚本
我们先从最基础的开始,写一个能批量处理多个文本的脚本。这个脚本虽然简单,但已经能解决80%的批量生成需求。
3.1 准备文本数据
创建一个文本文件,比如叫texts.txt,每行一段要生成语音的文本:
欢迎收听今天的科技新闻播报。 人工智能技术正在快速发展,改变我们的生活。 Qwen3-TTS提供了高质量的语音合成能力。 批量处理可以大幅提升工作效率。再创建一个声音描述文件descriptions.txt,每行对应一个声音描述:
沉稳的新闻播报员声音,语速适中,发音清晰 年轻活泼的科技博主声音,语速稍快,充满活力 专业的讲解员声音,语调平稳,富有权威感 亲切的助手声音,语速自然,听起来舒服3.2 基础批量处理脚本
import torch import soundfile as sf import os from pathlib import Path from qwen_tts import Qwen3TTSModel from tqdm import tqdm # 进度条库,方便查看进度 class SimpleBatchProcessor: def __init__(self, model_name="Qwen/Qwen3-TTS-12Hz-1.7B-VoiceDesign"): """初始化批量处理器""" print("正在加载模型...") self.model = Qwen3TTSModel.from_pretrained( model_name, device_map="cuda:0", torch_dtype=torch.bfloat16, attn_implementation="flash_attention_2", ) print("模型加载完成!") def process_batch(self, texts, descriptions, output_dir="output"): """ 批量处理文本 参数: texts: 文本列表 descriptions: 声音描述列表 output_dir: 输出目录 """ # 创建输出目录 os.makedirs(output_dir, exist_ok=True) # 检查输入长度 if len(texts) != len(descriptions): print("警告:文本和描述数量不一致,将使用循环匹配") # 使用进度条显示处理进度 for i, (text, desc) in enumerate(tqdm(zip(texts, descriptions), total=len(texts))): try: # 生成语音 wavs, sr = self.model.generate_voice_design( text=text, language="Chinese", # 根据实际语言修改 instruct=desc, ) # 保存音频文件 filename = f"output_{i+1:03d}.wav" output_path = os.path.join(output_dir, filename) sf.write(output_path, wavs[0], sr) # 同时保存文本和描述信息 info_path = os.path.join(output_dir, f"output_{i+1:03d}.txt") with open(info_path, 'w', encoding='utf-8') as f: f.write(f"文本:{text}\n") f.write(f"描述:{desc}\n") f.write(f"音频文件:{filename}\n") except Exception as e: print(f"处理第{i+1}条时出错:{e}") # 保存错误信息 error_path = os.path.join(output_dir, f"error_{i+1:03d}.txt") with open(error_path, 'w', encoding='utf-8') as f: f.write(f"文本:{text}\n") f.write(f"描述:{desc}\n") f.write(f"错误:{str(e)}\n") def from_files(self, text_file, desc_file, output_dir="output"): """从文件读取文本和描述进行批量处理""" # 读取文本文件 with open(text_file, 'r', encoding='utf-8') as f: texts = [line.strip() for line in f if line.strip()] # 读取描述文件 with open(desc_file, 'r', encoding='utf-8') as f: descriptions = [line.strip() for line in f if line.strip()] print(f"读取到 {len(texts)} 条文本,{len(descriptions)} 条描述") # 开始处理 self.process_batch(texts, descriptions, output_dir) # 使用示例 if __name__ == "__main__": # 创建处理器 processor = SimpleBatchProcessor() # 从文件批量处理 processor.from_files( text_file="texts.txt", desc_file="descriptions.txt", output_dir="batch_output" ) print("批量处理完成!")这个脚本的核心思路很简单:读取文本文件,循环处理每一行,生成语音并保存。加了进度条和错误处理,处理过程中能清楚看到进度,出错也不会中断整个流程。
4. 高级批量处理:任务调度与并发
当你要处理成百上千条语音时,简单的循环可能还不够快。这时候就需要考虑任务调度和并发处理了。
4.1 使用多进程加速
Python的多进程可以充分利用多核CPU,显著提升处理速度。下面是一个多进程版本的批量处理器:
import torch import soundfile as sf import os import multiprocessing as mp from pathlib import Path from functools import partial from tqdm import tqdm import json class MultiProcessBatchProcessor: def __init__(self, model_name="Qwen/Qwen3-TTS-12Hz-1.7B-VoiceDesign", num_workers=None): """ 多进程批量处理器 参数: model_name: 模型名称 num_workers: 进程数,默认使用CPU核心数 """ self.model_name = model_name self.num_workers = num_workers or mp.cpu_count() print(f"初始化多进程处理器,使用 {self.num_workers} 个进程") def _worker_init(self): """每个工作进程的初始化函数""" # 每个进程独立加载模型 global worker_model worker_model = Qwen3TTSModel.from_pretrained( self.model_name, device_map="cuda:0", torch_dtype=torch.bfloat16, ) def _process_single(self, task, output_dir): """处理单个任务""" idx, text, desc = task try: # 生成语音 wavs, sr = worker_model.generate_voice_design( text=text, language="Chinese", instruct=desc, ) # 保存音频 filename = f"output_{idx:04d}.wav" output_path = os.path.join(output_dir, filename) sf.write(output_path, wavs[0], sr) # 保存元数据 meta = { "index": idx, "text": text, "description": desc, "audio_file": filename, "status": "success" } meta_path = os.path.join(output_dir, f"output_{idx:04d}.json") with open(meta_path, 'w', encoding='utf-8') as f: json.dump(meta, f, ensure_ascii=False, indent=2) return (idx, "success", None) except Exception as e: # 保存错误信息 error_meta = { "index": idx, "text": text, "description": desc, "error": str(e), "status": "failed" } error_path = os.path.join(output_dir, f"error_{idx:04d}.json") with open(error_path, 'w', encoding='utf-8') as f: json.dump(error_meta, f, ensure_ascii=False, indent=2) return (idx, "failed", str(e)) def process_batch(self, texts, descriptions, output_dir="mp_output"): """多进程批量处理""" # 创建输出目录 os.makedirs(output_dir, exist_ok=True) # 准备任务列表 tasks = [(i+1, text, desc) for i, (text, desc) in enumerate(zip(texts, descriptions))] # 创建进程池 with mp.Pool( processes=self.num_workers, initializer=self._worker_init, initargs=() ) as pool: # 使用partial固定output_dir参数 process_func = partial(self._process_single, output_dir=output_dir) # 使用imap_unordered获取结果(顺序不重要) results = [] with tqdm(total=len(tasks), desc="批量处理进度") as pbar: for result in pool.imap_unordered(process_func, tasks): results.append(result) pbar.update(1) # 统计结果 success_count = sum(1 for r in results if r[1] == "success") failed_count = len(results) - success_count print(f"\n处理完成!成功:{success_count},失败:{failed_count}") if failed_count > 0: print("失败的任务:") for idx, status, error in results: if status == "failed": print(f" 任务 {idx}: {error}") return results # 使用示例 if __name__ == "__main__": # 准备测试数据 sample_texts = [f"这是第{i}条测试文本,用于批量语音生成。" for i in range(1, 101)] sample_descs = ["清晰的中文播音声音,语速适中"] * 100 # 创建多进程处理器 processor = MultiProcessBatchProcessor(num_workers=4) # 使用4个进程 # 开始批量处理 results = processor.process_batch( texts=sample_texts, descriptions=sample_descs, output_dir="multiprocess_output" )这个多进程版本的关键点:
- 每个进程独立加载模型:避免模型在进程间共享的问题
- 使用进程池:管理多个工作进程
- 进度条显示:即使多进程也能看到整体进度
- 完善的错误处理:失败的任务不会影响其他任务
4.2 任务队列与调度
对于更复杂的场景,比如要处理几千条数据,可能还需要考虑任务队列和调度。这里我用一个简单的基于文件的任务队列:
import json import time from datetime import datetime import threading from queue import Queue class TaskScheduler: """任务调度器""" def __init__(self, task_file, max_workers=4): """ 初始化调度器 参数: task_file: 任务配置文件 max_workers: 最大工作线程数 """ self.task_file = task_file self.max_workers = max_workers self.task_queue = Queue() self.results = [] self.lock = threading.Lock() # 加载任务 self.tasks = self._load_tasks() def _load_tasks(self): """从文件加载任务""" with open(self.task_file, 'r', encoding='utf-8') as f: tasks = json.load(f) print(f"加载了 {len(tasks)} 个任务") return tasks def _worker(self, worker_id): """工作线程函数""" # 每个线程独立加载模型 model = Qwen3TTSModel.from_pretrained( "Qwen/Qwen3-TTS-12Hz-1.7B-VoiceDesign", device_map="cuda:0", torch_dtype=torch.bfloat16, ) while True: try: # 获取任务 task = self.task_queue.get(timeout=1) if task is None: # 结束信号 break task_id, text, desc = task # 处理任务 start_time = time.time() wavs, sr = model.generate_voice_design( text=text, language="Chinese", instruct=desc, ) process_time = time.time() - start_time # 保存结果 result = { "task_id": task_id, "status": "success", "process_time": process_time, "timestamp": datetime.now().isoformat() } with self.lock: self.results.append(result) print(f"Worker {worker_id}: 完成任务 {task_id},耗时 {process_time:.2f}秒") except Exception as e: error_result = { "task_id": task_id if 'task_id' in locals() else "unknown", "status": "failed", "error": str(e), "timestamp": datetime.now().isoformat() } with self.lock: self.results.append(error_result) print(f"Worker {worker_id}: 任务失败,错误:{e}") finally: self.task_queue.task_done() def run(self, output_dir="scheduled_output"): """运行调度器""" # 创建输出目录 os.makedirs(output_dir, exist_ok=True) # 将任务加入队列 for task in self.tasks: self.task_queue.put(task) # 创建工作线程 threads = [] for i in range(self.max_workers): t = threading.Thread(target=self._worker, args=(i+1,)) t.start() threads.append(t) # 等待所有任务完成 self.task_queue.join() # 发送结束信号 for _ in range(self.max_workers): self.task_queue.put(None) # 等待所有线程结束 for t in threads: t.join() # 保存结果报告 report_path = os.path.join(output_dir, "processing_report.json") with open(report_path, 'w', encoding='utf-8') as f: json.dump(self.results, f, ensure_ascii=False, indent=2) # 统计信息 success_count = sum(1 for r in self.results if r["status"] == "success") failed_count = len(self.results) - success_count print(f"\n调度完成!") print(f"总任务数:{len(self.tasks)}") print(f"成功:{success_count}") print(f"失败:{failed_count}") if success_count > 0: avg_time = sum(r["process_time"] for r in self.results if r["status"] == "success") / success_count print(f"平均处理时间:{avg_time:.2f}秒") return self.results # 创建任务配置文件的示例 def create_task_config(num_tasks=50, output_file="tasks.json"): """创建示例任务配置文件""" tasks = [] for i in range(1, num_tasks + 1): task = [ i, # 任务ID f"这是第{i}个任务生成的语音内容,用于演示批量处理能力。", # 文本 "专业的中文讲解声音,语速适中,发音清晰" # 声音描述 ] tasks.append(task) with open(output_file, 'w', encoding='utf-8') as f: json.dump(tasks, f, ensure_ascii=False, indent=2) print(f"已创建任务配置文件:{output_file}") return tasks5. 资源管理与优化
批量处理大量语音时,资源管理很重要。显存不够、内存泄漏、文件太多等问题都需要提前考虑。
5.1 显存优化技巧
class OptimizedBatchProcessor: """优化资源使用的批量处理器""" def __init__(self): self.model = None self.current_batch = 0 def load_model(self, use_optimization=True): """加载模型并应用优化""" kwargs = { "pretrained_model_name_or_path": "Qwen/Qwen3-TTS-12Hz-1.7B-VoiceDesign", "device_map": "cuda:0", "torch_dtype": torch.bfloat16, # 使用bfloat16节省显存 } if use_optimization: kwargs.update({ "attn_implementation": "flash_attention_2", # 使用FlashAttention "low_cpu_mem_usage": True, # 减少CPU内存使用 }) self.model = Qwen3TTSModel.from_pretrained(**kwargs) # 启用推理模式 self.model.eval() # 如果有多个GPU,可以设置模型并行 if torch.cuda.device_count() > 1: print(f"检测到 {torch.cuda.device_count()} 个GPU,启用模型并行") self.model.parallelize() def process_with_memory_control(self, texts, descriptions, batch_size=4): """ 分批处理,控制显存使用 参数: batch_size: 每批处理的数量,根据显存调整 """ total = len(texts) results = [] for i in range(0, total, batch_size): batch_texts = texts[i:i+batch_size] batch_descs = descriptions[i:i+batch_size] print(f"处理批次 {i//batch_size + 1}/{(total + batch_size - 1)//batch_size}") batch_results = [] for text, desc in zip(batch_texts, batch_descs): try: with torch.no_grad(): # 禁用梯度计算,节省显存 wavs, sr = self.model.generate_voice_design( text=text, language="Chinese", instruct=desc, ) batch_results.append((text, wavs[0], sr, "success")) except torch.cuda.OutOfMemoryError: print("显存不足,尝试清理缓存") torch.cuda.empty_cache() batch_results.append((text, None, None, "out_of_memory")) except Exception as e: batch_results.append((text, None, None, f"error: {str(e)}")) results.extend(batch_results) # 每处理完一批,清理缓存 torch.cuda.empty_cache() time.sleep(0.5) # 稍微休息一下 return results def cleanup(self): """清理资源""" if self.model is not None: del self.model self.model = None torch.cuda.empty_cache() print("资源已清理")5.2 文件管理策略
生成大量音频文件时,好的文件管理策略能让后续处理方便很多:
import shutil from datetime import datetime class FileManager: """文件管理器""" def __init__(self, base_dir="batch_output"): self.base_dir = base_dir self.session_id = datetime.now().strftime("%Y%m%d_%H%M%S") self.session_dir = os.path.join(base_dir, self.session_id) # 创建目录结构 self._create_directory_structure() def _create_directory_structure(self): """创建标准化的目录结构""" directories = { "audio": "音频文件", "text": "原始文本", "metadata": "元数据", "logs": "日志文件", "errors": "错误文件", "temp": "临时文件" } for key, dir_name in directories.items(): path = os.path.join(self.session_dir, dir_name) os.makedirs(path, exist_ok=True) setattr(self, f"{key}_dir", path) print(f"创建会话目录:{self.session_dir}") def save_audio(self, audio_data, sample_rate, filename, metadata=None): """保存音频文件""" # 生成唯一文件名 timestamp = datetime.now().strftime("%H%M%S") unique_name = f"{filename}_{timestamp}.wav" audio_path = os.path.join(self.audio_dir, unique_name) # 保存音频 sf.write(audio_path, audio_data, sample_rate) # 保存元数据 if metadata: meta_path = os.path.join(self.metadata_dir, f"{filename}_{timestamp}.json") with open(meta_path, 'w', encoding='utf-8') as f: json.dump(metadata, f, ensure_ascii=False, indent=2) return audio_path def archive_session(self, format="zip"): """归档整个会话""" archive_name = f"{self.session_dir}.{format}" if format == "zip": shutil.make_archive(self.session_dir, 'zip', self.session_dir) print(f"已归档:{archive_name}") return archive_name else: print(f"不支持的归档格式:{format}") return None def generate_report(self, results): """生成处理报告""" report = { "session_id": self.session_id, "start_time": datetime.now().isoformat(), "total_tasks": len(results), "success_count": sum(1 for r in results if r[3] == "success"), "failed_count": sum(1 for r in results if r[3] != "success"), "details": [] } for text, audio, sr, status in results: detail = { "text": text[:100] + "..." if len(text) > 100 else text, "status": status, "has_audio": audio is not None } report["details"].append(detail) report_path = os.path.join(self.logs_dir, "processing_report.json") with open(report_path, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) print(f"报告已生成:{report_path}") return report6. 错误处理与重试机制
批量处理中难免会遇到各种错误,好的错误处理机制能让流程更稳定。
6.1 智能重试策略
import time from functools import wraps def retry_on_failure(max_retries=3, delay=1, backoff=2): """ 重试装饰器 参数: max_retries: 最大重试次数 delay: 初始延迟(秒) backoff: 延迟倍数 """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): last_exception = None current_delay = delay for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_retries - 1: print(f"第{attempt + 1}次尝试失败,{current_delay}秒后重试... 错误:{str(e)[:100]}") time.sleep(current_delay) current_delay *= backoff else: print(f"所有{max_retries}次尝试都失败了") raise last_exception return wrapper return decorator class RobustBatchProcessor: """带重试机制的批量处理器""" def __init__(self): self.model = None @retry_on_failure(max_retries=3, delay=2) def generate_with_retry(self, text, description): """带重试的生成函数""" if self.model is None: self._load_model() return self.model.generate_voice_design( text=text, language="Chinese", instruct=description, ) def _load_model(self): """加载模型""" print("正在加载模型...") self.model = Qwen3TTSModel.from_pretrained( "Qwen/Qwen3-TTS-12Hz-1.7B-VoiceDesign", device_map="cuda:0", torch_dtype=torch.bfloat16, ) print("模型加载完成") def process_with_error_recovery(self, tasks): """带错误恢复的处理""" results = [] error_log = [] for task_id, text, desc in tasks: try: # 尝试生成 wavs, sr = self.generate_with_retry(text, desc) results.append((task_id, wavs[0], sr, "success")) except Exception as e: # 记录错误 error_info = { "task_id": task_id, "text": text, "description": desc, "error": str(e), "timestamp": datetime.now().isoformat() } error_log.append(error_info) # 尝试简化任务重试 print(f"任务{task_id}失败,尝试简化处理...") try: # 简化描述重试 simple_desc = "清晰的中文发音" wavs, sr = self.generate_with_retry(text, simple_desc) results.append((task_id, wavs[0], sr, "recovered")) print(f"任务{task_id}恢复成功") except: results.append((task_id, None, None, "failed")) print(f"任务{task_id}最终失败") return results, error_log6.2 监控与日志
import logging from logging.handlers import RotatingFileHandler class BatchProcessingMonitor: """批量处理监控器""" def __init__(self, log_dir="logs"): self.log_dir = log_dir os.makedirs(log_dir, exist_ok=True) # 设置日志 self._setup_logging() # 监控数据 self.metrics = { "start_time": time.time(), "processed_count": 0, "success_count": 0, "failed_count": 0, "total_audio_duration": 0, "errors": [] } def _setup_logging(self): """配置日志系统""" log_file = os.path.join(self.log_dir, f"batch_process_{datetime.now().strftime('%Y%m%d')}.log") # 创建logger self.logger = logging.getLogger("BatchProcessor") self.logger.setLevel(logging.INFO) # 文件处理器(自动轮转) file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5 ) file_handler.setLevel(logging.INFO) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # 格式 formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) self.logger.addHandler(file_handler) self.logger.addHandler(console_handler) def log_task_start(self, task_id, text_preview): """记录任务开始""" self.logger.info(f"开始处理任务 {task_id}: {text_preview[:50]}...") def log_task_success(self, task_id, duration, audio_length): """记录任务成功""" self.metrics["processed_count"] += 1 self.metrics["success_count"] += 1 self.metrics["total_audio_duration"] += audio_length self.logger.info(f"任务 {task_id} 成功完成,处理时间:{duration:.2f}s,音频长度:{audio_length:.2f}s") def log_task_failure(self, task_id, error): """记录任务失败""" self.metrics["processed_count"] += 1 self.metrics["failed_count"] += 1 self.metrics["errors"].append({ "task_id": task_id, "error": str(error), "time": datetime.now().isoformat() }) self.logger.error(f"任务 {task_id} 失败:{error}") def generate_summary(self): """生成处理摘要""" total_time = time.time() - self.metrics["start_time"] summary = { "session_duration": total_time, "total_tasks": self.metrics["processed_count"], "success_rate": self.metrics["success_count"] / max(self.metrics["processed_count"], 1), "total_audio_duration": self.metrics["total_audio_duration"], "average_time_per_task": total_time / max(self.metrics["processed_count"], 1), "error_count": len(self.metrics["errors"]) } self.logger.info("处理摘要:" + json.dumps(summary, ensure_ascii=False, indent=2)) return summary7. 实战案例:有声书批量生成
最后,我们来看一个完整的实战案例——批量生成有声书。
7.1 项目结构
audiobook_project/ ├── chapters/ # 章节文本 │ ├── chapter_01.txt │ ├── chapter_02.txt │ └── ... ├── voices/ # 声音配置 │ ├── narrator.json # 旁白声音描述 │ ├── male_chars.json # 男性角色声音 │ └── female_chars.json # 女性角色声音 ├── scripts/ # 处理脚本 ├── output/ # 输出目录 └── config.yaml # 配置文件7.2 配置文件示例
# config.yaml project: name: "科幻小说有声书" language: "Chinese" output_format: "wav" sample_rate: 24000 voices: narrator: description: "沉稳的男性旁白声音,语速适中,富有感染力" model: "Qwen/Qwen3-TTS-12Hz-1.7B-VoiceDesign" protagonist: description: "年轻的男性主角声音,充满活力,略带紧张感" model: "Qwen/Qwen3-TTS-12Hz-1.7B-VoiceDesign" female_lead: description: "温柔的女性角色声音,语调柔和,情感丰富" model: "Qwen/Qwen3-TTS-12Hz-1.7B-VoiceDesign" processing: batch_size: 8 max_workers: 4 retry_attempts: 3 output_dir: "output/audiobook"7.3 主处理脚本
import yaml import os from pathlib import Path class AudiobookGenerator: """有声书生成器""" def __init__(self, config_path="config.yaml"): # 加载配置 with open(config_path, 'r', encoding='utf-8') as f: self.config = yaml.safe_load(f) # 初始化组件 self.file_manager = FileManager( base_dir=self.config['processing']['output_dir'] ) self.monitor = BatchProcessingMonitor( log_dir=os.path.join(self.file_manager.session_dir, "logs") ) # 加载模型 self.models = self._load_models() def _load_models(self): """加载所有需要的模型""" models = {} voice_configs = self.config['voices'] for voice_name, config in voice_configs.items(): print(f"加载声音模型:{voice_name}") model = Qwen3TTSModel.from_pretrained( config['model'], device_map="cuda:0", torch_dtype=torch.bfloat16, attn_implementation="flash_attention_2", ) models[voice_name] = model return models def process_chapter(self, chapter_file, voice_type="narrator"): """处理单个章节""" chapter_name = Path(chapter_file).stem print(f"处理章节:{chapter_name}") # 读取章节文本 with open(chapter_file, 'r', encoding='utf-8') as f: lines = [line.strip() for line in f if line.strip()] # 获取对应的模型和声音描述 model = self.models[voice_type] voice_desc = self.config['voices'][voice_type]['description'] # 批量处理段落 results = [] for i, line in enumerate(lines): self.monitor.log_task_start(f"{chapter_name}_{i}", line) try: start_time = time.time() # 生成语音 wavs, sr = model.generate_voice_design( text=line, language=self.config['project']['language'], instruct=voice_desc, ) process_time = time.time() - start_time audio_duration = len(wavs[0]) / sr # 保存文件 filename = f"{chapter_name}_para_{i+1:03d}" metadata = { "chapter": chapter_name, "paragraph": i+1, "text": line, "voice": voice_type, "process_time": process_time } audio_path = self.file_manager.save_audio( wavs[0], sr, filename, metadata ) self.monitor.log_task_success( f"{chapter_name}_{i}", process_time, audio_duration ) results.append({ "success": True, "audio_path": audio_path, "metadata": metadata }) except Exception as e: self.monitor.log_task_failure(f"{chapter_name}_{i}", str(e)) results.append({ "success": False, "error": str(e), "text": line }) return results def generate_full_audiobook(self, chapters_dir="chapters"): """生成完整的有声书""" print("开始生成有声书...") # 获取所有章节文件 chapter_files = sorted(Path(chapters_dir).glob("*.txt")) all_results = [] for chapter_file in chapter_files: chapter_results = self.process_chapter( chapter_file, voice_type="narrator" # 默认使用旁白声音 ) all_results.extend(chapter_results) # 生成最终报告 summary = self.monitor.generate_summary() # 创建播放列表 self._create_playlist(all_results) print(f"有声书生成完成!共处理 {len(all_results)} 个段落") return all_results def _create_playlist(self, results): """创建播放列表文件""" playlist = [] for result in results: if result["success"]: playlist.append({ "file": result["audio_path"], "metadata": result["metadata"] }) playlist_file = os.path.join( self.file_manager.session_dir, "playlist.json" ) with open(playlist_file, 'w', encoding='utf-8') as f: json.dump(playlist, f, ensure_ascii=False, indent=2) print(f"播放列表已创建:{playlist_file}") return playlist_file # 使用示例 if __name__ == "__main__": # 创建生成器 generator = AudiobookGenerator("config.yaml") # 生成完整有声书 results = generator.generate_full_audiobook("chapters") # 归档结果 generator.file_manager.archive_session() print("项目完成!")8. 总结
批量处理Qwen3-TTS-12Hz-1.7B-VoiceDesign生成语音,核心是要做好任务管理、资源优化和错误处理。从简单的循环脚本到复杂的多进程调度,关键是找到适合自己需求的方案。
实际用下来,对于几百条的小批量任务,用基础的多进程脚本就足够了。如果要处理成千上万条,或者需要长时间稳定运行,那就需要考虑更完善的任务队列和监控系统。
资源管理方面,显存是最需要关注的。根据你的显卡情况调整批量大小,用好bfloat16和FlashAttention这些优化技巧,能明显提升处理效率。
错误处理也很重要,特别是网络不稳定或者显存不足的时候。好的重试机制和错误恢复策略,能让批量处理流程更稳定可靠。
最后,文件管理和日志记录虽然看起来是小事,但在实际项目中能省去很多麻烦。标准化的目录结构、完整的处理日志,能让后续的查找和问题排查方便很多。
如果你刚开始尝试批量处理,建议先从简单的脚本开始,慢慢增加功能。遇到问题多看看日志,根据实际情况调整参数。批量处理一旦跑顺了,效率提升是非常明显的。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。