Django,Celery,Redis,RabbitMQ:Fanout-On-Writes的链式任务

 手机用户2502883723 发布于 2023-02-05 08:44

我一直在观看Rick Branson的PyCon视频:Instagram上的消息传播.您可能希望观看视频以回答此问题.Rick Branson使用Celery,Redis和RabbitMQ.为了让您加快速度,每个用户都有一个redis列表供他们的主页提供.每个列表都包含由他们关注的人发布的照片​​的媒体ID.

例如贾斯汀比伯有150万粉丝.当他发布照片时,需要将该照片的ID插入每个粉丝的每个粉丝列表中.这称为Fanout-On-Write方法.但是,这种方法存在一些可靠性问题.它可以工作,但对于拥有数百万粉丝的Justin Bieber或Lady Gaga这样的人来说,在Web请求中执行此操作(您需要0-500毫秒才能完成请求)可能会出现问题.到那时,请求将超时.

所以Rick Branson决定使用Celery,这是一个基于分布式消息传递的异步任务队列/作业队列.任何繁重的工作,例如将媒体ID插入到关注者列表中,都可以在Web请求之外异步完成.请求将完成,芹菜将继续将ID插入所有列表.

这种方法有奇效.但同样,你不想把所有贾斯汀的追随者送到芹菜的一大块,因为它会捆绑一个芹菜工人.为什么不让多个工人同时工作,以便更快完成?卓见!你想把这个块拆分成更小的块,每个批次都有不同的工作人员.Rick Branson做了一批10,000名追随者,他使用一种叫做光标的东西,为所有Justin Bieber的粉丝插入媒体ID,直到完成为止.在视频中,他在3:56中谈到了这一点

我想知道是否有人可以解释这一点,并展示如何做到这一点的例子.我目前正在尝试尝试相同的设置.我使用Andy McCurdy的redis-py python客户端库与我的redis服务器进行通信.对于我服务的每个用户,我创建了一个redis关注者列表.

因此,ID为343的用户将拥有以下密钥的列表:

followers:343

我还为每个用户创建了一个主页提交列表.每个用户都有自己的列表.因此ID为1990的用户将拥有以下密钥的列表:

homefeed:1990

在"粉丝:343"redis列表中,它包含关注用户343的人的所有ID.用户343有20,007粉丝.下面,我正在检索列表中从索引0一直到结束-1的所有ID,只是为了向您展示它的样子.

>>> r_server.lrange("followers:343", 0, -1)
['8', '7', '5', '3', '65', '342', '42', etc...] ---> for the sake of example, assume this list has another 20,000 IDs.

您看到的是关注用户343的所有用户ID的列表.

这是我的proj/mydjangoapp/tasks.py,其中包含我的insert_into_homefeed函数:

from __future__ import absolute_import
from celery import shared_task
import redis
pool = redis.ConnectionPool(host='XX.XXX.XXX.X', port=6379, db=0, password='XXXXX')

@shared_task
def insert_into_homefeed(photo_id, user_id):
    # Grab the list of all follower IDs from Redis for user_id.
    r_server = redis.Redis(connection_pool=pool)

    followers_list = r_server.lrange("followers:%s" % (user_id), 0, -1)

    # Now for each follower_id in followers_list, find their homefeed key 
    # in Redis and insert the photo_id into that homefeed list.

    for follower_id in followers_list:
        homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id)
    return "Fan Out Completed for %s" % (user_id)

在此任务中,当从Django视图调用时,它将获取跟随用户343的人的所有ID,然后将照片ID插入其所有的主馈页列表中.

这是我在proj/mydjangoapp/views.py中的上传视图.我基本上调用celery的延迟方法并传递必要的变量,以便请求快速结束:

# Import the Celery Task Here
from mydjangoapp.tasks import insert_into_homefeed


@csrf_exempt
def Upload(request):
    if request.method == 'POST':
        data  = json.loads(request.body)
        newPhoto = Photo.objects.create(user_id = data['user_id'], description= data['description'], photo_url = data['photo_url'])
        newPhoto_ID = newPhoto.pk
        insert_into_homefeed.delay(newPhoto_ID, data['user_id'])
        return HttpResponse("Request Completed")

我怎么能这样做,它将被10,000批量?

1 个回答
  • 视频中描述的方法是任务"链接".

    要使您的任务方法作为链启动并运行,您需要在关注者列表中添加一个表示索引的额外参数.该任务不是在完整的关注者列表上工作,而是仅对固定的批量大小起作用,从它所提交的索引参数开始.完成后,任务应创建一个新任务并传递新索引.

    INSERT_INTO_HOMEFEED_BATCH = 10000
    
    @shared_task
    def insert_into_homefeed(photo_id, user_id, index=0):
        # Grab the list of all follower IDs from Redis for user_id.
        r_server = redis.Redis(connection_pool=pool)
    
        range_limit = index + INSERT_INTO_HOMEFEED_BATCH - 1 # adjust for zero-index
    
        followers_list_batch = r_server.lrange("followers:%s" % (user_id), index, range_limit)
    
        if not followers_list_batch:
            return # zero followers or no more batches
    
        # Now for each follower_id in followers_list_batch, find their homefeed key 
        # in Redis and insert the photo_id into that homefeed list.
        for follower_id in followers_list:
            homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id)
    
        insert_into_homefeed.delay(photo_id, user_id, range_limit + 1)
    

    这很有效,因为Redis 列表是有序的,而lrange命令不会在超出范围的输入上返回错误.

    2023-02-05 08:49 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有