开篇:语音聊天到底难在哪
“对着手机说一句,对方秒回”听起来简单,背后却是一条超长链路:麦克风采集 → 前端编码 → 网络传输 → 云端 ASR → LLM 推理 → TTS → 音频回传 → 播放器渲染。任何一环掉链子,用户就会吐槽“卡顿、延迟、机器人味儿”。
用 com.google.genai 做语音聊天,核心挑战可以浓缩成三点:
- 音频流是“胖数据”,64 kbit/s 的 Opus 单路 20 ms 帧,每秒就要 50 个包,网络抖一下就直接“炸麦”。
- 全双工场景下,ASR、LLM、TTS 三个模型要并行跑,还要共享上下文,资源调度比传统 HTTP 请求复杂一个量级。
- Google 的 API 默认走全球负载均衡,第一次 TLS 握手 + OAuth 刷新就可能 300 ms,不加优化直接输在起跑线。
下面把我踩过的坑、调优脚本、线上配置一条线捋清,让你少熬两周夜。
接入方式选哪家:gRPC vs REST vs WebSocket
com.google.genai 对外暴露三套端口:
| 协议 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| gRPC (HTTP/2) | 自带流式、多路复用、官方 Python/Java SDK 原生支持 | 端口 443 需允许 HTTP/2,部分老旧代理会降级 | 低延迟双向流,生产首选 |
| REST (JSON) | 调试简单,curl 一把梭 | 无服务器推送,只能轮询,延迟>500 ms | 后台离线批处理 |
| WebSocket | 浏览器直接开麦 | 需要自己做帧同步、重连、指数退避 | H5 网页 Demo |
结论:
- 终端到服务器走 gRPC 流式;
- 纯后台任务(例如把 1 万小时录音批量转文字)用 REST;
- WebSocket 留给快速原型,上线前务必迁到 gRPC。
环境配置 3 步走
- 开通服务
Cloud Console → Vertex AI → “Generative AI” → 勾选 “Speech/LLM/TTS” API,记下 Project ID。 - 建服务账号
IAM & Admin → Service Accounts → 新建genai-voice-chat→ 角色Vertex AI User→ 下载 JSON。 - 装 SDK
把刚才的 JSON 路径写进环境变量,后面代码会自动卷:# Python 3.10+ 虚拟环境 pip install google-cloud-aiplatform==1.38.0 google-genai==0.3.0export GOOGLE_APPLICATION_CREDENTIALS=/secure/genai-voice-chat.json
音频流处理:Python 示例(带注释)
下面这段代码演示“麦克风 → 实时 ASR → LLM → TTS → 扬声器”全双工回路,单线程异步,方便你插到 asyncio 框架里。关键逻辑:
- 用
pyaudio以 20 ms 帧喂给 gRPC; - ASR 返回
is_final后触发 LLM; - LLM 每输出一个 sentence 就调用 TTS,TTS 返回的音频流直接塞进
pyaudio输出缓冲区; - 全程 ring-buffer 缓存,网络抖动时自动补包。
import asyncio, pyaudio, logging, time from google.api_core import retry from google.genai import speech, llm, tts FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 16000 CHUNK = 320 # 20 ms class VoiceChat: def __init__(self): self.speech_client = speech.SpeechClient() self.llm_client = llm.LLMClient() self.tts_client = tts.TextToSpeechClient() self.audio = pyaudio.PyAudio() self.in_stream = self.audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) self.out_stream = self.audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, output=True, frames_per_buffer=CHUNK) async def listen(self): """Producer:把麦克风帧推给 ASR 流""" config = speech.RecognitionConfig( encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, sample_rate_hertz=RATE, language_code="en-US", enable_automatic_punctuation=True, ) streaming_config = speech.StreamingRecognitionConfig( config=config, interim_results=True ) # 双向流 requests = self.audio_request_generator() responses = self.speech_client.streaming_recognize( requests, timeout=300 ) async for response in responses: for result in response.results: if result.is_final: transcript = result.alternatives[0].transcript logging.info("ASR: %s", transcript) # 直接调度 LLM,不阻塞 asyncio.create_task(self.think_and_speak(transcript)) async def audio_request_generator(self): """异步生成器,yield 音频帧""" while True: data = await asyncio.to_thread(self.in_stream.read, CHUNK) yield speech.StreamingRecognizeRequest(audio_content=data) async def think_and_speak(self, transcript): """Consumer:LLM + TTS""" prompt = f"User: {transcript}\nAssistant:" # 流式 LLM,返回 sentence 级切片 llm_stream = self.llm_client.predict_stream( model="gemini-pro", prompt=prompt, max_tokens=150 ) assistant_text = "" async for piece in llm_stream: assistant_text += piece # 简单断句,遇到句号就发 TTS if piece.endswith((".", "!", "?")): await self.speak(assistant_text) assistant_text = "" @retry.Retry(predicate=retry.if_transient_error) async def speak(self, text): """TTS 并播放""" tts_resp = await asyncio.to_thread( self.tts_client.synthesize_speech, input=tts.SynthesisInput(text=text), voice=tts.VoiceSelectionParams( language_code="en-US", name="en-US-Wavenet-D" ), audio_config=tts.AudioConfig( audio_encoding=tts.AudioEncoding.LINEAR16 ), ) # 直接写扬声器 await asyncio.to_thread(self.out_stream.write, tts_resp.audio_content) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) vc = VoiceChat() asyncio.run(vc.listen())时间复杂度:
- ASR 流式识别为 O(n) 帧级增量,n 为音频采样点数;
- LLM 生成 O(m) token,m<150 时平均延迟 180 ms;
- TTS 实时因子 0 < RTF < 0.3,整体链路延迟 P95 可压到 600 ms(见下节指标)。
Java 双工示例(Android 端)
Android 官方 Sample 已经封装好AudioRecord+ gRPC,这里只贴关键片段:
// proto 双向流 StreamObserver<StreamingRecognizeRequest> requestObserver = speechStub.streamingRecognize(new StreamObserver<>() { @Override public void onNext(StreamingRecognizeResponse resp) { if (resp.getResultsCount() > 0 && resp.getResults(0).getIsFinal()) { String txt = resp.getResults(0) .getAlternatives(0) .getTranscript(); // 切换到 UI 线程 runOnUiThread(() -> sendToLLM(txt)); } } ... }); // 麦克风循环 while (recording) { short[] buf = new short[320]; audioRecord.read(buf, 0, 320); requestObserver.onNext( StreamingRecognizeRequest.newBuilder() .setAudioContent(ByteString.copyFrom(short2bytes(buf))) .build()); }要点:
- 用
audioRecord.getTimestamp打 WallClock,方便后端做漂移校准; - gRPC 通道加
keepAliveWithoutCalls=true(),防止 NAT 超时断流。
性能指标与优化清单
延迟拆解(实测 Pixel 6 + Wi-Fi 6,美国西海岸 endpoint)
- 麦克风采集 + 前端编码:20 ms
- 网络 RTT:40 ms
- ASR 首帧响应:120 ms
- LLM 首 token:80 ms
- TTS 首包:100 ms
- 播放器缓冲:60 ms
合计 420 ms,P95 600 ms,达到“准实时”门槛。
并发模型
单核 Gemini Pro 可支撑 120 QPS(Query Per Second);若每通对话平均 7 轮,则 1 vCPU ≈ 17 路并发。生产建议:- K8s HPA 按 CPU 70% 扩容;
- 把 ASR/TTS 与 LLM 拆成独立 Pod,避免互相挤占。
省流技巧
- 启用
voice_activity_detection,静音段直接丢包,省 30% 流量; - TTS 选
MP3_64K比LINEAR16小 4 倍,解码 CPU 增加 <5%,移动端更划算; - gRPC 打开
gzip压缩,文本 payload 可再降 60%。
- 启用
生产环境注意事项
认证管理
- 把服务账号 JSON 挂进 K8s Secret,不要打包进镜像;
每 12 小时调用auth.refresh(),防止 401 风暴。
- 把服务账号 JSON 挂进 K8s Secret,不要打包进镜像;
错误重试
gRPC 状态码映射:UNAVAILABLE/DEADLINE_EXCEEDED→ 指数退避,最大 3 次;RESOURCE_EXHAUSTED→ 立刻限流,等待配额窗口;INVALID_ARGUMENT→ 直接抛给客户端,避免死循环。
监控
用 OpenTelemetry 把asr_latency/llm_latency/tts_latency打成 Histogram,P99>1 s 就 paging;
音频层再挂packet_loss、jitter,一眼定位是网络还是模型。隐私合规
欧盟用户先过 GDPR:- 录音落盘前调用
speech_client.delete_recognizer()清除临时日志; - 提供“一键遗忘”接口,LLM 上下文 24 h 后强制淘汰。
- 录音落盘前调用
扩展思考题
- 多语言混说场景下,如何动态切换 ASR
language_code而不重启流? - 如果用户网络掉到 3G,音频码率自适应降到 24 kbit/s,TTS 音质如何同步降级?
- 在浏览器里直接用 WebRTC + Insertable Streams,能否把 gRPC 音频帧封装成 RTP,绕过 WebSocket?
把这三个问题想透,你的语音聊天就真正从“能跑”进化到“能抗”。
—— 先记录到这里,祝各位上线不炸服,延迟稳稳压在 500 ms 以内。