热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kafka原理和集群测试

Kafka是一个消息系统,由LinkedIn贡献给Apache基金会,称为Apache的一个顶级项目。Kafka最初用作LinkedIn的活动流(activitystream)和运营数据处理管道(pipeline)的基

Kafka是一个消息系统,由LinkedIn贡献给Apache基金会,称为Apache的一个顶级项目。Kafka最初用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基

Kafka是一个消息系统,由LinkedIn贡献给Apache基金会,称为Apache的一个顶级项目。Kafka最初用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础。它具有可扩展、吞吐量大和可持久化等特征,以及非常好的分区、复制和容错特征。

Kafka的关键设计决策

1). Kafka在设计之时为就将持久化消息作为通常的使用情况进行了考虑。
2). Kafka主要的设计约束是吞吐量,而不是功能。
3). Kafka有关哪些数据已经被使用了的状态信息保存为数据使用者(consumer)的一部分,而不是保存在服务器之上。
4). Kafka是一种显式的分布式系统。它假设,数据生产者(producer)、代理(brokers)和数据使用者(consumer)分散于多台机器之上。
而相比而言,传统的消息队列不能很好的支持(如超长的未处理数据、不能有效持久化)。对于数据的可用性,Kafka提供了两个保证:
(1). 生产者发送到Topic的分区上消息将会按照它们发送的顺序,而消费者收到的消息也是此顺序
(2). 如果一个Topic配置了复制因子( replication facto)为N, 那么可以允许N-1服务器当掉而不丢失任何已经增加的消息

Kafka中几个关键术语

Topic:Kafka将消息种子(Feed)分门别类, 每一类的消息称之为话题(Topic).
Producer:发布消息的对象称之为话题生产者(Kafka topic producer)
Consumer:订阅消息并处理发布的消息的种子的对象称之为话题消费者(consumers)
Broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。

Kafka中的Topic

topic
Topic是发布的消息的类别或者种子Feed名。对于每一个Topic, Kafka集群维护这一个分区的log,就像下图中的示例:Kafka集群
每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分配了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。
Kafka集群保持所有的消息,直到它们过期,无论消息是否被消费了。
实际上消费者所持有的仅有的元数据就是这个偏移量,也就是消费者在这个log中的位置。 这个偏移量由消费者控制:正常情况当消费者消费消息的时候,偏移量也线性的的增加。但是实际偏移量由消费者控制,消费者可以将偏移量重置为更老的一个偏移量,重新读取消息。
可以看到这种设计对消费者来说操作自如, 一个消费者的操作不会影响其它消费者对此log的处理。
再说说分区。Kafka中采用分区的设计有几个目的。
、可以处理更多的消息,不受单台服务器的限制。Topic拥有多个分区意味着它可以进行扩展,并处理更多的数据。
、分区可以作为并行处理的单元。
Topic的分区Log被分布到集群中的多个服务器上。每个服务器处理它持有的分区。 根据配置每个分区还可以复制到其它服务器作为备份容错。
每个分区有一个leader,零或多个replica。Leader处理此分区的所有的读写请求而replica被动的复制数据。如果leader当机,其它的一个replica会被推举为新的leader。
一台服务器可能同时是一个分区的leader,另一个分区的replica。 这样可以平衡负载,避免所有的请求都只让一台或者某几台服务器处理。
关于复制原理,参考下面官档翻译:
Kafka 的集群复制设计

Kafka的集群部署

Kafka中主要有三种模式,
单机broker模式
单机多broker模式(伪分布式)
多机多broker模式(集群)
和hadoop一样,前两种多用于开发测试。第三种才是实际生产中可用的部署模式,下面介绍一下三节点kafka集群的部署流程
软件的安装直接解压缩即可:

tar xzvf kafka_2.10-0.8.1.1.tgz
mkdir /var/kafka && mkdir /var/zookeeper

关键参数的解释,可以参考http://debugo.com/kafka-params/
vim kafka_2.10-0.8.1.1/config/server.properties

#在默认的配置上,我只修改了3个地方。三个主机debugo01,debugo02,debugo03分别对应id为1,2,3
broker.id=3
log.dirs=/var/kafka
zookeeper.cOnnect=debugo01:2181,debugo02:2181,debugo03:2181
配置zookeeper,修改DataDir并加入集群参数
vim kafka_2.10-0.8.1.1/config/zookeeper.properties
initLimit=5
syncLimit=2
server.1=debugo01:2888:3888  
server.2=debugo02:2888:3888  
server.3=debugo03:2888:3888
dataDir=/var/zookeeper
#分别将1,2,3写入三个主机的myid文件
echo "1" >> /var/zookeeper/myid

