由于普通锁的粒度比较大,以至于在并发量高的环境下,锁对于并发性能影响很大,本文章对无锁队列做探索,该无锁队列目前只支持单读单写,上干货
该队列由链表组成,每个节点有N个泛型T组成,该队列实现对T类型元素单读单写的无锁操作,可以方便的用在单生产者消费者模型中
队列的主要元素如下:
yqueue_t的实现,每次批量分配⼀批元素,减少内存的分配和释放(解决不断动态内存分配的问题)。
yqueue_t内部由⼀个⼀个chunk组成,每个chunk保存N个元素
当队列空间不⾜时每次分配⼀个chunk_t,每个chunk_t能存储N个元素。
在数据出队列后,队列有多余空间的时候,回收的chunk也不是⻢上释放,⽽是根据局部性原理先回收到spare_chunk⾥⾯,当再次需要分配chunk_t的时候从spare_chunk中获取。
yqueue_t内部有三个chunk_t类型指针以及对应的索引位置:begin_chunk/begin_pos:begin_chunk⽤于指向队列头的chunk,begin_pos⽤于指向队列第⼀个 元素在当前chunk中的位置。 back_chunk/back_pos:back_chunk⽤于指向队列尾的chunk,back_po⽤于指向队列最后⼀个元
素在当前chunk的位置。
end_chunk/end_pos:由于chunk是批量分配的,所以end_chunk⽤于指向分配的最后⼀个chunk位
置。
这⾥特别需要注意区分back_chunk/back_pos和end_chunk/end_pos的作⽤:
back_chunk/back_pos:对应的是元素存储位置;
end_chunk/end_pos:决定是否要分配chunk或者回收chunk
//Back position may point to invalid memory if the queue is empty,
//while begin & end positions are always valid. Begin position is
//accessed exclusively be queue reader (front/pop), while back and
//end positions are accessed exclusively by queue writer (back/push).
chunk_t *begin_chunk; // 链表头结点
int begin_pos; // 起始点
chunk_t *back_chunk; // 队列中最后一个元素所在的链表结点
int back_pos; // 尾部
chunk_t *end_chunk; // 拿来扩容的,总是指向链表的最后一个结点
int end_pos;
// 链表结点称之为chunk_t
struct chunk_t
{
T values[N]; //每个chunk_t可以容纳N个T类型的元素,以后就以一个chunk_t为单位申请内存
chunk_t *prev;
chunk_t *next;
};
inline yqueue_t()
{
begin_chunk = (chunk_t *)malloc(sizeof(chunk_t));
alloc_assert(begin_chunk);
begin_pos = 0;
back_chunk = NULL; //back_chunk总是指向队列中最后一个元素所在的chunk,现在还没有元素,所以初始为空
back_pos = 0;
end_chunk = begin_chunk; //end_chunk总是指向链表的最后一个chunk
end_pos = 0;
}
队列操作函数
inline T &front() // 返回的是引用,是个左值,调用者可以通过其修改容器的值
{
return begin_chunk->values[begin_pos];
}
// Returns reference to the back element of the queue.
// If the queue is empty, behaviour is undefined.
inline T &back() // 返回的是引用,是个左值,调用者可以通过其修改容器的值
{
return back_chunk->values[back_pos];
}
// Adds an element to the back end of the queue.
inline void push()
{
back_chunk = end_chunk;
back_pos = end_pos; //
if (++end_pos != N) //end_pos==N表明这个链表结点已经满了
return;
chunk_t *sc = spare_chunk.xchg(NULL); // 为什么设置为NULL? 因为如果把之前值取出来了则没有spare chunk了,所以设置为NULL
if (sc) // 如果有spare chunk则继续复用它
{
end_chunk->next = sc;
sc->prev = end_chunk;
}
else // 没有则重新分配
{
end_chunk->next = (chunk_t *)malloc(sizeof(chunk_t)); // 分配一个chunk
alloc_assert(end_chunk->next);
end_chunk->next->prev = end_chunk;
}
end_chunk = end_chunk->next;
end_pos = 0;
}
// Removes element from the back end of the queue. In other words
// it rollbacks last push to the queue. Take care: Caller is
// responsible for destroying the object being unpushed.
// The caller must also guarantee that the queue isn't empty when
// unpush is called. It cannot be done automatically as the read
// side of the queue can be managed by different, completely
// unsynchronised thread.
// 必须要保证队列不为空,参考ypipe_t的uwrite
inline void unpush()
{
// First, move 'back' one position backwards.
if (back_pos) // 从尾部删除元素
--back_pos;
else
{
back_pos = N - 1; // 回退到前一个chunk
back_chunk = back_chunk->prev;
}
// Now, move 'end' position backwards. Note that obsolete end chunk
// is not used as a spare chunk. The analysis shows that doing so
// would require free and atomic operation per chunk deallocated
// instead of a simple free.
if (end_pos) // 意味着当前的chunk还有其他元素占有
--end_pos;
else
{
end_pos = N - 1; // 当前chunk没有元素占用,则需要将整个chunk释放
end_chunk = end_chunk->prev;
free(end_chunk->next);
end_chunk->next = NULL;
}
}
// Removes an element from the front end of the queue.
inline void pop()
{
if (++begin_pos == N) // 删除满一个chunk才回收chunk
{
chunk_t *o = begin_chunk;
begin_chunk = begin_chunk->next;
begin_chunk->prev = NULL;
begin_pos = 0;
// 'o' has been more recently used than spare_chunk,
// so for cache reasons we'll get rid of the spare and
// use 'o' as the spare.
chunk_t *cs = spare_chunk.xchg(o); //由于局部性原理,总是保存最新的空闲块而释放先前的空闲快
free(cs);
}
}
这⾥的front()或者back()函数,需要注意的返回的是左值引⽤,我们可以修改其值。
对于先进后出队列⽽⾔:
begin_chunk->values[begin_pos]代表队列头可读元素, 读取队列头元素即是读取begin_pos位置
的元素;
back_chunk->values[back_pos]代表队列尾可写元素,写⼊元素时则是更新back_pos位置的元素,
要确保元素真正⽣效,还需要调⽤push函数更新back_pos的位置,避免下次更新的时候⼜是更新当前
back_pos位置对应的元素。
下面为无锁同步的重要操作方法
// 设置新值,返回旧值
inline T *xchg (T *val_) //原子操
{
#if defined ZMQ_ATOMIC_PTR_ATOMIC_H
return (T*) atomic_swap_ptr (&ptr, val_);
#elif defined ZMQ_ATOMIC_PTR_TILE
return (T*) arch_atomic_exchange (&ptr, val_);
#elif defined ZMQ_ATOMIC_PTR_X86
T *old;
__asm__ volatile (
"lock; xchg %0, %2"
: "=r" (old), "=m" (ptr)
: "m" (ptr), "0" (val_));
return old;
#elif defined ZMQ_ATOMIC_PTR_MUTEX
sync.lock ();
T *old = (T*) ptr;
ptr = val_;
sync.unlock ();
return old;
#else
#error atomic_ptr is not implemented for this platform
#endif
}
// Perform atomic 'compare and swap' operation on the pointer.
// The pointer is compared to 'cmp' argument and if they are
// equal, its value is set to 'val'. Old value of the pointer
// is returned.
// 原来的值(ptr指向)如果和 comp_的值相同则更新为val_,并返回原来的ptr
inline T *cas (T *cmp_, T *val_)//原子操作
{
#if defined ZMQ_ATOMIC_PTR_ATOMIC_H
return (T*) atomic_cas_ptr (&ptr, cmp_, val_);
#elif defined ZMQ_ATOMIC_PTR_TILE
return (T*) arch_atomic_val_compare_and_exchange (&ptr, cmp_, val_);
#elif defined ZMQ_ATOMIC_PTR_X86
T *old;
__asm__ volatile (
"lock; cmpxchg %2, %3"
: "=a" (old), "=m" (ptr)
: "r" (val_), "m" (ptr), "0" (cmp_)
: "cc");
return old;
#else
#error atomic_ptr is not implemented for this platform
#endif
}
下面讲一下无锁队列的读写操作方法
读写队列操作的重要元素如下:
yqueue_t
queue;
// Points to the first un-flushed item. This variable is used
// exclusively by writer thread.
T *w;//指向第一个未刷新的元素,只被写线程使用
// Points to the first un-prefetched item. This variable is used
// exclusively by reader thread.
T *r;//指向第一个还没预提取的元素,只被读线程使用
// Points to the first item to be flushed in the future.
T *f;//指向下一轮要被刷新的一批元素中的第一个
// The single point of contention between writer and reader thread.
// Points past the last flushed item. If it is NULL,
// reader is asleep. This pointer should be always accessed using
// atomic operations.
atomic_ptr_tc;//读写线程共享的指针,指向每一轮刷新的起点(看代码的时候会详细说)。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)
无锁队列重要的操作方法
// Write an item to the pipe. Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are neverflushed down the stream.
// 写入数据,incomplete参数表示写入是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。
inline void write(const T &value_, bool incomplete_)
{
// Place the value to the queue, add new terminator element.
queue.back() = value_;
queue.push();
// Move the "flush up to here" poiter.
if (!incomplete_)
f = &queue.back(); // 记录要刷新的位置
}
#ifdef ZMQ_HAVE_OPENVMS
#pragma message restore
#endif
// Pop an incomplete item from the pipe. Returns true is such
// item exists, false otherwise.
inline bool unwrite(T *value_)
{
if (f == &queue.back())
return false;
queue.unpush();
*value_ = queue.back();
return true;
}
// Flush all the completed items into the pipe. Returns false if
// the reader thread is sleeping. In that case, caller is obliged to
// wake the reader up before using the pipe again.
// 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒读线程。
inline bool flush()
{
// If there are no un-flushed items, do nothing.
if (w == f) // 不需要刷新,即是还没有新元素加入
return true;
// Try to set 'c' to 'f'.
if (c.cas(w, f) != w) // 尝试将c设置为f,即是准备更新w的位置
{
// Compare-and-swap was unseccessful because 'c' is NULL.
// This means that the reader is asleep. Therefore we don't
// care about thread-safeness and update c in non-atomic
// manner. We'll return false to let the caller know
// that reader is sleeping.
c.set(f);
w = f;
return false; // 线程看到flush返回false之后会发送一个消息给读线程,这个是需要写业务去做处理
}
// Reader is alive. Nothing special to do now. Just move
// the 'first un-flushed item' pointer to 'f'.
w = f;
return true;
}
// Check whether item is available for reading.
// 这里面有两个点,一个是检查是否有数据可读,一个是预取
inline bool check_read()
{
// Was the value prefetched already? If so, return.
if (&queue.front() != r && r) //判断是否在前几次调用read函数时已经预取数据了return true;
return true;
// There's no prefetched value, so let us prefetch more values.
// Prefetching is to simply retrieve the
// pointer from c in atomic fashion. If there are no
// items to prefetch, set c to NULL (using compare-and-swap).
r = c.cas(&queue.front(), NULL);//尝试预取数据
// If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
// it can happen during pipe shutdown when items
// are being deallocated.
if (&queue.front() == r || !r)//判断是否成功预取数据
return false;
// There was at least one value prefetched.
return true;
}
// Reads an item from the pipe. Returns false if there is no value.
// available.
inline bool read(T *value_)
{
// Try to prefetch a value.
if (!check_read())
return false;
// There was at least one value prefetched.
// Return it to the caller.
*value_ = queue.front();
queue.pop();
return true;
}
// Applies the function fn to the first elemenent in the pipe
// and returns the value returned by the fn.
// The pipe mustn't be empty or the function crashes.
inline bool probe(bool (*fn)(T &))
{
bool rc = check_read();
// zmq_assert(rc);
return (*fn)(queue.front());
}