MPSC Queue:多生产者单消费者无锁队列
SPSC无锁队列,参考https://blog.csdn.net/qq_46105170/article/details/157458924。
概述
MPSC(Multi-Producer Single-Consumer)队列是一种允许多个生产者线程同时写入、但只有一个消费者线程读取的无锁数据结构。典型应用场景包括:多个业务线程向单一日志线程发送消息、多个工作线程向单一聚合线程汇报结果等。
与 SPSC 的核心区别
在 SPSC 队列中,生产者可以直接写入数据然后用store推进tail,因为只有一个生产者,不存在竞争。但在 MPSC 中,多个生产者可能同时尝试写入,如果都读到相同的tail值,就会写入同一个槽位,导致数据丢失。
MPSC 的核心挑战有两个:槽位竞争问题和写入可见性问题。槽位竞争是指多个生产者必须以某种方式"抢占"槽位,确保每个槽位只被一个生产者使用。写入可见性是指消费者需要知道某个槽位的数据是否已经写入完成。
解决方案
针对槽位竞争,我们使用 CAS(Compare-And-Swap)操作。生产者先读取当前tail,然后尝试原子地将其推进。如果 CAS 成功,该生产者就"拥有"了这个槽位;如果失败,说明其他生产者抢先了,需要重试。
针对写入可见性,问题在于:生产者必须先通过 CAS 推进tail来抢占槽位,然后才能写入数据。这意味着tail的推进发生在数据写入之前。消费者如果只看tail,可能会读到尚未写入的槽位。解决方案是为每个槽位增加一个ready标志,生产者写入数据后将其置为true,消费者在读取前检查这个标志。
实现
#include<array>#include<atomic>#include<cstddef>#include<emmintrin.h>template<typenameT,size_t Cap>classMPSCQueue{// 缓存行对齐,避免 false sharingalignas(64)std::atomic<size_t>head{0};// 消费者读取位置alignas(64)std::atomic<size_t>tail{0};// 生产者写入位置alignas(64)std::array<T,Cap+1>buffer;// 环形缓冲区,+1 用于区分空和满alignas(64)std::array<std::atomic<bool>,Cap+1>ready;// 槽位就绪标志// 计算下一个位置,环形缓冲区回绕staticsize_tnext_pos(size_t pos){return(pos+1)%(Cap+1);}public:// 构造函数:初始化所有 ready 标志为 falseMPSCQueue(){for(auto&r:ready)r.store(false,std::memory_order_relaxed);}// 生产者调用:多线程安全boolpush(constT&val){size_t pos,next;// 第一步:用 CAS 抢占槽位do{pos=tail.load(std::memory_order_relaxed);next=next_pos(pos);// 检查队列是否已满if(next==head.load(std::memory_order_relaxed))returnfalse;}while(!tail.compare_exchange_weak(pos,next,std::memory_order_relaxed,std::memory_order_relaxed));// 第二步:写入数据(此时我们独占这个槽位)buffer[pos]=val;// 第三步:标记槽位就绪,使用 release 确保数据写入对消费者可见ready[pos].store(true,std::memory_order_release);returntrue;}// 消费者调用:单线程boolpop(T&val){size_t pos=head.load(std::memory_order_relaxed);// 检查队列是否为空if(pos==tail.load(std::memory_order_acquire))returnfalse;// 自旋等待数据就绪// 槽位已被生产者抢占,但数据可能还未写入完成while(!ready[pos].load(std::memory_order_acquire))_mm_pause();// CPU 提示:正在自旋,降低功耗和总线竞争// 读取数据val=std::move(buffer[pos]);// 清除就绪标志ready[pos].store(false,std::memory_order_relaxed);// 推进 head,release 确保上述操作对生产者可见head.store(next_pos(pos),std::memory_order_release);returntrue;}};内存序分析
在 CAS 操作中我们使用relaxed是因为 CAS 本身只是抢占槽位,此时还没有数据需要同步。真正的同步点在ready标志上。
ready[pos].store(true, release)与ready[pos].load(acquire)构成同步关系,确保生产者对buffer[pos]的写入在消费者读取之前完成。
head.store(next, release)确保消费者对数据的读取在生产者看到head推进之前完成,防止生产者过早覆盖数据。
检查队列满时的head.load可以用relaxed,因为看到旧值最多导致误判队列已满而放弃写入,不会导致正确性问题。
我们使用compare_exchange_weak而不是strong是因为 weak 版本允许"伪失败"(spurious failure),即使比较成功也可能返回失败。但由于我们已经在循环中,伪失败只是多一次重试。weak 版本在某些架构(如 ARM)上能生成更高效的指令。
消费者行为说明
pop函数的行为是:如果队列为空(head == tail),立即返回false;如果队列非空但槽位数据未就绪,自旋等待。这种设计假设生产者写入很快完成,自旋时间很短。
如果需要纯非阻塞版本,可以在ready检查失败时也返回false,让调用者决定如何处理。
总结
MPSC 队列相比 SPSC 的主要变化是生产者端使用 CAS 竞争槽位,以及引入ready标志解决写入可见性问题。消费者端逻辑相对简单,因为只有一个消费者,不需要竞争。这种设计在多个生产者、单个消费者的场景下提供了高效的无锁通信机制。