快速将多个项目(1000 /秒)添加到sidekiq队列?

 ssl87 发布于 2023-02-09 13:00

我意识到sidekiq有一个push_bulk选项,但我目前受到redis延迟的限制,所以通过push_bulk传递多个项目仍然不够快(只有大约50/s).

我试图增加redis连接的数量,如下所示:

redis_conn = proc {
  Redis.new({ :url => Rails.configuration.redis.url })
}

Sidekiq.configure_client do |config|
  Sidekiq.configure_client do |config|
    config.redis = ConnectionPool.new(size: 50, &redis_conn)
  end
  config.client_middleware do |chain|
    chain.add Sidekiq::Status::ClientMiddleware
  end
end

然后触发单独的线程(Thread.new)以实际对各种对象执行perform_async.有趣的是,任何不是第一个线程的线程都不会被抛入sidekiq队列,就像它们被完全忽略一样.

有谁知道更好的方法吗?

编辑:这是我正在尝试的push_bulk方法实际上更慢:

  user_ids = User.need_scraping.pluck(:id)
  bar = ProgressBar.new(user_ids.count)
  user_ids.in_groups_of(10000, false).each do |user_id_group|
    Sidekiq::Client.push_bulk(
      'args'  => user_id_group.map{ |user_id| [user_id] },
      'class' => ScrapeUser,
      'queue' => 'scrape_user',
      'retry' => true
    )
  end

谢谢!

2 个回答
  • @Winfield的答案是正确的,并且他对延迟绝对正确。但是,正确的语法实际上如下:

    User.need_scraping.select('id').find_in_batches do |user_group|
      Sidekiq::Client.push_bulk({ 'class' => UserWorker, 'args' => user_group.map {|user| [user.id] } })
    end
    

    也许它在最新的Sidekiq中有所更改(我懒得检查),但是现在这是正确的语法。

    2023-02-09 13:03 回答
  • 你想要使用push_bulk.您受限于将元素写入redis队列支持sidekiq的延迟/往返时间.

    您应该使用多个线程/连接来克服网络速度慢的问题,而您应该真正删除额外的网络往返.

    假设您正在尝试将20k UserWorker个工作纳入考虑范围user_id:

    您可以通过以下方式将单个工作排入队列:

    UserWorker.perform_async(user_id)
    

    ...映射到:

    Sidekiq::Client.push('class' => UserWorker, 'args' => [user_id] )
    

    所以push_bulk20k user_ids 的版本是:

    # This example takes 20k user_ids in an array, chunks them into groups of 1000 ids,
    # and batch sends them to redis as a group.
    
    User.need_scraping.select('id').find_in_batches do |user_group|
    
      sidekiq_items = user_group.map {|user| { 'class' => UserWorker, 'args' => [user.id] } }
      Sidekiq::Client.push_bulk(sidekiq_items)
    end
    

    这将20k redis呼叫转换为20个redis呼叫,平均往返时间为5ms(乐观),即1秒对100秒.你的旅费可能会改变.

    编辑: 评论者似乎对Sidekiq/Redis客户端批量排队数据的行为感到困惑.

    Sidekiq::Client.push_bulk()方法将一系列作业排入队列.它将这些转换为Sidekiq作业有效负载哈希,然后调用SideKiq::Client.raw_push()将这些有效负载传递给redis.参见来源:https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/client.rb#L158

    SideKiq::Client.raw_push()获取Sidekiq哈希有效负载列表,将它们转换为JSON,然后执行MULTI组合两个redis命令的redis命令.首先,它将目标队列添加到活动队列列表(redis SADD),然后将所有作业有效负载推送到目标队列redis列表对象(redis LPUSH).这是一个redis命令,在一个redis原子组中一起执行.

    如果这仍然很慢,您可能还有其他问题(网络速度慢,redis服务器过载等).

    2023-02-09 13:04 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有