热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink自定义source和sink,获取kafka的key,输出指定key

Flink的FlinkKafkaConsumer、FlinkKafkaProducer,在消费、生成kafka数据的时候,不能指定key,又时候,我们又需要这个key。valk

  Flink 的  FlinkKafkaConsumer、FlinkKafkaProducer,在消费、生成kafka 数据的时候,不能指定key,又时候,我们又需要这个key。

val kafkaSource = new FlinkKafkaConsumer[ObjectNode]("kafka_demo", new JsonNodeDeserializationSchema(), Common.getProp)
    val sink = new FlinkKafkaProducer[String]("kafka_demo_out", new SimpleStringSchema(), Common.getProp)
    sink.setWriteTimestampToKafka(true)

    env.addSource(kafkaSource)
      .map(node => {
        node.put("token", System.currentTimeMillis())
        node.toString
      })
      .addSink(sink)

下面通过flink 的自定source、sink 实现,消费、生成kafka 数据的时候,获取数据的key ,和输出不同key的数据

思路: 使用kafka 原生的api,KafkaConsuemr和KafkaProducer 消费、生产kafka的数据,就可以获取到key值

kafka 生产者:

object KafkaKeyMaker {
  val topic = "kafka_key"
  def main(args: Array[String]): Unit = {

    val producer = new KafkaProducer[String, String](Common.getProp)
    while (true) {
      val map = Map("user"->"venn", "name"->"venn","pass"->System.currentTimeMillis())
      val jsonObject: JSONObject = new JSONObject(map)
      println(jsonObject.toString())
// key : msgKey + long val msg
= new ProducerRecord[String, String](topic, "msgKey" + System.currentTimeMillis(), jsonObject.toString()) producer.send(msg) producer.flush() Thread.sleep(3000) } } }

kafka 消费者:

object KafkaKeyReceive{
  val topic = "kafka_key"
  def main(args: Array[String]): Unit = {
    val consumer = new KafkaConsumer[String, String](Common.getProp)
    consumer.subscribe(util.Arrays.asList(topic + "_out"))
    while (true) {
      val records = consumer.poll(500)
      val tmp = records.iterator()
      while (tmp.hasNext){
        val record = tmp.next()
        val key = record.key()
        val value = record.value()
        println("receive -> key : " + key + ", value : " + value)
      }
      Thread.sleep(3000)
    }
  }
}

flink 代码,自定义source、sink

import com.venn.common.Common
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.collection.JavaConversions._
/**
  * Created by venn on 19-4-26.
  */
object KafkaSourceKey {

