热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

自己动手实现自旋锁(spinlock)

大多数的并行程序都需要在底层使用锁机制进行同步,简单来讲,锁无非是一套简单的原语,它们保证程序(或进程)对某一资源的互斥访问来维持数据的一致性,如果没有锁机制作为保证,多个线程可能

大多数的并行程序都需要在底层使用锁机制进行同步,简单来讲,锁无非是一套简单的原语,它们保证程序(或进程)对某一资源的互斥访问来维持数据的一致性,如果没有锁机制作为保证,多个线程可能同时访问某一资源,假设没有精心设计的(很复杂)无锁算法保证程序正确执行,那么后果往往非常严重的。无锁算法难于使用,所以一般而言都使用锁来保证程序的一致性。

如果更新某一数据结构的操作比较缓慢,那么互斥的锁是一个比较好的选择,此时如果某一进程或线程被阻塞,操作系统会重新接管控制权,并调度其他进程(或线程)继续执行,原先被阻塞的进程处于睡眠状态。控制权的转换伴随着进程上下文的切换,而这往往是一个昂贵而耗时的操作,所以对于等待锁的时间比较短,那么应该使用其他更高效的方法。




自旋锁(spinlock)

自旋锁(Spinlock)是一种常用的互斥(Mutual Exclusion)同步原语(Synchronization Primitive),试图进入临界区(Critical Section)的线程使用忙等待(Busy Waiting)的方式检测锁的状态,若锁未被持有则尝试获取。与其他锁不同,自旋锁仅仅只是“自旋”,即不停地检查某一锁是否已经被解开,自旋锁是非常快的,所以加锁-解锁操作耗时很短,然而,自旋锁也不是万精油,当因互斥导致进程睡眠的时间很长时,使用自旋锁是不明智的选择。

下面我们考虑实现自己的自旋锁,首先我们需要一些原语,幸好GCC已经为我们提供了一些内置函数,

#define atomic_xadd(P, V) __sync_fetch_and_add((P), (V))
#define cmpxchg(P, O, N) __sync_val_compare_and_swap((P), (O), (N))
#define atomic_inc(P) __sync_add_and_fetch((P), 1)
#define atomic_dec(P) __sync_add_and_fetch((P), -1)
#define atomic_add(P, V) __sync_add_and_fetch((P), (V))
#define atomic_set_bit(P, V) __sync_or_and_fetch((P), 1<<(V))
#define atomic_clear_bit(P, V) __sync_and_and_fetch((P), ~(1<<(V)))

然而,我们也需要自己实现其他的几个原子操作,如下:

/* Compile read-write barrier */
#define barrier() asm volatile("": : :"memory")
/* Pause instruction to prevent excess processor bus usage */
#define cpu_relax() asm volatile("pause\n": : :"memory")
/* Atomic exchange (of various sizes) */
static inline void *xchg_64(void *ptr, void *x)
{
__asm__ __volatile__("xchgq %0,%1"
:"=r" ((unsigned long long) x)
:"m" (*(volatile long long *)ptr), "0" ((unsigned long long) x)
:"memory");
return x;
}
static inline unsigned xchg_32(void *ptr, unsigned x)
{
__asm__ __volatile__("xchgl %0,%1"
:"=r" ((unsigned) x)
:"m" (*(volatile unsigned *)ptr), "0" (x)
:"memory");
return x;
}
static inline unsigned short xchg_16(void *ptr, unsigned short x)
{
__asm__ __volatile__("xchgw %0,%1"
:"=r" ((unsigned short) x)
:"m" (*(volatile unsigned short *)ptr), "0" (x)
:"memory");
return x;
}
/* Test and set a bit */
static inline char atomic_bitsetandtest(void *ptr, int x)
{
char out;
__asm__ __volatile__("lock; bts %2,%1\n"
"sbb %0,%0\n"
:"=r" (out), "=m" (*(volatile long long *)ptr)
:"Ir" (x)
:"memory");
return out;
}

自旋锁可以使用交换原语实现,如下:

#define EBUSY 1
typedef unsigned spinlock;
static void spin_lock(spinlock *lock)
{
while (1)
{
if (!xchg_32(lock, EBUSY)) return;
while (*lock) cpu_relax();
}
}
static void spin_unlock(spinlock *lock)
{
barrier();
*lock = 0;
}
static int spin_trylock(spinlock *lock)
{
return xchg_32(lock, EBUSY);
}

上面的自旋锁已经能够工作,但是也会产生问题,因为多个线程可能产生竞争,因为在锁释放的时候其他的每个线程都想获得锁。这会导致处理器总线的负载增大,从而使性能降低,所以接下来我们将实现另外一种自旋锁,该自旋锁能够感知下一个获得锁的进程或线程,因此能够大大减轻处理器总线负载。

