热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kafka自定义生产者

第一步:在终端启动一个消费都等待生产者生产出来的数据代码实现创建Maven项目添加依赖junit
第一步:在终端启动一个消费都等待生产者生产出来的数据

在这里插入图片描述

代码实现

创建Maven项目


  • 添加依赖

<dependency><groupId>junitgroupId><artifactId>junitartifactId><version>4.12version>dependency><dependency><groupId>org.apache.kafkagroupId><artifactId>kafka-clientsartifactId><version>2.4.1version>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-apiartifactId><version>1.7.25version>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-log4j12artifactId><version>1.7.25version>dependency><dependency><groupId>org.apache.kafkagroupId><artifactId>kafka_2.12artifactId><version>2.4.1version>dependency>

  • 在resources目录下添加log4j.properties

### 设置###
log4j.rootLogger &#61; debug,stdout,D,E### 输出信息到控制抬 ###
log4j.appender.stdout &#61; org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target &#61; System.out
log4j.appender.stdout.layout &#61; org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern &#61; [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n### 输出DEBUG 级别以上的日志到&#61;E://logs/error.log ###
log4j.appender.D &#61; org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File &#61; E://logs/log.log
log4j.appender.D.Append &#61; true
log4j.appender.D.Threshold &#61; DEBUG
log4j.appender.D.layout &#61; org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern &#61; %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n### 输出ERROR 级别以上的日志到&#61;E://logs/error.log ###
log4j.appender.E &#61; org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File &#61;E://logs/error.log
log4j.appender.E.Append &#61; true
log4j.appender.E.Threshold &#61; ERROR
log4j.appender.E.layout &#61; org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern &#61; %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n

情况一&#xff1a;创建生产者

public class CustomerProducer {// 配置信息来源&#xff1a;ProducerConfigpublic static void main(String[] args) {Properties props &#61; new Properties();// Kafka服务端的主机名和端口号props.put("bootstrap.servers", "hcmaster:9092");// 应答级别&#xff1a;等待所有副本节点的应答props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 16384);//16k// 请求延时props.put("linger.ms", 1);// 发送缓存区内存大小props.put("buffer.memory", 33554432);//32M// key序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer &#61; new KafkaProducer<>(props);//循环发送数据for (int i &#61; 0; i < 8; i&#43;&#43;) {ProducerRecord<String, String> data &#61; new ProducerRecord<>("first", Integer.toString(i), "haha-" &#43; i);producer.send(data);}producer.close();}
}

运行程序&#xff0c;在Consumer终端上查看结果&#xff1a;
在这里插入图片描述

情况二&#xff1a;创建带回调的生产者

public class CallBackProducer {public static void main(String[] args) throws InterruptedException {Properties props &#61; new Properties();// Kafka服务端的主机名和端口号props.put("bootstrap.servers", "hcmaster:9092,hcslave1:9092,hcslave2:9092");// 等待所有副本节点的应答props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 16384);// 增加服务端请求延时props.put("linger.ms", 1);// 发送缓存区内存大小props.put("buffer.memory", 33554432);// key序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> kafkaProducer &#61; new KafkaProducer<>(props);for (int i &#61; 0; i < 8; i&#43;&#43;) {Thread.sleep(500);ProducerRecord<String, String> pr &#61; new ProducerRecord<>("first", Integer.toString(i), "hehe-" &#43; i);kafkaProducer.send(pr, new Callback() {&#64;Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception !&#61; null){System.out.println("发送失败");}else {System.out.print("发送成功&#xff1a; ");if (metadata !&#61; null) {System.out.println(metadata.topic()&#43;" - "&#43;metadata.partition() &#43; " - " &#43; metadata.offset());}}}});}kafkaProducer.close();}}

在Intellij控制中结果&#xff1a;
在这里插入图片描述
在counsumer终端中查看结果&#xff1a;
在这里插入图片描述

情况三&#xff1a;创建自定义分区的生产者


  • 自定义Partitioner

public class CustomPartitioner implements Partitioner {&#64;Overridepublic void configure(Map<String, ?> configs) {}&#64;Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 控制分区return 1;}&#64;Overridepublic void close() {}
}

