热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

pythonweb并发性能_python并发与web

python并发与webpython并发主要方式有:Thread(线程)Process(进程)协程python因为GIL的存在使得python的并发无法利用

python并发与web

python并发主要方式有:

Thread(线程)

Process(进程)

协程

python因为GIL的存在使得python的并发无法利用CPU多核的优势以至于性能比较差,下面我们将通过几个例子来介绍python的并发。

线程

我们通过一个简单web server程序来观察python的线程,首先写一个耗时的小函数

def fib(n):

if n <&#61; 2:

return 1

else:

return fib(n - 1) &#43; fib(n - 2)

然后写一个fib web server&#xff0c;程序比较简单就不解释了。

from socket import *

from fib import fib

def fib_server(address):

sock &#61; socket(AF_INET, SOCK_STREAM)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

sock.bind(address)

sock.listen(5)

while True:

client, addr &#61; sock.accept()

print(&#39;Connection&#39;, addr)

fib_handle(client)

def fib_handler(client):

while True:

req &#61; client.recv(100)

if not req:

break

n &#61; int(req)

result &#61; fib(n)

resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;

client.send(resp)

print(&#39;Closed&#39;)

fib_server((&#39;&#39;, 25002))

运行shell命令可以看到计算结果

nc localhost 25002

10

55

由于服务段是单线程的&#xff0c;如果另外启动一个连接将得不到计算结果

nc localhost 25002

10

为了能让我们的server支持多个请求&#xff0c;我们对服务端代码加入多线程支持

#sever.py

#服务端代码

from socket import *

from fib import fib

from threading import Thread

def fib_server(address):

sock &#61; socket(AF_INET, SOCK_STREAM)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

sock.bind(address)

sock.listen(5)

while True:

client, addr &#61; sock.accept()

print(&#39;Connection&#39;, addr)

#fib_handler(client)

Thread(target&#61;fib_handler, args&#61;(client,), daemon&#61;True).start() #需要在python3下运行

def fib_handler(client):

while True:

req &#61; client.recv(100)

if not req:

break

n &#61; int(req)

result &#61; fib(n)

resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;

client.send(resp)

print(&#39;Closed&#39;)

fib_server((&#39;&#39;, 25002)) #在25002端口启动程序

运行shell命令可以看到计算结果

nc localhost 25002

10

55

由于服务端是多线程的&#xff0c;启动一个新连接将得到计算结果

nc localhost 25002

10

55

性能测试

我们加入一段性能测试代码

#perf1.py

from socket import *

from threading import Thread

import time

sock &#61; socket(AF_INET, SOCK_STREAM)

sock.connect((&#39;localhost&#39;, 25002))

n &#61; 0

def monitor():

global n

while True:

time.sleep(1)

print(n, &#39;reqs/sec&#39;)

n &#61; 0

Thread(target&#61;monitor).start()

while True:

start &#61; time.time()

sock.send(b&#39;1&#39;)

resp &#61; sock.recv(100)

end &#61; time.time()

n &#43;&#61; 1

#代码非常简单&#xff0c;通过全局变量n来统计qps(req/sec 每秒请求数)

在shell中运行perf1.py可以看到结果如下:

106025 reqs/sec

109382 reqs/sec

98211 reqs/sec

105391 reqs/sec

108875 reqs/sec

平均每秒请求数大概是10w左右

如果我们另外启动一个进程来进行性能测试就会发现python的GIL对线程造成的影响

python3 perf1.py

74677 reqs/sec

78284 reqs/sec

72029 reqs/sec

81719 reqs/sec

82392 reqs/sec

84261 reqs/sec

并且原来的shell中的qps也是类似结果

96488 reqs/sec

99380 reqs/sec

84918 reqs/sec

87485 reqs/sec

85118 reqs/sec

78211 reqs/sec

如果我们再运行

nc localhost 25002

40

来完全占用服务器资源一段时间&#xff0c;就可以看到shell窗口内的rqs迅速下降到

99 reqs/sec

99 reqs/sec

这也反映了Python的GIL的一个特点&#xff0c;会优先处理占用CPU资源大的任务

具体原因我也不知道&#xff0c;可能需要阅读GIL实现源码才能知道。

线程池在web编程的应用

python有个库叫做cherrypy&#xff0c;最近用到&#xff0c;大致浏览了一下其源代码&#xff0c;其内核使用的是python线程池技术。

cherrypy通过Python线程安全的队列来维护线程池&#xff0c;具体实现为:

class ThreadPool(object):

"""A Request Queue for an HTTPServer which pools threads.

ThreadPool objects must provide min, get(), put(obj), start()

and stop(timeout) attributes.

"""

def __init__(self, server, min&#61;10, max&#61;-1,

accepted_queue_size&#61;-1, accepted_queue_timeout&#61;10):

self.server &#61; server

self.min &#61; min

self.max &#61; max

self._threads &#61; []

self._queue &#61; queue.Queue(maxsize&#61;accepted_queue_size)

self._queue_put_timeout &#61; accepted_queue_timeout

self.get &#61; self._queue.get

def start(self):

"""Start the pool of threads."""

for i in range(self.min):

self._threads.append(WorkerThread(self.server))

for worker in self._threads:

worker.setName(&#39;CP Server &#39; &#43; worker.getName())

worker.start()

for worker in self._threads:

while not worker.ready:

time.sleep(.1)

....

def put(self, obj):

self._queue.put(obj, block&#61;True, timeout&#61;self._queue_put_timeout)

if obj is _SHUTDOWNREQUEST:

return

def grow(self, amount):

"""Spawn new worker threads (not above self.max)."""

if self.max > 0:

budget &#61; max(self.max - len(self._threads), 0)

else:

# self.max <&#61; 0 indicates no maximum

budget &#61; float(&#39;inf&#39;)

n_new &#61; min(amount, budget)

workers &#61; [self._spawn_worker() for i in range(n_new)]

while not all(worker.ready for worker in workers):

time.sleep(.1)

self._threads.extend(workers)

....

def shrink(self, amount):

"""Kill off worker threads (not below self.min)."""

[...]

def stop(self, timeout&#61;5):

# Must shut down threads here so the code that calls

# this method can know when all threads are stopped.

[...]

可以看出来&#xff0c;cherrypy的线程池将大小初始化为10&#xff0c;每当有一个httpconnect进来时就将其放入任务队列中&#xff0c;然后WorkerThread会不断从任务队列中取出任务执行&#xff0c;可以看到这是一个非常标准的线程池模型。

进程

由于Python的thread无法利用多核&#xff0c;为了充分利用多核CPU&#xff0c;Python可以使用了多进程来模拟线程以提高并发的性能。Python的进程代价比较高可以看做是另外再启动一个python进程。

#server_pool.py

from socket import *

from fib import fib

from threading import Thread

from concurrent.futures import ProcessPoolExecutor as Pool #这里用的python3的线程池&#xff0c;对应python2的threadpool

pool &#61; Pool(4) #启动一个大小为4的进程池

def fib_server(address):

sock &#61; socket(AF_INET, SOCK_STREAM)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

sock.bind(address)

sock.listen(5)

while True:

client, addr &#61; sock.accept()

print(&#39;Connection&#39;, addr)

Thread(target&#61;fib_handler, args&#61;(client,), daemon&#61;True).start()

def fib_handler(client):

while True:

req &#61; client.recv(100)

if not req:

break

n &#61; int(req)

future &#61; pool.submit(fib, n)

result &#61; future.result()

resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;

client.send(resp)

print(&#39;Closed&#39;)

fib_server((&#39;&#39;, 25002))

性能测试

可以看到新的server的qps为&#xff1a;

4613 reqs/sec

4764 reqs/sec

4619 reqs/sec

4393 reqs/sec

4768 reqs/sec

4846 reqs/sec

这个结果远低于前面的10w qps主要原因是进程启动速度较慢&#xff0c;进程池内部逻辑比较复杂&#xff0c;涉及到了数据传输&#xff0c;队列等问题。

但是通过多进程我们可以保证每一个链接相对独立&#xff0c;不会受其他请求太大的影响。

即使我们使用以下耗时的命令也不会影响到性能测试

nc localhost 25502

40

协程

协程简介

协程是一个古老的概念&#xff0c;最早出现在早期的os中&#xff0c;它出现的时间甚至比线程进程还要早。

协程也是一个比较难以理解和运用的并发方式&#xff0c;用协程写出来的代码比较难以理解。

python中使用yield和next来实现协程的控制。

def count(n):

while(n > 0):

yield n #yield起到的作用是blocking&#xff0c;将代码阻塞在这里&#xff0c;生成一个generator&#xff0c;然后通过next调用。

n -&#61; 1

for i in count(5):

print(i)

#可以看到运行结果&#xff1a;

5

4

3

2

1

下面我们通过例子来介绍如何书写协程代码。首先回到之前的代码。首先我们要想到我们为什么要用线程&#xff0c;当然是为了防止阻塞&#xff0c;

这里的阻塞来自socket的IO和cpu占用2个方面。协程的引入也是为了防止阻塞&#xff0c;因此我们先将代码中的阻塞点标记出来。

#sever.py

#服务端代码

from socket import *

from fib import fib

def fib_server(address):

sock &#61; socket(AF_INET, SOCK_STREAM)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

sock.bind(address)

sock.listen(5)

while True:

client, addr &#61; sock.accept() #blocking

print(&#39;Connection&#39;, addr)

fib_handler(client)

def fib_handler(client):

while True:

req &#61; client.recv(100) #blocking

if not req:

break

n &#61; int(req)

result &#61; fib(n)

resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;

client.send(resp) #blocking

print(&#39;Closed&#39;)

fib_server((&#39;&#39;, 25002)) #在25002端口启动程序

上面标记了3个socket IO阻塞点&#xff0c;我们先忽略CPU占用。

首先我们在blocking点插入yield语句&#xff0c;这样做的原因就是&#xff0c;通过yield标记出blocking点以及blocking的原因&#xff0c;这样我们就可以在调度的时候实现noblocking&#xff0c;我们调度的时候遇到yield语句并且block之后就可以直接去执行其他的请求而不用阻塞在这里&#xff0c;这里我们也将实现一个简单的noblocking调度方法。

#sever.py

#服务端代码

from socket import *

from fib import fib

def fib_server(address):

sock &#61; socket(AF_INET, SOCK_STREAM)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

sock.bind(address)

sock.listen(5)

while True:

yield &#39;recv&#39;, sock

client, addr &#61; sock.accept() #blocking

print(&#39;Connection&#39;, addr)

fib_handler(client)

def fib_handler(client):

while True:

yield &#39;recv&#39;, client

req &#61; client.recv(100) #blocking

if not req:

break

n &#61; int(req)

result &#61; fib(n)

resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;

yield &#39;send&#39;, client

client.send(resp) #blocking

print(&#39;Closed&#39;)

fib_server((&#39;&#39;, 25002)) #在25002端口启动程序

上述程序无法运行&#xff0c;因为我们还没有一个yield的调度器&#xff0c;程序只是单纯的阻塞在了yield所标记的地方&#xff0c;这也是协程的一个好处&#xff0c;可以人为来调度&#xff0c;不像thread一样乱序执行。下面是包含了调度器的代码。

from socket import *

from fib import fib

from threading import Thread

from collections import deque

from concurrent.futures import ProcessPoolExecutor as Pool

from select import select

tasks &#61; deque()

recv_wait &#61; {}

send_wait &#61; {}

def run():

while any([tasks, recv_wait, send_wait]):

while not tasks:

can_recv, can_send, _ &#61; select(recv_wait, send_wait, [])

for s in can_recv:

tasks.append(recv_wait.pop(s))

for s in can_send:

tasks.append(send_wait.pop(s))

task &#61; tasks.popleft()

try:

why, what &#61; next(task)

if why &#61;&#61; &#39;recv&#39;:

recv_wait[what] &#61; task

elif why &#61;&#61; &#39;send&#39;:

send_wait[what] &#61; task

else:

raise RuntimeError("ARG!")

except StopIteration:

print("task done")

def fib_server(address):

sock &#61; socket(AF_INET, SOCK_STREAM)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

sock.bind(address)

sock.listen(5)

while True:

yield &#39;recv&#39;, sock

client, addr &#61; sock.accept()

print(&#39;Connection&#39;, addr)

tasks.append(fib_handler(client))

def fib_handler(client):

while True:

yield &#39;recv&#39;, client

req &#61; client.recv(100)

if not req:

break

n &#61; int(req)

result &#61; fib(n)

resp &#61; str(result).encode(&#39;ascii&#39;) &#43; b&#39;\n&#39;

yield &#39;send&#39;, client

client.send(resp)

print(&#39;Closed&#39;)

tasks.append(fib_server((&#39;&#39;, 25003)))

run()

我们通过轮询&#43;select来控制协程&#xff0c;核心是用一个task queue来维护程序运行的流水线&#xff0c;用recv_wait和send_wait两个字典来实现任务的分发。

性能测试

可以看到新的server的qps为&#xff1a;

(82262, &#39;reqs/sec&#39;)

(82915, &#39;reqs/sec&#39;)

(82128, &#39;reqs/sec&#39;)

(82867, &#39;reqs/sec&#39;)

(82284, &#39;reqs/sec&#39;)

(82363, &#39;reqs/sec&#39;)

(82954, &#39;reqs/sec&#39;)

与之前的thread模型性能比较接近&#xff0c;协程的好处是异步的&#xff0c;但是协程 仍然只能使用到一个CPU

当我们让服务器计算40的fib从而占满cpu时&#xff0c;qps迅速下降到了0。

tornado 基于协程的 python web框架

tornado是facebook出品的异步web框架,tornado中协程的使用比较简单&#xff0c;利用coroutine.gen装饰器可以将自己的异步函数注册进tornado的ioloop中&#xff0c;tornado异步方法一般的书写方式为:

&#64;gen.coroutime

def post(self):

resp &#61; yield GetUser()

self.write(resp)

tornado异步原理

def start(self):

"""Starts the I/O loop.

The loop will run until one of the I/O handlers calls stop(), which

will make the loop stop after the current event iteration completes.

"""

self._running &#61; True

while True:

[ ... ]

if not self._running:

break

[ ... ]

try:

event_pairs &#61; self._impl.poll(poll_timeout)

except Exception, e:

if e.args &#61;&#61; (4, "Interrupted system call"):

logging.warning("Interrupted system call", exc_info&#61;1)

continue

else:

raise

# Pop one fd at a time from the set of pending fds and run

# its handler. Since that handler may perform actions on

# other file descriptors, there may be reentrant calls to

# this IOLoop that update self._events

self._events.update(event_pairs)

while self._events:

fd, events &#61; self._events.popitem()

try:

self._handlers[fd](fd, events)

except KeyboardInterrupt:

raise

except OSError, e:

if e[0] &#61;&#61; errno.EPIPE:

# Happens when the client closes the connection

pass

else:

logging.error("Exception in I/O handler for fd %d",

fd, exc_info&#61;True)

except:

logging.error("Exception in I/O handler for fd %d",fd, exc_info&#61;True)

这是tornado异步调度的核心主循环&#xff0c;poll()方法返回一个形如(fd: events)的键值对&#xff0c;并赋值给event_pairs变量&#xff0c;在内部的while循环中&#xff0c;event_pairs中的内容被一个一个的取出&#xff0c;然后相应的处理器会被调用&#xff0c;tornado通过下面的函数讲socket注册进epoll中。tornado在linux默认选择epoll&#xff0c;在windows下默认选择select(只能选择select)。

def add_handler(self, fd, handler, events):

"""Registers the given handler to receive the given events for fd."""

self._handlers[fd] &#61; handler

self._impl.register(fd, events | self.ERROR)

cherrypy线程池与tornado协程的比较

我们通过最简单程序运行在单机上进行性能比较

测试的语句为&#xff1a;

ab -c 100 -n 1000 -k localhost:8080/ | grep "Time taken for tests:"

其中cherrypy的表现为&#xff1a;

Completed 100 requests

Completed 200 requests

Completed 300 requests

Completed 400 requests

Completed 500 requests

Completed 600 requests

Completed 700 requests

Completed 800 requests

Completed 900 requests

Completed 1000 requests

Finished 1000 requests

Time taken for tests: 10.773 seconds

tornado的表现为&#xff1a;

Completed 100 requests

Completed 200 requests

Completed 300 requests

Completed 400 requests

Completed 500 requests

Completed 600 requests

Completed 700 requests

Completed 800 requests

Completed 900 requests

Completed 1000 requests

Finished 1000 requests

Time taken for tests: 0.377 seconds

可以看出tornado的性能还是非常惊人的&#xff0c;当应用程序涉及到异步IO还是要尽量使用tornado

总结

本文主要介绍了python的线程、进程和协程以及其应用&#xff0c;并对这几种模型进行了简单的性能分析&#xff0c;python由于GIL的存在&#xff0c;不管是线程还是协程都不能利用到多核。

对于计算密集型的web app线程模型与协程模型的性能大致一样&#xff0c;线程由于调度受操作系统管理&#xff0c;其性能略好。

对于IO密集型的web app协程模型性能会有很大的优势。

参考文献



推荐阅读
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • 关于我们EMQ是一家全球领先的开源物联网基础设施软件供应商,服务新产业周期的IoT&5G、边缘计算与云计算市场,交付全球领先的开源物联网消息服务器和流处理数据 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文详细介绍了Linux中进程控制块PCBtask_struct结构体的结构和作用,包括进程状态、进程号、待处理信号、进程地址空间、调度标志、锁深度、基本时间片、调度策略以及内存管理信息等方面的内容。阅读本文可以更加深入地了解Linux进程管理的原理和机制。 ... [详细]
  • 本文介绍了在mac环境下使用nginx配置nodejs代理服务器的步骤,包括安装nginx、创建目录和文件、配置代理的域名和日志记录等。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • 本文讨论了clone的fork与pthread_create创建线程的不同之处。进程是一个指令执行流及其执行环境,其执行环境是一个系统资源的集合。在调用系统调用fork创建一个进程时,子进程只是完全复制父进程的资源,这样得到的子进程独立于父进程,具有良好的并发性。但是二者之间的通讯需要通过专门的通讯机制,另外通过fork创建子进程系统开销很大。因此,在某些情况下,使用clone或pthread_create创建线程可能更加高效。 ... [详细]
  • 本文介绍了Windows操作系统的版本及其特点,包括Windows 7系统的6个版本:Starter、Home Basic、Home Premium、Professional、Enterprise、Ultimate。Windows操作系统是微软公司研发的一套操作系统,具有人机操作性优异、支持的应用软件较多、对硬件支持良好等优点。Windows 7 Starter是功能最少的版本,缺乏Aero特效功能,没有64位支持,最初设计不能同时运行三个以上应用程序。 ... [详细]
  • OO第一单元自白:简单多项式导函数的设计与bug分析
    本文介绍了作者在学习OO的第一次作业中所遇到的问题及其解决方案。作者通过建立Multinomial和Monomial两个类来实现多项式和单项式,并通过append方法将单项式组合为多项式,并在此过程中合并同类项。作者还介绍了单项式和多项式的求导方法,并解释了如何利用正则表达式提取各个单项式并进行求导。同时,作者还对自己在输入合法性判断上的不足进行了bug分析,指出了自己在处理指数情况时出现的问题,并总结了被hack的原因。 ... [详细]
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • 本文详细介绍了git常用命令及其操作方法,包括查看、添加、提交、删除、找回等操作,以及如何重置修改文件、抛弃工作区修改、将工作文件提交到本地暂存区、从版本库中删除文件等。同时还介绍了如何从暂存区恢复到工作文件、恢复最近一次提交过的状态,以及如何合并多个操作等。 ... [详细]
author-avatar
奇国的雪儿
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有