热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

无锁队列理解

    由于普通锁的粒度比较大,以至于在并发量高的环境下,锁对于并发性能影响很大,本文章对无锁队列做探索,该无锁队列目前只支持单读单写,上干货     该队列由链表组成,每个节点有


        由于普通锁的粒度比较大,以至于在并发量高的环境下,锁对于并发性能影响很大,本文章对无锁队列做探索,该无锁队列目前只支持单读单写,上干货



         该队列由链表组成,每个节点有N个泛型T组成,该队列实现对T类型元素单读单写的无锁操作,可以方便的用在单生产者消费者模型中



        队列的主要元素如下:

        yqueue_t的实现,每次批量分配⼀批元素,减少内存的分配和释放(解决不断动态内存分配的问题)。
        yqueue_t内部由⼀个⼀个chunk组成,每个chunk保存N个元素

 

        当队列空间不⾜时每次分配⼀个chunk_t,每个chunk_t能存储N个元素。
        在数据出队列后,队列有多余空间的时候,回收的chunk也不是⻢上释放,⽽是根据局部性原理先回收到spare_chunk⾥⾯,当再次需要分配chunk_t的时候从spare_chunk中获取。
       yqueue_t内部有三个chunk_t类型指针以及对应的索引位置:begin_chunk/begin_pos:begin_chunk⽤于指向队列头的chunk,begin_pos⽤于指向队列第⼀个 元素在当前chunk中的位置。                back_chunk/back_pos:back_chunk⽤于指向队列尾的chunk,back_po⽤于指向队列最后⼀个元
      素在当前chunk的位置。
      end_chunk/end_pos:由于chunk是批量分配的,所以end_chunk⽤于指向分配的最后⼀个chunk位
置。
      这⾥特别需要注意区分back_chunk/back_pos和end_chunk/end_pos的作⽤:
      back_chunk/back_pos:对应的是元素存储位置;
     end_chunk/end_pos:决定是否要分配chunk或者回收chunk

 

//Back position may point to invalid memory if the queue is empty,
//while begin & end positions are always valid. Begin position is
//accessed exclusively be queue reader (front/pop), while back and
//end positions are accessed exclusively by queue writer (back/push).
chunk_t *begin_chunk; // 链表头结点
int begin_pos; // 起始点
chunk_t *back_chunk; // 队列中最后一个元素所在的链表结点
int back_pos; // 尾部
chunk_t *end_chunk; // 拿来扩容的,总是指向链表的最后一个结点
int end_pos;
// 链表结点称之为chunk_t
struct chunk_t
{
T values[N]; //每个chunk_t可以容纳N个T类型的元素,以后就以一个chunk_t为单位申请内存
chunk_t *prev;
chunk_t *next;
};
inline yqueue_t()
{
begin_chunk = (chunk_t *)malloc(sizeof(chunk_t));
alloc_assert(begin_chunk);
begin_pos = 0;
back_chunk = NULL; //back_chunk总是指向队列中最后一个元素所在的chunk,现在还没有元素,所以初始为空
back_pos = 0;
end_chunk = begin_chunk; //end_chunk总是指向链表的最后一个chunk
end_pos = 0;
}



       
队列操作函数


      

