热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

kafka0.9+消费者配置参数说明

ConsumerConfiguration在kafka0.9使用JavaConsumer替代了老版本的scalaConsumer。新版的配置如下:bootstrap.

Consumer Configuration在kafka 0.9+使用Java Consumer替代了老版本的scala Consumer。新版的配置如下:


  • bootstrap.servers
    在启动consumer时配置的broker地址的。不需要将cluster中所有的broker都配置上,因为启动后会自动的发现cluster所有的broker。
    它配置的格式是:host1:port1;host2:port2…

  • key.descrializer

      Message record 的key的反序列化类。

      一般都是StringDeserializer, org.apache.kafka.common.serialization.StringDeserializer


  • value.descrializer
    Message record 的value的反序列化类。

      一般都是StringDeserializer。org.apache.kafka.common.serialization.StringDeserializer


  • group.id
    用于表示该consumer想要加入到哪个group中。默认值是 “”。

  • heartbeat.interval.ms
    心跳间隔。心跳是在consumer与coordinator之间进行的。心跳是确定consumer存活,加入或者退出group的有效手段。这个值必须设置的小于session.timeout.ms,因为:
    当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。
    通常设置的值要低于session.timeout.ms的1/3。
    默认值是:3000 (3s)

  • session.timeout.ms
    Consumer session 过期时间。这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
    其默认值是:10000 (10 s)

  • enable.auto.commit
    Consumer 在commit offset时有两种模式:自动提交,手动提交。手动提交在前面已经说过。自动提交:是Kafka Consumer会在后台周期性的去commit。
    默认值是true。

  • auto.commit.interval.ms
    自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)

  • auto.offset.reset
    这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(如果一个record被删除,就肯定不存在了)时,该如何处理。它有4种处理方式:
    1) earliest:自动重置到最早的offset。
    2) latest:看上去重置到最晚的offset。
    3) none:如果边更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset。
    4) 如果不是上述3种,只抛出异常给consumer。
    默认值是latest。

  • connections.max.idle.ms
    连接空闲超时时间。因为consumer只与broker有连接(coordinator也是一个broker),所以这个配置的是consumer到broker之间的。
    默认值是:540000 (9 min)

  • fetch.max.wait.ms
    Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。

  • fetch.min.bytes
    当consumer向一个broker发起fetch请求时,broker返回的records的大小最小值。如果broker中数据量不够的话会wait,直到数据大小满足这个条件。
    取值范围是:[0, Integer.Max],默认值是1。
    默认值设置为1的目的是:使得consumer的请求能够尽快的返回。

  • fetch.max.bytes
    一次fetch请求,从一个broker中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。
    broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes 和 topic的max.message.bytes的配置。
    取值范围是:[0, Integer.Max],默认值是:52428800 (5 MB)

  • max.partition.fetch.bytes
    一次fetch请求,从一个partition中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。
    broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes 和 topic的max.message.bytes的配置。

  • max.poll.interval.ms
    前面说过要求程序中不间断的调用poll()。如果长时间没有调用poll,且间隔超过这个值时,就会认为这个consumer失败了。

  • max.poll.records
    Consumer每次调用poll()时取到的records的最大数。

  • receive.buffer.byte
    Consumer receiver buffer (SO_RCVBUF)的大小。这个值在创建Socket连接时会用到。
    取值范围是:[-1, Integer.MAX]。默认值是:65536 (64 KB)
    如果值设置为-1,则会使用操作系统默认的值。

  • request.timeout.ms
    请求发起后,并不一定会很快接收到响应信息。这个配置就是来配置请求超时时间的。默认值是:305000 (305 s)

  • client.id
    Consumer进程的标识。如果设置一个人为可读的值,跟踪问题会比较方便。

  • interceptor.classes
    用户自定义interceptor。

  • metadata.max.age.ms
    Metadata数据的刷新间隔。即便没有任何的partition订阅关系变更也行执行。
    范围是:[0, Integer.MAX],默认值是:300000 (5 min)


Consumer配置说明

bootstrap.servers

参考生产者中的bootstrap.servers配置说明

key.descrializer、value.descrializer
Message record 的key, value的反序列化类。

group.id


A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.


用于表示该consumer想要加入到哪个group中。默认值是 “”。
heartbeat.interval.ms


The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.


心跳间隔。心跳是在consumer与coordinator之间进行的。心跳用来保持consumer的会话,并且在有consumer加入或者离开group时帮助进行rebalance。
这个值必须设置的小于session.timeout.ms,因为:当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。
通常设置的值要低于session.timeout.ms的1/3。默认值是:3000 (3s)

session.timeout.ms


The timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.


Consumer session 过期时间。consumer会发送周期性的心跳表明该consumer是活着的。如果超过session.timeout.ms设定的值仍然没有收到心跳,zebroker会把这个consumer从group中移除,并且重新rebalance。
这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
其默认值是:10000 (10 s)

enable.auto.commit


If true the consumer’s offset will be periodically committed in the background.


设置Consumer 在 commit 方式是否是自动调焦。默认值是true。

auto.commit.interval.ms


The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true.


自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)

auto.offset.reset


What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):



  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer’s group
  • anything else: throw exception to the consumer.