  def main(args: Array[String]): Unit = {
    // environment
    val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment

    
    env.addSource(new RichSourceFunction[String] { // kafka consumer 对象 var consumer: KafkaConsumer[String, String] = null // 初始化方法 override def open(parameters: Configuration): Unit = { cOnsumer= new KafkaConsumer[String, String](Common.getProp) // 订阅topic val list = List("kafka_key") consumer.subscribe(list) } // 执行方法,拉取数据,获取到的数据,会放到source 的缓冲区 override def run(ctx: SourceFunction.SourceContext[String]): Unit = { println("run") while (true) { val records = consumer.poll(500) val tmp = records.iterator() while (tmp.hasNext) { val record = tmp.next() val key = record.key() val value = record.value() ctx.collect("key : " + key + ", value " + value) } } } override def cancel(): Unit = { println("cancel") } }).map(s => s + "map")
      .addSink(new RichSinkFunction[String] { // kafka producer 对象 var producer: KafkaProducer[String, String] = null // 初始化 override def open(parameters: Configuration): Unit = { producer = new KafkaProducer[String, String](Common.getProp) } override def close(): Unit = { if (producer == null) { producer.flush() producer.close() } } // 输出数据,每条结果都会执行一次,并发高的时候,可以按需做flush override def invoke(value: String, context: SinkFunction.Context[_]): Unit = { println("flink : " + value) val msg = new ProducerRecord[String, String]( "kafka_key_out", "key" + System.currentTimeMillis(), value) producer.send(msg) producer.flush() } }) // execute job
    env.execute("KafkaToKafka")
  }

}

kafka 生产者数据:

{"user" : "venn", "name" : "venn", "pass" : 1561355358148}
{"user" : "venn", "name" : "venn", "pass" : 1561355361271}
{"user" : "venn", "name" : "venn", "pass" : 1561355364276}
{"user" : "venn", "name" : "venn", "pass" : 1561355367279}
{"user" : "venn", "name" : "venn", "pass" : 1561355370283}

flink 输出数据:

run
flink : key : msgKey1561355358180, value {"user" : "venn", "name" : "venn", "pass" : 1561355358148}map
flink : key : msgKey1561355361271, value {"user" : "venn", "name" : "venn", "pass" : 1561355361271}map
flink : key : msgKey1561355364276, value {"user" : "venn", "name" : "venn", "pass" : 1561355364276}map
flink : key : msgKey1561355367279, value {"user" : "venn", "name" : "venn", "pass" : 1561355367279}map
flink : key : msgKey1561355370283, value {"user" : "venn", "name" : "venn", "pass" : 1561355370283}map
flink : key : msgKey1561355373289, value {"user" : "venn", "name" : "venn", "pass" : 1561355373289}map
flink : key : msgKey1561355376293, value {"user" : "venn", "name" : "venn", "pass" : 1561355376293}map

kafka 消费者:

receive -> key : key1561355430411, value : key : msgKey1561355430356, value {"user" : "venn", "name" : "venn", "pass" : 1561355430356}map
receive -> key : key1561355433427, value : key : msgKey1561355433359, value {"user" : "venn", "name" : "venn", "pass" : 1561355433359}map
receive -> key : key1561355436441, value : key : msgKey1561355436364, value {"user" : "venn", "name" : "venn", "pass" : 1561355436364}map
receive -> key : key1561355439456, value : key : msgKey1561355439367, value {"user" : "venn", "name" : "venn", "pass" : 1561355439367}map
receive -> key : key1561355442473, value : key : msgKey1561355442370, value {"user" : "venn", "name" : "venn", "pass" : 1561355442370}map
receive -> key : key1561355445391, value : key : msgKey1561355445374, value {"user" : "venn", "name" : "venn", "pass" : 1561355445374}map

 

注:这样设计有个问题,没办法做到精确一次:

  1、source 的精确一次可以使用kafka 的低级api,每次从指定的offset 读取数据,提交新的offset,然后将当前的offset 存到状态中,这样即使程序失败,重启到上一个checkpoint状态,数据也不会重复。

  2、sink 的处理比较麻烦,以官网介绍的 “两段提交”的方法,提交生产者的数据。简单来说,就是每次数据处理完后,需要提交数据到kafka,不做真正的提交,仅写入一些已定义的状态变量,当chckpoint成功时Flink负责提交这些写入,否则就终止取消掉。

 

参考zhisheng 大佬的 博客 : 《从0到1学习Flink》—— 如何自定义 Data Source ?

《从0到1学习Flink》—— 如何自定义 Data Sink ?

两段提交的一篇翻译: 【译】Flink + Kafka 0.11端到端精确一次处理语义的实现


推荐阅读
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • 本文介绍了闭包的定义和运转机制,重点解释了闭包如何能够接触外部函数的作用域中的变量。通过词法作用域的查找规则,闭包可以访问外部函数的作用域。同时还提到了闭包的作用和影响。 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 展开全部下面的代码是创建一个立方体Thisexamplescreatesanddisplaysasimplebox.#Thefirstlineloadstheinit_disp ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • Android开发实现的计时器功能示例
    本文分享了Android开发实现的计时器功能示例,包括效果图、布局和按钮的使用。通过使用Chronometer控件,可以实现计时器功能。该示例适用于Android平台,供开发者参考。 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • GreenDAO快速入门
    前言之前在自己做项目的时候,用到了GreenDAO数据库,其实对于数据库辅助工具库从OrmLite,到litePal再到GreenDAO,总是在不停的切换,但是没有真正去了解他们的 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
  • 开发笔记:(002)spring容器中bean初始化销毁时执行的方法及其3种实现方式
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了(002)spring容器中bean初始化销毁时执行的方法及其3种实现方式相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 起因由于我录制过一个小程序的课程,里面有消息模板的讲解。最近有几位同学反馈官方要取消消息模板,使用订阅消息。为了方便大家容易学 PythonFlask构建微信小程序订餐系统 课程。 ... [详细]
  • 本文整理了Java中org.apache.pig.backend.executionengine.ExecException.<init>()方法的一些代码 ... [详细]
author-avatar
赵小锅2502889451
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有