inline T &front() // 返回的是引用,是个左值,调用者可以通过其修改容器的值
{
return begin_chunk->values[begin_pos];
}
// Returns reference to the back element of the queue.
// If the queue is empty, behaviour is undefined.
inline T &back() // 返回的是引用,是个左值,调用者可以通过其修改容器的值
{
return back_chunk->values[back_pos];
}
// Adds an element to the back end of the queue.
inline void push()
{
back_chunk = end_chunk;
back_pos = end_pos; //
if (++end_pos != N) //end_pos==N表明这个链表结点已经满了
return;
chunk_t *sc = spare_chunk.xchg(NULL); // 为什么设置为NULL? 因为如果把之前值取出来了则没有spare chunk了,所以设置为NULL
if (sc) // 如果有spare chunk则继续复用它
{
end_chunk->next = sc;
sc->prev = end_chunk;
}
else // 没有则重新分配
{
end_chunk->next = (chunk_t *)malloc(sizeof(chunk_t)); // 分配一个chunk
alloc_assert(end_chunk->next);
end_chunk->next->prev = end_chunk;
}
end_chunk = end_chunk->next;
end_pos = 0;
}
// Removes element from the back end of the queue. In other words
// it rollbacks last push to the queue. Take care: Caller is
// responsible for destroying the object being unpushed.
// The caller must also guarantee that the queue isn't empty when
// unpush is called. It cannot be done automatically as the read
// side of the queue can be managed by different, completely
// unsynchronised thread.
// 必须要保证队列不为空,参考ypipe_t的uwrite
inline void unpush()
{
// First, move 'back' one position backwards.
if (back_pos) // 从尾部删除元素
--back_pos;
else
{
back_pos = N - 1; // 回退到前一个chunk
back_chunk = back_chunk->prev;
}
// Now, move 'end' position backwards. Note that obsolete end chunk
// is not used as a spare chunk. The analysis shows that doing so
// would require free and atomic operation per chunk deallocated
// instead of a simple free.
if (end_pos) // 意味着当前的chunk还有其他元素占有
--end_pos;
else
{
end_pos = N - 1; // 当前chunk没有元素占用,则需要将整个chunk释放
end_chunk = end_chunk->prev;
free(end_chunk->next);
end_chunk->next = NULL;
}
}
// Removes an element from the front end of the queue.
inline void pop()
{
if (++begin_pos == N) // 删除满一个chunk才回收chunk
{
chunk_t *o = begin_chunk;
begin_chunk = begin_chunk->next;
begin_chunk->prev = NULL;
begin_pos = 0;
// 'o' has been more recently used than spare_chunk,
// so for cache reasons we'll get rid of the spare and
// use 'o' as the spare.
chunk_t *cs = spare_chunk.xchg(o); //由于局部性原理,总是保存最新的空闲块而释放先前的空闲快
free(cs);
}
}



这⾥的front()或者back()函数,需要注意的返回的是左值引⽤,我们可以修改其值。

对于先进后出队列⽽⾔:

begin_chunk->values[begin_pos]代表队列头可读元素, 读取队列头元素即是读取begin_pos位置

的元素;

back_chunk->values[back_pos]代表队列尾可写元素,写⼊元素时则是更新back_pos位置的元素,

要确保元素真正⽣效,还需要调⽤push函数更新back_pos的位置,避免下次更新的时候⼜是更新当前

back_pos位置对应的元素。

下面为无锁同步的重要操作方法

// 设置新值,返回旧值
inline T *xchg (T *val_) //原子操
{
#if defined ZMQ_ATOMIC_PTR_ATOMIC_H
return (T*) atomic_swap_ptr (&ptr, val_);
#elif defined ZMQ_ATOMIC_PTR_TILE
return (T*) arch_atomic_exchange (&ptr, val_);
#elif defined ZMQ_ATOMIC_PTR_X86
T *old;
__asm__ volatile (
"lock; xchg %0, %2"
: "=r" (old), "=m" (ptr)
: "m" (ptr), "0" (val_));
return old;
#elif defined ZMQ_ATOMIC_PTR_MUTEX
sync.lock ();
T *old = (T*) ptr;
ptr = val_;
sync.unlock ();
return old;
#else
#error atomic_ptr is not implemented for this platform
#endif
}
// Perform atomic 'compare and swap' operation on the pointer.
// The pointer is compared to 'cmp' argument and if they are
// equal, its value is set to 'val'. Old value of the pointer
// is returned.
// 原来的值(ptr指向)如果和 comp_的值相同则更新为val_,并返回原来的ptr
inline T *cas (T *cmp_, T *val_)//原子操作
{
#if defined ZMQ_ATOMIC_PTR_ATOMIC_H
return (T*) atomic_cas_ptr (&ptr, cmp_, val_);
#elif defined ZMQ_ATOMIC_PTR_TILE
return (T*) arch_atomic_val_compare_and_exchange (&ptr, cmp_, val_);
#elif defined ZMQ_ATOMIC_PTR_X86
T *old;
__asm__ volatile (
"lock; cmpxchg %2, %3"
: "=a" (old), "=m" (ptr)
: "r" (val_), "m" (ptr), "0" (cmp_)
: "cc");
return old;
#else
#error atomic_ptr is not implemented for this platform
#endif
}


 



