热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink教程(1)FlinkDataStream创建数据源转换算子

Flink教程DataStream创建数据源转换算子1.前言2.创建Flink项目2.1在cmd窗口创建2.2WordCount例子2.3分析Flink程序代码结构3.第1步&#x

Flink教程 DataStream 创建数据源 转换算子

  • 1. 前言
  • 2. 创建Flink项目
    • 2.1 在cmd窗口创建
    • 2.2 WordCount例子
    • 2.3 分析Flink程序代码结构
  • 3. 第1步:创建执行环境
  • 4. 第2步:创建数据源
    • 4.1 什么是流?
      • 4.1.1 无界流
      • 4.1.2 有界流
    • 4.2 从指定的数据集合创建流(一般测试时用)
    • 4.3 从文件里读取数据
    • 4.4 从WebSocket读取数据
    • 4.5 从Kafka读取数据
      • 4.5.1 引入jar
      • 4.5.2 编写Kafka数据源
  • 5. 第3步:处理数据流
    • 5.1 什么是DataStream
    • 5.2 什么是元组(Tuple)
    • 5.3 基本转换算子
      • 5.3.1 map
      • 5.3.2 flatMap
      • 5.3.3 filter
      • 5.3.4 keyBy
      • 5.3.5 reduce
  • 后续


1. 前言

从前年开始,就被公众号上Flink文章频繁的刷屏,看来是时候了解下Flink了。
Flink官网第一句话介绍是数据流上的有状态计算
我第一眼看这句话感觉很拗口,什么是流上的计算?什么是有状态?
作为菜鸟,我觉的学习Flink最好方法是看官网并敲代码实践,不会的百度些博客学学。

2. 创建Flink项目

废话不多说,我们来创建一个Flink项目吧。关于“Flink是什么”,“Flink应用场景”,“Flink安装部署”,“Flink架构原理”等话题,我感觉网上好的博客很多了,我默认此时你至少了解过Flink并安装过Flink吧,不然怎么会搜到我这篇博客?

2.1 在cmd窗口创建

打开cmd命令窗口,输入如下命令

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.1 -DarchetypeCatalog=internal

如果你有强迫症,你看不惯一行命令这么长,你可以粘贴下面的。
我的是win10系统,命令以^换行,如果你是Linux系统,要以\换行。

mvn archetype:generate ^
-DarchetypeGroupId=org.apache.flink ^
-DarchetypeArtifactId=flink-quickstart-java ^
-DarchetypeVersion=1.10.1 ^
-DarchetypeCatalog=internal

执行中途,它会询问你输入groupId和artifactId,然后一路回车输入Y,项目就创建好了。
在这里插入图片描述

2.2 WordCount例子

如果你项目创建成功,会有如下代码结构,然后再创建个BatchWordCount类,贴入下面Java代码。
在这里插入图片描述