在debugo01,debugo02,debugo03上分别启动zookeeper和kafka Server

bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动kafka Server
bin/kafka-server-start.sh config/server.properties

这时可以在log中找到,新的broker已经将数据注册到znode中。

#####debugo01#####
[2014-12-07 20:54:20,506] INFO Awaiting socket connections on debugo01:9092. (kafka.network.Acceptor)
[2014-12-07 20:54:20,521] INFO [Socket Server on Broker 1], Started (kafka.network.SocketServer)
[2014-12-07 20:54:20,649] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-12-07 20:54:20,725] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2014-12-07 20:54:20,876] INFO Registered broker 1 at path /brokers/ids/1 with address debugo01:9092. (kafka.utils.ZkUtils$)
[2014-12-07 20:54:20,907] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
[2014-12-07 20:54:20,993] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
#####debugo02#####
[2014-12-07 20:54:35,896] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2014-12-07 20:54:35,913] INFO [Socket Server on Broker 2], Started (kafka.network.SocketServer)
[2014-12-07 20:54:36,073] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-12-07 20:54:36,179] INFO conflict in /controller data: {"version":1,"brokerid":2,"timestamp":"1417956876081"} stored data: {"version":1,"brokerid":1,"timestamp":"1417956860689"} (kafka.utils.ZkUtils$)
[2014-12-07 20:54:36,398] INFO Registered broker 2 at path /brokers/ids/2 with address debugo02:9092. (kafka.utils.ZkUtils$)
[2014-12-07 20:54:36,420] INFO [Kafka Server 2], started (kafka.server.KafkaServer)
#####debugo03#####
[2014-12-07 20:54:43,535] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2014-12-07 20:54:43,549] INFO [Socket Server on Broker 3], Started (kafka.network.SocketServer)
[2014-12-07 20:54:43,728] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-12-07 20:54:43,783] INFO conflict in /controller data: {"version":1,"brokerid":3,"timestamp":"1417956883737"} stored data: {"version":1,"brokerid":1,"timestamp":"1417956860689"} (kafka.utils.ZkUtils$)
[2014-12-07 20:54:43,999] INFO Registered broker 3 at path /brokers/ids/3 with address debugo03:9092. (kafka.utils.ZkUtils$)
[2014-12-07 20:54:44,018] INFO [Kafka Server 3], started (kafka.server.KafkaServer)

Topic的分区和复制

1. 创建debugo01,这个topic分区数为3,复制为1(不复制)。该topic跨越全部broker。下面管理命令在任意kafka节点上执行即可

bin/kafka-topics.sh --create --zookeeper debugo01,debugo02,debugo03 --replication-factor 1 --partitions 3 --topic debugo01
Created topic "debugo01".

2. 创建debugo02,这个topic分区数为1,复制为3(每个主机都有一份)。该topic跨越全部broker。下面管理命令在任意kafka节点上执行即可

bin/kafka-topics.sh --create --zookeeper debugo01,debugo02,debugo03 --replication-factor 3 --partitions 1 --topic debugo02

3. 列出topic信息

[root@debugo01 kafka_2.10-0.8.1.1]# bin/kafka-topics.sh --list --zookeeper localhost:2181
debugo01
debugo02

4. 列出topic描述信息

[root@debugo01 kafka_2.10-0.8.1.1]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic debugo01
Topic:debugo01	PartitionCount:3	ReplicationFactor:1	Configs:
	Topic: debugo01	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: debugo01	Partition: 1	Leader: 2	Replicas: 2	Isr: 2
	Topic: debugo01	Partition: 2	Leader: 3	Replicas: 3	Isr: 3

5. 检查log目录,对于topic debugo01,debugo01为0号分区,debugo02为1号分区。而topic debugo02则复制了3份,都为0号分区

[root@debugo01 kafka]# ll
total 24
drwxr-xr-x 2 root root 4096 Dec  7 21:15 debugo01-0
drwxr-xr-x 2 root root 4096 Dec  7 21:16 debugo02-0
[root@debugo02 kafka]# ll
total 24
drwxr-xr-x 2 root root 4096 Dec  7 21:15 debugo01-1
drwxr-xr-x 2 root root 4096 Dec  7 21:16 debugo02-0
#而每个分区下面都生成了index和log文件
[root@debugo01 debugo01-0]# ls
00000000000000000000.index  00000000000000000000.log

