热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

学习笔记Kafka——KafkaConsumerAPI及开发实例

篇首语:本文由编程笔记#小编为大家整理,主要介绍了学习笔记Kafka—— Kafka Consumer API及开发实例相关的知识,希望对你有一定的参考价值。 一、Kafka Consumer API

篇首语:本文由编程笔记#小编为大家整理,主要介绍了学习笔记Kafka—— Kafka Consumer API及开发实例相关的知识,希望对你有一定的参考价值。



一、Kafka Consumer API

1.1、Consumer


1.2、KafkaConsumer


1.3、ConsumerRecords


1.4、ConsumerRecord


1.5、KafkaConsumer 实战

package demo02;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class SimpleConsumer
public static void main(String[] args)
String topic = "test_02_02";
String group = "test_group";
Map<String, Object> kafkaProperties &#61; new HashMap<>();
kafkaProperties.put("bootstrap.servers", "node100:9092,node101:9092,node102:9092");
kafkaProperties.put("group.id", group);
kafkaProperties.put("enable.auto.commit","true");
kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer &#61; new KafkaConsumer<>(kafkaProperties);
consumer.subscribe(Arrays.asList(topic));
while (true)
ConsumerRecords<String, String> records &#61; consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset &#61; %d, key &#61; %s, value &#61; %s%n", record.offset(), record.key(), record.value());



结果&#xff1a;


二、Producer & Consumer整合实战
  • 1、设计一个工具类可以返回随机字符串

WordUtil .java

package demo03;
import org.apache.kafka.common.protocol.types.Field;
import java.util.Random;
public class WordUtil
public static final String[] WORDS &#61; "A traditional queue retains records in-order on the server, and if multiple consumers consume from the queue then the server hands out records in the order they are stored. However, although the server hands out records in order, the records are delivered asynchronously to consumers, so they may arrive out of order on different consumers. This effectively means the ordering of the records is lost in the presence of parallel consumption. Messaging systems often work around this by having a notion of \\"exclusive consumer\\" that allows only one process to consume from a queue, but of course this means that there is no parallelism in processing. Kafka does it better. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. This is achieved by assigning the partitions in the topic to the consumers in the consumer group so that each partition is consumed by exactly one consumer in the group. By doing this we ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. Note however that there cannot be more consumer instances in a consumer group than partitions.".split(" ");
static Random random &#61; new Random();
public static KV generateRandom()
int index &#61; random.nextInt(WORDS.length);
return new KV(String.valueOf(index),WORDS[index]);

public static void main(String[] args)
for(int i&#61;0;i<10;i&#43;&#43;)
KV kv &#61; generateRandom();
System.out.printf("key: %s, value: %s\\n",kv.getK(),kv.getV());



KV.java

package demo03;
import org.apache.kafka.common.protocol.types.Field;
public class KV
public String k;
public String v;
public KV(String k, String v)
this.k &#61; k;
this.v &#61; v;

public String getK()
return k;

public void setK(String k)
this.k &#61; k;

public String getV()
return v;

public void setV(String v)
this.v &#61; v;


执行WordUtil.java结果&#xff1a;


  • 2、设计Producer可以每秒发送数据

TimerProducer.java

package demo03;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
public class TimerProducer
public static void main(String[] args) throws InterruptedException
String topic &#61; "test_02_02";
Map<String,Object> kafkaProperties &#61; new HashMap<>();
kafkaProperties.put("bootstrap.servers", "node100:9092,node101:9092,node102:9092");
kafkaProperties.put("acks", "all");
kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer &#61; new KafkaProducer<>(kafkaProperties);
int size &#61; 60;
for (int i &#61; 0; i < size; i&#43;&#43;)
Thread.sleep(1000L);
KV kv &#61; WordUtil.generateRandom();
producer.send(new ProducerRecord<>(topic, kv.getK(), kv.getV()));

producer.close();


TimerConsumer.java

package demo03;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class TimerConsumer
public static void main(String[] args)
String topic &#61; "test_02_02";
String group &#61; "test_group";
Map<String, Object> kafkaProperties &#61; new HashMap<>();
kafkaProperties.put("bootstrap.servers", "node100:9092,node101:9092,node102:9092");
kafkaProperties.put("group.id", group);
kafkaProperties.put("enable.auto.commit", "true");
kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer &#61; new KafkaConsumer<>(kafkaProperties);
consumer.subscribe(Arrays.asList(topic));
while (true)
ConsumerRecords<String, String> records &#61; consumer.poll(Duration.ofMillis(3000));
System.out.printf("\\nTime: %s\\n",new Date());
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset &#61; %d, key &#61; %s, value &#61; %s%n", record.offset(), record.key(), record.value());



结果&#xff1a;


推荐阅读
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文介绍了一个Java猜拳小游戏的代码,通过使用Scanner类获取用户输入的拳的数字,并随机生成计算机的拳,然后判断胜负。该游戏可以选择剪刀、石头、布三种拳,通过比较两者的拳来决定胜负。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 本文介绍了如何在给定的有序字符序列中插入新字符,并保持序列的有序性。通过示例代码演示了插入过程,以及插入后的字符序列。 ... [详细]
  • C# 7.0 新特性:基于Tuple的“多”返回值方法
    本文介绍了C# 7.0中基于Tuple的“多”返回值方法的使用。通过对C# 6.0及更早版本的做法进行回顾,提出了问题:如何使一个方法可返回多个返回值。然后详细介绍了C# 7.0中使用Tuple的写法,并给出了示例代码。最后,总结了该新特性的优点。 ... [详细]
  • 本文介绍了在Java中gt、gtgt、gtgtgt和lt之间的区别。通过解释符号的含义和使用例子,帮助读者理解这些符号在二进制表示和移位操作中的作用。同时,文章还提到了负数的补码表示和移位操作的限制。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
author-avatar
水果jia
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有