这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(如果一个record被删除,就肯定不存在了)时,该如何处理。它有4种处理方式:


  • earliest:自动重置到最早的offset。
  • latest:看上去重置到最晚的offset。
  • none:如果边更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset。
  • 如果不是上述3种,只抛出异常给consumer。
    默认值是latest。

fetch.max.wait.ms


The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by fetch.min.bytes.


Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。

fetch.min.bytes


The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.


当consumer向一个broker发起fetch请求时,broker返回的records的大小最小值。如果broker中数据量不够的话会wait,直到数据大小满足这个条件。
取值范围是:[0, Integer.Max],默认值是1。默认值设置为1的目的是:使得consumer的请求能够尽快的返回。

fetch.max.bytes


The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel.
一次fetch请求,从一个broker中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes和topic的max.message.bytes的配置。
取值范围是:[0, Integer.Max],默认值是:52428800 (5 MB)


max.partition.fetch.bytes
一次fetch请求,从一个partition中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。 broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker 的message.max.bytes和 topic 的max.message.bytes的配置。

max.poll.interval.ms


The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.


前面说过要求程序中不间断的调用poll()。如果长时间没有调用poll,且间隔超过这个值时,就会认为这个consumer失败了。

max.poll.records


The maximum number of records returned in a single call to poll().


Consumer每次调用poll()时取到的records的最大数。

 

转载处:https://blog.csdn.net/lixiang987654321/article/details/99437168


推荐阅读
  • springboot基于redis配置session共享项目环境配置pom.xml引入依赖application.properties配置Cookie序列化(高版本不需要)测试启 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 本文介绍了RPC框架Thrift的安装环境变量配置与第一个实例,讲解了RPC的概念以及如何解决跨语言、c++客户端、web服务端、远程调用等需求。Thrift开发方便上手快,性能和稳定性也不错,适合初学者学习和使用。 ... [详细]
  • 标题: ... [详细]
  • 本文介绍了Python爬虫技术基础篇面向对象高级编程(中)中的多重继承概念。通过继承,子类可以扩展父类的功能。文章以动物类层次的设计为例,讨论了按照不同分类方式设计类层次的复杂性和多重继承的优势。最后给出了哺乳动物和鸟类的设计示例,以及能跑、能飞、宠物类和非宠物类的增加对类数量的影响。 ... [详细]
  • MySQL数据库锁机制及其应用(数据库锁的概念)
    本文介绍了MySQL数据库锁机制及其应用。数据库锁是计算机协调多个进程或线程并发访问某一资源的机制,在数据库中,数据是一种供许多用户共享的资源,如何保证数据并发访问的一致性和有效性是数据库必须解决的问题。MySQL的锁机制相对简单,不同的存储引擎支持不同的锁机制,主要包括表级锁、行级锁和页面锁。本文详细介绍了MySQL表级锁的锁模式和特点,以及行级锁和页面锁的特点和应用场景。同时还讨论了锁冲突对数据库并发访问性能的影响。 ... [详细]
  • Hibernate延迟加载深入分析-集合属性的延迟加载策略
    本文深入分析了Hibernate延迟加载的机制,特别是集合属性的延迟加载策略。通过延迟加载,可以降低系统的内存开销,提高Hibernate的运行性能。对于集合属性,推荐使用延迟加载策略,即在系统需要使用集合属性时才从数据库装载关联的数据,避免一次加载所有集合属性导致性能下降。 ... [详细]
  • Servlet多用户登录时HttpSession会话信息覆盖问题的解决方案
    本文讨论了在Servlet多用户登录时可能出现的HttpSession会话信息覆盖问题,并提供了解决方案。通过分析JSESSIONID的作用机制和编码方式,我们可以得出每个HttpSession对象都是通过客户端发送的唯一JSESSIONID来识别的,因此无需担心会话信息被覆盖的问题。需要注意的是,本文讨论的是多个客户端级别上的多用户登录,而非同一个浏览器级别上的多用户登录。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 本文讨论了在shiro java配置中加入Shiro listener后启动失败的问题。作者引入了一系列jar包,并在web.xml中配置了相关内容,但启动后却无法正常运行。文章提供了具体引入的jar包和web.xml的配置内容,并指出可能的错误原因。该问题可能与jar包版本不兼容、web.xml配置错误等有关。 ... [详细]
  • Java如何导入和导出Excel文件的方法和步骤详解
    本文详细介绍了在SpringBoot中使用Java导入和导出Excel文件的方法和步骤,包括添加操作Excel的依赖、自定义注解等。文章还提供了示例代码,并将代码上传至GitHub供访问。 ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • PHP连接MySQL的2种方法小结以及防止乱码【PHP】
    后端开发|php教程PHP,MySQL,乱码后端开发-php教程PHP的MySQL配置报错信息:ClassmysqlinotfoundinAnswer:1.在confphp.ini ... [详细]
  • 我正在使用sql-serverkafka-connect和debezium监视sqlserver数据库,但是当我发布并运行我的wo ... [详细]
  • druid接入kafka indexing service整个流程
    先介绍下我们的druid集群配置Overload1台Coordinator1台Middlemanager3台Broker3台Historical一共12台,其中cold6台,hot ... [详细]
author-avatar
手机用户2702934324
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有