今天来说说,Python 中的任务切分。以爬虫为例,从一个存 url 的 txt 文件中,读取其内容,我们会获取一个 url 列表。我们把这一个 url 列表称为大任务。
列表切分
在不考虑内存占用的情况下,我们对上面的大任务进行一个切分。比如我们将大任务切分成的小任务是每秒最多只访问5个URL。
import os
import timeCURRENT_DIR &#61; os.path.dirname(os.path.abspath(__file__))def read_file():file_path &#61; os.path.join(CURRENT_DIR, "url_list.txt")with open(file_path, "r", encoding&#61;"utf-8") as fs:result &#61; [i.strip() for i in fs.readlines()]return resultdef fetch(url):print(url)def run():max_count &#61; 5url_list &#61; read_file()for index in range(0, len(url_list), max_count):start &#61; time.time()fetch(url_list[index:index &#43; max_count])end &#61; time.time() - startif end < 1:time.sleep(1 - end)if __name__ &#61;&#61; &#39;__main__&#39;:run()
关键代码都在for循环里&#xff0c;首先我们通过声明range的第三个参数&#xff0c;该参数指定迭代的步长为5,这样每次index增加都是以5为基数&#xff0c;即0&#xff0c;5&#xff0c;10。。。
然后我们对url_list做切片&#xff0c;每次取其五个元素&#xff0c;这五个元素会随着index的增加不断的在改变&#xff0c;如果最后不够五个了&#xff0c;按照切片的特性这个时候就会有多少取多少了&#xff0c;不会造成索引超下标的问题。
随着url列表的增加&#xff0c;我们会发现内存的占用也在提高了。这个时候我们就需要对代码进行修改了&#xff0c;我们知道生成器是比较节省内存的空间的&#xff0c;修改之后代码变成&#xff0c;下面的这样。
生成器切分
import os
import time
from itertools import isliceCURRENT_DIR &#61; os.path.dirname(os.path.abspath(__file__))def read_file():file_path &#61; os.path.join(CURRENT_DIR, "url_list.txt")with open(file_path, "r", encoding&#61;"utf-8") as fs:for i in fs:yield i.strip()def fetch(url):print(url)def run():max_count &#61; 5url_gen &#61; read_file()while True:url_list &#61; list(islice(url_gen, 0, max_count))if not url_list:breakstart &#61; time.time()fetch(url_list)end &#61; time.time() - startif end < 1:time.sleep(1 - end)if __name__ &#61;&#61; &#39;__main__&#39;:run()
首先&#xff0c;我们修改了文件读取的方式&#xff0c;把原来读列表的形式&#xff0c;改为了生成器的形式。这样我们在调用该文件读取方法的时候大大节省了内存。
然后就是对上面for循环进行改造&#xff0c;因为生成器的特性&#xff0c;这里不适合使用for进行迭代&#xff0c;因为每一次的迭代都会消耗生成器的元素&#xff0c;通过使用itertools的islice对url_gen进行切分,islice是生成器的切片,这里我们每次切分出含有5个元素的生成器&#xff0c;因为生成器没有__len__方法所以&#xff0c;我们将其转为列表&#xff0c;然后判断列表是否为空&#xff0c;就可以知道迭代是否该结束了。
修改之后的代码&#xff0c;不管是性能还是节省内存上都大大的提高。读取千万级的文件不是问题。
除此之外&#xff0c;在使用异步爬虫的时候&#xff0c;也许会用到异步生成器切片。下面就和大家讨论&#xff0c;异步生成器切分的问题
异步生成器切分
首先先来看一个简单的异步生成器。
我们知道调用下面的代码会得到一个生成器
def foo():for i in range(20):yield i
如果在def前面加一个async&#xff0c;那么在调用的时候它就是个异步生成器了。
完整示例代码如下
import asyncio
async def foo():for i in range(20):yield iasync def run():async_gen &#61; foo()async for i in async_gen:print(i)if __name__ &#61;&#61; &#39;__main__&#39;:asyncio.run(run())
关于async for的切分有点复杂&#xff0c;这里推荐使用aiostream模块&#xff0c;使用之后代码改为下面这样
import asyncio
from aiostream import streamasync def foo():for i in range(22):yield iasync def run():index &#61; 0limit &#61; 5while True:xs &#61; stream.iterate(foo())ys &#61; xs[index:index &#43; limit]t &#61; await stream.list(ys)if not t:breakprint(t)index &#43;&#61; limitif __name__ &#61;&#61; &#39;__main__&#39;:asyncio.run(run())