热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

学习笔记Kafka——KafkaConsumerAPI及开发实例

篇首语:本文由编程笔记#小编为大家整理,主要介绍了学习笔记Kafka—— Kafka Consumer API及开发实例相关的知识,希望对你有一定的参考价值。 一、Kafka Consumer API

篇首语:本文由编程笔记#小编为大家整理,主要介绍了学习笔记Kafka—— Kafka Consumer API及开发实例相关的知识,希望对你有一定的参考价值。



一、Kafka Consumer API

1.1、Consumer


1.2、KafkaConsumer


1.3、ConsumerRecords


1.4、ConsumerRecord


1.5、KafkaConsumer 实战

package demo02;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class SimpleConsumer
public static void main(String[] args)
String topic = "test_02_02";
String group = "test_group";
Map<String, Object> kafkaProperties &#61; new HashMap<>();
kafkaProperties.put("bootstrap.servers", "node100:9092,node101:9092,node102:9092");
kafkaProperties.put("group.id", group);
kafkaProperties.put("enable.auto.commit","true");
kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer &#61; new KafkaConsumer<>(kafkaProperties);
consumer.subscribe(Arrays.asList(topic));
while (true)
ConsumerRecords<String, String> records &#61; consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset &#61; %d, key &#61; %s, value &#61; %s%n", record.offset(), record.key(), record.value());



结果&#xff1a;


二、Producer & Consumer整合实战
  • 1、设计一个工具类可以返回随机字符串

WordUtil .java

package demo03;
import org.apache.kafka.common.protocol.types.Field;
import java.util.Random;
public class WordUtil
public static final String[] WORDS &#61; "A traditional queue retains records in-order on the server, and if multiple consumers consume from the queue then the server hands out records in the order they are stored. However, although the server hands out records in order, the records are delivered asynchronously to consumers, so they may arrive out of order on different consumers. This effectively means the ordering of the records is lost in the presence of parallel consumption. Messaging systems often work around this by having a notion of \\"exclusive consumer\\" that allows only one process to consume from a queue, but of course this means that there is no parallelism in processing. Kafka does it better. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. This is achieved by assigning the partitions in the topic to the consumers in the consumer group so that each partition is consumed by exactly one consumer in the group. By doing this we ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. Note however that there cannot be more consumer instances in a consumer group than partitions.".split(" ");
static Random random &#61; new Random();
public static KV generateRandom()
int index &#61; random.nextInt(WORDS.length);
return new KV(String.valueOf(index),WORDS[index]);

public static void main(String[] args)
for(int i&#61;0;i<10;i&#43;&#43;)
KV kv &#61; generateRandom();
System.out.printf("key: %s, value: %s\\n",kv.getK(),kv.getV());



KV.java

package demo03;
import org.apache.kafka.common.protocol.types.Field;
public class KV
public String k;
public String v;
public KV(String k, String v)
this.k &#61; k;
this.v &#61; v;

public String getK()
return k;

public void setK(String k)
this.k &#61; k;

public String getV()
return v;

public void setV(String v)
this.v &#61; v;


执行WordUtil.java结果&#xff1a;


  • 2、设计Producer可以每秒发送数据

TimerProducer.java

package demo03;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
public class TimerProducer
public static void main(String[] args) throws InterruptedException
String topic &#61; "test_02_02";
Map<String,Object> kafkaProperties &#61; new HashMap<>();
kafkaProperties.put("bootstrap.servers", "node100:9092,node101:9092,node102:9092");
kafkaProperties.put("acks", "all");
kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer &#61; new KafkaProducer<>(kafkaProperties);
int size &#61; 60;
for (int i &#61; 0; i < size; i&#43;&#43;)
Thread.sleep(1000L);
KV kv &#61; WordUtil.generateRandom();
producer.send(new ProducerRecord<>(topic, kv.getK(), kv.getV()));

producer.close();


TimerConsumer.java

package demo03;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class TimerConsumer
public static void main(String[] args)
String topic &#61; "test_02_02";
String group &#61; "test_group";
Map<String, Object> kafkaProperties &#61; new HashMap<>();
kafkaProperties.put("bootstrap.servers", "node100:9092,node101:9092,node102:9092");
kafkaProperties.put("group.id", group);
kafkaProperties.put("enable.auto.commit", "true");
kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer &#61; new KafkaConsumer<>(kafkaProperties);
consumer.subscribe(Arrays.asList(topic));
while (true)
ConsumerRecords<String, String> records &#61; consumer.poll(Duration.ofMillis(3000));
System.out.printf("\\nTime: %s\\n",new Date());
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset &#61; %d, key &#61; %s, value &#61; %s%n", record.offset(), record.key(), record.value());



结果&#xff1a;


推荐阅读
  • Java自带的观察者模式及实现方法详解
    本文介绍了Java自带的观察者模式,包括Observer和Observable对象的定义和使用方法。通过添加观察者和设置内部标志位,当被观察者中的事件发生变化时,通知观察者对象并执行相应的操作。实现观察者模式非常简单,只需继承Observable类和实现Observer接口即可。详情请参考Java官方api文档。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 使用eclipse创建一个Java项目的步骤
    本文介绍了使用eclipse创建一个Java项目的步骤,包括启动eclipse、选择New Project命令、在对话框中输入项目名称等。同时还介绍了Java Settings对话框中的一些选项,以及如何修改Java程序的输出目录。 ... [详细]
  • 本文介绍了闭包的定义和运转机制,重点解释了闭包如何能够接触外部函数的作用域中的变量。通过词法作用域的查找规则,闭包可以访问外部函数的作用域。同时还提到了闭包的作用和影响。 ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 如何用UE4制作2D游戏文档——计算篇
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了如何用UE4制作2D游戏文档——计算篇相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 本文介绍了Hyperledger Fabric外部链码构建与运行的相关知识,包括在Hyperledger Fabric 2.0版本之前链码构建和运行的困难性,外部构建模式的实现原理以及外部构建和运行API的使用方法。通过本文的介绍,读者可以了解到如何利用外部构建和运行的方式来实现链码的构建和运行,并且不再受限于特定的语言和部署环境。 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • 数组的排序:数组本身有Arrays类中的sort()方法,这里写几种常见的排序方法。(1)冒泡排序法publicstaticvoidmain(String[]args ... [详细]
  • 面向对象之3:封装的总结及实现方法
    本文总结了面向对象中封装的概念和好处,以及在Java中如何实现封装。封装是将过程和数据用一个外壳隐藏起来,只能通过提供的接口进行访问。适当的封装可以提高程序的理解性和维护性,增强程序的安全性。在Java中,封装可以通过将属性私有化并使用权限修饰符来实现,同时可以通过方法来访问属性并加入限制条件。 ... [详细]
  • (三)多表代码生成的实现方法
    本文介绍了一种实现多表代码生成的方法,使用了java代码和org.jeecg框架中的相关类和接口。通过设置主表配置,可以生成父子表的数据模型。 ... [详细]
  • 本文介绍了RxJava在Android开发中的广泛应用以及其在事件总线(Event Bus)实现中的使用方法。RxJava是一种基于观察者模式的异步java库,可以提高开发效率、降低维护成本。通过RxJava,开发者可以实现事件的异步处理和链式操作。对于已经具备RxJava基础的开发者来说,本文将详细介绍如何利用RxJava实现事件总线,并提供了使用建议。 ... [详细]
author-avatar
水果jia
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有