热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

linux进阶50——无锁CAS

1.概念比较并交换(compareandswap,CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作࿰
1. 概念

比较并交换(compare and swap,CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某⼀数据时由于执行顺序不确定性以及中断的不可预知性产⽣的数据不一致问题。

有了CAS,我们就可以用它来实现各种无锁(lock free)的数据结构。

2. 实现原理

该操作通过将内存中的值与指定数据进行比较,当数值⼀样时将内存中的数据替换为新的值。

实现方式1:

//输入reg的地址,判断reg的值与oldval是否相等
//如果相等,那么就将newval赋值给reg;否则reg保持不变
//最终将reg原先的值返回回去int compare_and_swap(int *reg, int oldval, int newval)
{int old_ref_val = *reg;if(old_reg_val == oldval)*reg = newval;return old_reg_val;
}

实现方式2:

//输入一个pAddr的地址,在函数内部判断其的值是否与期望值nExpected相等
//如果相等那么就将pAddr的值改为nNew并同时返回true;否则就返回false,什么都不做bool compare_and_swap(int *pAddr, int nExpected, int nNew)
{if(*pAddr == nExpected){*pAddr = nNew;return true;}elsereturn false;
}

在上面的两种实现中第二种形式更好,因为它返回bool值让调用者知道是否更新成功。

3. 应用层实现

因为CAS是原子操作,所以在各种库的原子库中都有对应的CAS实现方式。

3.1 gcc/g++中的CAS

对于gcc、g++编译器来讲,其原子操作中包含下面两个函数,是专门用来做CAS的:

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);

3.2 windows下的CAS

在Windows下,你可以使用下面的Windows API来完成CAS:

InterlockedCompareExchange ( __inout LONG volatile *Target,__in LONG Exchange,__in LONG Comperand);

3.3 c++中的CAS

C++11标准库引入了原子操作,包含在头文件中,下面是专门用于CAS操作的接口:

template
bool atomic_compare_exchange_weak( std::atomic* obj,T* expected, T desired );
template
bool atomic_compare_exchange_weak( volatile std::atomic* obj,T* expected, T desired );

4. 无锁队列的实现

此处我们只考虑队列出队列和进队列的并发问题:

  • 出队列:出队列时,要保证只有一个线程在对头结点进行出队列的操作,否则就会发生错乱
  • 入队列:入队列时,也一样,保证只有一个线程在对尾节点进行入队列的操作,否则就会发生错乱

4.1 代码实现

queue_cas.h

//queue_cas.h
#include template
class Queue
{
public:Queue(); //构造函数~Queue(); //析构函数
public:void push(ElemType elem); //入队列bool pop(); //出队列void show(); //打印队列的内容
private:struct _qNode //队列节点{_qNode(): _next(nullptr) { } _qNode(ElemType elem): _elem(elem), _next(nullptr) { } ElemType _elem;struct _qNode *_next;};
private:struct _qNode *_head; //头结点struct _qNode *_tail; //尾节点
};template
Queue::Queue()
{_head = _tail =new _qNode();
}template
Queue::~Queue()
{while(_head != nullptr){struct _qNode *tempNode = _head;_head = _head->_next;delete tempNode;}
}template
void Queue::push(ElemType elem)
{//创建一个新的节点struct _qNode *newNode = new struct _qNode(elem);struct _qNode *p = _tail;struct _qNode *oldp = _tail;do{while(p->_next != nullptr)p = p->_next;} while(__sync_bool_compare_and_swap(&_tail->_next, nullptr, newNode) != true);__sync_bool_compare_and_swap(&_tail, oldp, newNode);
}template
bool Queue::pop()
{struct _qNode *p;do {p = _head;if(p->_next == nullptr)return false;} while(__sync_bool_compare_and_swap(&_head, p , p->_next) != true);delete p;return true;
}template
void Queue::show()
{struct _qNode* tempNode &#61; _head->_next;if(tempNode &#61;&#61; nullptr){std::cout <<"Empty" <_elem <<" ";tempNode &#61; tempNode->_next;}std::cout <}

上面为无锁队列的实现代码&#xff0c;我们假定此队列中头结点不存储数据(当做哨兵)&#xff0c;尾节点存储数据

 

其使用到CAS的核心函数就是push()和pop()函数&#xff0c;在下面我们将_sync_bool_compare_and_swap()函数调用称之为CAS操作。

push()如下&#xff1a;

  • 假设线程T1和T2都执行push()函数&#xff0c;当线程T1先执行do-while中的CAS操作然后发现其尾节点后为空&#xff0c;于是就执行do-while中的CAS操作将尾节点_tail的_next指针赋值为newNode&#xff0c;然后退出do-while循环&#xff0c;调用第二个CAS操作将尾节点指针向后移动一位
  • 由于CAS是一个原子操作&#xff0c;所以即使同时T2线程了也调用了do-while中的CAS操作&#xff0c;但是其判断p->_next不为空&#xff0c;因为T1线程已经将尾节点向后移动了&#xff0c;所以其只能继续执行do&#xff0c;将p向后移动&#xff0c;重新移动到尾节点继续重新判断&#xff0c;直到成功为止....
  • 为什么push()函数的最后一个CAS操作不需要判断是否执行成功&#xff0c;因为&#xff1a;

如果有一个线程T1&#xff0c;它的while中的CAS如果成功的话&#xff0c;那么其它所有的随后线程的CAS都会失败&#xff0c;然后就会再循环

此时&#xff0c;如果T1线程还没有更新tail指针&#xff0c;其它的线程继续失败&#xff0c;因为tail->next不是NULL了

直到T1线程更新完tail指针&#xff0c;于是其它的线程中的某个线程就可以得到新的tail指针&#xff0c;继续往下走了


  • do作用域中为什么要使用while将p指针向后移动&#xff1a;

假设T1线程在调用第二个CAS操作更新_tail指针之前&#xff0c;T1线程停掉或者挂掉了&#xff0c;那么其它线程就会进入死循环

template
void Queue::push(ElemType elem)
{//创建一个新的节点struct _qNode *newNode &#61; new struct _qNode(elem);struct _qNode *p &#61; _tail;struct _qNode *oldp &#61; _tail;do{//不断的向后指&#xff0c;直到直到尾节点为止while(p->_next !&#61; nullptr)p &#61; p->_next;} while(__sync_bool_compare_and_swap(&p->_next, nullptr, newNode) !&#61; true); //如果p没有移动到真正的尾节点上&#xff0c;那么继续执行do//当CAS函数执行成功之后&#xff0c;那么执行这个CAS函数&#xff0c;将尾节点指针向后移动1位__sync_bool_compare_and_swap(&_tail, oldp, newNode);
}

pop()如下&#xff1a;

  • 原理与push()同理&#xff0c;假设线程T1和线程T2都执行pop()操作&#xff0c;假设T1先执行CAS操作将_head向后移动了一位&#xff0c;并且删除了原先的头指针
  • 那么当T2再执行时发现T1更新过后的_head指针(移动了)与一开始获取的头指针p不相等了&#xff0c;那么就继续执行do作用域重新获取头指针&#xff0c;然后重新进行CAS操作

template
bool Queue::pop()
{struct _qNode *p;do {//获取_head指针p &#61; _head;if(p->_next &#61;&#61; nullptr)return false;} while(__sync_bool_compare_and_swap(&_head, p , p->_next) !&#61; true); //判断头结点是否被移动过&#xff0c;如果移动过那么就执行do内部重新获取_head指针//删除头指针delete p;return true;
}

4.2 测试代码

//queue_cas_test.cpp
#include "queue_cas.h"int main()
{Queue queue;queue.push(1);queue.push(2);queue.push(3);queue.show();queue.pop();queue.show();queue.pop();queue.show();queue.pop();queue.show();queue.push(1);queue.show();queue.push(2);queue.show();
}

编译运行&#xff1a;

[root&#64;192 CAS]# g&#43;&#43; queue_cas.cpp -o queue_cas
[root&#64;192 CAS]# ls
queue_cas queue_cas.cpp queue_cas.h
[root&#64;192 CAS]# ./queue_cas
1 2 3
2 3
3
Empty
1
1 2
[root&#64;192 CAS]#

5. 无锁队列性能测试

5.1 stl库中的queue

#include
#include
#include
#include
#include using namespace std;#define FOR_LOOP_NUM 10000000 //队列push和pop操作函数中for循环的次数static std::queue _queue; //队列
static std::mutex _mutex; //队列操作要用到的互斥锁static int push_count; //队列总共push的次数
static int pop_count; //队列总共pop的次数typedef void *(*thread_func_t)(void *arg);void *queue_push(void *arg)
{for(int i &#61; 0; i }void *queue_pop(void *arg)
{while(true){_mutex.lock();if(_queue.size() > 0){_queue.pop();pop_count&#43;&#43;;}_mutex.unlock();if(pop_count >&#61; FOR_LOOP_NUM)break;}return NULL;
}void test_queue(thread_func_t push_func, thread_func_t pop_func, void *arg)
{clock_t start &#61; clock();pthread_t push_tid;if(pthread_create(&push_tid, NULL, push_func, arg) !&#61; 0){perror("pthread_create");}pthread_t pop_tid;if(pthread_create(&pop_tid, NULL, pop_func, arg) !&#61; 0){perror("pthread_create");}pthread_join(push_tid, NULL);pthread_join(pop_tid, NULL);clock_t end &#61; clock();printf("spend clock: %ld\n", (end - start) / CLOCKS_PER_SEC);
}int main()
{push_count &#61; 0;pop_count &#61; 0;test_queue(queue_push, queue_pop, NULL);printf("push_count:%d, pop_count:%d\n", push_count, pop_count);return 0;
}

编译运行&#xff1a;

[root&#64;192 queue_stl]# g&#43;&#43; -lpthread queue_stl.cpp -o queue_stl
[root&#64;192 queue_stl]# ls
queue_stl queue_stl.cpp
[root&#64;192 queue_stl]# ./queue_stl
spend clock: 15
push_count:10000000, pop_count:10000000
[root&#64;192 queue_stl]# ./queue_stl
spend clock: 16
push_count:10000000, pop_count:10000000
[root&#64;192 queue_stl]# ./queue_stl
spend clock: 15
push_count:10000000, pop_count:10000000
[root&#64;192 queue_stl]# ./queue_stl
spend clock: 14
push_count:10000000, pop_count:10000000
[root&#64;192 queue_stl]# ./queue_stl
spend clock: 17
push_count:10000000, pop_count:10000000
[root&#64;192 queue_stl]#

5.2 无锁队列

#include
#include
#include
#include "queue_cas.h"using namespace std;#define FOR_LOOP_NUM 10000000 //队列push和pop操作函数中for循环的次数static int push_count; //队列总共push的次数
static int pop_count; //队列总共pop的次数static Queue _queue;typedef void *(*thread_func_t)(void *arg);void *queue_push(void *arg)
{for(int i &#61; 0; i }void *queue_pop(void *arg)
{while(true){_queue.pop();pop_count&#43;&#43;;if(pop_count >&#61; FOR_LOOP_NUM)break;}return NULL;
}void test_queue(thread_func_t push_func, thread_func_t pop_func, void *arg)
{clock_t start &#61; clock();pthread_t push_tid;if(pthread_create(&push_tid, NULL, push_func, arg) !&#61; 0){perror("pthread_create");}pthread_t pop_tid;if(pthread_create(&pop_tid, NULL, pop_func, arg) !&#61; 0){perror("pthread_create");}pthread_join(push_tid, NULL);pthread_join(pop_tid, NULL);clock_t end &#61; clock();printf("spend clock: %ld\n", (end - start) / CLOCKS_PER_SEC);
}int main()
{push_count &#61; 0;pop_count &#61; 0;test_queue(queue_push, queue_pop, NULL);printf("push_count:%d, pop_count:%d\n", push_count, pop_count);return 0;
}

编译运行&#xff1a;

[root&#64;192 CAS]# g&#43;&#43; -lpthread queue_cas.cpp -o queue_cas
[root&#64;192 CAS]# ls
queue_cas queue_cas.cpp queue_cas.h queue_stl
[root&#64;192 CAS]# ./queue_cas
spend clock: 1
push_count:10000000, pop_count:10000000
[root&#64;192 CAS]# ./queue_cas
spend clock: 2
push_count:10000000, pop_count:10000000
[root&#64;192 CAS]# ./queue_cas
spend clock: 1
push_count:10000000, pop_count:10000000
[root&#64;192 CAS]# ./queue_cas
spend clock: 1
push_count:10000000, pop_count:10000000
[root&#64;192 CAS]# ./queue_cas
spend clock: 1
push_count:10000000, pop_count:10000000
[root&#64;192 CAS]# ./queue_cas
spend clock: 2
push_count:10000000, pop_count:10000000
[root&#64;192 CAS]#

对比发现&#xff0c;无锁队列的效率更高。


推荐阅读
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • 李逍遥寻找仙药的迷阵之旅
    本文讲述了少年李逍遥为了救治婶婶的病情,前往仙灵岛寻找仙药的故事。他需要穿越一个由M×N个方格组成的迷阵,有些方格内有怪物,有些方格是安全的。李逍遥需要避开有怪物的方格,并经过最少的方格,找到仙药。在寻找的过程中,他还会遇到神秘人物。本文提供了一个迷阵样例及李逍遥找到仙药的路线。 ... [详细]
  • 基于layUI的图片上传前预览功能的2种实现方式
    本文介绍了基于layUI的图片上传前预览功能的两种实现方式:一种是使用blob+FileReader,另一种是使用layUI自带的参数。通过选择文件后点击文件名,在页面中间弹窗内预览图片。其中,layUI自带的参数实现了图片预览功能。该功能依赖于layUI的上传模块,并使用了blob和FileReader来读取本地文件并获取图像的base64编码。点击文件名时会执行See()函数。摘要长度为169字。 ... [详细]
  • 本文讨论了使用差分约束系统求解House Man跳跃问题的思路与方法。给定一组不同高度,要求从最低点跳跃到最高点,每次跳跃的距离不超过D,并且不能改变给定的顺序。通过建立差分约束系统,将问题转化为图的建立和查询距离的问题。文章详细介绍了建立约束条件的方法,并使用SPFA算法判环并输出结果。同时还讨论了建边方向和跳跃顺序的关系。 ... [详细]
  • 本文详细介绍了Linux中进程控制块PCBtask_struct结构体的结构和作用,包括进程状态、进程号、待处理信号、进程地址空间、调度标志、锁深度、基本时间片、调度策略以及内存管理信息等方面的内容。阅读本文可以更加深入地了解Linux进程管理的原理和机制。 ... [详细]
  • 本文介绍了解决二叉树层序创建问题的方法。通过使用队列结构体和二叉树结构体,实现了入队和出队操作,并提供了判断队列是否为空的函数。详细介绍了解决该问题的步骤和流程。 ... [详细]
  • 本文讨论了clone的fork与pthread_create创建线程的不同之处。进程是一个指令执行流及其执行环境,其执行环境是一个系统资源的集合。在调用系统调用fork创建一个进程时,子进程只是完全复制父进程的资源,这样得到的子进程独立于父进程,具有良好的并发性。但是二者之间的通讯需要通过专门的通讯机制,另外通过fork创建子进程系统开销很大。因此,在某些情况下,使用clone或pthread_create创建线程可能更加高效。 ... [详细]
  • OO第一单元自白:简单多项式导函数的设计与bug分析
    本文介绍了作者在学习OO的第一次作业中所遇到的问题及其解决方案。作者通过建立Multinomial和Monomial两个类来实现多项式和单项式,并通过append方法将单项式组合为多项式,并在此过程中合并同类项。作者还介绍了单项式和多项式的求导方法,并解释了如何利用正则表达式提取各个单项式并进行求导。同时,作者还对自己在输入合法性判断上的不足进行了bug分析,指出了自己在处理指数情况时出现的问题,并总结了被hack的原因。 ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • 本文介绍了一种轻巧方便的工具——集算器,通过使用集算器可以将文本日志变成结构化数据,然后可以使用SQL式查询。集算器利用集算语言的优点,将日志内容结构化为数据表结构,SPL支持直接对结构化的文件进行SQL查询,不再需要安装配置第三方数据库软件。本文还详细介绍了具体的实施过程。 ... [详细]
  • 本文介绍了Codeforces Round #321 (Div. 2)比赛中的问题Kefa and Dishes,通过状压和spfa算法解决了这个问题。给定一个有向图,求在不超过m步的情况下,能获得的最大权值和。点不能重复走。文章详细介绍了问题的题意、解题思路和代码实现。 ... [详细]
  • 代理模式的详细介绍及应用场景
    代理模式是一种在软件开发中常用的设计模式,通过在客户端和目标对象之间增加一层中间层,让代理对象代替目标对象进行访问,从而简化系统的复杂性。代理模式可以根据不同的使用目的分为远程代理、虚拟代理、Copy-on-Write代理、保护代理、防火墙代理、智能引用代理和Cache代理等几种。本文将详细介绍代理模式的原理和应用场景。 ... [详细]
  • 本文介绍了Java集合库的使用方法,包括如何方便地重复使用集合以及下溯造型的应用。通过使用集合库,可以方便地取用各种集合,并将其插入到自己的程序中。为了使集合能够重复使用,Java提供了一种通用类型,即Object类型。通过添加指向集合的对象句柄,可以实现对集合的重复使用。然而,由于集合只能容纳Object类型,当向集合中添加对象句柄时,会丢失其身份或标识信息。为了恢复其本来面貌,可以使用下溯造型。本文还介绍了Java 1.2集合库的特点和优势。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
author-avatar
多米音乐_35794462
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有