热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:Kafka踩坑记录

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Kafka踩坑记录相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Kafka踩坑记录相关的知识,希望对你有一定的参考价值。






1.生产者发送消息如果没有key值,要设置成null,不能设置成空字符串,否则会认为空字符串是key值,会把所有消息发送到一个分区上。

2.生产者设置消息批量发送,需要设置两个属性:batch.size和linger.ms。前者是消息积攒到多大时,发送给broker。后者是超过多少毫秒时,发送给broker。两者只要触发一个条件,就会把积攒的消息批量发送给broker。
需要注意的是,不管是否批量发送消息,produce都要显式调用close方法,只不过批量发送消息时,KafkaProducer会把消息存起来,触发条件后,才统一发送给broker。

当还没有到达batch.size的阈值,也没有到达linger.ms的阈值时,如果此时线程突然中断了,那么这批次的消息就会丢失,不会发送给broker。当这两个条件都没触发时,但是调用了produce的close方法,会把这个批次的消息提交至broker的。

3.batch.size的值,设置大小要合理,设置太大,会造成阻塞,效率反而更慢,下面复原一下问题。
本来想测试一下消费端的数据处理能力,所以在生产者端,我把batch.size的值设置成了21300000,不要问为什么设置成这个值,随便瞎写的。把linger.ms设置成了20000,即20秒发送一批数据。生产者通过循环产生数据。
运行的时候,产生了令人费解的一幕。生产者生产数据的速度非常慢,几秒钟,才会进入下一次循环。
然后经过了多次测试发现了更神奇的现象。当消息的key值设置为null或动态变化时,生产消息的速率非常慢。而当把消息的key值设置成固定的时,生产消息的速率又恢复了正常。并且,把batch.size的值设置小一些的时候(4096),生产速率也是正常的。
由上面的现象可知。batch.size设置很大,会影响生产者的效率,具体为什么影响,不得而知,需要追溯源码。 而batch.size设置很大时,效率也跟消息的key值有关系。key值固定不变时,效率快的原因猜测是因为不用进行分区的负载计算,只是一个往一个分区发,所以快。而key是null或者动态变化时,效率慢的原因猜测是因为需要计算不同key值的分区负载,所以影响了效率。那为什么当batch.size值小的时候,不会受key值的影响呢?这就说明batch.size和key值之间,有相互关系,但是还不是完全的影响关系。

带着上面的猜想和疑问,我们来看producer的源码。问题的根本,都集中在了producer的send方法里,所以,我们看send方法的源码。

这里,我们看源码的目的是找到为什么生产效率慢。所以,我采用了断点的方式,来定位到底是哪块代码,影响了速率。下面记录一下寻找过程。
由send方法经过一顿点入,进入了doSend方法,经过一番分析,初步猜测,是这个方法影响了效率,如下:
在这里插入图片描述
然后就在这里打个断点,然后再在它的下一行打个断点,用断点法验证是不是这里影响了性能。通过测试发现,从这个方法到下一个方法断点走的速度很快。而下一个断点到返回的send方法的时间,反而很慢。这就说明,我们猜测的这个断点,不是影响效率的地方。而是在这个断点的后面,某段代码,影响了效率。
索性把后面的代码,每行都打上断点,来测试到底哪行代码性能慢,如下图所示:
在这里插入图片描述
然后继续采用断点发,一个断点一个断点走,看哪个断点执行时间长。最终,发现是这个代码执行时间长:
在这里插入图片描述我们点进去这个方法,可以继续采用断点发一个一个排查哪个代码影响了性能,也可以大体看一遍方法,猜测一下可能哪里影响性能,缩小定位范围,最终,定位到是这里影响了性能:

在这里插入图片描述
这个方法就是最终的原因所在,我们把其全部代码贴出来:

/**
* Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool
* is configured with blocking mode.
*
* @param size The buffer size to allocate in bytes
* @param maxTimeToBlockMs The maximum time in milliseconds to block for buffer memory to be available
* @return The buffer
* @throws InterruptedException If the thread is interrupted while blocked
* @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block
* forever)
*/

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);
// loop over and over until we have a buffer or have reserved
// enough memory to allocate one
while (accumulated < size) {
long startWaitNs &#61; time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
waitingTimeElapsed &#61; !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} finally {
long endWaitNs &#61; time.nanoseconds();
timeNs &#61; Math.max(0L, endWaitNs - startWaitNs);
recordWaitTime(timeNs);
}
if (waitingTimeElapsed) {
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " &#43; maxTimeToBlockMs &#43; " ms.");
}
remainingTimeToBlockNs -&#61; timeNs;
// check if we can satisfy this request from the free list,
// otherwise allocate memory
if (accumulated &#61;&#61; 0 && size &#61;&#61; this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer &#61; this.free.pollFirst();
accumulated &#61; size;
} else {
// we&#39;ll need to allocate memory, but we may only get
// part of what we need on this iteration
freeUp(size - accumulated);
int got &#61; (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
this.nonPooledAvailableMemory -&#61; got;
accumulated &#43;&#61; got;
}
}
// Don&#39;t reclaim memory on throwable since nothing was thrown
accumulated &#61; 0;
} finally {
// When this loop was not able to successfully terminate don&#39;t loose available memory
this.nonPooledAvailableMemory &#43;&#61; accumulated;
this.waiters.remove(moreMemory);
}
}
} finally {
// signal any additional waiters if there is more memory left
// over for them
try {
if (!(this.nonPooledAvailableMemory &#61;&#61; 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
// Another finally... otherwise find bugs complains
lock.unlock();
}
}
if (buffer &#61;&#61; null)
return safeAllocateByteBuffer(size);
else
return buffer;
}

结合注释看代码&#xff0c;可以得知&#xff0c;核心原因如下图:
在这里插入图片描述
当超出size时&#xff0c;会进入阻塞&#xff0c;这就解释了为什么batch.size很大时&#xff0c;效率反而慢了。

那么还有一个疑问没解决&#xff0c;为什么key值设置成固定值时&#xff0c;效率很快呢&#xff0c;它没有阻塞吗&#xff1f;

通过断点可知&#xff0c;当key值固定时&#xff0c;代码没走进阻塞方法&#xff0c;它走了其他分支&#xff0c;所以效率很快&#xff0c;如下图:
在这里插入图片描述


总结

batch.size的值需要合理设置&#xff0c;否则会进入阻塞&#xff0c;效率很慢。






推荐阅读
  • 使用C++编写程序实现增加或删除桌面的右键列表项
    本文介绍了使用C++编写程序实现增加或删除桌面的右键列表项的方法。首先通过操作注册表来实现增加或删除右键列表项的目的,然后使用管理注册表的函数来编写程序。文章详细介绍了使用的五种函数:RegCreateKey、RegSetValueEx、RegOpenKeyEx、RegDeleteKey和RegCloseKey,并给出了增加一项的函数写法。通过本文的方法,可以方便地自定义桌面的右键列表项。 ... [详细]
  • 本文介绍了数据库的存储结构及其重要性,强调了关系数据库范例中将逻辑存储与物理存储分开的必要性。通过逻辑结构和物理结构的分离,可以实现对物理存储的重新组织和数据库的迁移,而应用程序不会察觉到任何更改。文章还展示了Oracle数据库的逻辑结构和物理结构,并介绍了表空间的概念和作用。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 本文介绍了C#中生成随机数的三种方法,并分析了其中存在的问题。首先介绍了使用Random类生成随机数的默认方法,但在高并发情况下可能会出现重复的情况。接着通过循环生成了一系列随机数,进一步突显了这个问题。文章指出,随机数生成在任何编程语言中都是必备的功能,但Random类生成的随机数并不可靠。最后,提出了需要寻找其他可靠的随机数生成方法的建议。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • 预备知识可参考我整理的博客Windows编程之线程:https:www.cnblogs.comZhuSenlinp16662075.htmlWindows编程之线程同步:https ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • 合并列值-合并为一列问题需求:createtabletab(Aint,Bint,Cint)inserttabselect1,2,3unionallsel ... [详细]
  • 超级简单加解密工具的方案和功能
    本文介绍了一个超级简单的加解密工具的方案和功能。该工具可以读取文件头,并根据特定长度进行加密,加密后将加密部分写入源文件。同时,该工具也支持解密操作。加密和解密过程是可逆的。本文还提到了一些相关的功能和使用方法,并给出了Python代码示例。 ... [详细]
  • 本文分析了Wince程序内存和存储内存的分布及作用。Wince内存包括系统内存、对象存储和程序内存,其中系统内存占用了一部分SDRAM,而剩下的30M为程序内存和存储内存。对象存储是嵌入式wince操作系统中的一个新概念,常用于消费电子设备中。此外,文章还介绍了主电源和后备电池在操作系统中的作用。 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • 本文介绍了设计师伊振华受邀参与沈阳市智慧城市运行管理中心项目的整体设计,并以数字赋能和创新驱动高质量发展的理念,建设了集成、智慧、高效的一体化城市综合管理平台,促进了城市的数字化转型。该中心被称为当代城市的智能心脏,为沈阳市的智慧城市建设做出了重要贡献。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 本文介绍了在处理不规则数据时如何使用Python自动提取文本中的时间日期,包括使用dateutil.parser模块统一日期字符串格式和使用datefinder模块提取日期。同时,还介绍了一段使用正则表达式的代码,可以支持中文日期和一些特殊的时间识别,例如'2012年12月12日'、'3小时前'、'在2012/12/13哈哈'等。 ... [详细]
author-avatar
坐着驴车追宝马
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有