我有一个公共交通消费者服务设置与RabbitMQ一起工作,我无法弄清楚如何提高消费者的速度 - 似乎很难限制每秒收到的10条消息.
我已尝试过此处列出的步骤:https://groups.google.com/forum/#! msg/masstransit-discuss/plP4n2sixrY/xfORgTPqcwsJ,但没有成功 - 将预取和并发使用者设置为25只会增加已确认的消息,但它不会增加消息的下载速率.
我的配置如下:
ServiceBusFactory.ConfigureDefaultSettings(x => { x.SetConcurrentReceiverLimit(25); x.SetConcurrentConsumerLimit(25); }); _bus = ServiceBusFactory.New( sbc => { sbc.UseRabbitMq(x => x.ConfigureHost( "rabbitmq://localhost/Dev/consume?prefetch=25", y => { y.SetUsername(config.Username); y.SetPassword(config.Password); })); sbc.UseLog4Net(); sbc.ReceiveFrom("rabbitmq://localhost/Dev/consume?prefetch=25"); sbc.Subscribe(x => RegisterConsumers(x, container)); sbc.UseJsonSerializer(); sbc.SetConcurrentConsumerLimit(25); });
我在两个地方设置并发消费者限制,因为我不确定我是否需要在默认或总线配置中设置它,并且消费者通过统一注册 - 我省略了消费者订阅,因为所有订阅者都是接收.
我有点困惑,我是否需要设置其他任何东西,或者我是否需要更改我设置配置的顺序.
任何帮助非常感谢.
在花了一个浪漫的夜晚解决问题并尝试克里斯提出的不同的事情之后,我发现还有另一件事要做才能让它像它应该的那样工作.
具体来说,是的,您需要在使用者队列地址上设置预取:
sbc.UseRabbitMq( f => f.ConfigureHost( new Uri( "rabbitmq://guest:guest@localhost/masstransit_consumer" ), c => { } ) ); int pf = 20; // prefetch // set consumer prefetch (required!) sbc.ReceiveFrom( string.Format( "rabbitmq://guest:guest@localhost/masstransit_consumer?prefetch={0}", pf ) );
但这仍然不够.
关键在mtstress
克里斯提到的工具代码中可以在他的回答下面的评论中提到.结果是工具调用:
int _t, _ct; ThreadPool.GetMinThreads( out _t, out _ct ); ThreadPool.SetMinThreads( pf, _ct );
将其添加到我的代码可以解决问题.我想知道为什么这不是MSMQ传输所必需的,尽管......
更新#1
经过进一步调查,我发现了一个可能的罪魁祸首.它在ServiceBusBuilderImpl
.
有一种提高极限的方法,ConfigureThreadPool
.
这里的问题是它调用CalculateRequiredThreads
哪些应返回所需线程的数量.不幸的是,后者在我的客户端Windows 7和Windows Server上都返回负值.因此,ConfigureThreadPool
实际上没有任何作用,因为在调用时会忽略负值ThreadPool.SetMin/MaxThreads
.
这个负面价值怎么样?似乎CalculateRequiredThreads
调用ThreadPool.GetMinThreads
和ThreadPool.GetAvailableThreads
使用公式来提出所需线程的数量:
var requiredThreads = consumerThreads + (workerThreads - availableWorkerThreads);
这里的问题是在我的机器上这有效地做到了:
40 (my limit) + 8 (workerThreads) - 1023 (availableThreads)
当然会返回
-975
结论是:上述代码来自公共交通内部似乎是错误的.当我提前手动提高限制时,它会ConfigureMinThreads
尊重它(因为只有当它高于读取值时才设置限制).
如果不事先手动设置限制,则无法设置限制,因此代码执行的线程数与默认线程池限制(在我的计算机上似乎为8)一样多.
显然有人认为这个公式会产生
40 + 8 - 8
在默认情况下.为什么GetMinThreads
和GetAvailableThreads
返回这些无关的价值还有待确定......
更新#2
更改
static int CalculateRequiredThreads( int consumerThreads ) { int workerThreads; int completionPortThreads; ThreadPool.GetMinThreads( out workerThreads, out completionPortThreads ); int availableWorkerThreads; int availableCompletionPortThreads; ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads ); var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads ); return requiredThreads; }
至
static int CalculateRequiredThreads( int consumerThreads ) { int workerThreads; int completionPortThreads; ThreadPool.GetMaxThreads( out workerThreads, out completionPortThreads ); int availableWorkerThreads; int availableCompletionPortThreads; ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads ); var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads ); return requiredThreads; }
解决了这个问题.两者都返回1023,公式的输出是正确数量的预期线程.