package com.pigg.test01;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class BatchWordCount {public static void main(String[] args) throws Exception {//第1步&#xff1a;创建执行环境final ExecutionEnvironment env &#61; ExecutionEnvironment.getExecutionEnvironment();//第2步&#xff1a;创建数据源DataSource<String> lines &#61; env.fromElements("I love coding", "I love flink", "study flink");//第3步&#xff1a;处理数据// 3.1&#xff1a;将每一行按照空格切分&#xff0c;并组成(word, 1)FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne &#61; lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {&#64;Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words &#61; line.split(" ");for (String word : words) {collector.collect(Tuple2.of(word, 1));}}});// 3.2&#xff1a;按照第一个字段分组&#xff0c;并按照第二个字段求和AggregateOperator<Tuple2<String, Integer>> result &#61; wordAndOne.groupBy(0).sum(1);//第4步&#xff1a;输出打印到控制台result.print();//第5步&#xff1a;触发执行job&#xff0c;如果是实时流计算这是要的&#xff0c;这里是批处理&#xff0c;也可以不加env.execute("BatchWordCount");}
}

不要深究代码的意思&#xff0c;我们只是先了解下写一个Flink程序的代码结构。运行main方法时&#xff0c;如果报如下错误&#xff1a;

NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction

你需要把如下选项勾上
在这里插入图片描述
执行结果为&#xff1a;

(flink,2)
(love,2)
(I,2)
(coding,1)
(study,1)

2.3 分析Flink程序代码结构

在上面程序中&#xff0c;我注释了5个步骤&#xff0c;Flink的Job程序也基本分5个步骤&#xff1a;

  • 第1步&#xff1a;创建执行环境
  • 第2步&#xff1a;创建数据源
  • 第3步&#xff1a;处理数据流
  • 第4步&#xff1a;输出结果到指定位置
  • 第5步&#xff1a;触发执行Job

下面我们来逐步学习这5个步骤&#xff0c;其中第3步最为关键&#xff0c;是本博客的重点&#xff0c;更是平时开发的核心。

3. 第1步&#xff1a;创建执行环境

Flink程序最开始都是要创建执行环境&#xff0c;它会自动根据不同的运行场景创建对应的执行环境。

  • 如果你在IDEA里运行main方法&#xff0c;Flink创建的是本地执行环境
  • 如果你把程序打成jar包&#xff0c;提交到Flink集群上执行&#xff0c;Flink创建的是集群执行环境

创建执行环境很简单&#xff0c;就一句话&#xff1a;

ExecutionEnvironment env &#61; ExecutionEnvironment.getExecutionEnvironment();//流数据源
StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();

4. 第2步&#xff1a;创建数据源

创建数据源是指定义我们的Flink程序从哪里获取数据。
如果是实时计算&#xff0c;一般工作中我们都是从Kafka中获取数据。
如果是跑批处理&#xff0c;一般是从txt&#xff0c;csv&#xff0c;hdfs上读取数据。
还记得一开始说的Flink自我介绍数据流上的有状态计算这句话吗&#xff1f;
我认为很有必要先理解下什么是流&#xff1f;

4.1 什么是流&#xff1f;

对Flink而言&#xff0c;不管是不停采集新增的事件还是已经固定大小的数据集合&#xff0c;它们都是流数据&#xff0c;只不过根据它们是否有界限&#xff0c;分为无界流和有界流。
在这里插入图片描述
说明上面代码也可以写成下面这样&#xff0c;对流数据lines的操作&#xff0c;就是调用DataStream API操作它。

DataStream<String> lines &#61; env.addSource(kafkaSource)

有时间得看看DataStream.java这个类的代码。map&#xff0c;flatMap&#xff0c;filter等算子的定义都是在这个类里。

5.2 什么是元组&#xff08;Tuple&#xff09;

写Java的可能不知道元组&#xff0c;但是玩过Python的小朋友应该都知道。
元组与ArrayList类似&#xff0c;可以放一组数值&#xff0c;但是不同数据的类型可以不同
从上面的WordCount代码里看&#xff0c;用到了Tuple2&#xff0c;这表示定义一个元组&#xff0c;它有2个值&#xff0c;
第一个值是String类型&#xff0c;第二个值是Integer类型。
查看Flink里Tuple2类&#xff0c;可以发现Flink定义了Tuple0一直到Tuple25。

5.3 基本转换算子


5.3.1 map


  • 转换类型&#xff1a;DataStream → DataStream
  • 说明&#xff1a;读取一个元素并生成一个新的元素&#xff0c;例如
  • 举例&#xff1a;

输入map转换输出
1&#xff0c;2&#xff0c;3乘以22&#xff0c;4&#xff0c;6
a&#xff0c;b&#xff0c;b添加一个元素1,组成Tuple2(a,1)&#xff0c;(b,1)&#xff0c;(b,1)

下面举例a&#xff0c;b&#xff0c;b -> (a,1)&#xff0c;(b,1)&#xff0c;(b,1)

public class DataSourceTest {public static void main(String[] args) throws Exception {//第1步&#xff1a;创建执行环境final ExecutionEnvironment env &#61; ExecutionEnvironment.getExecutionEnvironment();//第2步&#xff1a;创建数据源DataSource<String> dataSource3 &#61; env.fromCollection(Arrays.asList("a", "b", "b"));//第3步&#xff1a;执行转换算子MapOperator<String, Tuple2<String, Integer>> maped &#61; dataSource3.map(new MapFunction<String, Tuple2<String, Integer>>() {&#64;Overridepublic Tuple2<String, Integer> map(String s) throws Exception {return new Tuple2<>(s, 1);}});//第4步&#xff1a;输出打印到控制台maped.print();//第5步&#xff1a;执行job&#xff0c;如果是实时流计算这是要的&#xff0c;这里是批处理&#xff0c;也可以不加env.execute("BatchWordCount");}
}

5.3.2 flatMap

写Java8多的小朋友估计对flatMap不陌生&#xff0c;暂时叫它扁平map吧。

  • 转换类型&#xff1a;DataStream → DataStream
  • 说明&#xff1a;多组数据->生成多个流->合并成一个流
  • 举例&#xff1a;

输入flatMap转换输出
“I love coding”, “I love flink”切分后,组成Tuple2(flink,1)2个(love,1)2个(I,1)(coding,1)

DataSource<String> lines &#61; env.fromElements("I love coding", "I love flink");//将每一行按照空格切分&#xff0c;并组成(word, 1)
FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne &#61; lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {&#64;Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words &#61; line.split(" ");for (String word : words) {collector.collect(Tuple2.of(word, 1));}}});

5.3.3 filter


  • 转换类型&#xff1a;DataStream → DataStream
  • 说明&#xff1a;该算子将按照条件对输入数据集进行筛选操作&#xff0c;将符合条件的数据集输出
  • 举例&#xff1a;

输入flatMap转换输出
1, 2, 3, 4, 5, 6找到奇数1&#xff0c;3&#xff0c;5

DataStreamSource<Integer> nums &#61; env.fromElements(1, 2, 3, 4, 5, 6);SingleOutputStreamOperator<Long> filterd &#61; nums.filter(new FilterFunction<Long>() {&#64;Overridepublic boolean filter(Long value) throws Exception {return value %2 !&#61; 0;}
}).setParallelism(1);filterd.print();