6. 下面topic debugo03,replication-factor为2,partition为3.那么broker id为1的debugo01会如下面describe所示,保存0号分区和1号分区。
而0号分区的repica leader为broker id = 3,包含3和1两个replicas。

bin/kafka-topics.sh --create --zookeeper debugo01,debugo02,debugo03 --replication-factor 2 --partitions 3 --topic debugo03
Created topic "debugo03".
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic debugo03
[root@debugo01 kafka_2.10-0.8.1.1]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic debugo03
Topic:debugo03	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: debugo03	Partition: 0	Leader: 3	Replicas: 3,1	Isr: 3,1
	Topic: debugo03	Partition: 1	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: debugo03	Partition: 2	Leader: 2	Replicas: 2,3	Isr: 2,3
[root@debugo01 kafka_2.10-0.8.1.1]# ll /var/kafka/debugo03*
/var/kafka/debugo03-0:
total 0
-rw-r--r-- 1 root root 10485760 Dec  7 21:34 00000000000000000000.index
-rw-r--r-- 1 root root        0 Dec  7 21:34 00000000000000000000.log
/var/kafka/debugo03-1:
total 0
-rw-r--r-- 1 root root 10485760 Dec  7 21:34 00000000000000000000.index
-rw-r--r-- 1 root root        0 Dec  7 21:34 00000000000000000000.log

消息的产生和消费

两个终端分别打开producer和consumer进行测试


bin/kafka-console-producer.sh --broker-list debugo01:9092 --topic debugo03
hello kafka
hello debugo

bin/kafka-console-consumer.sh --zookeeper debugo01:2181 --from-beginning --topic debugo03
hello kafka
hello debugo

下面使用perf命令来测试几个topic的性能,需要先下载kafka-perf_2.10-0.8.1.1.jar,并拷贝到kafka/libs下面。
50W条消息,每条1000字节,batch大小1000,topic为debugo01,4个线程(message size设置太大需要调整相关参数,否则容易OOM)。只用了13秒完成,kafka在多分区支持下吞吐量是非常给力的。

bin/kafka-producer-perf-test.sh --messages 500000 --message-size 1000  --batch-size 1000 --topics debugo01 --threads 4 --broker-list debugo01:9092,debugo02:9092,debugo03:9092
start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec
2014-12-07 22:07:56:038, 2014-12-07 22:08:09:413, 0, 1000, 1000, 476.84, 35.6514, 500000, 37383.1776

同样的参数测试debugo02, 由于但分区加复制(replicas-factor=3),用时39秒。所以,适当加大partition数量和broker相关线程数量会极大的提高性能。

bin/kafka-producer-perf-test.sh --messages 500000 --message-size 1000  --batch-size 1000 --topics debugo02 --threads 4 --broker-list debugo01:9092,debugo02:9092,debugo03:9092
start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec
2014-12-07 22:13:28:840, 2014-12-07 22:14:07:819, 0, 1000, 1000, 476.84, 12.2332, 500000, 12827.4199

同样的参数测试debugo03,用时30秒。

bin/kafka-producer-perf-test.sh --messages 500000 --message-size 1000  --batch-size 1000 --topics debugo03 --threads 4 --broker-list debugo01:9092,debugo02:9092,debugo03:9092
start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec
2014-12-07 22:16:04:895, 2014-12-07 22:16:34:715, 0, 1000, 1000, 476.84, 15.9905, 500000, 16767.2703

同理,测试comsumer的性能。

bin/kafka-consumer-perf-test.sh --zookeeper debugo01,debugo02,debugo03 --messages 500000 --topic debugo01 --threads 3
start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2014-12-07 22:19:04:527, 2014-12-07 22:19:17:184, 1048576, 476.8372, 62.2747, 500000, 65299.7257
bin/kafka-consumer-perf-test.sh --zookeeper debugo01,debugo02,debugo03 --messages 500000 --topic debugo02 --threads 3
start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
[2014-12-07 22:19:59,938] WARN [perf-consumer-78853_debugo01-1417961999315-4a5941ef], No broker partitions consumed by consumer thread perf-consumer-78853_debugo01-1417961999315-4a5941ef-1 for topic debugo02 (kafka.consumer.ZookeeperConsumerConnector)
[2014-12-07 22:19:59,938] WARN [perf-consumer-78853_debugo01-1417961999315-4a5941ef], No broker partitions consumed by consumer thread perf-consumer-78853_debugo01-1417961999315-4a5941ef-2 for topic debugo02 (kafka.consumer.ZookeeperConsumerConnector)
2014-12-07 22:20:01:008, 2014-12-07 22:20:08:971, 1048576, 476.8372, 160.9305, 500000, 168747.8907
bin/kafka-consumer-perf-test.sh --zookeeper debugo01,debugo02,debugo03 --messages 500000 --topic debugo03 --threads 3
start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
?2014-12-07 22:21:27:421, 2014-12-07 22:21:39:918, 1048576, 476.8372, 63.6037, 500002, 66693.6108

