热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

查看kafka版本_熬夜码了万字的kafka架构分析与集群搭建使用

点关注,不迷路!1.kafka简介Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(r

点关注,不迷路!

1.kafka简介

1de9b0559eb43a6a618167c9de7c2209.png

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

优势:

高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒;

可扩展性:kafka集群支持热扩展;

持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;

容错性:允许集群中节点故障(若副本数量为n,则允许n-1个节点故障);

高并发:支持数千个客户端同时读写。

版本演进

  • Kafka 0.8 之后引入了副本机制, Kafka 成为了一个分布式高可靠消息队列解决方案。

  • 0.8.2.0 版本社区引入了新版本 Producer API , 需要指定 Broker 地址,但是bug比较多

  • 0.9.0.0 版本增加了基础的安全认证 / 权限功能,同时使用 Java 重写了新版本 Consumer API ,还引入了 Kafka Connect 组件用于实现高性能的数据抽取,同样也是bug多

  • 0.10.0.0 是里程碑式的大版本,因为该版本引入了 Kafka Streams。从这个版本起,Kafka 正式升级成分布式流处理平台

  • 0.11.0.0 版本,引入了两个重量级的功能变更:一个是提供幂等性 Producer API 以及事务(Transaction) API ;另一个是对 Kafka 消息格式做了重构。

  • 1.0和2.0 主要是对Kafka Streams 的各种改进,在消息引擎方面并未引入太多的重大功能特性

1.0中文文档:http://kafka.apachecn.org/documentation.html#introduction

1.1文档:http://kafka.apache.org/11/documentation.html

术语

  • Record(消息):Kafka 处理的主要对象。

  • Topic(主题):主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。

  • Partition(分区):一个有序不变的消息序列。每个Topic下可以有多个分区。

  • Offset(消息位移):表示分区中每条消息的位置信息,是一个单调递增且不变的值。

  • Replica(副本):Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者(leader)副本和追随者(follower)副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。

  • Broker(代理):Kafka以集群的方式运行,集群中的每一台服务器称之为一个代理(broker)

  • Producer(生产者):消息生产者,向Broker发送消息的客户端。

  • Consumer(消费者):消息消费者,从Broker读取消息的客户端。

  • Consumer Offset(消费者位移):表征消费者消费进度,每个消费者都有自己的消费者位移。

  • Consumer Group(消费者组):每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的 Consumer Group消费,但是一个 Consumer Group中只能有一个Consumer 能够消费该消息。

每一个Topic,下面可以有多个分区(Partition)日志文件 。Partition是一个有序的message序列,这些message按顺序添加到一个叫做commit log的文件中。每个partition中的消息都有一个唯一的编号,称之为offset,用来唯一标示某个分区中的message。每个consumer是基于自己在commit log中的消费进度(offset)来进行工作的。在kafka中,消费offset由consumer自己来维护;一般情况下我们按照顺序逐条消费commit log中的消息,当然我可以通过指定offset来重复消费某些消息,或者跳过某些消息。

a028ed474fe8d3f9c6cf77b1d85a20dc.png

cfb2830d9610c4d92b1dd49163a0a650.png

传统的消息传递模式有2种:队列( queue) 和(publish-subscribe)

  • queue模式:多个consumer从服务器中读取数据,消息只会到达一个consumer。

  • publish-subscribe模式:消息会被广播给所有的consumer。

Kafka基于这2种模式提供了一种consumer的抽象概念:consumer group。

  • queue模式:所有的consumer都位于同一个consumer group 下。

  • publish-subscribe模式:所有的consumer都有着自己唯一的consumer group。

d544d19ac1faf503aeaa2a16eb9f8861.png

架构图

7ed81d111ee425b1c72ac86b60303691.png

fc53e5348db51000ab31b0e2899c011c.png

应用场景

它可以用于两大类别的应用:

  1. 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。(相当于message queue)

  2. 构建实时流式应用程序,对这些流数据进行转换或者影响。(就是流处理,通过kafka stream topic和topic之间内部进行变化)

42f4bd2e98fe1dd4f54ab3c9d44deee3.png

日志收集:用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer;

消息系统:解耦生产者和消费者、缓存消息等;

用户活动跟踪:kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后消费者通过订阅这些topic来做实时的监控分析,亦可保存到数据库;

运营指标:kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;

流式处理:比如spark streaming和storm。

2.kafka使用与集群搭建

环境准备

Kafka是用Scala语言开发的,运行在JVM上,在安装Kafka之前需要先安装JDK

kafka依赖zookeeper,需要先安装zookeeper,下载地址

wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gztar ‐zxvf zookeeper‐3.4.9.tar.gzcd zookeeper‐3.4.9cp conf/zoo_sample.cfg conf/zoo.cfg#启动zookeeper服务bin/zkServer.sh start#启动客户端bin/zkCli.sh

下载kafka安装包

参考 http://kafka.apache.org/11/documentation.html#quickstart

wget https://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgztar ‐zxvf kafka_2.11-1.1.1.tgzcd kafka_2.11-1.1.1.tgz

启动kafka服务

bin/kafka-server-start.sh config/server.properties

server.properties是kafka核心配置文件

http://kafka.apache.org/11/documentation.html#brokerconfigs

Property

Default

Description

broker.id

0

每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯一的即可。

log.dirs

/tmp/kafka-logs

kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;

每当创建新partition时,都会选择在包含最少partitions的路径下进行。

listeners

9092

server接受客户端连接的端口

zookeeper.connect

localhost:2181

zooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接方式为 hostname1:port1, hostname2:port2, hostname3:port3

log.retention.hours

168

每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样。

min.insync.replicas

1

当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常

delete.topic.enable

false

是否允许删除主题

创建主题

# 创建分区数是1,副本数是1的主题为test的topicbin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test# 查看topic列表bin/kafka-topics.sh --list --zookeeper localhost:2181# 查看命令帮助bin/kafka-topics.sh

启动Producer发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

启动Consumer消费消息

# --group 指定消费组 --from-beginning 从头开始消费 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group consumer1 --from-beginning# 查看消费组bin/kafka‐consumer‐groups.sh ‐‐bootstrap‐server localhost:9092 ‐‐list

  • 单播消息:kafka中,在同一个消费组里 ,一条消息只能被某一个消费者消费

  • 多播消息:针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组

集群配置

配置3个broker

> cp config/server.properties config/server-1.properties> cp config/server.properties config/server-2.propertiesconfig/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2#启动brokerbin/kafka‐server‐start.sh ‐daemon config/server‐1.propertiesbin/kafka‐server‐start.sh ‐daemon config/server‐2.properties#创建一个 副本为3,分区为3的topicbin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-replicated-topic# 查看topic的情况bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

  • leader节点负责给定partition的所有读写请求。

  • replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。

  • isr 是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。

3.Java中kafka‐clients 应用

Java中使用kafka,引入maven依赖

org.apache.kafka kafka-clients 1.1.1

Producer Configs

kafka不同版本的配置可能存在一些差异,以官方文档参考为准

参考:http://kafka.apachecn.org/documentation.html#producerapi

Consumer Configs

Producer代码

public class Producer { public static void main(String[] args) { Properties properties &#61; new Properties(); // 配置 broker 地址 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.14:9092"); /** 发出消息持久化机制参数 (1)acks&#61;0&#xff1a;表示producer不需要等待任何broker确认收到消息的回复&#xff0c;就可以继续发送下一条消息。 性能最高&#xff0c;但是最容易丢消息。 (2)acks&#61;1&#xff1a;至少要等待leader已经成功将数据写入本地log&#xff0c;但是不需要等待所有follower 是否成功写入。就可以继续发送下一条消息。这种情况下&#xff0c;如果follower没有成功备份数据&#xff0c; 而此时leader又挂掉&#xff0c;则消息会丢失。 (3)acks&#61;-1或all&#xff1a;这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数) 都成功写入日志&#xff0c;这种策略会保证只要有一个备份存活就不会丢失数据。 这是最强的数据保证。一般除非是金融级别&#xff0c;或跟钱打交道的场景才会使用这种配置。 */ properties.put(ProducerConfig.ACKS_CONFIG,"1"); //发送失败会重试&#xff0c;默认重试间隔100ms&#xff0c;重试能保证消息发送的可靠性&#xff0c;但是也可能造成消息 // 重复发送&#xff0c;比如网络抖动&#xff0c;所以需要在接收者那边做好消息接收的幂等性处理 properties.put(ProducerConfig.RETRIES_CONFIG, 3); //重试间隔设置 properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); //设置发送消息的本地缓冲区&#xff0c;如果设置了该缓冲区&#xff0c;消息会先发送到本地缓冲区&#xff0c;可以提高消息发送性能&#xff0c; // 默认值是33554432&#xff0c;即32MB properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); //kafka本地线程会从缓冲区取数据&#xff0c;批量发送到broker&#xff0c; //设置批量发送消息的大小&#xff0c;默认值是16384&#xff0c;即16kb&#xff0c;就是说一个batch满了16kb就发送出去 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //默认值是0&#xff0c;意思就是消息必须立即被发送&#xff0c;但这样会影响性能 //一般设置100毫秒左右&#xff0c;就是说这个消息发送完后会进入本地的一个batch&#xff0c;如果100毫秒内&#xff0c; // 这个batch满了16kb就会随batch一起被发送出去 //如果100毫秒内&#xff0c;batch没满&#xff0c;那么也必须把消息发送出去&#xff0c;不能让消息的发送延迟时间太长 properties.put(ProducerConfig.LINGER_MS_CONFIG, 100); //把发送的key从字符串序列化为字节数组 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //把发送消息value从字符串序列化为字节数组 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //获取到producer KafkaProducer producer &#61; new KafkaProducer<>(properties); int messageNo &#61; 1; boolean isAsync &#61; false; while (true){ String messageStr &#61; "Message_" &#43; messageNo; long startTime &#61; System.currentTimeMillis(); //指定发送分区 ProducerRecord record &#61; new ProducerRecord<>("topicTest1",0,messageNo&#43;"",messageStr); if (isAsync) { // Send asynchronously producer.send(record, new DemoCallBack(startTime, messageNo, messageStr)); } else { // Send synchronously try { producer.send(record).get(); System.out.println("Sent message: (" &#43; messageNo &#43; ", " &#43; messageStr &#43; ")"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } if (messageNo&#61;&#61;10){ break; } &#43;&#43;messageNo; } }}class DemoCallBack implements Callback { private final long startTime; private final int key; private final String message; public DemoCallBack(long startTime, int key, String message) { this.startTime &#61; startTime; this.key &#61; key; this.message &#61; message; } /** * A callback method the user can implement to provide asynchronous handling of request completion. This method will * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be * non-null. * * &#64;param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error * occurred. * &#64;param exception The exception thrown during processing of this record. Null if no error occurred. */ &#64;Override public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime &#61; System.currentTimeMillis() - startTime; if (metadata !&#61; null) { System.out.println( "message(" &#43; key &#43; ", " &#43; message &#43; ") sent to partition(" &#43; metadata.partition() &#43; "), " &#43; "offset(" &#43; metadata.offset() &#43; ") in " &#43; elapsedTime &#43; " ms"); } else { exception.printStackTrace(); } }}

Consumer代码

public class Consumer { public static void main(String[] args) { Properties props &#61; new Properties(); // 配置broker地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.3.14:9092"); // 消费组 props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); // 是否自动提交offset props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交offset的间隔时间 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); /* 心跳时间&#xff0c;服务端broker通过心跳确认consumer是否故障&#xff0c;如果发现故障&#xff0c;就会通过心跳下发 rebalance的指令给其他的consumer通知他们进行rebalance操作&#xff0c;这个时间可以稍微短一点 */ props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000"); //服务端broker多久感知不到一个consumer心跳就认为他故障了&#xff0c;默认是10秒 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 获取consumer KafkaConsumer consumer &#61; new KafkaConsumer<>(props); String topicName &#61; "topicTest1"; // 指定分区 consumer.assign(Arrays.asList(new TopicPartition(topicName, 0))); //消息回溯消费 // consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0))); //指定offset消费 // consumer.seek(new TopicPartition(topicName, 0), 10); while(true){ /* * poll() API 是拉取消息的长轮询&#xff0c;主要是判断consumer是否还活着&#xff0c;只要我们持续调用poll()&#xff0c; * 消费者就会存活在自己所在的group中&#xff0c;并且持续的消费指定partition的消息。 * 底层是这么做的&#xff1a;消费者向server持续发送心跳&#xff0c;如果一个时间段(session. * timeout.ms)consumer挂掉或是不能发送心跳&#xff0c;这个消费者会被认为是挂掉了&#xff0c; * 这个Partition也会被重新分配给其他consumer */ ConsumerRecords records &#61; consumer.poll(1000); for (ConsumerRecord record : records) { System.out.println("Received message: (" &#43; record.key() &#43; ", " &#43; record.value() &#43; ") at offset " &#43; record.offset()); } if (records.count() > 0) { // 提交offset consumer.commitSync(); } } }}




推荐阅读
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • 如何在服务器主机上实现文件共享的方法和工具
    本文介绍了在服务器主机上实现文件共享的方法和工具,包括Linux主机和Windows主机的文件传输方式,Web运维和FTP/SFTP客户端运维两种方式,以及使用WinSCP工具将文件上传至Linux云服务器的操作方法。此外,还介绍了在迁移过程中需要安装迁移Agent并输入目的端服务器所在华为云的AK/SK,以及主机迁移服务会收集的源端服务器信息。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • Google Play推出全新的应用内评价API,帮助开发者获取更多优质用户反馈。用户每天在Google Play上发表数百万条评论,这有助于开发者了解用户喜好和改进需求。开发者可以选择在适当的时间请求用户撰写评论,以获得全面而有用的反馈。全新应用内评价功能让用户无需返回应用详情页面即可发表评论,提升用户体验。 ... [详细]
  • 本文介绍了Java高并发程序设计中线程安全的概念与synchronized关键字的使用。通过一个计数器的例子,演示了多线程同时对变量进行累加操作时可能出现的问题。最终值会小于预期的原因是因为两个线程同时对变量进行写入时,其中一个线程的结果会覆盖另一个线程的结果。为了解决这个问题,可以使用synchronized关键字来保证线程安全。 ... [详细]
  • 网络请求模块选择——axios框架的基本使用和封装
    本文介绍了选择网络请求模块axios的原因,以及axios框架的基本使用和封装方法。包括发送并发请求的演示,全局配置的设置,创建axios实例的方法,拦截器的使用,以及如何封装和请求响应劫持等内容。 ... [详细]
  • 目录浏览漏洞与目录遍历漏洞的危害及修复方法
    本文讨论了目录浏览漏洞与目录遍历漏洞的危害,包括网站结构暴露、隐秘文件访问等。同时介绍了检测方法,如使用漏洞扫描器和搜索关键词。最后提供了针对常见中间件的修复方式,包括关闭目录浏览功能。对于保护网站安全具有一定的参考价值。 ... [详细]
  • ZABBIX 3.0 配置监控NGINX性能【OK】
    1.在agent端查看配置:nginx-V查看编辑时是否加入状态监控模块:--with-http_stub_status_module--with-http_gzip_stat ... [详细]
  • 有意向可以发简历到邮箱内推.简历直达组内Leader.能做同事的话,内推奖励全给你. ... [详细]
  • 构建LNMP架构平台
    LNMP架构的组成:Linux、Nginx、MySQL、PHP关于NginxNginx与apache的作用一样,都是为了搭建网站服务器,由俄罗斯人lgorsysoev开发,其特点是 ... [详细]
  • 【转】腾讯分析系统架构解析
    TA(TencentAnalytics,腾讯分析)是一款面向第三方站长的免费网站分析系统,在数据稳定性、及时性方面广受站长好评,其秒级的实时数据更新频率也获得业界的认可。本文将从实 ... [详细]
author-avatar
mobiledu2502884243
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有