下面我们介绍另外一种自旋锁,MCS自旋锁,该锁使用链表维护申请者的请求序列,

typedef struct mcs_lock_t mcs_lock_t;
struct mcs_lock_t
{
mcs_lock_t *next;
int spin;
};
typedef struct mcs_lock_t *mcs_lock;
static void lock_mcs(mcs_lock *m, mcs_lock_t *me)
{
mcs_lock_t *tail;
me->next = NULL;
me->spin = 0;
tail = xchg_64(m, me);
/* No one there? */
if (!tail) return;
/* Someone there, need to link in */
tail->next = me;
/* Make sure we do the above setting of next. */
barrier();
/* Spin on my spin variable */
while (!me->spin) cpu_relax();
return;
}
static void unlock_mcs(mcs_lock *m, mcs_lock_t *me)
{
/* No successor yet? */
if (!me->next)
{
/* Try to atomically unlock */
if (cmpxchg(m, me, NULL) == me) return;
/* Wait for successor to appear */
while (!me->next) cpu_relax();
}
/* Unlock next one */
me->next->spin = 1;
}
static int trylock_mcs(mcs_lock *m, mcs_lock_t *me)
{
mcs_lock_t *tail;
me->next = NULL;
me->spin = 0;
/* Try to lock */
tail = cmpxchg(m, NULL, &me);
/* No one was there - can quickly return */
if (!tail) return 0;
return EBUSY;
}

当然,MCS锁也是有问题的,因为它的API除了需要传递锁的地址外,还需要传递另外一个结构,下面介绍另外一种自旋锁算法,K42锁算法,

typedef struct k42lock k42lock;
struct k42lock
{
k42lock *next;
k42lock *tail;
};
static void k42_lock(k42lock *l)
{
k42lock me;
k42lock *pred, *succ;
me.next = NULL;
barrier();
pred = xchg_64(&l->tail, &me);
if (pred)
{
me.tail = (void *) 1;
barrier();
pred->next = &me;
barrier();
while (me.tail) cpu_relax();
}
succ = me.next;
if (!succ)
{
barrier();
l->next = NULL;
if (cmpxchg(&l->tail, &me, &l->next) != &me)
{
while (!me.next) cpu_relax();
l->next = me.next;
}
}
else
{
l->next = succ;
}
}
static void k42_unlock(k42lock *l)
{
k42lock *succ = l->next;
barrier();
if (!succ)
{
if (cmpxchg(&l->tail, &l->next, NULL) == (void *) &l->next) return;
while (!l->next) cpu_relax();
succ = l->next;
}
succ->tail = NULL;
}
static int k42_trylock(k42lock *l)
{
if (!cmpxchg(&l->tail, NULL, &l->next)) return 0;
return EBUSY;
}

K42和MCS锁都需要遍历链表才能找到下一个最可能获得锁的进程(或线程),有时查找可能比较费时,所以我们再次改进后:

typedef struct listlock_t listlock_t;
struct listlock_t
{
listlock_t *next;
int spin;
};
typedef struct listlock_t *listlock;
#define LLOCK_FLAG (void *)1
static void listlock_lock(listlock *l)
{
listlock_t me;
listlock_t *tail;
/* Fast path - no users */
if (!cmpxchg(l, NULL, LLOCK_FLAG)) return;
me.next = LLOCK_FLAG;
me.spin = 0;
/* Convert into a wait list */
tail = xchg_64(l, &me);
if (tail)
{
/* Add myself to the list of waiters */
if (tail == LLOCK_FLAG) tail = NULL;
me.next = tail;
/* Wait for being able to go */
while (!me.spin) cpu_relax();
return;
}
/* Try to convert to an exclusive lock */
if (cmpxchg(l, &me, LLOCK_FLAG) == &me) return;
/* Failed - there is now a wait list */
tail = *l;
/* Scan to find who is after me */
while (1)
{
/* Wait for them to enter their next link */
while (tail->next == LLOCK_FLAG) cpu_relax();
if (tail->next == &me)
{
/* Fix their next pointer */
tail->next = NULL;
return;
}
tail = tail->next;
}
}
static void listlock_unlock(listlock *l)
{
listlock_t *tail;
listlock_t *tp;
while (1)
{
tail = *l;
barrier();
/* Fast path */
if (tail == LLOCK_FLAG)
{
if (cmpxchg(l, LLOCK_FLAG, NULL) == LLOCK_FLAG) return;
continue;
}
tp = NULL;
/* Wait for partially added waiter */
while (tail->next == LLOCK_FLAG) cpu_relax();
/* There is a wait list */
if (tail->next) break;
/* Try to convert to a single-waiter lock */
if (cmpxchg(l, tail, LLOCK_FLAG) == tail)
{
/* Unlock */
tail->spin = 1;
return;
}
cpu_relax();
}
/* A long list */
tp = tail;
tail = tail->next;
/* Scan wait list */
while (1)
{
/* Wait for partially added waiter */
while (tail->next == LLOCK_FLAG) cpu_relax();
if (!tail->next) break;
tp = tail;
tail = tail->next;
}
tp->next = NULL;
barrier();
/* Unlock */
tail->spin = 1;
}
static int listlock_trylock(listlock *l)
{
/* Simple part of a spin-lock */
if (!cmpxchg(l, NULL, LLOCK_FLAG)) return 0;
/* Failure! */
return EBUSY;

等等,还可以改进,可以在自旋锁里面嵌套一层自旋锁,

typedef struct bitlistlock_t bitlistlock_t;
struct bitlistlock_t
{
bitlistlock_t *next;
int spin;
};
typedef bitlistlock_t *bitlistlock;
#define BLL_USED ((bitlistlock_t *) -2LL)
static void bitlistlock_lock(bitlistlock *l)
{
bitlistlock_t me;
bitlistlock_t *tail;
/* Grab control of list */
while (atomic_bitsetandtest(l, 0)) cpu_relax();
/* Remove locked bit */
tail = (bitlistlock_t *) ((uintptr_t) *l & ~1LL);
/* Fast path, no waiters */
if (!tail)
{
/* Set to be a flag value */
*l = BLL_USED;
return;
}
if (tail == BLL_USED) tail = NULL;
me.next = tail;
me.spin = 0;
barrier();
/* Unlock, and add myself to the wait list */
*l = &me;
/* Wait for the go-ahead */
while (!me.spin) cpu_relax();
}
static void bitlistlock_unlock(bitlistlock *l)
{
bitlistlock_t *tail;
bitlistlock_t *tp;
/* Fast path - no wait list */
if (cmpxchg(l, BLL_USED, NULL) == BLL_USED) return;
/* Grab control of list */
while (atomic_bitsetandtest(l, 0)) cpu_relax();
tp = *l;
barrier();
/* Get end of list */
tail = (bitlistlock_t *) ((uintptr_t) tp & ~1LL);
/* Actually no users? */
if (tail == BLL_USED)
{
barrier();
*l = NULL;
return;
}
/* Only one entry on wait list? */
if (!tail->next)
{
barrier();
/* Unlock bitlock */
*l = BLL_USED;
barrier();
/* Unlock lock */
tail->spin = 1;
return;
}
barrier();
/* Unlock bitlock */
*l = tail;
barrier();
/* Scan wait list for start */
do
{
tp = tail;
tail = tail->next;
}
while (tail->next);
tp->next = NULL;
barrier();
/* Unlock */
tail->spin = 1;
}
static int bitlistlock_trylock(bitlistlock *l)
{
if (!*l && (cmpxchg(l, NULL, BLL_USED) == NULL)) return 0;
return EBUSY;
}

还可以再次改进,如下

/* Bit-lock for editing the wait block */
#define SLOCK_LOCK 1
#define SLOCK_LOCK_BIT 0
/* Has an active user */
#define SLOCK_USED 2
#define SLOCK_BITS 3
typedef struct slock slock;
struct slock
{
uintptr_t p;
};
typedef struct slock_wb slock_wb;
struct slock_wb
{
/*
* last points to the last wait block in the chain.
* The value is only valid when read from the first wait block.
*/
slock_wb *last;
/* next points to the next wait block in the chain. */
slock_wb *next;
/* Wake up? */
int wake;
};
/* Wait for control of wait block */
static slock_wb *slockwb(slock *s)
{
uintptr_t p;
/* Spin on the wait block bit lock */
while (atomic_bitsetandtest(&s->p, SLOCK_LOCK_BIT))
{
cpu_relax();
}
p = s->p;
if (p <= SLOCK_BITS)
{
/* Oops, looks like the wait block was removed. */
atomic_dec(&s->p);
return NULL;
}
return (slock_wb *)(p - SLOCK_LOCK);
}
static void slock_lock(slock *s)
{
slock_wb swblock;
/* Fastpath - no other readers or writers */
if (!s->p && (cmpxchg(&s->p, 0, SLOCK_USED) == 0)) return;
/* Initialize wait block */
swblock.next = NULL;
swblock.last = &swblock;
swblock.wake = 0;
while (1)
{
uintptr_t p = s->p;
cpu_relax();
/* Fastpath - no other readers or writers */
if (!p)
{
if (cmpxchg(&s->p, 0, SLOCK_USED) == 0) return;
continue;
}
if (p > SLOCK_BITS)
{
slock_wb *first_wb, *last;
first_wb = slockwb(s);
if (!first_wb) continue;
last = first_wb->last;
last->next = &swblock;
first_wb->last = &swblock;
/* Unlock */
barrier();
s->p &= ~SLOCK_LOCK;
break;
}
/* Try to add the first wait block */
if (cmpxchg(&s->p, p, (uintptr_t)&swblock) == p) break;
}
/* Wait to acquire exclusive lock */
while (!swblock.wake) cpu_relax();
}
static void slock_unlock(slock *s)
{
slock_wb *next;
slock_wb *wb;
uintptr_t np;
while (1)
{
uintptr_t p = s->p;
/* This is the fast path, we can simply clear the SRWLOCK_USED bit. */
if (p == SLOCK_USED)
{
if (cmpxchg(&s->p, SLOCK_USED, 0) == SLOCK_USED) return;
continue;
}
/* There's a wait block, we need to wake the next pending user */
wb = slockwb(s);
if (wb) break;
cpu_relax();
}
next = wb->next;
if (next)
{
/*
* There's more blocks chained, we need to update the pointers
* in the next wait block and update the wait block pointer.
*/
np = (uintptr_t) next;
next->last = wb->last;
}
else
{
/* Convert the lock to a simple lock. */
np = SLOCK_USED;
}
barrier();
/* Also unlocks lock bit */
s->p = np;
barrier();
/* Notify the next waiter */
wb->wake = 1;
/* We released the lock */
}
static int slock_trylock(slock *s)
{
/* No other readers or writers? */
if (!s->p && (cmpxchg(&s->p, 0, SLOCK_USED) == 0)) return 0;
return EBUSY;
}

下面是另外一种实现方式,称为stack-lock算法,

typedef struct stlock_t stlock_t;
struct stlock_t
{
stlock_t *next;
};
typedef struct stlock_t *stlock;
static __attribute__((noinline)) void stlock_lock(stlock *l)
{
stlock_t *me = NULL;
barrier();
me = xchg_64(l, &me);
/* Wait until we get the lock */
while (me) cpu_relax();
}
#define MAX_STACK_SIZE (1<<12)
static __attribute__((noinline)) int on_stack(void *p)
{
int x;
uintptr_t u = (uintptr_t) &x;
return ((u - (uintptr_t)p + MAX_STACK_SIZE) 2);
}
static __attribute__((noinline)) void stlock_unlock(stlock *l)
{
stlock_t *tail = *l;
barrier();
/* Fast case */
if (on_stack(tail))
{
/* Try to remove the wait list */
if (cmpxchg(l, tail, NULL) == tail) return;
tail = *l;
}
/* Scan wait list */
while (1)
{
/* Wait for partially added waiter */
while (!tail->next) cpu_relax();
if (on_stack(tail->next)) break;
tail = tail->next;
}
barrier();
/* Unlock */
tail->next = NULL;
}
static int stlock_trylock(stlock *l)
{
stlock_t me;
if (!cmpxchg(l, NULL, &me)) return 0;
return EBUSY;
}

改进后变成,

typedef struct plock_t plock_t;
struct plock_t
{
plock_t *next;
};
typedef struct plock plock;
struct plock
{
plock_t *next;
plock_t *prev;
plock_t *last;
};
static void plock_lock(plock *l)
{
plock_t *me = NULL;
plock_t *prev;
barrier();
me = xchg_64(l, &me);
prev = NULL;
/* Wait until we get the lock */
while (me)
{
/* Scan wait list for my previous */
if (l->next != (plock_t *) &me)
{
plock_t *t = l->next;
while (me)
{
if (t->next == (plock_t *) &me)
{
prev = t;
while (me) cpu_relax();
goto done;
}
if (t->next) t = t->next;
cpu_relax();
}
}
cpu_relax();
}
done:
l->prev = prev;
l->last = (plock_t *) &me;
}
static void plock_unlock(plock *l)
{
plock_t *tail;
/* Do I know my previous? */
if (l->prev)
{
/* Unlock */
l->prev->next = NULL;
return;
}
tail = l->next;
barrier();
/* Fast case */
if (tail == l->last)
{
/* Try to remove the wait list */
if (cmpxchg(&l->next, tail, NULL) == tail) return;
tail = l->next;
}
/* Scan wait list */
while (1)
{
/* Wait for partially added waiter */
while (!tail->next) cpu_relax();
if (tail->next == l->last) break;
tail = tail->next;
}
barrier();
/* Unlock */
tail->next = NULL;
}
static int plock_trylock(plock *l)
{
plock_t me;
if (!cmpxchg(&l->next, NULL, &me))
{
l->last = &me;
return 0;
}
return EBUSY;
}

下面介绍另外一种算法,ticket lock算法,实际上,Linux内核正是采用了该算法,不过考虑到执行效率,人家是以汇编形式写的,

typedef union ticketlock ticketlock;
union ticketlock
{
unsigned u;
struct
{
unsigned short ticket;
unsigned short users;
} s;
};
static void ticket_lock(ticketlock *t)
{
unsigned short me = atomic_xadd(&t->s.users, 1);
while (t->s.ticket != me) cpu_relax();
}
static void ticket_unlock(ticketlock *t)
{
barrier();
t->s.ticket++;
}
static int ticket_trylock(ticketlock *t)
{
unsigned short me = t->s.users;
unsigned short menew = me + 1;
unsigned cmp = ((unsigned) me <<16) + me;
unsigned cmpnew = ((unsigned) menew <<16) + me;
if (cmpxchg(&t->u, cmp, cmpnew) == cmp) return 0;
return EBUSY;
}
static int ticket_lockable(ticketlock *t)
{
ticketlock u = *t;
barrier();
return (u.s.ticket == u.s.users);
}

至此,自旋锁各种不同的实现介绍完毕,亲,你明白了吗?:)

