muduo库中的Thread类集合了所有线程的操作,其中还运用了线程安全的观察者模式。
运用shared_ptr和weak_ptr做到了生命周期的管理。
线程类的接口:
代码加注释如下:
Thread.h
#ifndef MUDUO_BASE_THREAD_H #define MUDUO_BASE_THREAD_H #includeThread.cc#include #include #include #include #include namespace muduo { class Thread : boost::noncopyable { public: typedef boost::function ThreadFunc; explicit Thread(const ThreadFunc&, const string& name = string()); #ifdef __GXX_EXPERIMENTAL_CXX0X__ explicit Thread(ThreadFunc&&, const string& name = string()); //右值引用,在对象返回的时候不会拷贝构造临时对象,而是和临时对象交换,提高了效率 #endif ~Thread(); void start(); int join(); // return pthread_join() bool started() const { return started_; } // pthread_t pthreadId() const { return pthreadId_; } pid_t tid() const { return *tid_; } //由于pthread_t的id号可能是一样的,所以需要用gettid() const string& name() const { return name_; } static int numCreated() { return numCreated_.get(); } private: void setDefaultName(); bool started_; bool joined_; pthread_t pthreadId_; boost::shared_ptr tid_; //用来管理生命期 ThreadFunc func_; //线程回调函数 string name_; static AtomicInt32 numCreated_; //原子操作 }; } #endif
#include#include #include #include #include #include #include #include #include #include #include #include #include #include namespace muduo { namespace CurrentThread { __thread int t_cachedTid = 0; //用来缓存的id //__thread修饰的变量是线程局部存储的,线程不共享,线程安全,解释见下2 __thread char t_tidString[32]; //tid的字符串表示形式 __thread int t_tidStringLength = 6; __thread const char* t_threadName = "unknown"; //每个线程的名称 const bool sameType = boost::is_same ::value; //boost::is_same用来检测参数是否是相同的类型,相同返回true。 BOOST_STATIC_ASSERT(sameType); //断言相同 } namespace detail { pid_t gettid() { return static_cast (::syscall(SYS_gettid)); //系统调用获取 //解释见下1 } void afterFork() //fork之后打扫战场,子进程中执行 { muduo::CurrentThread::t_cachedTid = 0; //当前为0 //1.先清零tid muduo::CurrentThread::t_threadName = "main";//为main//为什么要赋值为0和main,因为fork可能在主线程中调用,也可能在子线程中调用。fork得到一个新进程, CurrentThread::tid(); //2.此处再缓存tid //新进程只有一个执行序列,只有一个线程 // no need to call pthread_atfork(NULL, NULL, &afterFork); //实际上服务器要么多进程,要么多线程。如果都用,甚至可能死锁,见下5 } class ThreadNameInitializer //线程名初始化 { public: ThreadNameInitializer() { muduo::CurrentThread::t_threadName = "main"; //由下面的init全局对象先触发构造,主线程的名称为main CurrentThread::tid(); //获得tid pthread_atfork(NULL, NULL, &afterFork); //如果我们调用了fork函数,调用成功后子进程会调用afterfork() } }; ThreadNameInitializer init; //全部变量类,这个对象构造先于main函数,当我们的程序引入这个库时,这个全局函数直接构造,我们程序的main()函数还没有执行。 struct ThreadData //线程数据类,观察者模式 { typedef muduo::Thread::ThreadFunc ThreadFunc; ThreadFunc func_; string name_; boost::weak_ptr wkTid_; ThreadData(const ThreadFunc& func, const string& name, const boost::shared_ptr & tid) //保存有Thread类的shared_ptr : func_(func), name_(name), wkTid_(tid) { } void runInThread() //线程运行 { pid_t tid = muduo::CurrentThread::tid(); //得到线程tid boost::shared_ptr ptid = wkTid_.lock(); //尝试将weak_ptr提升为shared_ptr,thread_safe if (ptid) //如果成功 { *ptid = tid; ptid.reset(); //成功了之后把通过智能指针修改父类线程id,该临时shared_ptr销毁掉 } muduo::CurrentThread::t_threadName = name_.empty() ? "muduoThread" : name_.c_str(); ::prctl(PR_SET_NAME, muduo::CurrentThread::t_threadName); try { func_(); //运行线程运行函数 muduo::CurrentThread::t_threadName = "finished"; //运行玩的threadname } catch (const Exception& ex) //Exception异常 { muduo::CurrentThread::t_threadName = "crashed"; fprintf(stderr, "exception caught in Thread %s\n", name_.c_str()); fprintf(stderr, "reason: %s\n", ex.what()); fprintf(stderr, "stack trace: %s\n", ex.stackTrace()); //打印函数调用栈 abort(); } catch (const std::exception& ex) //标准异常 { muduo::CurrentThread::t_threadName = "crashed"; fprintf(stderr, "exception caught in Thread %s\n", name_.c_str()); fprintf(stderr, "reason: %s\n", ex.what()); abort(); } catch (...) //其他 { muduo::CurrentThread::t_threadName = "crashed"; fprintf(stderr, "unknown exception caught in Thread %s\n", name_.c_str()); throw; // rethrow //再次抛出 } } }; void* startThread(void* obj) //线程启动 { ThreadData* data = static_cast (obj); //派生类指针转化成基类指针,obj是派生类的this指针 data->runInThread(); delete data; return NULL; } } } using namespace muduo; void CurrentThread::cacheTid() //在这里第一次会缓存tid,并不会每次都systemcall,提高了效率 { if (t_cachedTid == 0) { t_cachedTid = detail::gettid(); t_tidStringLength = snprintf(t_tidString, sizeof t_tidString, "%5d ", t_cachedTid); } } bool CurrentThread::isMainThread() //是否是主线程 { return tid() == ::getpid(); } void CurrentThread::sleepUsec(int64_t usec) //休眠 { struct timespec ts = { 0, 0 }; ts.tv_sec = static_cast (usec / Timestamp::kMicroSecondsPerSecond); ts.tv_nsec = static_cast (usec % Timestamp::kMicroSecondsPerSecond * 1000); ::nanosleep(&ts, NULL); } AtomicInt32 Thread::numCreated_; Thread::Thread(const ThreadFunc& func, const string& n) : started_(false), joined_(false), pthreadId_(0), tid_(new pid_t(0)), func_(func), name_(n) { setDefaultName(); } #ifdef __GXX_EXPERIMENTAL_CXX0X__ //C++11标准 Thread::Thread(ThreadFunc&& func, const string& n) : started_(false), joined_(false), pthreadId_(0), tid_(new pid_t(0)), func_(std::move(func)), name_(n) { setDefaultName(); } #endif //pthread_join()和pthread_detach()都是防止现成资源泄露的途径,join()会阻塞等待。 Thread::~Thread() //这个析构函数是线程安全的。析构时确认thread没有join,才会执行析构。即线程的析构不会等待线程结束 { //如果thread对象的生命周期长于线程,那么可以通过join等待线程结束。否则thread对象析构时会自动detach线程,防止资源泄露 if (started_ && !joined_) //如果没有join,就detach,如果用过了,就不用了。 { pthread_detach(pthreadId_); } } void Thread::setDefaultName() //相当于给没有名字的线程起个名字 "Thread %d" { int num = numCreated_.incrementAndGet(); //原子操作 if (name_.empty()) { char buf[32]; snprintf(buf, sizeof buf, "Thread%d", num); name_ = buf; } } void Thread::start() //线程启动 { assert(!started_); started_ = true; // FIXME: move(func_) detail::ThreadData* data = new detail::ThreadData(func_, name_, tid_); if (pthread_create(&pthreadId_, NULL, &detail::startThread, data)) { started_ = false; delete data; // or no delete? LOG_SYSFATAL <<"Failed in pthread_create"; } } int Thread::join() //等待线程 { assert(started_); assert(!joined_); joined_ = true; return pthread_join(pthreadId_, NULL); }
需要注意的是
1.在linux系统中,每个进程都有一个pid,类型pid_t,由getpid()取得。Linux下的POSIX线程也有一个id,类型pthread_t,由pthread_self()取得,该线程由线程库维护,其id空间是各个进程独立的(即不同线程中可能拥有相同的id)。有时我们需要知道线程的真实id,就不能使用pid和pthread id,需要使用该线程真实的id,称为tid。(不是我们平常所用的pthread_t tid; tid=pthread_create()的那个tid)。
linux系统中有一个系统调用可以实现得到线程的真实id,我们可以实现一个函数,返回该系统调用的返回值,return syscall(SYS_gettid),但是频繁系统调用会造成性能降低,muduo库就使用了全局变量current_tid来缓存它,我们只需要调用一次,以后直接获取该缓存就可以了。
2.__thread修饰的变量是线程局部存储的,每个线程都有一份。__thread,gcc内置的线程局部存储设施。__thread只能修饰POD类型。如果类没有定义构造函数,也是POD类型。
3.boost::is_same用来判断是否是同一类型
比如typedef unsigned int u_int,就是同一类型。
4.pthread_atfork()
int pthread_atfork(void (*prepare)(void), void(*parent)(void), void(*child)(void));
调用fork时,内部创建子进程前在父进程中会调用prepare,内部创建子进程成功后,父进程会调用parent,子进程会调用child。
示例:
#include#include #include #include using namespace std; void prepare() { cout<<"pid = "< (getpid())<<" prepare"< (getpid())<<" parent"< (getpid()))<<" child"< (getpid()))<<" enntering main"< (getpid()))<<" exiting main"< 输出:
5.多线程程序中调用fork()造成死锁示例
见我另一篇博客,这部分是我追加的:http://blog.csdn.net/freeelinux/article/details/53426876