如何使用Kafka(超过15MB)发送大量邮件?

 手机用户2602915241 发布于 2023-02-04 19:59

我使用Java Producer API将String-messages发送到Kafka V. 0.8.如果邮件大小约为15 MB,我会得到一个MessageSizeTooLargeException.我试图设置message.max.bytes为40 MB,但我仍然得到例外.小消息没有问题.

(例外情况出现在制作人中,我在此应用程序中没有使用者.)

我该怎么做才能摆脱这种异常?

我的示例生产者配置

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

错误日志:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)

laughing_man.. 158

您需要调整三(或四)个属性:

消费者方面:fetch.message.max.bytes- 这将确定消费者可以获取的消息的最大大小.

代理方:replica.fetch.max.bytes- 这将允许代理中的副本在群集内发送消息,并确保正确复制消息.如果这个太小,则永远不会复制该消息,因此,消费者永远不会看到该消息,因为该消息永远不会被提交(完全复制).

经纪人方面:message.max.bytes- 这是经纪人可以从生产者处收到的最大消息大小.

代理方(每个主题):max.message.bytes- 这是代理允许附加到主题的消息的最大大小.此大小经过预压缩验证.(默认为经纪人message.max.bytes.)

我发现了关于2号的困难方法 - 你没有从Kafka获得任何异常,消息或警告,所以在发送大量消息时一定要考虑这一点.

6 个回答
  • 您需要调整三(或四)个属性:

    消费者方面:fetch.message.max.bytes- 这将确定消费者可以获取的消息的最大大小.

    代理方:replica.fetch.max.bytes- 这将允许代理中的副本在群集内发送消息,并确保正确复制消息.如果这个太小,则永远不会复制该消息,因此,消费者永远不会看到该消息,因为该消息永远不会被提交(完全复制).

    经纪人方面:message.max.bytes- 这是经纪人可以从生产者处收到的最大消息大小.

    代理方(每个主题):max.message.bytes- 这是代理允许附加到主题的消息的最大大小.此大小经过预压缩验证.(默认为经纪人message.max.bytes.)

    我发现了关于2号的困难方法 - 你没有从Kafka获得任何异常,消息或警告,所以在发送大量消息时一定要考虑这一点.

    2023-02-04 20:01 回答
  • 这个想法是将相同大小的消息从Kafka Producer发送到Kafka Broker然后由Kafka Consumer接收,即

    Kafka生产商 - > Kafka Broker - > Kafka Consumer

    假设要求是发送15MB的消息,那么生产者,经纪人和消费者这三者都需要同步.

    Kafka Producer发送15 MB - > Kafka Broker允许/商店15 MB - > Kafka Consumer获得15 MB

    因此,设置应为A.)On Broker:message.max.bytes = 15728640 replica.fetch.max.bytes = 15728640

    B.)On Consumer:fetch.message.max.bytes = 15728640

    2023-02-04 20:02 回答
  • @laughing_man的答案非常准确。但是,我仍然想提出建议,这是我从Quora的Kafka专家Stephane Maarek那里学到的。

    Kafka并非要处理大型邮件。

    您的API应该使用云存储(Ex AWS S3),并且只需向Kafka或任何消息代理推送S3的引用即可。您必须找到某个地方来保存数据,也许是网络驱动器,也许是任何东西,但它不应该是消息代理。

    现在,如果您不想采用上述解决方案

    消息的最大大小为1MB(代理中的设置称为message.max.bytes)Apache Kafka。如果确实非常需要它,则可以增加该大小,并确保为生产者和消费者增加网络缓冲区。

    而且,如果您真的很想拆分邮件,请确保每个拆分的邮件都具有完全相同的密钥,以便将其推送到同一分区,并且邮件内容应报告“部件ID”,以便您的使用者可以完全重建邮件。

    如果您的消息是基于文本的(gzip,snappy,lz4压缩),则还可以探索压缩,这可能会减小数据大小,但并非神奇。

    同样,您必须使用外部系统来存储该数据,然后仅将外部引用推送到Kafka。这是一种非常常见的体系结构,您应该使用并被广泛接受的体系结构。

    请记住,仅当邮件数量巨大而不是大小时,Kafka才能发挥最佳作用。

    资料来源:https : //www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka

    2023-02-04 20:02 回答
  • 要记住,message.max.bytes属性必须与消费者的财产同步fetch.message.max.bytes.获取大小必须至少与最大消息大小一样大,否则可能存在生成器可以发送大于消费者可以使用/获取的消息的情况.值得一看的是它.
    您使用的是哪个版本的Kafka?还提供了一些您将获得的更多详细信息跟踪.有什么东西像...... payload size of xxxx larger than 1000000出现在日志里吗?

    2023-02-04 20:02 回答
  • 与Laugh_man的答案相比,Kafka 0.10和新消费者需要进行微小的更改:

    经纪人:没有变化,你仍然需要增加属性message.max.bytesreplica.fetch.max.bytes.message.max.bytes必须等于或小于(*)replica.fetch.max.bytes.

    制作人:增加max.request.size发送更大的消息.

    消费者:增加max.partition.fetch.bytes以接收更大的消息.

    (*)阅读评论以了解有关message.max.bytes<=的更多信息replica.fetch.max.bytes

    2023-02-04 20:02 回答
  • 您需要覆盖以下属性:

    Broker Configs($ KAFKA_HOME/config/server.properties)

    replica.fetch.max.bytes

    message.max.bytes

    消费者配置($ KAFKA_HOME/config/consumer.properties)
    此步骤对我不起作用.我将它添加到消费者应用程序,它工作正常

    fetch.message.max.bytes

    重启服务器.

    请查看此文档以获取更多信息:http: //kafka.apache.org/08/configuration.html

    2023-02-04 20:02 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有