5.3.4 keyBy


  • 转换类型&#xff1a;DataStream → KeyedStream
  • 说明&#xff1a;具有相同key的所有记录会分配给到同一分区&#xff0c;类似SQL的group by&#xff0c;在内部&#xff0c;keyBy&#xff08;&#xff09;是使用hash分区实现
  • 举例&#xff1a;
    如果是DataSet用groupBy&#xff0c;是DataStream用keyBy
    接着上面4.5.2编写Kafka节&#xff0c;把从Kafka读取的数据进行WordCount

DataStreamSource<String> lines &#61; env.addSource(kafkaSource);SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne &#61;lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {&#64;Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words &#61; line.split(" ");for (String word : words){Tuple2<String, Integer> tp &#61; Tuple2.of(word, 1);out.collect(tp);}}});
//注意下面的.keyBy(0).sum(1)&#xff0c;说按照第一值分组&#xff0c;再把同组里第二个值求和
//聚合函数&#xff08;min&#xff0c;max&#xff0c;sum&#xff09;都是加在keyBy&#xff08;DataSet时是groupBy&#xff09;后面
SingleOutputStreamOperator<Tuple2<String, Integer>> sum &#61; wordAndOne.keyBy(0).sum(1);

上面例子是在元组类型上keyBy&#xff0c;所以传的是数字&#xff0c;如果是POJO类型&#xff0c;可以传入字段名

dataStream.keyBy("someKey")

5.3.5 reduce


  • 转换类型&#xff1a;KeyedStream→DataStream
  • 说明&#xff1a;在分区的数据流上调用reduce函数&#xff1a;将当前元素与最后一个reduce的值合并生成新值。
    reduce函数是将KeyedStream转换为DataStream&#xff0c;也就是reduce调用前必须进行分区&#xff0c;即得先调用keyBy()函数
  • 举例&#xff1a;

keyedStream.reduce(new ReduceFunction<Integer>() {&#64;Overridepublic Integer reduce(Integer value1, Integer value2)throws Exception {return value1 &#43; value2;}
});

后续

写到这&#xff0c;发现把时间语义&#xff0c;窗口&#xff0c;聚合&#xff0c;分流也写在这一博客&#xff0c;会显的博客太长了&#xff0c;而且质量会更低&#xff08;我承认我基本是把IDEA的代码贴过来&#xff09;。而且时间语义和窗口特别重要的知识&#xff0c;我还是放到下一篇博客吧&#xff08;该贴的代码还是得贴&#xff09;。
Flink教程(二) DataStream聚合 keyBy sum min和minBy区别


推荐阅读
author-avatar
zdl
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有