我意识到sidekiq有一个push_bulk选项,但我目前受到redis延迟的限制,所以通过push_bulk传递多个项目仍然不够快(只有大约50/s).
我试图增加redis连接的数量,如下所示:
redis_conn = proc { Redis.new({ :url => Rails.configuration.redis.url }) } Sidekiq.configure_client do |config| Sidekiq.configure_client do |config| config.redis = ConnectionPool.new(size: 50, &redis_conn) end config.client_middleware do |chain| chain.add Sidekiq::Status::ClientMiddleware end end
然后触发单独的线程(Thread.new)以实际对各种对象执行perform_async.有趣的是,任何不是第一个线程的线程都不会被抛入sidekiq队列,就像它们被完全忽略一样.
有谁知道更好的方法吗?
编辑:这是我正在尝试的push_bulk方法实际上更慢:
user_ids = User.need_scraping.pluck(:id) bar = ProgressBar.new(user_ids.count) user_ids.in_groups_of(10000, false).each do |user_id_group| Sidekiq::Client.push_bulk( 'args' => user_id_group.map{ |user_id| [user_id] }, 'class' => ScrapeUser, 'queue' => 'scrape_user', 'retry' => true ) end
谢谢!
@Winfield的答案是正确的,并且他对延迟绝对正确。但是,正确的语法实际上如下:
User.need_scraping.select('id').find_in_batches do |user_group| Sidekiq::Client.push_bulk({ 'class' => UserWorker, 'args' => user_group.map {|user| [user.id] } }) end
也许它在最新的Sidekiq中有所更改(我懒得检查),但是现在这是正确的语法。
你想要使用push_bulk
.您受限于将元素写入redis队列支持sidekiq的延迟/往返时间.
您应该使用多个线程/连接来克服网络速度慢的问题,而您应该真正删除额外的网络往返.
假设您正在尝试将20k UserWorker
个工作纳入考虑范围user_id
:
您可以通过以下方式将单个工作排入队列:
UserWorker.perform_async(user_id)
...映射到:
Sidekiq::Client.push('class' => UserWorker, 'args' => [user_id] )
所以push_bulk
20k user_ids 的版本是:
# This example takes 20k user_ids in an array, chunks them into groups of 1000 ids, # and batch sends them to redis as a group. User.need_scraping.select('id').find_in_batches do |user_group| sidekiq_items = user_group.map {|user| { 'class' => UserWorker, 'args' => [user.id] } } Sidekiq::Client.push_bulk(sidekiq_items) end
这将20k redis呼叫转换为20个redis呼叫,平均往返时间为5ms(乐观),即1秒对100秒.你的旅费可能会改变.
编辑: 评论者似乎对Sidekiq/Redis客户端批量排队数据的行为感到困惑.
该Sidekiq::Client.push_bulk()
方法将一系列作业排入队列.它将这些转换为Sidekiq作业有效负载哈希,然后调用SideKiq::Client.raw_push()
将这些有效负载传递给redis.参见来源:https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/client.rb#L158
SideKiq::Client.raw_push()
获取Sidekiq哈希有效负载列表,将它们转换为JSON,然后执行MULTI
组合两个redis命令的redis命令.首先,它将目标队列添加到活动队列列表(redis SADD
),然后将所有作业有效负载推送到目标队列redis列表对象(redis LPUSH
).这是一个redis命令,在一个redis原子组中一起执行.
如果这仍然很慢,您可能还有其他问题(网络速度慢,redis服务器过载等).