并发(Concurrency)是指多个单元同时、并行被执行,而并发执行单元对共享资源(硬件资源和软件上的全局变量,静态变量等)的访问很容易导致竞态(Race Conditions)。
概念:Linux驱动之并发与竞态
竞争状态的分类:
对称多处理器(SMP)的多个CPU | SMP是一种紧耦合、共享存储的系统类型,因为多个CPU同时共享系统总线,因此可以访问共同的外设和存储器 |
单CPU内进程与抢占它的进程 | linux2.6以后支持内核抢占调度,一个进程在内核执行的时候可能耗完了自己的时间片,也可能被另一个高优先级进程打断,进程和抢占它的进程访问共享资源,竞态发生 |
中断(硬中断、软中断、Tasklet、底半部)与进程之间 | 中断打断进程,中断打断中断(中断程序访问进程或另一个中断正在访问的资源,则竞态发生) |
PS:中断下半部机制 - 软中断及tasklet
竞态的解决方法是:
保证对共享资源的互斥访问(即一个执行单元在访问共享资源时,其他的执行单元被禁止访问)访问共享资源的代码区域被称为临界区(critical sections),临界区需要被以某种互斥机制加以保护。
Linux常见互斥机制:中断屏蔽、原子操作、自旋锁和信号量、互斥体等
程序在运行时内存实际的访问顺序和程序代码编写的访问顺序不一定一致,这就是内存乱序访问。内存乱序访问行为出现的理由是为了提升程序运行时的性能。内存乱序访问主要发生在两个阶段:
防止编译乱序:
未加屏障 | 加编译屏障 |
// test.cpp int x, y, r; void f() { x = r; y = 1; } |
int x, y, r; void f() { x = r; __asm__ __volatile__("" ::: "memory"); y = 1; } |
| 编译后,对于x的内存访问必定在y赋值之前 |
执行乱序:(主要表现在多CPU上)
如果是单核CPU,执行程序时碰到依赖点(如f=1;while(f==0);//会等待f=1执行完,再执行while),会等待,因此程序员感受不到乱序;
但是,这个依赖点等待对于其他核是不可见的,例如:
CPU0:
while(f==0);//wait
printf(x);
CPU1:
x=42;
f=1;
执行乱序的解决方法:
DMB DSB ISB 简介
概括的讲:ISB>DSB>DMB
屏障指令 | 功能 | 应用 | 解释 |
DMB(Data memory barrier) | 数据内存屏障:DMB可以继续执行之后的指令,只要这条指令不是内存访问指令; | core0:write A;DMB;write B core1:Load B;Load A | 写入A完成后才能写入B,因此加载B的值正确是,A的值也必然正确 |
DSB(Data Synchronization Barrier) | 数据同步指令:等待DSB之前的所有指令完成(包括指令前的所有缓存,跳转预测,TLB维护操作) | ||
ISB(Instruction Synchronization Barrier) | 指令同步屏障:Flush流水线,使指令之后执行的指令都是从缓存或内存中获得的 |
更详细的可以参考:Linux内核同步机制之(一):原子操作
要了解原子操作,首先需要了解LDREX,STREX指令,
首先我们看一下LDR和STR的含义:
后缀EX其实是Exclusive(独占的);
LDREX和STREX总结:
之后我们可以来看原子操作的源代码:(以atomic_add()和atomic_add_return()为例)
//保证原子操作的输入参数都是atomic结构体,因此可以对原子操作进行计数
typedef struct {int counter;
} atomic_t;
#if __LINUX_ARM_ARCH__ >= 6 ----------------------(1)
static inline void atomic_add(int i, atomic_t *v)
{unsigned long tmp;int result;
//prefetchw : 将counter的值读入内存中prefetchw(&v->counter); -------------------------(2)__asm__ __volatile__("@ atomic_add\n" ------------------(3)
"1: ldrex %0, [%3]\n" --------------------------(4)
" add %0, %0, %4\n" --------------------------(5)
" strex %1, %0, [%3]\n" -------------------------(6)
" teq %1, #0\n" -----------------------------(7)
" bne 1b": "=&r" (result), "=&r" (tmp), "+Qo" (v->counter) ---对应%0,%1,%2: "r" (&v->counter), "Ir" (i) -------------对应%3,%4: "cc");
}#else#ifdef CONFIG_SMP
#error SMP not supported on pre-ARMv6 CPUs
#endifstatic inline int atomic_add_return(int i, atomic_t *v)
{unsigned long flags;int val;raw_local_irq_save(flags);val = v->counter;v->counter = val += i;raw_local_irq_restore(flags);return val;
}
#define atomic_add(i, v) (void) atomic_add_return(i, v)#endif
最终要的是下面的代码的含义
__asm__ __volatile__("@ atomic_add\n" //__asm_ _volatile()表示下面的汇编代码编译不要优化;@表示该行是注释
"1: ldrex %0, [%3]\n" //%3就是"r",%0就是"=&r",从内存中读(&v-counter),并存到另一个寄存器中
" add %0, %0, %4\n" //%0寄存器记录的是v-counter的值,这里 的操作是寄存器值+1
" strex %1, %0, [%3]\n" //将寄存器值写回内存中,即更新内存中v->counter值,如果成功%1寄存器值为0,否则为1
" teq %1, #0\n" //比较%1的值是不是0,若不是,跳回第一步重新执行
" bne 1b"
备注:
%3就是input operand list中的"r" (&v->counter),r是限制符(constraint),用来告诉编译器gcc,去选择一个通用寄存器保存该操作数吧。%0对应output openrand list中的"=&r" (result),=表示该操作数是write only的,&表示该操作数是一个earlyclobber operand,具体是什么意思呢?编译器在处理嵌入式汇编的时候,倾向使用尽可能少的寄存器,如果output operand没有&修饰的话,汇编指令中的input和output操作数会使用同样一个寄存器。因此,&确保了%3和%0使用不同的寄存器。
(5)完成步骤(4)后,%0这个output操作数已经被赋值为atomic_t变量的old value,毫无疑问,这里的操作是要给old value加上i。这里%4对应"Ir" (i),这里“I”这个限制符对应ARM平台,表示这是一个有特定限制的立即数,该数必须是0~255之间的一个整数通过rotation的操作得到的一个32bit的立即数。这是和ARM的data-processing instructions如何解析立即数有关的。每个指令32个bit,其中12个bit被用来表示立即数,其中8个bit是真正的数据,4个bit用来表示如何rotation。更详细的内容请参考ARM ARM文档。
(6)这一步将修改后的new value保存在atomic_t变量中。是否能够正确的操作的状态标记保存在%1操作数中,也就是"=&r" (tmp)。
(7)检查memory update的操作是否正确完成,如果OK,皆大欢喜,如果发生了问题(有其他的内核路径插入),那么需要跳转到lable 1那里,从新进行一次read-modify-write的操作
原子操作源码总结:
ldrex(内存读数据到寄存器)---->atomic.counter操作(+,-,等)------>strex(尝试写数据到内存中)
但是:如果原子操作过程中,如果发生过并发的访问,那么strex会执行失败 ,跳回ldrex重新执行 !!!!
整型原子操作:
Function Name | Explain |
void atomic_set(atomic *v,int i) | 设置原子变量v->counter为1 |
atomic v=ATOMIC_INIT(0) | 定义原子变量v,并初始化v->counter为0 |
atomic_read(atomic *v) | 返回原子变量的值 |
void atomic_add(int i,atomic *v) | 原子变量值加i |
void atomic_sub(int i,atomic *v) | 原子变量值减i |
void atomic_inc(atomic *v) | 原子变量值自增 |
void atomic_dec(atomic *v) | 原子变量值自减 |
int atomic_inc_and_test(atomic *v) int atomic_dec_and_test(atomic *v) int atomic_sub_and_test(int i,atomic *v) | 操作(自增,自减,减)并返回,测试操作是否为0; 为0返回true 否则返回false |
void atomic_add_return(int i,atomic *v) void atomic_sub_return(int i,atomic *v) void atomic_inc_return(int i,atomic *v) void atomic_dec_return(int i,atomic *v) | 操作并返回新的值; 先返回测试值,再操作!!!!! |
位原子操作:
void set_bit(nr,void *addr) | 设置addr地址的第nr位(该位写1) |
void clear_bit(nr,void *addr) | 清除addr地址的第nr位(该位写0) |
void change_bit(nr,void *addr) | addr地址的第nr位取反 |
test_bit(nr,void *addr) | 测试addr地址的第nr位,返回值 |
int test_and_set_bit(nr,void *addr) int test_and_clear_bit(nr,void *addr) int test_and_change_bit(nr,void *addr) | 测试并操作位,返回测试值 |
static atomic_t xxx_available=ATOMIC_INIT(1); //定义原子变量xxx_available,初值为1
static int xxx_open(struct inode *inode,struct file *filep)
{
....if(!atomic_dec_and_test(&xxx_available)){atomic_inc(&xxx_available);return -EBUSY; //已经打开}
....return 0;
}
static int xxx_release(struct inode *inode,struct file *filep)
{atomic_inc(&xxx_available); //释放设备return 0;
}
假设设备未打开,则xxx_available=1;
打开设备,执行xxx_open,执行atomic_dec_and_test(),先返回值1,再执行xxx_avaiable--;此时xxx_available=0,设备打开;
另一个进程也想打开设备(二次打开),执行xxx_open,执行atomic_dec_and_test(),因为设备已经打开,返回xxx_available值0,在让xxx_available=-1,if条件满足,执行xxx_available++,返回EBUSY;
释放设备,xxx_available=0+1;
4.自旋锁