我一直在观看Rick Branson的PyCon视频:Instagram上的消息传播.您可能希望观看视频以回答此问题.Rick Branson使用Celery,Redis和RabbitMQ.为了让您加快速度,每个用户都有一个redis列表供他们的主页提供.每个列表都包含由他们关注的人发布的照片的媒体ID.
例如贾斯汀比伯有150万粉丝.当他发布照片时,需要将该照片的ID插入每个粉丝的每个粉丝列表中.这称为Fanout-On-Write方法.但是,这种方法存在一些可靠性问题.它可以工作,但对于拥有数百万粉丝的Justin Bieber或Lady Gaga这样的人来说,在Web请求中执行此操作(您需要0-500毫秒才能完成请求)可能会出现问题.到那时,请求将超时.
所以Rick Branson决定使用Celery,这是一个基于分布式消息传递的异步任务队列/作业队列.任何繁重的工作,例如将媒体ID插入到关注者列表中,都可以在Web请求之外异步完成.请求将完成,芹菜将继续将ID插入所有列表.
这种方法有奇效.但同样,你不想把所有贾斯汀的追随者送到芹菜的一大块,因为它会捆绑一个芹菜工人.为什么不让多个工人同时工作,以便更快完成?卓见!你想把这个块拆分成更小的块,每个批次都有不同的工作人员.Rick Branson做了一批10,000名追随者,他使用一种叫做光标的东西,为所有Justin Bieber的粉丝插入媒体ID,直到完成为止.在视频中,他在3:56中谈到了这一点
我想知道是否有人可以解释这一点,并展示如何做到这一点的例子.我目前正在尝试尝试相同的设置.我使用Andy McCurdy的redis-py python客户端库与我的redis服务器进行通信.对于我服务的每个用户,我创建了一个redis关注者列表.
因此,ID为343的用户将拥有以下密钥的列表:
followers:343
我还为每个用户创建了一个主页提交列表.每个用户都有自己的列表.因此ID为1990的用户将拥有以下密钥的列表:
homefeed:1990
在"粉丝:343"redis列表中,它包含关注用户343的人的所有ID.用户343有20,007粉丝.下面,我正在检索列表中从索引0一直到结束-1的所有ID,只是为了向您展示它的样子.
>>> r_server.lrange("followers:343", 0, -1) ['8', '7', '5', '3', '65', '342', '42', etc...] ---> for the sake of example, assume this list has another 20,000 IDs.
您看到的是关注用户343的所有用户ID的列表.
这是我的proj/mydjangoapp/tasks.py,其中包含我的insert_into_homefeed函数:
from __future__ import absolute_import from celery import shared_task import redis pool = redis.ConnectionPool(host='XX.XXX.XXX.X', port=6379, db=0, password='XXXXX') @shared_task def insert_into_homefeed(photo_id, user_id): # Grab the list of all follower IDs from Redis for user_id. r_server = redis.Redis(connection_pool=pool) followers_list = r_server.lrange("followers:%s" % (user_id), 0, -1) # Now for each follower_id in followers_list, find their homefeed key # in Redis and insert the photo_id into that homefeed list. for follower_id in followers_list: homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id) return "Fan Out Completed for %s" % (user_id)
在此任务中,当从Django视图调用时,它将获取跟随用户343的人的所有ID,然后将照片ID插入其所有的主馈页列表中.
这是我在proj/mydjangoapp/views.py中的上传视图.我基本上调用celery的延迟方法并传递必要的变量,以便请求快速结束:
# Import the Celery Task Here from mydjangoapp.tasks import insert_into_homefeed @csrf_exempt def Upload(request): if request.method == 'POST': data = json.loads(request.body) newPhoto = Photo.objects.create(user_id = data['user_id'], description= data['description'], photo_url = data['photo_url']) newPhoto_ID = newPhoto.pk insert_into_homefeed.delay(newPhoto_ID, data['user_id']) return HttpResponse("Request Completed")
我怎么能这样做,它将被10,000批量?
视频中描述的方法是任务"链接".
要使您的任务方法作为链启动并运行,您需要在关注者列表中添加一个表示索引的额外参数.该任务不是在完整的关注者列表上工作,而是仅对固定的批量大小起作用,从它所提交的索引参数开始.完成后,任务应创建一个新任务并传递新索引.
INSERT_INTO_HOMEFEED_BATCH = 10000 @shared_task def insert_into_homefeed(photo_id, user_id, index=0): # Grab the list of all follower IDs from Redis for user_id. r_server = redis.Redis(connection_pool=pool) range_limit = index + INSERT_INTO_HOMEFEED_BATCH - 1 # adjust for zero-index followers_list_batch = r_server.lrange("followers:%s" % (user_id), index, range_limit) if not followers_list_batch: return # zero followers or no more batches # Now for each follower_id in followers_list_batch, find their homefeed key # in Redis and insert the photo_id into that homefeed list. for follower_id in followers_list: homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id) insert_into_homefeed.delay(photo_id, user_id, range_limit + 1)
这很有效,因为Redis 列表是有序的,而lrange命令不会在超出范围的输入上返回错误.