我想确定我理解条件变量是如何工作的,所以我将使用我写的程序来问我的问题.
在我的程序中,我有一个"生产者"线程(一个)和"工作线程"(几个让我们假设3).
生产者线程"处理"一个FIFO链表,这意味着它所做的只是检查列表开头是否有一个项(在我的程序类型请求中调用Req
)(由一个名为front的全局指针指向)在我的程序中)如果是这样,将它分配到一个全局请求元素(称为globalReq
).
工作线程,在一个循环中运行,等待处理请求,通过提取全局请求变量,将它们变为自己的局部变量(对于它们中的每一个都是"私有"的,因为每个线程都有一个独立的堆栈 - 正确我,如果我错了),然后处理请求.
为了做到这一点,我使用互斥量和条件变量.
一个重要的注意事项是,一旦请求存在(暂时让我们假设只存在一个请求),那么工作线程中的哪一个将"关注"它(假设它们都是"自由的" - 睡觉并不重要在条件变量上).
在提取请求并将其分配到全局请求之后,生成器线程调用pthread_cond_signal
- 据我所知,解锁至少一个"阻塞"线程 - >因此它可以解除阻塞,例如2个线程.
所以我的问题是我现有的代码(如下):
1)我怎样才能保证只有一个线程(来自工作线程)将处理请求.我是否需要在所有通用"生产者消费者"实施中添加"while check loop"?
2)如何通过pthread_cond_broadcast
(或者如果一个pthread_cond_signal
未被阻塞的多个线程)通过互斥锁上的内容解除阻塞的线程,可能我还没有掌握它...
(每个)工作线程的代码是:
void *worker(void *arg) { while(1) { printf("\n BEFORE LOCKING sthread_mutex with thread: %d \n", syscall(SYS_gettid)); pthread_mutex_lock(&sthread_mutex); printf("\n AFTER UNLOCKING sthread_mutex with thread: %d \n", syscall(SYS_gettid)); printf("\n BEFORE WAITING ON cond_var with thread: %d \n", syscall(SYS_gettid)); pthread_cond_wait(&cond_var,&sthread_mutex); //wait on condition variable printf("\n AFTER WAITING ON cond_var with thread: %d \n", syscall(SYS_gettid)); printf("\n got signal for thread: %d \n",syscall(SYS_gettid)); // extract the current request into g local variable // within the "private stack" of this thread Req localReq = globalReq; pthread_mutex_unlock(&sthread_mutex); printf("\n AFTER UNLOCKING sthread_mutex with thread: %d \n", syscall(SYS_gettid)); // perform the desired task task(localReq); printf("\n BEFORE calling sem_post with thread: %d \n", syscall(SYS_gettid)); sem_post(&sem); } // end while (1) } // end of worker thread function
生产者线程的代码是:
void *producer(void *arg) { while(1) { if(front != NULL) // queue not empty { // go to sleep if all "worker threads" are occuipied // or decrement number of free "worker threads" threads by 1 printf(" schedualer thread BEFORE calling sem_wait on sem \n"); sem_wait(&sem); // lock the sthread mutex in order to "synchronize" with the // "worker threads"... printf(" schedualer thread BEFORE locking sthread_mutex \n"); pthread_mutex_lock(&sthread_mutex); printf(" schedualer thread AFTER locking sthread_mutex \n"); globalReq = extract_element(); // this is the global request variable // notify the worker threads that an "arriving request" needed to // be taking care of printf(" schedualer thread BEFORE calling signal on cond_var \n"); // pthread_cond_signal(&cond_var); pthread_cond_broadcast(&cond_var); printf(" schedualer thread AFTER calling signal on cond_var \n"); // unlock the smutex printf(" schedualer thread BEFORE UNLOCKING sthread_mutex \n"); pthread_mutex_unlock(&sthread_mutex); printf(" schedualer thread AFTER UNLOCKING sthread_mutex \n"); } // queue not empty else continue; } // end while (1) } // end of producer
另一个问题:
生产者线程调用sem_wait
的全球信号量(这是在与工作线程的数量开始初始化,在这种情况下,3,以指示本身)有多少工作线程处理目前的请求,并完成此"机制",工作线程,一旦完成处理他们"赢了"的请求(当争用条件变量时),调用sem_post
指示"另一个工作线程可用"
3)这是实现这种"发信号通知有多少可用工作线程"的正确(良好/有效)方式吗?
4)通过//*段落中提到的生产者和工人线程共享的全局变量 "传递"请求有什么好处和缺点?传递它是一种明智的方式,或者最好"只是"创建一个"新的请求变量"(在使用malloc的堆上),它将为每个工作线程和请求"专用"(并且还在其中释放它)每个工作线程一旦完成为请求服务)?
5)如果您对这段代码有任何其他意见(好的或坏的),请随时向我表明.
编辑:
大家好,
一些额外的问题:
除了生产者和工作者线程之外,还有另一个称为监听器的线程,它只是用于插入到达链表(FIFO队列)的请求,所以它实际上不是之前提到的生产者的任务.
所以我的新问题是:
8)关于我的程序的附加信息,我用信号量组成的"信令机制"是否有效?
9)由生产者和监听器线程管理的链表有两个全局指针 front
,分别rear
指向链表的头部和尾部(列表的头部是第一个要处理的请求).
下面是监听器线程执行的插入函数的实现,以及生产者线程执行的"提取"功能.
为了在"队列"(链表)上同步这两个线程,我使用了一个名为qmutex的共享互斥锁.
我的问题是,关于下面的两个代码,在每个函数中"放置"互斥锁(锁定和解锁)的"最佳"位置在哪里?
谢谢分配,
盖伊.
插入功能:
void insertion(void *toInsert) { struct getInfo *req = (struct getInfo *)toInsert; newNode = (N*)malloc(sizeof(N)); newNode->req = req; newNode->next = NULL; // WHERE SHULD I LOCK (AND UNLOCK) THE QUEUE MUTEX ???????????????????????? if(front == NULL) { front = newNode; printf("empty list - insert as head \n"); } else { rear->next = newNode; printf(" NOT AN EMPTY list - insert as last node \n"); } rear = newNode; } // end of insertion
提取功能:
Req extract_element() { if(front == NULL) printf("\n empty queue \n"); else { Req ret; tmpExtract = front; ret.socketNum = tmpExtract->req->socketNum; ret.type = tmpExtract->req->type; printf("\n extracting node with sockNum: %d \n",ret.socketNum); front = front->next; free(tmpExtract); return(ret); } } // end of extract_element
sonicwave.. 5
而不是直接回答您的问题,首先,这里是对典型方法的描述:
您有一种队列或列表,您可以在其中添加工作数据.每当您添加一组工作数据时,首先锁定互斥锁,添加数据,发出条件变量信号,然后解锁互斥锁.
然后,您的工作线程会锁定互斥锁,并在队列为空时等待循环中的条件.当发送信号时,一个或多个工作人员将被唤醒,但只有一个(一次)将获取互斥锁.锁定互斥锁后,"获胜者"会检查队列中是否存在某些内容,将其解压缩,解锁互斥锁,并执行必要的工作.在解锁互斥锁之后,其他线程也可能会被唤醒(并且如果条件被广播将会唤醒),并且将从队列中提取下一个工作,或者如果队列为空则返回等待.
在代码中,它看起来有点像这样:
#include#include #include #define WORKER_COUNT 3 pthread_mutex_t mutex; pthread_cond_t cond; pthread_t workers[WORKER_COUNT]; static int queueSize = 0; static void *workerFunc(void *arg) { printf("Starting worker %d\n", (int)arg); while(1) { pthread_mutex_lock(&mutex); while(queueSize < 1) { pthread_cond_wait(&cond, &mutex); } printf("Worker %d woke up, processing queue #%d\n", (int)arg, queueSize); //Extract work from queue --queueSize; pthread_mutex_unlock(&mutex); //Do work sleep(1); } } int main() { int i; pthread_mutex_init(&mutex, 0); pthread_cond_init(&cond, 0); for(i=0; i (我在线程之后省略了清理工作,并且将工作人员号码传递给线程很快且很脏,但在这种情况下可以正常工作).
在这里,工作人员将被唤醒
pthread_cond_broadcast()
,并且只要队列中存在某些东西就会运行(直到queueSize
回到0 - 想象还有一个实际的队列),然后再回去等待.回到问题:
1:互斥锁和保护变量(这里是它
queueSize
)负责这一点.您还需要保护变量,因为您的线程也可能因其他原因而被唤醒(所谓的虚假唤醒,请参阅http://linux.die.net/man/3/pthread_cond_wait).2:如果你打电话,唤醒线程就像任何其他线程一样攻击互斥锁
pthread_mutex_lock()
.3:我不确定为什么你需要向生产者发出可用工作线程数量的信号?
4:队列需要可以从生产者和使用者那里访问 - 但是仍然可以用各种方式用函数(或类,如果你使用的是C++)封装.
5:我希望以上就足够了?
6:事情
pthread_cond_wait()
是,它可以有虚假的唤醒.也就是说,即使你没有发出信号,它也可能会醒来.因此,你需要保护变量(while()
周围循环pthread_cond_wait()
在我的代码示例),以确保有实际上是有原因的苏醒,一次pthread_cond_wait()
返回.然后,您使用与条件使用的相同的互斥锁保护保护变量(以及您需要提取的任何工作数据),然后您可以确定只有一个线程将执行每项工作.7:我没有让生产者进入睡眠状态,而是让它添加它可以提取到工作队列的任何数据.如果队列已满,那么它应该进入休眠状态,否则它应该继续添加内容.
8:使用Listener线程,我真的不明白为什么你甚至需要你的Producer线程.为什么不让工人
extract_element()
自言自语?9:您需要保护对列表变量的所有访问.也就是说,在
insertion()
第一次访问之前锁定互斥锁front
,并在上次访问之后将其解锁rear
.同样的事情extract_element()
- 虽然你需要重写函数,以便在队列为空时也有一个有效的返回值.
而不是直接回答您的问题,首先,这里是对典型方法的描述:
您有一种队列或列表,您可以在其中添加工作数据.每当您添加一组工作数据时,首先锁定互斥锁,添加数据,发出条件变量信号,然后解锁互斥锁.
然后,您的工作线程会锁定互斥锁,并在队列为空时等待循环中的条件.当发送信号时,一个或多个工作人员将被唤醒,但只有一个(一次)将获取互斥锁.锁定互斥锁后,"获胜者"会检查队列中是否存在某些内容,将其解压缩,解锁互斥锁,并执行必要的工作.在解锁互斥锁之后,其他线程也可能会被唤醒(并且如果条件被广播将会唤醒),并且将从队列中提取下一个工作,或者如果队列为空则返回等待.
在代码中,它看起来有点像这样:
#include <pthread.h> #include <unistd.h> #include <stdio.h> #define WORKER_COUNT 3 pthread_mutex_t mutex; pthread_cond_t cond; pthread_t workers[WORKER_COUNT]; static int queueSize = 0; static void *workerFunc(void *arg) { printf("Starting worker %d\n", (int)arg); while(1) { pthread_mutex_lock(&mutex); while(queueSize < 1) { pthread_cond_wait(&cond, &mutex); } printf("Worker %d woke up, processing queue #%d\n", (int)arg, queueSize); //Extract work from queue --queueSize; pthread_mutex_unlock(&mutex); //Do work sleep(1); } } int main() { int i; pthread_mutex_init(&mutex, 0); pthread_cond_init(&cond, 0); for(i=0; i<WORKER_COUNT; ++i) { pthread_create(&(workers[i]), 0, workerFunc, (void*)(i+1)); } sleep(1); pthread_mutex_lock(&mutex); //Add work to queue queueSize = 5; pthread_cond_broadcast(&cond); pthread_mutex_unlock(&mutex); sleep(10); return 0; }
(我在线程之后省略了清理工作,并且将工作人员号码传递给线程很快且很脏,但在这种情况下可以正常工作).
在这里,工作人员将被唤醒pthread_cond_broadcast()
,并且只要队列中存在某些东西就会运行(直到queueSize
回到0 - 想象还有一个实际的队列),然后再回去等待.
回到问题:
1:互斥锁和保护变量(这里是它queueSize
)负责这一点.您还需要保护变量,因为您的线程也可能因其他原因而被唤醒(所谓的虚假唤醒,请参阅http://linux.die.net/man/3/pthread_cond_wait).
2:如果你打电话,唤醒线程就像任何其他线程一样攻击互斥锁pthread_mutex_lock()
.
3:我不确定为什么你需要向生产者发出可用工作线程数量的信号?
4:队列需要可以从生产者和使用者那里访问 - 但是仍然可以用各种方式用函数(或类,如果你使用的是C++)封装.
5:我希望以上就足够了?
6:事情pthread_cond_wait()
是,它可以有虚假的唤醒.也就是说,即使你没有发出信号,它也可能会醒来.因此,你需要保护变量(while()
周围循环pthread_cond_wait()
在我的代码示例),以确保有实际上是有原因的苏醒,一次pthread_cond_wait()
返回.然后,您使用与条件使用的相同的互斥锁保护保护变量(以及您需要提取的任何工作数据),然后您可以确定只有一个线程将执行每项工作.
7:我没有让生产者进入睡眠状态,而是让它添加它可以提取到工作队列的任何数据.如果队列已满,那么它应该进入休眠状态,否则它应该继续添加内容.
8:使用Listener线程,我真的不明白为什么你甚至需要你的Producer线程.为什么不让工人extract_element()
自言自语?
9:您需要保护对列表变量的所有访问.也就是说,在insertion()
第一次访问之前锁定互斥锁front
,并在上次访问之后将其解锁rear
.同样的事情extract_element()
- 虽然你需要重写函数,以便在队列为空时也有一个有效的返回值.