热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

python多线程输出_Python多线程

多线程基础概念并行与并发并行:同时处理多个任务,必须在多核环境下一段时间内同时处理多个任务,单核也可以并发并发手段线程:内核

多线程基础概念

并行与并发

并行:同时处理多个任务,必须在多核环境下

一段时间内同时处理多个任务,单核也可以并发

并发手段

线程:内核空间的调度

进程:内核空间的调度

协程:用户空间的调度

线程可以允许程序在同一进程空间中并发运行多个操作。本次主要介绍Python标准库中的多线程模块threading。

threading模块

线程初始化

使用threading模块的Thread类初始化对象然后调用start方法启动线程。

import threading

import time

def worker(num):

time.sleep(1)

print('worker-{}'.format(num))

# 创建线程对象 target参数是一个函数, 这个函数即线程要执行的逻辑

threads = [threading.Thread(target=worker, args=(i, ))for i in range(5)]

for t in threads:

t.start()

# start 方法启动一个线程, 当这个线程的逻辑执行完毕的时候,线程自动退出, Python 没有提供主动退出线程的方法

# 输出以下结果

worker-0worker-1worker-2worker-3

worker-4

初始化的五个线程的执行逻辑中的print方法打印字符串及换行符出现了随机分布,即出现了资源竞争。

给线程传递参数

import threading

import time

def worker(*args, **kwargs):

time.sleep(1)

print(args)

print(kwargs)

threads = threading.Thread(target=worker, args=(1, 2, 3), kwargs={'a':'b'}).start()

# 输出

(1, 2, 3)

{'a': 'b'}

args传递位置参数,kwargs传递关键字参数。

Thread常用参数和方法

>>> help(threading.Thread)

可以看到Thread函数的初始化方法中的参数如下:

| __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)

| This constructor should always be called with keyword arguments. Arguments are:

|

| *group* should be None; reserved for future extension when a ThreadGroup

| class is implemented.

|

| *target* is the callable object to be invoked by the run()

| method. Defaults to None, meaning nothing is called.

|

| *name* is the thread name. By default, a unique name is constructed of

| the form "Thread-N" where N is a small decimal number.

|

| *args* is the argument tuple for the target invocation. Defaults to ().

|

| *kwargs* is a dictionary of keyword arguments for the target

| invocation. Defaults to {}.

name

表示线程名称,默认情况下,线程名称是Thread-N,N是一个较小的十进制数。我们可以传递name参数,控制线程名称。

以下会导入logging模块来显示线程的名称等详细信息

import threading

import time

import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker(num):

time.sleep(1)

logging.info('worker-{}'.format(num))

threads = [threading.Thread(target=worker, args=(i, ), name='workerthread-{}'.format(i)) for i in range(5)]

for t in threads:

t.start()

# 输出

2017-03-20 21:39:29,339 INFO [workerthread-0] worker-0

2017-03-20 21:39:29,340 INFO [workerthread-1] worker-1

2017-03-20 21:39:29,340 INFO [workerthread-2] worker-2

2017-03-20 21:39:29,340 INFO [workerthread-3] worker-3

2017-03-20 21:39:29,346 INFO [workerthread-4] worker-4

其中logging模块的basicConfig函数的format中的%(threadName)s就是用来输出当前线程的名称的。

线程可以重名, 线程名并不是线程的唯一标识,但是通常应该避免线程重名,通常的处理手段是加前缀

daemon

Daemon:守护

和Daemon线程相对应的还有Non-Daemon线程,在此Thread初始化函数中的daemon参数即表示线程是否是Daemon线程。

Daemon线程:会伴随主线程结束而结束(可以理解为主线程结束,守护线程结束)

Non-Daemon线程:不会随着主线程结束而结束,主线程需要等待Non-Daemon结束

import logging

import time

import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker():

logging.info('starting')

time.sleep(2)

logging.info('stopping')

if __name__ == '__main__':

logging.info('starting')

t1 = threading.Thread(target=worker, name='worker1', daemon=False)

t1.start()

time.sleep(1)

t2 = threading.Thread(target=worker, name='worker2', daemon=True)

t2.start()

logging.info('stopping')

# 输出

2017-03-20 23:28:06,404 INFO [MainThread] starting

2017-03-20 23:28:06,436 INFO [worker1] starting

2017-03-20 23:28:07,492 INFO [worker2] starting

2017-03-20 23:28:07,492 INFO [MainThread] stopping # 主线程执行完成

2017-03-20 23:28:08,439 INFO [worker1] stopping # 主线程执行完成之后会等Non-Daemon线程执行完成,但是并不会等Daemon线程执行完成,即Daemon线程会随着主线程执行完成而释放

Thread.join()

如果想等Daemon线程执行完成之后主线程再退出,可以使用线程对象的join()方法

import logging

import time

import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker():

logging.info('starting')

time.sleep(2)

logging.info('stopping')

if __name__ == '__main__':

logging.info('starting')

t1 = threading.Thread(target=worker, name='worker1', daemon=False)

t1.start()

time.sleep(1)

t2 = threading.Thread(target=worker, name='worker2', daemon=True)

t2.start()

logging.info('stopping')

t1.join()

t2.join()

# 输出

2017-03-20 23:41:07,217 INFO [MainThread] starting

2017-03-20 23:41:07,243 INFO [worker1] starting