^^

参考

http://blog.csdn.net/smallnest/article/details/38491483

http://www.350351.com/jiagoucunchu/xiaoxixitong/46720.html

http://kafka.apache.org/documentation.html

http://backend.blog.163.com/blog/static/202294126201431723734212/

http://www.inter12.org/archives/842

推荐阅读
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • 如何实现织梦DedeCms全站伪静态
    本文介绍了如何通过修改织梦DedeCms源代码来实现全站伪静态,以提高管理和SEO效果。全站伪静态可以避免重复URL的问题,同时通过使用mod_rewrite伪静态模块和.htaccess正则表达式,可以更好地适应搜索引擎的需求。文章还提到了一些相关的技术和工具,如Ubuntu、qt编程、tomcat端口、爬虫、php request根目录等。 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 这是原文链接:sendingformdata许多情况下,我们使用表单发送数据到服务器。服务器处理数据并返回响应给用户。这看起来很简单,但是 ... [详细]
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
  • 本文介绍了一些Java开发项目管理工具及其配置教程,包括团队协同工具worktil,版本管理工具GitLab,自动化构建工具Jenkins,项目管理工具Maven和Maven私服Nexus,以及Mybatis的安装和代码自动生成工具。提供了相关链接供读者参考。 ... [详细]
  • REVERT权限切换的操作步骤和注意事项
    本文介绍了在SQL Server中进行REVERT权限切换的操作步骤和注意事项。首先登录到SQL Server,其中包括一个具有很小权限的普通用户和一个系统管理员角色中的成员。然后通过添加Windows登录到SQL Server,并将其添加到AdventureWorks数据库中的用户列表中。最后通过REVERT命令切换权限。在操作过程中需要注意的是,确保登录名和数据库名的正确性,并遵循安全措施,以防止权限泄露和数据损坏。 ... [详细]
  • 本文分析了Wince程序内存和存储内存的分布及作用。Wince内存包括系统内存、对象存储和程序内存,其中系统内存占用了一部分SDRAM,而剩下的30M为程序内存和存储内存。对象存储是嵌入式wince操作系统中的一个新概念,常用于消费电子设备中。此外,文章还介绍了主电源和后备电池在操作系统中的作用。 ... [详细]
  • 解决Sharepoint 2013运行状况分析出现的“一个或多个服务器未响应”问题的方法
    本文介绍了解决Sharepoint 2013运行状况分析中出现的“一个或多个服务器未响应”问题的方法。对于有高要求的客户来说,系统检测问题的存在是不可接受的。文章详细描述了解决该问题的步骤,包括删除服务器、处理分布式缓存留下的记录以及使用代码等方法。同时还提供了相关关键词和错误提示信息,以帮助读者更好地理解和解决该问题。 ... [详细]
  • 目录浏览漏洞与目录遍历漏洞的危害及修复方法
    本文讨论了目录浏览漏洞与目录遍历漏洞的危害,包括网站结构暴露、隐秘文件访问等。同时介绍了检测方法,如使用漏洞扫描器和搜索关键词。最后提供了针对常见中间件的修复方式,包括关闭目录浏览功能。对于保护网站安全具有一定的参考价值。 ... [详细]
  • Sleuth+zipkin链路追踪SpringCloud微服务的解决方案
    在庞大的微服务群中,随着业务扩展,微服务个数增多,系统调用链路复杂化。Sleuth+zipkin是解决SpringCloud微服务定位和追踪的方案。通过TraceId将不同服务调用的日志串联起来,实现请求链路跟踪。通过Feign调用和Request传递TraceId,将整个调用链路的服务日志归组合并,提供定位和追踪的功能。 ... [详细]
  • 本文总结了初学者在使用dubbo设计架构过程中遇到的问题,并提供了相应的解决方法。问题包括传输字节流限制、分布式事务、序列化、多点部署、zk端口冲突、服务失败请求3次机制以及启动时检查。通过解决这些问题,初学者能够更好地理解和应用dubbo设计架构。 ... [详细]
author-avatar
齐鲁墨_931
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有