热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:Kafka踩坑记录

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Kafka踩坑记录相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Kafka踩坑记录相关的知识,希望对你有一定的参考价值。






1.生产者发送消息如果没有key值,要设置成null,不能设置成空字符串,否则会认为空字符串是key值,会把所有消息发送到一个分区上。

2.生产者设置消息批量发送,需要设置两个属性:batch.size和linger.ms。前者是消息积攒到多大时,发送给broker。后者是超过多少毫秒时,发送给broker。两者只要触发一个条件,就会把积攒的消息批量发送给broker。
需要注意的是,不管是否批量发送消息,produce都要显式调用close方法,只不过批量发送消息时,KafkaProducer会把消息存起来,触发条件后,才统一发送给broker。

当还没有到达batch.size的阈值,也没有到达linger.ms的阈值时,如果此时线程突然中断了,那么这批次的消息就会丢失,不会发送给broker。当这两个条件都没触发时,但是调用了produce的close方法,会把这个批次的消息提交至broker的。

3.batch.size的值,设置大小要合理,设置太大,会造成阻塞,效率反而更慢,下面复原一下问题。
本来想测试一下消费端的数据处理能力,所以在生产者端,我把batch.size的值设置成了21300000,不要问为什么设置成这个值,随便瞎写的。把linger.ms设置成了20000,即20秒发送一批数据。生产者通过循环产生数据。
运行的时候,产生了令人费解的一幕。生产者生产数据的速度非常慢,几秒钟,才会进入下一次循环。
然后经过了多次测试发现了更神奇的现象。当消息的key值设置为null或动态变化时,生产消息的速率非常慢。而当把消息的key值设置成固定的时,生产消息的速率又恢复了正常。并且,把batch.size的值设置小一些的时候(4096),生产速率也是正常的。
由上面的现象可知。batch.size设置很大,会影响生产者的效率,具体为什么影响,不得而知,需要追溯源码。 而batch.size设置很大时,效率也跟消息的key值有关系。key值固定不变时,效率快的原因猜测是因为不用进行分区的负载计算,只是一个往一个分区发,所以快。而key是null或者动态变化时,效率慢的原因猜测是因为需要计算不同key值的分区负载,所以影响了效率。那为什么当batch.size值小的时候,不会受key值的影响呢?这就说明batch.size和key值之间,有相互关系,但是还不是完全的影响关系。

带着上面的猜想和疑问,我们来看producer的源码。问题的根本,都集中在了producer的send方法里,所以,我们看send方法的源码。

这里,我们看源码的目的是找到为什么生产效率慢。所以,我采用了断点的方式,来定位到底是哪块代码,影响了速率。下面记录一下寻找过程。
由send方法经过一顿点入,进入了doSend方法,经过一番分析,初步猜测,是这个方法影响了效率,如下:
在这里插入图片描述
然后就在这里打个断点,然后再在它的下一行打个断点,用断点法验证是不是这里影响了性能。通过测试发现,从这个方法到下一个方法断点走的速度很快。而下一个断点到返回的send方法的时间,反而很慢。这就说明,我们猜测的这个断点,不是影响效率的地方。而是在这个断点的后面,某段代码,影响了效率。
索性把后面的代码,每行都打上断点,来测试到底哪行代码性能慢,如下图所示:
在这里插入图片描述
然后继续采用断点发,一个断点一个断点走,看哪个断点执行时间长。最终,发现是这个代码执行时间长:
在这里插入图片描述我们点进去这个方法,可以继续采用断点发一个一个排查哪个代码影响了性能,也可以大体看一遍方法,猜测一下可能哪里影响性能,缩小定位范围,最终,定位到是这里影响了性能:

在这里插入图片描述
这个方法就是最终的原因所在,我们把其全部代码贴出来:

/**
* Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool
* is configured with blocking mode.
*
* @param size The buffer size to allocate in bytes
* @param maxTimeToBlockMs The maximum time in milliseconds to block for buffer memory to be available
* @return The buffer
* @throws InterruptedException If the thread is interrupted while blocked
* @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block
* forever)
*/

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);
// loop over and over until we have a buffer or have reserved
// enough memory to allocate one
while (accumulated < size) {
long startWaitNs &#61; time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
waitingTimeElapsed &#61; !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} finally {
long endWaitNs &#61; time.nanoseconds();
timeNs &#61; Math.max(0L, endWaitNs - startWaitNs);
recordWaitTime(timeNs);
}
if (waitingTimeElapsed) {
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " &#43; maxTimeToBlockMs &#43; " ms.");
}
remainingTimeToBlockNs -&#61; timeNs;
// check if we can satisfy this request from the free list,
// otherwise allocate memory
if (accumulated &#61;&#61; 0 && size &#61;&#61; this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer &#61; this.free.pollFirst();
accumulated &#61; size;
} else {
// we&#39;ll need to allocate memory, but we may only get
// part of what we need on this iteration
freeUp(size - accumulated);
int got &#61; (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
this.nonPooledAvailableMemory -&#61; got;
accumulated &#43;&#61; got;
}
}
// Don&#39;t reclaim memory on throwable since nothing was thrown
accumulated &#61; 0;
} finally {
// When this loop was not able to successfully terminate don&#39;t loose available memory
this.nonPooledAvailableMemory &#43;&#61; accumulated;
this.waiters.remove(moreMemory);
}
}
} finally {
// signal any additional waiters if there is more memory left
// over for them
try {
if (!(this.nonPooledAvailableMemory &#61;&#61; 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
// Another finally... otherwise find bugs complains
lock.unlock();
}
}
if (buffer &#61;&#61; null)
return safeAllocateByteBuffer(size);
else
return buffer;
}

结合注释看代码&#xff0c;可以得知&#xff0c;核心原因如下图:
在这里插入图片描述
当超出size时&#xff0c;会进入阻塞&#xff0c;这就解释了为什么batch.size很大时&#xff0c;效率反而慢了。

那么还有一个疑问没解决&#xff0c;为什么key值设置成固定值时&#xff0c;效率很快呢&#xff0c;它没有阻塞吗&#xff1f;

通过断点可知&#xff0c;当key值固定时&#xff0c;代码没走进阻塞方法&#xff0c;它走了其他分支&#xff0c;所以效率很快&#xff0c;如下图:
在这里插入图片描述


总结

batch.size的值需要合理设置&#xff0c;否则会进入阻塞&#xff0c;效率很慢。






推荐阅读
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • 欢乐的票圈重构之旅——RecyclerView的头尾布局增加
    项目重构的Git地址:https:github.comrazerdpFriendCircletreemain-dev项目同步更新的文集:http:www.jianshu.comno ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • 本文介绍了MVP架构模式及其在国庆技术博客中的应用。MVP架构模式是一种演变自MVC架构的新模式,其中View和Model之间的通信通过Presenter进行。相比MVC架构,MVP架构将交互逻辑放在Presenter内部,而View直接从Model中读取数据而不是通过Controller。本文还探讨了MVP架构在国庆技术博客中的具体应用。 ... [详细]
author-avatar
坐着驴车追宝马
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有