我使用Java Producer API将String-messages发送到Kafka V. 0.8.如果邮件大小约为15 MB,我会得到一个MessageSizeTooLargeException
.我试图设置message.max.bytes
为40 MB,但我仍然得到例外.小消息没有问题.
(例外情况出现在制作人中,我在此应用程序中没有使用者.)
我该怎么做才能摆脱这种异常?
private ProducerConfig kafkaConfig() { Properties props = new Properties(); props.put("metadata.broker.list", BROKERS); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); props.put("message.max.bytes", "" + 1024 * 1024 * 40); return new ProducerConfig(props); }
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224] kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(Unknown Source) at kafka.producer.Producer.send(Unknown Source) at kafka.javaapi.producer.Producer.send(Unknown Source)
laughing_man.. 158
您需要调整三(或四)个属性:
消费者方面:fetch.message.max.bytes
- 这将确定消费者可以获取的消息的最大大小.
代理方:replica.fetch.max.bytes
- 这将允许代理中的副本在群集内发送消息,并确保正确复制消息.如果这个太小,则永远不会复制该消息,因此,消费者永远不会看到该消息,因为该消息永远不会被提交(完全复制).
经纪人方面:message.max.bytes
- 这是经纪人可以从生产者处收到的最大消息大小.
代理方(每个主题):max.message.bytes
- 这是代理允许附加到主题的消息的最大大小.此大小经过预压缩验证.(默认为经纪人message.max.bytes
.)
我发现了关于2号的困难方法 - 你没有从Kafka获得任何异常,消息或警告,所以在发送大量消息时一定要考虑这一点.
您需要调整三(或四)个属性:
消费者方面:fetch.message.max.bytes
- 这将确定消费者可以获取的消息的最大大小.
代理方:replica.fetch.max.bytes
- 这将允许代理中的副本在群集内发送消息,并确保正确复制消息.如果这个太小,则永远不会复制该消息,因此,消费者永远不会看到该消息,因为该消息永远不会被提交(完全复制).
经纪人方面:message.max.bytes
- 这是经纪人可以从生产者处收到的最大消息大小.
代理方(每个主题):max.message.bytes
- 这是代理允许附加到主题的消息的最大大小.此大小经过预压缩验证.(默认为经纪人message.max.bytes
.)
我发现了关于2号的困难方法 - 你没有从Kafka获得任何异常,消息或警告,所以在发送大量消息时一定要考虑这一点.
这个想法是将相同大小的消息从Kafka Producer发送到Kafka Broker然后由Kafka Consumer接收,即
Kafka生产商 - > Kafka Broker - > Kafka Consumer
假设要求是发送15MB的消息,那么生产者,经纪人和消费者这三者都需要同步.
Kafka Producer发送15 MB - > Kafka Broker允许/商店15 MB - > Kafka Consumer获得15 MB
因此,设置应为A.)On Broker:message.max.bytes = 15728640 replica.fetch.max.bytes = 15728640
B.)On Consumer:fetch.message.max.bytes = 15728640
@laughing_man的答案非常准确。但是,我仍然想提出建议,这是我从Quora的Kafka专家Stephane Maarek那里学到的。
Kafka并非要处理大型邮件。
您的API应该使用云存储(Ex AWS S3),并且只需向Kafka或任何消息代理推送S3的引用即可。您必须找到某个地方来保存数据,也许是网络驱动器,也许是任何东西,但它不应该是消息代理。
现在,如果您不想采用上述解决方案
消息的最大大小为1MB(代理中的设置称为message.max.bytes
)Apache Kafka。如果确实非常需要它,则可以增加该大小,并确保为生产者和消费者增加网络缓冲区。
而且,如果您真的很想拆分邮件,请确保每个拆分的邮件都具有完全相同的密钥,以便将其推送到同一分区,并且邮件内容应报告“部件ID”,以便您的使用者可以完全重建邮件。
如果您的消息是基于文本的(gzip,snappy,lz4压缩),则还可以探索压缩,这可能会减小数据大小,但并非神奇。
同样,您必须使用外部系统来存储该数据,然后仅将外部引用推送到Kafka。这是一种非常常见的体系结构,您应该使用并被广泛接受的体系结构。
请记住,仅当邮件数量巨大而不是大小时,Kafka才能发挥最佳作用。
资料来源:https : //www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka
要记住,message.max.bytes
属性必须与消费者的财产同步fetch.message.max.bytes
.获取大小必须至少与最大消息大小一样大,否则可能存在生成器可以发送大于消费者可以使用/获取的消息的情况.值得一看的是它.
您使用的是哪个版本的Kafka?还提供了一些您将获得的更多详细信息跟踪.有什么东西像...... payload size of xxxx larger
than 1000000
出现在日志里吗?
与Laugh_man的答案相比,Kafka 0.10和新消费者需要进行微小的更改:
经纪人:没有变化,你仍然需要增加属性message.max.bytes
和replica.fetch.max.bytes
.message.max.bytes
必须等于或小于(*)replica.fetch.max.bytes
.
制作人:增加max.request.size
发送更大的消息.
消费者:增加max.partition.fetch.bytes
以接收更大的消息.
(*)阅读评论以了解有关message.max.bytes
<=的更多信息replica.fetch.max.bytes
您需要覆盖以下属性:
Broker Configs($ KAFKA_HOME/config/server.properties)
replica.fetch.max.bytes
message.max.bytes
消费者配置($ KAFKA_HOME/config/consumer.properties)
此步骤对我不起作用.我将它添加到消费者应用程序,它工作正常
fetch.message.max.bytes
重启服务器.
请查看此文档以获取更多信息:http: //kafka.apache.org/08/configuration.html