2017-03-20 23:41:08,245 INFO [worker2] starting

2017-03-20 23:41:08,246 INFO [MainThread] stopping

2017-03-20 23:41:09,243 INFO [worker1] stopping

2017-03-20 23:41:10,248 INFO [worker2] stopping

使用join函数只有主线程就需要等待Daemon线程执行完成在推出。

join函数的原型:join(self, timeout=None)

join方法会阻塞直到线程退出或者超时, timeout 是可选的,如果不设置timeout, 会一直等待线程退出。如果设置了timeout,会在超时之后退出或者线程执行完成退出。

因为join函数总是返回None,因此在超时时间到达之后如果要知道线程是否还是存活的,可以调用is_alive()方法判断线程是否存活。

threading常用方法

enumerate()

列出当前所有的存活的线程

>>> threading.enumerate()

[<_mainthread started>, , ]

local()

import logging

import threading

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

ctx &#61; threading.local()

ctx.data &#61; 5

data &#61; &#39;a&#39;

def worker():

logging.info(data)

logging.info(ctx.data)

worker()

threading.Thread(target&#61;worker).start()

# 输出

2017-03-21 00:02:08,102 INFO [MainThread] a

2017-03-21 00:02:08,113 INFO [MainThread] 5

2017-03-21 00:02:08,119 INFO [Thread-34] a

Exception in thread Thread-34:

Traceback (most recent call last):

File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 914, in _bootstrap_inner

self.run()

File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 862, in run

self._target(*self._args, **self._kwargs)

File "", line 7, in worker

logging.info(ctx.data)

AttributeError: &#39;_thread._local&#39; object has no attribute &#39;data&#39;

线程共享内存、状态和资源。但是thread模块的local类的对象的属性&#xff0c; 只在当前线程可见。

Thread类的派生

Python中可以通过继承 Thread 类并重写 run 方法来编写多线程的逻辑&#xff0c;此时逻辑函数就是run。

class mythread(threading.Thread):

def run(self):

print(&#39;mythread run&#39;)

t &#61; mythread()

t.run() # 输出mythread run

t.start() # 输出mythread run

通过继承方式派生而来的子类对象可以同时执行start方法和run方法&#xff0c;结果是一样的&#xff0c;都是执行子类的run方法。但是非继承的方式不能同时使用start方法和run方法&#xff0c;会报错。

派生时逻辑函数的参数传递

class mythread(threading.Thread):

def __init__(self, *args, **kwargs):

super().__init__() # 需要调用父类的初始化方法初始化

self.args &#61; args

self.kwargs &#61; kwargs

def run(self):

print(&#39;mythread run&#39;, self.args, self.kwargs)

t &#61; mythread(1, 2, 3, a&#61;&#39;b&#39;)

t.start() # 输出mythread run (1, 2, 3) {&#39;a&#39;: &#39;b&#39;}

Timer类

Timer类&#xff1a;Thread类的派生类&#xff0c;也在threading模块中。意为定时器&#xff0c;用作线程的延迟执行。

>>> help(threading.Timer)

Timer类的初始化方法&#xff1a;__init__(self, interval, function, args&#61;None, kwargs&#61;None)

interval&#xff1a;时间间隔&#xff0c;即几秒之后开始执行function

function&#xff1a;线程执行的逻辑函数

args&#xff1a;位置参数

kwargs&#xff1a;关键字参数

代码

import threading

import time

import logging

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

def worker():

logging.info(&#39;worker running&#39;)

t1 &#61; threading.Timer(interval&#61;3, function&#61;worker)

t2 &#61; threading.Timer(interval&#61;3, function&#61;worker)

t1.setName(&#39;t1&#39;)

t2.setName(&#39;t2&#39;)

logging.info(&#39;start&#39;)

t1.start()

t2.start()

time.sleep(2)

logging.info(&#39;canceling {}&#39;.format(t1.name))

t1.cancel() # 2s之后仍然可以取消t1

logging.info(&#39;end&#39;)

# 输出

2017-03-21 19:28:52,801 INFO [MainThread] start

2017-03-21 19:28:54,811 INFO [MainThread] canceling t1

2017-03-21 19:28:54,819 INFO [MainThread] end

2017-03-21 19:28:55,808 INFO [t2] worker running

Timer.cancel()&#xff1a;取消仍然存活的定时器&#xff0c;如果定时器已经开始执行function&#xff0c;则无法取消。

Timer.setDaemon(True)&#xff1a;设置定时器为守护线程

线程同步

当使用多个线程来访问同一个数据时&#xff0c;会经常出现资源争用等线程安全问题(比如多个线程都在操作同一数据导致数据不一致)&#xff0c;这时候我们就可以使用一些同步技术来解决这类问题。比如Event&#xff0c;Lock&#xff0c;Condition&#xff0c;Barrier&#xff0c;Semaphore等等。

Event

>>> help(threading.Event)

Event对象内置一个标志&#xff0c;这个标志可以由set()方法和clear()方法设定。线程可以使用wait()方法进行阻塞等待&#xff0c;知道Event对象内置标志被set。

clear(self)&#xff1a;设置内置标志为False

set(self)&#xff1a;设置内置标志为True

wait(self, timeout&#61;None)&#xff1a;开始阻塞&#xff0c;直到内置标志被设置为True(即wait会阻塞线程直到set方法被调用或者超时)

is_set(self)&#xff1a;当且仅当内置标志为True的时候返回True

代码

以下代码实现的逻辑是&#xff1a;一个boss和五个睡觉工人&#xff0c;只要有一个工人完成了睡觉任务&#xff0c;那么就唤醒boss和其他工人。

import datetime

import threading

import logging

import random

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

def worker(event: threading.Event):

s &#61; random.randint(1, 5)

event.wait(s) # wait方法而不使用sleep方法&#xff0c;可以让其他工人收到通知后不再等待

logging.info(&#39;sleep {}&#39;.format(s))

event.set()

def boss(event:threading.Event):

start &#61; datetime.datetime.now()

event.wait()

end &#61; datetime.datetime.now()

logging.info(&#39;that boss exit takes {}s&#39;.format(end - start))

def start():

event &#61; threading.Event()

b &#61; threading.Thread(target&#61;boss, args&#61;(event, ), name&#61;&#39;boss&#39;)

b.start()

for i in range(5):

t &#61; threading.Thread(target&#61;worker, args&#61;(event, ), name&#61;&#39;worker-{}&#39;.format(i))

t.start()

执行start()方法&#xff0c;测试结果

>>> start()

2017-03-21 21:20:17,195 INFO [worker-2] sleep 1

2017-03-21 21:20:17,198 INFO [boss] that boss exit takes 0:00:01.004954s

2017-03-21 21:20:17,199 INFO [worker-0] sleep 2

2017-03-21 21:20:17,199 INFO [worker-1] sleep 3

2017-03-21 21:20:17,199 INFO [worker-3] sleep 2

2017-03-21 21:20:17,198 INFO [worker-4] sleep 1

可以看到&#xff1a;worker-2退出之后&#xff0c;boss和另外四个worker也瞬间就退出了。所以event对象的内置状态被set之后&#xff0c;相关线程就不再wait了。

event&#xff1a;在线程之间发送信号&#xff0c;通常用于某个线程需要等待其他线程处理完成某些动作之后才能启动

wait()方法的timeout参数

def worker(event: threading.Event):

while not event.wait(3):

logging.info(&#39;run run run&#39;)

event &#61; threading.Event()

threading.Thread(target&#61;worker, args&#61;(event, )).start()

# 输出

2017-03-21 21:32:47,275 INFO [Thread-8] run run run

2017-03-21 21:32:50,277 INFO [Thread-8] run run run

2017-03-21 21:32:53,281 INFO [Thread-8] run run run

2017-03-21 21:32:56,284 INFO [Thread-8] run run run

...

程序每隔3s就会输出一次结果&#xff0c;直到执行set()方法才会停止。因此我们可以写一个定时器(类似于Thread类的派生类Timer)。

代码

class Timer:

def __init__(self, interval, function, *args, **kwargs):

self.interval &#61; interval

self.function &#61; function

self.args &#61; args

self.kwargs &#61; kwargs

self.event &#61; threading.Event()

self.thread &#61; threading.Thread(target&#61;self.__target(), args&#61;args, kwargs&#61;kwargs)

def __target(self):

if not self.event.wait(self.interval):

return self.function

def start(self):

self.thread.start()

def cancel(self):

self.event.set()

def worker(act):

logging.info(&#39;run-{}&#39;.format(act))

t &#61; Timer(5, worker, &#39;hahaha&#39;)

t.start() # 输出2017-03-21 22:14:59,645 INFO [Thread-20] run-hahaha

延迟5s之后执行了逻辑函数&#xff0c;也可以使用cancel函数取消。(要注意参数的传递&#xff0c;此处Timer初始化不能使用关键字参数)

Lock

event是用来同步线程之间的操作的&#xff0c;但是如果要控制共享资源的访问那就需要用到锁机制了&#xff0c;在Python标准库中的实现就是内置的lock类。

>>> help(threading.Lock)

threading.Lock()函数会创建一个lock类的对象。

>>> help(threading.Lock())

锁对象是一个同步原语(synchronization primitive)&#xff0c;lock对象主要有以下三个方法&#xff1a;

acquire()&#xff1a; acquire(blocking&#61;True, timeout&#61;-1) -> bool 获得锁(即锁定锁)。成功获得锁返回True&#xff0c;没有获得锁则返回False。

release()&#xff1a; release() 释放锁

locked()&#xff1a; locked() -> bool 检查锁是否被锁住

代码

以下代码实现了在多个进程同时对资源进行访问时&#xff0c;进行加锁和解锁的操作&#xff0c;保证加减操作和赋值操作组合之后的原子性。

class Counter: # 计时器有加减方法&#xff0c;都会修改value值&#xff0c;因此都需要加锁处理

def __init__(self, start&#61;0):

self.value &#61; start

self.lock &#61; threading.Lock()

def inc(self):

self.lock.acquire()

try:

self.value &#43;&#61; 1

finally:

self.lock.release() # 需要用finally语句保证锁一定会被释放&#xff0c;否则资源永远不可访问

def dec(self):

self.lock.acquire()

try:

self.value -&#61; 1

finally:

self.lock.release()

def inc_worker(c: Counter):

pause &#61; random.random()

logging.info(&#39;sleeping-{}&#39;.format(pause))

time.sleep(pause)

c.inc()

logging.info(&#39;cur_value:{}&#39;.format(c.value))

def dec_worker(c: Counter):

pause &#61; random.random()

logging.info(&#39;sleeping-{}&#39;.format(pause))

time.sleep(pause)

c.dec()

logging.info(&#39;cur_value:{}&#39;.format(c.value))

c &#61; Counter()

for i in range(2):

threading.Thread(target&#61;inc_worker, args&#61;(c, ), name&#61;&#39;inc_worker-{}&#39;.format(i)).start()

for i in range(3):

threading.Thread(target&#61;dec_worker, args&#61;(c, ), name&#61;&#39;dec_worker-{}&#39;.format(i)).start()

测试输出

2017-03-21 23:17:44,761 INFO [inc_worker-0] sleeping-0.6542416949220327

2017-03-21 23:17:44,766 INFO [inc_worker-1] sleeping-0.48615543229897873

2017-03-21 23:17:44,771 INFO [dec_worker-0] sleeping-0.12355589507242459

2017-03-21 23:17:44,776 INFO [dec_worker-1] sleeping-0.5276710391905681

2017-03-21 23:17:44,784 INFO [dec_worker-2] sleeping-0.5546251407611247

2017-03-21 23:17:44,900 INFO [dec_worker-0] cur_value:-1

2017-03-21 23:17:45,258 INFO [inc_worker-1] cur_value:0

2017-03-21 23:17:45,312 INFO [dec_worker-1] cur_value:-1

2017-03-21 23:17:45,351 INFO [dec_worker-2] cur_value:-2

2017-03-21 23:17:45,421 INFO [inc_worker-0] cur_value:-1

可见&#xff0c;各项操作之间保持相互原子性&#xff0c;没有出现干扰。

因为lock类实现了__enter__和__exit__两个魔术方法&#xff0c;因此支持上下文管理器&#xff0c;可以修改以上Counter类的实现方法如下&#xff1a;

class Counter:

def __init__(self, start&#61;0):

self.value &#61; start

self.lock &#61; threading.Lock()

def inc(self):

self.lock.acquire()

with self.lock:

self.value &#43;&#61; 1

def dec(self):

self.lock.acquire()

with self.lock:

self.value -&#61; 1

即使用上下文管理器来代替try...finally...语句&#xff0c;测试输出应该以以上结果一致。

acquire方法的blocking参数

当blocking&#61;True时&#xff0c;A线程中执行了lock.acquire()方法之后并且没有执行到lock.release()方法&#xff0c;如果在B线程中再次执行lock.acquire()方法&#xff0c;则B线程阻塞。

正如以上代码实现&#xff0c;当有n个线程需要修改一个共享资源的时候&#xff0c;其他线程在获取锁之前都处于阻塞状态。(python的阻塞都会让出cpu的时间片&#xff0c;因此不是忙等待)

当blocking&#61;Fasle时&#xff0c;A线程中执行了lock.acquire()方法之后并且没有执行到lock.release()方法&#xff0c;如果在B线程中再次执行lock.acquire()方法&#xff0c;则B线程不会阻塞&#xff0c;并且acquire函数返回False。

acquire方法的timeout参数

当blocking&#61;True并且timeout>0时&#xff0c;acquire会一直阻塞到超时或者锁被释放。

acquire(0)的参数传递

模拟acquire方法的默认参数&#xff0c;编写一下函数进行模拟参数传递的过程&#xff1a;

def print1(blocking&#61;True, timeout&#61;-1):

print(blocking, timeout)

print1(0) # 输出0 -1

print1(10) # 输出10 -1

可见第一个位置参数&#xff0c;替代了blocking。也就是说lock.acquire(0)等效于lock.acquire(blocking&#61;False)

RLock

正常的lock对象是不能多次调用acquire的&#xff0c;但是可重用锁RLock可以多次调用 acquire 而不阻塞&#xff0c;而且 release 时也要执行和 acquire 一样的次数。

Condition

除了Event对象之外&#xff0c;线程同步还可以使用条件同步机制Condition。一类线程等待特定条件&#xff0c;而另一类线程发出特定条件满足的信号。

>>> help(threading.Condition)

在Condition的帮助中有以下几个方法&#xff1a;

初始化方法&#xff1a;init(self, lock&#61;None)。如果给定了lock参数&#xff0c;那么必须是Lock或者Rlock对象&#xff0c;并且被当做底层锁来使用。如果没有指定&#xff0c;那么会创建一个RLock对象的锁&#xff0c;也被当做底层锁来使用。

实现了__enter__和__exit__方法&#xff0c;支持上下文管理器。

notify(self, n&#61;1)&#xff1a;唤醒一个或多个在当前Condition上等待的其他线程&#xff0c;如果此方法的调用线程没有获得锁&#xff0c;那么在调用的时候就会报错RuntimeError

notify_all(self)&#xff1a;唤醒所有线程

wait(self, timeout&#61;None)&#xff1a;一直等待着知道被notifyed或者发生超时

实例代码

以下代码实现的是&#xff1a;有一个生产者线程&#xff0c;会生产若干次&#xff0c;每次生产结束后需要通知所有的消费者线程来消费&#xff0c;因此下面代码使用的是notify_all方法。

import threading

import time

import logging

import random

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

class Producer_Consumer_Model:

def __init__(self):

self.data &#61; None

self.event &#61; threading.Event() # 用来控制消费者退出

self.condition &#61; threading.Condition()

def Consumer(self):

while not self.event.is_set():

with self.condition:

self.condition.wait() # 一直等待直到收到生产者通知notify_all

logging.info(self.data) # 收到通知之后&#xff0c;开始执行消费者的业务逻辑部分

def Producer(self):

for _ in range(4): # 每个生产者生产4次

data &#61; random.randint(0, 100)

logging.info(data)

with self.condition:

self.data &#61; data # 写入成功就表示生产成功&#xff0c;因此需要在此加锁并且能够通知消费者线程去消费&#xff0c;因此选择使用condition来处理

self.condition.notify_all() # 生产成功之后通知所有的消费者去消费

self.event.wait(1) # 没生产一次等待1s

self.event.set() # 所有的生产完成之后通知消费者退出

m &#61; Producer_Consumer_Model()

for i in range(3):

threading.Thread(target&#61;m.Consumer, name&#61;&#39;Consumer-{}&#39;.format(i)).start()

p &#61; threading.Thread(target&#61;m.Producer, name&#61;&#39;Producer&#39;)

p.start()

测试结果(一个生产者&#xff0c;三个消费者)

2017-03-22 22:07:42,875 INFO [Producer] 16

2017-03-22 22:07:42,883 INFO [Consumer-0] 16

2017-03-22 22:07:42,890 INFO [Consumer-2] 16

2017-03-22 22:07:42,894 INFO [Consumer-1] 16

2017-03-22 22:07:43,884 INFO [Producer] 76

2017-03-22 22:07:43,888 INFO [Consumer-0] 76

2017-03-22 22:07:43,895 INFO [Consumer-2] 76

2017-03-22 22:07:43,898 INFO [Consumer-1] 76

2017-03-22 22:07:44,889 INFO [Producer] 31

2017-03-22 22:07:44,891 INFO [Consumer-0] 31

2017-03-22 22:07:44,911 INFO [Consumer-2] 31

2017-03-22 22:07:44,913 INFO [Consumer-1] 31

2017-03-22 22:07:45,892 INFO [Producer] 17

2017-03-22 22:07:45,894 INFO [Consumer-0] 17

2017-03-22 22:07:45,907 INFO [Consumer-2] 17

2017-03-22 22:07:45,910 INFO [Consumer-1] 17

可见&#xff0c;生产者每生产一次&#xff0c;所有的消费者就会去消费。如果想控制每次生产之后通知几个消费者来消费&#xff0c;那么就可以使用notify方法&#xff0c;指定消费者线程个数。

代码如下

import threading

import time

import logging

import random

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

class Producer_Consumer_Model:

def __init__(self):

self.data &#61; None

self.event &#61; threading.Event() # 用来控制消费者退出

self.condition &#61; threading.Condition()

def Consumer(self):

while not self.event.is_set():

with self.condition:

self.condition.wait() # 一直等待直到收到生产者通知notify_all

logging.info(self.data) # 收到通知之后&#xff0c;开始执行消费者的业务逻辑部分

def Producer(self):

for _ in range(4): # 每个生产者生产4次

data &#61; random.randint(0, 100)

logging.info(data)

with self.condition:

self.data &#61; data # 写入成功就表示生产成功&#xff0c;因此需要在此加锁并且能够通知消费者线程去消费&#xff0c;因此选择使用condition来处理

self.condition.notify(1) # 生产成功之后通知所有的消费者去消费

self.event.wait(1) # 没生产一次等待1s

self.event.set() # 所有的生产完成之后通知消费者退出

m &#61; Producer_Consumer_Model()

for i in range(3):

threading.Thread(target&#61;m.Consumer, name&#61;&#39;Consumer-{}&#39;.format(i)).start()

p &#61; threading.Thread(target&#61;m.Producer, name&#61;&#39;Producer&#39;)

p.start()

测试结果(一个生产者&#xff0c;三个消费者&#xff0c;每次生产之后只通知一个消费者去消费)

2017-03-22 22:24:52,933 INFO [Producer] 11

2017-03-22 22:24:52,948 INFO [Consumer-0] 11

2017-03-22 22:24:53,949 INFO [Producer] 47

2017-03-22 22:24:53,967 INFO [Consumer-1] 47

2017-03-22 22:24:54,967 INFO [Producer] 14

2017-03-22 22:24:54,983 INFO [Consumer-2] 14

2017-03-22 22:24:55,986 INFO [Producer] 54

2017-03-22 22:24:55,993 INFO [Consumer-0] 54

Condition 通常用于生产者消费者模式&#xff0c; 生产者生产消息之后&#xff0c; 使用notify 或者 notify_all 通知消费者消费。

消费者使用wait方法阻塞等待生产者通知

notify通知指定个wait的线程&#xff0c; notify_all通知所有的wait线程

无论notify/notify_all还是wait 都必须先acqurie&#xff0c; 完成后必须确保release&#xff0c; 通常使用with语法

Barrier

Barrier类存在于threading模块中&#xff0c;中文可以翻译成栅栏

>>> help(threading.Barrier)

可以看到Barrier的主要方法和属性&#xff1a;

__init__(self, parties, action&#61;None, timeout&#61;None)&#xff1a;初始化方法&#xff0c;创建一个Barrier

parties&#xff1a;所有参与的线程的数量

action&#xff1a;所有的线程都wait之后并且在线程释放之前就会执行这个action函数&#xff0c;相当于集结之后要做的事情。

timeout&#xff1a;相当于给需要等待的每个线程的wait方法加上timeout参数&#xff0c;超时则barrier不再生效

abort(self)&#xff1a;将Barrier设置成broken状态

reset(self)&#xff1a;将Barrier重置为最初状态

wait(self, timeout&#61;None)&#xff1a;在Barrier前等待&#xff0c;返回在Barrier前等待的下标&#xff0c;从0到parties-1

broken&#xff1a;如果Barrier处于broken状态则返回True

n_waiting&#xff1a;当前已经在Barrier处等待的线程的数量

parties&#xff1a;需要在Barrier处等待的线程的数量

示例代码

import threading

import logging

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

barrier &#61; threading.Barrier(parties&#61;3)

def worker(barrier: threading.Barrier):

logging.info(&#39;waiting for barrier with {} others&#39;.format(barrier.n_waiting))

worker_id &#61; barrier.wait()

logging.info(&#39;after barrier {}&#39;.format(worker_id))

for i in range(3):

threading.Thread(target&#61;worker, args&#61;(barrier, ), name&#61;&#39;worker-{}&#39;.format(i)).start()

测试结果

2017-03-22 23:25:03,992 INFO [worker-0] waiting for barrier with 0 others

2017-03-22 23:25:03,995 INFO [worker-1] waiting for barrier with 1 others

2017-03-22 23:25:03,998 INFO [worker-2] waiting for barrier with 2 others

2017-03-22 23:25:04,001 INFO [worker-2] after barrier 2

2017-03-22 23:25:04,001 INFO [worker-0] after barrier 0

2017-03-22 23:25:04,001 INFO [worker-1] after barrier 1

可见&#xff0c;所有的线程都会一直等待&#xff0c;知道所有的线程都到期了&#xff0c;然后就通过barrier&#xff0c;继续执行后续操作。

Barrier会建立一个控制点&#xff0c;所有参与的线程都会阻塞&#xff0c;直到所有参与的“各方”达到这一点。 它让线程分开启动&#xff0c;然后暂停&#xff0c;直到它们都准备好再继续。因此&#xff0c;这一点可以理解为各个线程的一个集结点。

abort函数的使用

将Barrier设置成broken状态。所有线程在参与集结过程中&#xff0c;只要执行了barrier.abort方法&#xff0c;那么正在等待的线程都会抛出threading.BrokenBarrierError异常。可以理解为&#xff0c;只要有一个线程确定已经到不了Barrier并且通知了Barrier&#xff0c;那么Barrier就会执行abort方法&#xff0c;通知所有正在wait的线程放弃集结。

实例代码

import threading

import logging

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

def worker(barrier: threading.Barrier):

logging.info(&#39;waiting for barrier with {} others&#39;.format(barrier.n_waiting))

try:

worker_id &#61; barrier.wait()

except threading.BrokenBarrierError:

logging.info(&#39;aborting&#39;)

else:

logging.info(&#39;after barrier {}&#39;.format(worker_id))

barrier &#61; threading.Barrier(4) # 需要等待4个线程

for i in range(3):

threading.Thread(target&#61;worker, args&#61;(barrier, ), name&#61;&#39;worker-{}&#39;.format(i)).start() # 3个线程都开始wait

barrier.abort() # 还有一个线程没有到wait&#xff0c;此时执行abort方法&#xff0c;则所有正在wait的线程都抛出异常

测试结果

2017-03-22 23:47:43,184 INFO [worker-0] waiting for barrier with 0 others

2017-03-22 23:47:43,192 INFO [worker-1] waiting for barrier with 1 others

2017-03-22 23:47:43,201 INFO [worker-2] waiting for barrier with 2 others

2017-03-22 23:47:43,211 INFO [worker-2] aborting

2017-03-22 23:47:43,207 INFO [worker-1] aborting

2017-03-22 23:47:43,207 INFO [worker-0] aborting

Semaphore

Semaphore类存在于threading模块中

help(threading.Semaphore)

信号量内部管理者一个计数器&#xff0c;这个计数器的值等于release()方法调用的次数减去acquire()方法调用的次数然后再加上初始值value&#xff0c;value默认为1。

可以看到Semaphore的主要方法&#xff1a;

__init__(self, value&#61;1)&#xff1a;初始化一个信号量&#xff0c;value为内部计数器赋初值&#xff0c;默认为1

acquire(self, blocking&#61;True, timeout&#61;None)&#xff1a;获取信号量&#xff0c;内部计数器减一

release(self)&#xff1a;释放信号量&#xff0c;内部计数器加一

示例代码

import threading

import time

import logging

import random

logging.basicConfig(level&#61;logging.DEBUG, format&#61;&#39;%(asctime)s %(levelname)s [%(threadName)s] %(message)s&#39;)

class Pool:

def __init__(self, num):

self.num &#61; num

self.conns &#61; [self._make_connect(i) for i in range(num)] # 存放连接

self.sem &#61; threading.Semaphore(num) # 信号量内部计数器初始为连接数

def _make_connect(self, name): # 根据连接名称创建连接

conn &#61; &#39;connect-{}&#39;.format(name)

return conn

def get_connect(self): # 从连接池获取连接

self.sem.acquire()

return self.conns.pop()

def return_connect(self, conn): # 将连接conn返还到连接池中

self.conns.insert(0, conn)

self.sem.release()

def worker(pool: Pool):

logging.info(&#39;starting&#39;)

conn &#61; pool.get_connect() # 如果获取不到则会阻塞在acquire处

logging.info(&#39;get connect {}&#39;.format(conn))

t &#61; random.randint(1, 3)

time.sleep(t)

logging.info(&#39;takes {}s&#39;.format(t))

pool.return_connect(conn)

logging.info(&#39;return connect {}&#39;.format(conn))

pool &#61; Pool(2) # 连接池中有两个连接可以使用

for i in range(3): # 三个线程使用两个连接开始任务

threading.Thread(target&#61;worker, args&#61;(pool, ), name&#61;&#39;worker-{}&#39;.format(i)).start()

测试结果

2017-03-23 00:54:36,056 INFO [worker-0] starting

2017-03-23 00:54:36,062 INFO [worker-0] get connect connect-1

2017-03-23 00:54:36,074 INFO [worker-1] starting

2017-03-23 00:54:36,079 INFO [worker-1] get connect connect-0

2017-03-23 00:54:36,089 INFO [worker-2] starting

2017-03-23 00:54:39,074 INFO [worker-0] takes 3s

2017-03-23 00:54:39,076 INFO [worker-0] return connect connect-1

2017-03-23 00:54:39,076 INFO [worker-2] get connect connect-1

2017-03-23 00:54:39,093 INFO [worker-1] takes 3s

2017-03-23 00:54:39,097 INFO [worker-1] return connect connect-0

2017-03-23 00:54:40,093 INFO [worker-2] takes 1s

2017-03-23 00:54:40,107 INFO [worker-2] return connect connect-1

这个测试结果显示&#xff1a;三个线程获取连接池中的两个连接&#xff0c;结果出现了一个线程等待其他线程执行完成之后再获取连接的过程。

Queue

Condition线程同步部分用来传递数据的是一个封装在生产者消费者模型中的元素data(正常使用情况下一般封装的都是一个列表&#xff0c;类似与Barrier部分的连接池中的conns列表)。

Python的queue模块中提供了同步的、线程安全的队列类&#xff0c;包括三种队列&#xff1a;

FIFO(先入先出)队列Queue

LIFO(后入先出)队列LifoQueue

优先级队列PriorityQueue

这些队列都实现了锁原语&#xff0c;能够在多线程中直接使用。可以使用队列来实现线程间的同步。因此我们可以使用queue模块来替换掉生产者消费者中的全局元素&#xff0c;代码如下&#xff1a;

import random

import queue

import threading

class Producer_Consumer_Model:

def __init__(self):

self.q &#61; queue.Queue()

self.event &#61; threading.Event()

def Consumer(self):

while not self.event.is_set():

logging.info(self.q.get())

def Producer(self):

while not self.event.wait(3):

data &#61; random.randint(1, 100)

logging.info(data)

self.q.put(data)

m &#61; Producer_Consumer_Model()

threading.Thread(target&#61;m.Consumer, name&#61;&#39;Consumer&#39;).start()

threading.Thread(target&#61;m.Producer, name&#61;&#39;Producer&#39;).start()

测试结果

2017-03-23 10:11:22,990 INFO [Producer] 26

2017-03-23 10:11:22,993 INFO [Consumer] 26

2017-03-23 10:11:25,993 INFO [Producer] 89

2017-03-23 10:11:26,003 INFO [Consumer] 89

2017-03-23 10:11:29,004 INFO [Producer] 14

2017-03-23 10:11:29,006 INFO [Consumer] 14

2017-03-23 10:11:32,007 INFO [Producer] 17

2017-03-23 10:11:32,009 INFO [Consumer] 17

每生产一次&#xff0c;消费者就会消费一次。当消费者线程&#xff0c;读取Queue则调用Queue.get()方法&#xff0c;若Queue为空时消费者线程获取不到内容&#xff0c;就会阻塞在这里&#xff0c;直到成功获取内容。

线程同步总结

Event&#xff1a;主要用于线程之间的事件通知

Lock,Rlock&#xff1a;主要用于保护共享资源

Condition&#xff1a;主要用于生产者消费者模型&#xff0c;可以理解为Event和Lock的结合体

Barrier&#xff1a;同步指定个等待的线程

Semaphore&#xff1a;主要用于保护资源&#xff0c;和Lock的区别在于可以多个线程访问共享资源&#xff0c;而锁一次只能一个线程访问到共享资源&#xff0c;即锁是value&#61;1的信号量

Queue&#xff1a;使用FIFO队列进行同步&#xff0c;适用于生产者消费者模型

GIL

GIL(Global Interpreter Lock)&#xff1a;全局解释器锁

Python代码的执行由Python 主循环来控制&#xff0c;Python 在设计之初就考虑到要在解释器的主循环中&#xff0c;同时只有一个线程在执行&#xff0c;即在任意时刻&#xff0c;只有一个线程在解释器中运行。对Python 主循环的访问由全局解释器锁(GIL)来控制&#xff0c;正是这个锁能保证同一时刻只有一个线程在运行。

因此Python多线程程序的执行顺序如下&#xff1a;

设置GIL

切换到一个线程去运行

运行

结束线程

解锁GIL

重复以上步骤

因此&#xff0c;Python的多线程并没有实现并行&#xff0c;只是实现了并发而已。如果要实现真正的并行&#xff0c;那就需要使用Python的多进程模块multiprocessing(multiprocessing模块的宗旨是像管理线程一样来管理进程)。

参考资料

记得帮我点赞哦&#xff01;

精心整理了计算机各个方向的从入门、进阶、实战的视频课程和电子书&#xff0c;按照目录合理分类&#xff0c;总能找到你需要的学习资料&#xff0c;还在等什么&#xff1f;快去关注下载吧&#xff01;&#xff01;&#xff01;

念念不忘&#xff0c;必有回响&#xff0c;小伙伴们帮我点个赞吧&#xff0c;非常感谢。

我是职场亮哥&#xff0c;YY高级软件工程师、四年工作经验&#xff0c;拒绝咸鱼争当龙头的斜杠程序员。

听我说&#xff0c;进步多&#xff0c;程序人生一把梭

如果有幸能帮到你&#xff0c;请帮我点个【赞】&#xff0c;给个关注&#xff0c;如果能顺带评论给个鼓励&#xff0c;将不胜感激。

职场亮哥文章列表&#xff1a;更多文章

本人所有文章、回答都与版权保护平台有合作&#xff0c;著作权归职场亮哥所有&#xff0c;未经授权&#xff0c;转载必究&#xff01;



推荐阅读
  • 本文介绍了Python爬虫技术基础篇面向对象高级编程(中)中的多重继承概念。通过继承,子类可以扩展父类的功能。文章以动物类层次的设计为例,讨论了按照不同分类方式设计类层次的复杂性和多重继承的优势。最后给出了哺乳动物和鸟类的设计示例,以及能跑、能飞、宠物类和非宠物类的增加对类数量的影响。 ... [详细]
  • 本文讨论了clone的fork与pthread_create创建线程的不同之处。进程是一个指令执行流及其执行环境,其执行环境是一个系统资源的集合。在调用系统调用fork创建一个进程时,子进程只是完全复制父进程的资源,这样得到的子进程独立于父进程,具有良好的并发性。但是二者之间的通讯需要通过专门的通讯机制,另外通过fork创建子进程系统开销很大。因此,在某些情况下,使用clone或pthread_create创建线程可能更加高效。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 预备知识可参考我整理的博客Windows编程之线程:https:www.cnblogs.comZhuSenlinp16662075.htmlWindows编程之线程同步:https ... [详细]
  • 基于dlib的人脸68特征点提取(眨眼张嘴检测)python版本
    文章目录引言开发环境和库流程设计张嘴和闭眼的检测引言(1)利用Dlib官方训练好的模型“shape_predictor_68_face_landmarks.dat”进行68个点标定 ... [详细]
  • 电话号码的字母组合解题思路和代码示例
    本文介绍了力扣题目《电话号码的字母组合》的解题思路和代码示例。通过使用哈希表和递归求解的方法,可以将给定的电话号码转换为对应的字母组合。详细的解题思路和代码示例可以帮助读者更好地理解和实现该题目。 ... [详细]
  • 本文介绍了在rhel5.5操作系统下搭建网关+LAMP+postfix+dhcp的步骤和配置方法。通过配置dhcp自动分配ip、实现外网访问公司网站、内网收发邮件、内网上网以及SNAT转换等功能。详细介绍了安装dhcp和配置相关文件的步骤,并提供了相关的命令和配置示例。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文介绍了[从头学数学]中第101节关于比例的相关问题的研究和修炼过程。主要内容包括[机器小伟]和[工程师阿伟]一起研究比例的相关问题,并给出了一个求比例的函数scale的实现。 ... [详细]
  • 本文讨论了使用差分约束系统求解House Man跳跃问题的思路与方法。给定一组不同高度,要求从最低点跳跃到最高点,每次跳跃的距离不超过D,并且不能改变给定的顺序。通过建立差分约束系统,将问题转化为图的建立和查询距离的问题。文章详细介绍了建立约束条件的方法,并使用SPFA算法判环并输出结果。同时还讨论了建边方向和跳跃顺序的关系。 ... [详细]
  • 本文介绍了P1651题目的描述和要求,以及计算能搭建的塔的最大高度的方法。通过动态规划和状压技术,将问题转化为求解差值的问题,并定义了相应的状态。最终得出了计算最大高度的解法。 ... [详细]
  • 本文介绍了游标的使用方法,并以一个水果供应商数据库为例进行了说明。首先创建了一个名为fruits的表,包含了水果的id、供应商id、名称和价格等字段。然后使用游标查询了水果的名称和价格,并将结果输出。最后对游标进行了关闭操作。通过本文可以了解到游标在数据库操作中的应用。 ... [详细]
  • ALTERTABLE通过更改、添加、除去列和约束,或者通过启用或禁用约束和触发器来更改表的定义。语法ALTERTABLEtable{[ALTERCOLUMNcolu ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • 本文介绍了最长上升子序列问题的一个变种解法,通过记录拐点的位置,将问题拆分为左右两个LIS问题。详细讲解了算法的实现过程,并给出了相应的代码。 ... [详细]
author-avatar
烧饼来一个则_815
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有