  • 自定义Procuder

public class CallBackProducer {public static void main(String[] args) throws InterruptedException {Properties props &#61; new Properties();// Kafka服务端的主机名和端口号props.put("bootstrap.servers", "hcmaster:9092,hcslave1:9092,hcslave2:9092");// 等待所有副本节点的应答props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 16384);// 增加服务端请求延时props.put("linger.ms", 1);// 发送缓存区内存大小props.put("buffer.memory", 33554432);// key序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 自定义分区props.put("partitioner.class", "com.hc.producer.customerparitioner.CustomPartitioner");KafkaProducer<String, String> kafkaProducer &#61; new KafkaProducer<>(props);for (int i &#61; 0; i < 8; i&#43;&#43;) {Thread.sleep(500);ProducerRecord<String, String> pr &#61; new ProducerRecord<>("first", Integer.toString(i), "hello world-" &#43; i);kafkaProducer.send(pr, (metadata, exception) -> {if (exception !&#61; null) {System.out.println("发送失败");} else {System.out.print("发送成功&#xff1a; ");if (metadata !&#61; null) {System.out.println(metadata.partition() &#43; " --- " &#43; metadata.offset());}}});}kafkaProducer.close();}}

  • 结果

在Intellij控制中结果&#xff1a;
在这里插入图片描述
在counsumer终端中查看结果&#xff1a;
在这里插入图片描述


推荐阅读
  • log4j相关
    Log4j的类图Logger-日志写出器,供程序员输出日志信息Appender-日志目的地,把格式化好的日志信息输出到指定的地方去ConsoleAppe ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • Sleuth+zipkin链路追踪SpringCloud微服务的解决方案
    在庞大的微服务群中,随着业务扩展,微服务个数增多,系统调用链路复杂化。Sleuth+zipkin是解决SpringCloud微服务定位和追踪的方案。通过TraceId将不同服务调用的日志串联起来,实现请求链路跟踪。通过Feign调用和Request传递TraceId,将整个调用链路的服务日志归组合并,提供定位和追踪的功能。 ... [详细]
  • Java图形化计算器设计与实现
    本文介绍了使用Java编程语言设计和实现图形化计算器的方法。通过使用swing包和awt包中的组件,作者创建了一个具有按钮监听器和自定义界面尺寸和布局的计算器。文章还分享了在图形化界面设计中的一些心得体会。 ... [详细]
  • 我收到这个错误.我怎么能在我的情况下解决这个问题?Bitmapcannotberesolvedtoatype发生错误的行publicvoidonPageStart ... [详细]
  • flask json传输失败_GO小知识之实例演示 json 如何转化为 map 和 struct
    简单谈一些JSON数据处理的小知识。近期工作中,因为要把数据库数据实时更新到elasticsearch,在实践过程中遇到了一些JSON数据处理的问题。实 ... [详细]
  • springboot日志【转】【补】
     市面上的日志框架日志门面(日志的抽象层)日志实现JCL(JakartaCommonsLogging)(2014)SLF4j(SimpleLoggingFacadeforJava) ... [详细]
  • java日志框架详解
    Java日志框架详解1.常用日志框架1.1Java常用日志框架类别1.2Java常用日志框架历史1.3两大日志接口阵营1.3.1基于CommonsLogging接口实现的常用日志框 ... [详细]
  • 利用Dubbo的SPI扩展容器
    在实际开发中,很多Dubbo的服务提供者都会运行在web容器上,如果提供者服务上同时对外 ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • 本文介绍了UVALive6575题目Odd and Even Zeroes的解法,使用了数位dp和找规律的方法。阶乘的定义和性质被介绍,并给出了一些例子。其中,部分阶乘的尾零个数为奇数,部分为偶数。 ... [详细]
  • 1简介本文结合数字信号处理课程和Matlab程序设计课程的相关知识,给出了基于Matlab的音乐播放器的总体设计方案,介绍了播放器主要模块的功能,设计与实现方法.我们将该设 ... [详细]
  • 今天就跟大家聊聊有关怎么在Android应用中实现一个换肤功能,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根 ... [详细]
author-avatar
银仔-zxy
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有