下面讲一下无锁队列的读写操作方法

读写队列操作的重要元素如下:

yqueue_t queue;
// Points to the first un-flushed item. This variable is used
// exclusively by writer thread.
T *w;//指向第一个未刷新的元素,只被写线程使用
// Points to the first un-prefetched item. This variable is used
// exclusively by reader thread.
T *r;//指向第一个还没预提取的元素,只被读线程使用
// Points to the first item to be flushed in the future.
T *f;//指向下一轮要被刷新的一批元素中的第一个
// The single point of contention between writer and reader thread.
// Points past the last flushed item. If it is NULL,
// reader is asleep. This pointer should be always accessed using
// atomic operations.
atomic_ptr_t c;//读写线程共享的指针,指向每一轮刷新的起点(看代码的时候会详细说)。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)


无锁队列重要的操作方法

// Write an item to the pipe. Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are neverflushed down the stream.
// 写入数据,incomplete参数表示写入是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。
inline void write(const T &value_, bool incomplete_)
{
// Place the value to the queue, add new terminator element.
queue.back() = value_;
queue.push();
// Move the "flush up to here" poiter.
if (!incomplete_)
f = &queue.back(); // 记录要刷新的位置
}
#ifdef ZMQ_HAVE_OPENVMS
#pragma message restore
#endif
// Pop an incomplete item from the pipe. Returns true is such
// item exists, false otherwise.
inline bool unwrite(T *value_)
{
if (f == &queue.back())
return false;
queue.unpush();
*value_ = queue.back();
return true;
}
// Flush all the completed items into the pipe. Returns false if
// the reader thread is sleeping. In that case, caller is obliged to
// wake the reader up before using the pipe again.
// 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒读线程。
inline bool flush()
{
// If there are no un-flushed items, do nothing.
if (w == f) // 不需要刷新,即是还没有新元素加入
return true;
// Try to set 'c' to 'f'.
if (c.cas(w, f) != w) // 尝试将c设置为f,即是准备更新w的位置
{
// Compare-and-swap was unseccessful because 'c' is NULL.
// This means that the reader is asleep. Therefore we don't
// care about thread-safeness and update c in non-atomic
// manner. We'll return false to let the caller know
// that reader is sleeping.
c.set(f);
w = f;
return false; // 线程看到flush返回false之后会发送一个消息给读线程,这个是需要写业务去做处理
}
// Reader is alive. Nothing special to do now. Just move
// the 'first un-flushed item' pointer to 'f'.
w = f;
return true;
}
// Check whether item is available for reading.
// 这里面有两个点,一个是检查是否有数据可读,一个是预取
inline bool check_read()
{
// Was the value prefetched already? If so, return.
if (&queue.front() != r && r) //判断是否在前几次调用read函数时已经预取数据了return true;
return true;
// There's no prefetched value, so let us prefetch more values.
// Prefetching is to simply retrieve the
// pointer from c in atomic fashion. If there are no
// items to prefetch, set c to NULL (using compare-and-swap).
r = c.cas(&queue.front(), NULL);//尝试预取数据
// If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
// it can happen during pipe shutdown when items
// are being deallocated.
if (&queue.front() == r || !r)//判断是否成功预取数据
return false;
// There was at least one value prefetched.
return true;
}
// Reads an item from the pipe. Returns false if there is no value.
// available.
inline bool read(T *value_)
{
// Try to prefetch a value.
if (!check_read())
return false;
// There was at least one value prefetched.
// Return it to the caller.
*value_ = queue.front();
queue.pop();
return true;
}
// Applies the function fn to the first elemenent in the pipe
// and returns the value returned by the fn.
// The pipe mustn't be empty or the function crashes.
inline bool probe(bool (*fn)(T &))
{
bool rc = check_read();
// zmq_assert(rc);
return (*fn)(queue.front());
}




