第四篇:Checkpointing 和中断机制源码分析
请关注公众号【碳硅化合物AI】
概述
Checkpointing(检查点)机制使 LangGraph 能够持久化执行状态,支持故障恢复、状态回滚和人机交互。中断机制允许在执行过程中暂停,等待人工干预。本文档深入分析检查点的保存和恢复流程,以及中断机制的实现原理。
入口类及说明
核心类关系
Checkpointing 系统的核心是BaseCheckpointSaver接口,中断机制通过GraphInterrupt异常和interrupt()函数实现。
关键类说明
BaseCheckpointSaver
BaseCheckpointSaver类位于libs/checkpoint/langgraph/checkpoint/base/__init__.py:116,定义了检查点保存器的接口。它必须实现:
获取检查点:
get_tuple():根据配置获取检查点元组list():列出匹配的检查点
保存检查点:
put():保存检查点和元数据put_writes():保存待处理的写入
异步版本:如果支持异步执行,需要实现
aget_tuple()、aput()等异步方法
Checkpoint
Checkpoint是一个 TypedDict,定义在libs/checkpoint/langgraph/checkpoint/base/__init__.py,包含:
版本信息:
v:检查点格式版本id:检查点唯一标识符ts:时间戳
状态数据:
channel_values:通道的当前值channel_versions:通道的版本号versions_seen:已看到的版本(用于中断检测)
元数据:
updated_channels:本次更新的通道列表
GraphInterrupt
GraphInterrupt类位于libs/langgraph/langgraph/errors.py:84,是中断异常。它:
- 包含中断信息:
args包含Interrupt对象序列 - 传播机制:在子图中被捕获,只在根图抛出
- 恢复支持:中断信息包含恢复所需的 ID
关键流程描述
检查点保存流程
每个 super-step 结束后,PregelLoop 会保存检查点:
检查点恢复流程
从检查点恢复执行时:
中断机制流程
中断机制支持两种方式:静态中断(interrupt_before/after)和动态中断(interrupt() 函数)。
静态中断流程
动态中断流程(interrupt() 函数)
实现关键点说明
1. Thread 和 Checkpoint 概念
- Thread:一个线程 ID 对应一系列相关的检查点,用于多租户场景
- Checkpoint:线程在某个时间点的状态快照
- Checkpoint ID:可以指定检查点 ID 从特定点恢复
2. 检查点版本控制
通道使用版本号跟踪更新:
- 每次更新通道时,版本号递增
versions_seen记录上次中断时看到的版本- 通过比较版本号判断是否有新更新
3. 待处理写入(Pending Writes)
当节点执行失败时:
- 其他成功节点的写入被保存为
pending_writes - 恢复执行时,这些写入会被应用到相应的任务
- 避免重复执行已成功的节点
4. 中断检测机制
should_interrupt()函数检查是否需要中断:
def should_interrupt( checkpoint: Checkpoint, interrupt_nodes: All | Sequence[str], tasks: Iterable[PregelExecutableTask], ) -> list[PregelExecutableTask]: # 检查是否有通道更新 any_updates_since_prev_interrupt = any( version > seen.get(chan, null_version) for chan, version in checkpoint["channel_versions"].items() ) # 检查任务是否在中断列表中 return [ task for task in tasks if task.name in interrupt_nodes ] if any_updates_since_prev_interrupt else []5. 中断恢复机制
恢复中断时:
- 使用
Command(resume=...)提供恢复值 - 恢复值通过
map_command()映射到相应的中断 - 中断值存储在
PregelScratchpad中 - 节点重新执行时,
interrupt()函数返回恢复值
6. 序列化支持
检查点使用SerializerProtocol序列化:
- 默认使用
JsonPlusSerializer - 支持 LangChain 和 LangGraph 原语
- 支持日期时间、枚举等类型
总结说明
Checkpointing 和中断机制通过以下方式实现了持久化执行和人机交互:
- 检查点保存:每个 super-step 保存状态,支持故障恢复
- 版本控制:使用版本号跟踪更新,支持中断检测
- 待处理写入:保存部分成功的结果,避免重复执行
- 静态中断:通过
interrupt_before/after在特定节点中断 - 动态中断:通过
interrupt()函数在节点内部中断 - 恢复机制:使用
Command(resume=...)恢复执行
关键设计决策:
- 版本跟踪:通过版本号高效检测状态变化
- 延迟应用:待处理写入在恢复时应用,避免重复执行
- 异常机制:使用异常实现中断,简化控制流
- 序列化抽象:通过
SerializerProtocol支持多种序列化格式
理解 Checkpointing 和中断机制有助于:
- 实现故障恢复(从检查点恢复执行)
- 实现人机交互(在关键点中断等待输入)
- 优化性能(避免重复执行)
- 实现状态回滚(回到之前的检查点)
下一篇文档将深入分析状态管理和节点执行,了解状态更新、合并和错误处理机制。