(全文完)



推荐阅读
  • 在Docker中,将主机目录挂载到容器中作为volume使用时,常常会遇到文件权限问题。这是因为容器内外的UID不同所导致的。本文介绍了解决这个问题的方法,包括使用gosu和suexec工具以及在Dockerfile中配置volume的权限。通过这些方法,可以避免在使用Docker时出现无写权限的情况。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 本文介绍了设计师伊振华受邀参与沈阳市智慧城市运行管理中心项目的整体设计,并以数字赋能和创新驱动高质量发展的理念,建设了集成、智慧、高效的一体化城市综合管理平台,促进了城市的数字化转型。该中心被称为当代城市的智能心脏,为沈阳市的智慧城市建设做出了重要贡献。 ... [详细]
  • CSS3选择器的使用方法详解,提高Web开发效率和精准度
    本文详细介绍了CSS3新增的选择器方法,包括属性选择器的使用。通过CSS3选择器,可以提高Web开发的效率和精准度,使得查找元素更加方便和快捷。同时,本文还对属性选择器的各种用法进行了详细解释,并给出了相应的代码示例。通过学习本文,读者可以更好地掌握CSS3选择器的使用方法,提升自己的Web开发能力。 ... [详细]
  • 本文主要解析了Open judge C16H问题中涉及到的Magical Balls的快速幂和逆元算法,并给出了问题的解析和解决方法。详细介绍了问题的背景和规则,并给出了相应的算法解析和实现步骤。通过本文的解析,读者可以更好地理解和解决Open judge C16H问题中的Magical Balls部分。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • Python正则表达式学习记录及常用方法
    本文记录了学习Python正则表达式的过程,介绍了re模块的常用方法re.search,并解释了rawstring的作用。正则表达式是一种方便检查字符串匹配模式的工具,通过本文的学习可以掌握Python中使用正则表达式的基本方法。 ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • 本文探讨了C语言中指针的应用与价值,指针在C语言中具有灵活性和可变性,通过指针可以操作系统内存和控制外部I/O端口。文章介绍了指针变量和指针的指向变量的含义和用法,以及判断变量数据类型和指向变量或成员变量的类型的方法。还讨论了指针访问数组元素和下标法数组元素的等价关系,以及指针作为函数参数可以改变主调函数变量的值的特点。此外,文章还提到了指针在动态存储分配、链表创建和相关操作中的应用,以及类成员指针与外部变量的区分方法。通过本文的阐述,读者可以更好地理解和应用C语言中的指针。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 开发笔记:实验7的文件读写操作
    本文介绍了使用C++的ofstream和ifstream类进行文件读写操作的方法,包括创建文件、写入文件和读取文件的过程。同时还介绍了如何判断文件是否成功打开和关闭文件的方法。通过本文的学习,读者可以了解如何在C++中进行文件读写操作。 ... [详细]
  • 本文介绍了在CentOS上安装Python2.7.2的详细步骤,包括下载、解压、编译和安装等操作。同时提供了一些注意事项,以及测试安装是否成功的方法。 ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
author-avatar
果粒仙子妹妹
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有