推荐阅读
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • Linux重启网络命令实例及关机和重启示例教程
    本文介绍了Linux系统中重启网络命令的实例,以及使用不同方式关机和重启系统的示例教程。包括使用图形界面和控制台访问系统的方法,以及使用shutdown命令进行系统关机和重启的句法和用法。 ... [详细]
  • IB 物理真题解析:比潜热、理想气体的应用
    本文是对2017年IB物理试卷paper 2中一道涉及比潜热、理想气体和功率的大题进行解析。题目涉及液氧蒸发成氧气的过程,讲解了液氧和氧气分子的结构以及蒸发后分子之间的作用力变化。同时,文章也给出了解题技巧,建议根据得分点的数量来合理分配答题时间。最后,文章提供了答案解析,标注了每个得分点的位置。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • CF:3D City Model(小思维)问题解析和代码实现
    本文通过解析CF:3D City Model问题,介绍了问题的背景和要求,并给出了相应的代码实现。该问题涉及到在一个矩形的网格上建造城市的情景,每个网格单元可以作为建筑的基础,建筑由多个立方体叠加而成。文章详细讲解了问题的解决思路,并给出了相应的代码实现供读者参考。 ... [详细]
  • 标题: ... [详细]
  • 本文讨论了clone的fork与pthread_create创建线程的不同之处。进程是一个指令执行流及其执行环境,其执行环境是一个系统资源的集合。在调用系统调用fork创建一个进程时,子进程只是完全复制父进程的资源,这样得到的子进程独立于父进程,具有良好的并发性。但是二者之间的通讯需要通过专门的通讯机制,另外通过fork创建子进程系统开销很大。因此,在某些情况下,使用clone或pthread_create创建线程可能更加高效。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 李逍遥寻找仙药的迷阵之旅
    本文讲述了少年李逍遥为了救治婶婶的病情,前往仙灵岛寻找仙药的故事。他需要穿越一个由M×N个方格组成的迷阵,有些方格内有怪物,有些方格是安全的。李逍遥需要避开有怪物的方格,并经过最少的方格,找到仙药。在寻找的过程中,他还会遇到神秘人物。本文提供了一个迷阵样例及李逍遥找到仙药的路线。 ... [详细]
  • 基于dlib的人脸68特征点提取(眨眼张嘴检测)python版本
    文章目录引言开发环境和库流程设计张嘴和闭眼的检测引言(1)利用Dlib官方训练好的模型“shape_predictor_68_face_landmarks.dat”进行68个点标定 ... [详细]
  • Spring常用注解(绝对经典),全靠这份Java知识点PDF大全
    本文介绍了Spring常用注解和注入bean的注解,包括@Bean、@Autowired、@Inject等,同时提供了一个Java知识点PDF大全的资源链接。其中详细介绍了ColorFactoryBean的使用,以及@Autowired和@Inject的区别和用法。此外,还提到了@Required属性的配置和使用。 ... [详细]
  • 本文介绍了H5游戏性能优化和调试技巧,包括从问题表象出发进行优化、排除外部问题导致的卡顿、帧率设定、减少drawcall的方法、UI优化和图集渲染等八个理念。对于游戏程序员来说,解决游戏性能问题是一个关键的任务,本文提供了一些有用的参考价值。摘要长度为183字。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
author-avatar
手机用户2602933971
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有