Flink教程 DataStream 创建数据源 转换算子
- 1. 前言
- 2. 创建Flink项目
- 2.1 在cmd窗口创建
- 2.2 WordCount例子
- 2.3 分析Flink程序代码结构
- 3. 第1步:创建执行环境
- 4. 第2步:创建数据源
- 4.1 什么是流?
- 4.2 从指定的数据集合创建流(一般测试时用)
- 4.3 从文件里读取数据
- 4.4 从WebSocket读取数据
- 4.5 从Kafka读取数据
- 4.5.1 引入jar
- 4.5.2 编写Kafka数据源
- 5. 第3步:处理数据流
- 5.1 什么是DataStream
- 5.2 什么是元组(Tuple)
- 5.3 基本转换算子
- 5.3.1 map
- 5.3.2 flatMap
- 5.3.3 filter
- 5.3.4 keyBy
- 5.3.5 reduce
- 后续
1. 前言
从前年开始,就被公众号上Flink文章频繁的刷屏,看来是时候了解下Flink了。
Flink官网第一句话介绍是数据流上的有状态计算。
我第一眼看这句话感觉很拗口,什么是流上的计算?什么是有状态?
作为菜鸟,我觉的学习Flink最好方法是看官网并敲代码实践,不会的百度些博客学学。
2. 创建Flink项目
废话不多说,我们来创建一个Flink项目吧。关于“Flink是什么”,“Flink应用场景”,“Flink安装部署”,“Flink架构原理”等话题,我感觉网上好的博客很多了,我默认此时你至少了解过Flink并安装过Flink吧,不然怎么会搜到我这篇博客?
2.1 在cmd窗口创建
打开cmd命令窗口,输入如下命令
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.1 -DarchetypeCatalog=internal
如果你有强迫症,你看不惯一行命令这么长,你可以粘贴下面的。
我的是win10系统,命令以^换行,如果你是Linux系统,要以\换行。
mvn archetype:generate ^
-DarchetypeGroupId=org.apache.flink ^
-DarchetypeArtifactId=flink-quickstart-java ^
-DarchetypeVersion=1.10.1 ^
-DarchetypeCatalog=internal
执行中途,它会询问你输入groupId和artifactId,然后一路回车输入Y,项目就创建好了。
2.2 WordCount例子
如果你项目创建成功,会有如下代码结构,然后再创建个BatchWordCount类,贴入下面Java代码。
package com.pigg.test01;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class BatchWordCount {public static void main(String[] args) throws Exception {final ExecutionEnvironment env &#61; ExecutionEnvironment.getExecutionEnvironment();DataSource<String> lines &#61; env.fromElements("I love coding", "I love flink", "study flink");FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne &#61; lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {&#64;Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words &#61; line.split(" ");for (String word : words) {collector.collect(Tuple2.of(word, 1));}}});AggregateOperator<Tuple2<String, Integer>> result &#61; wordAndOne.groupBy(0).sum(1);result.print();env.execute("BatchWordCount");}
}
不要深究代码的意思&#xff0c;我们只是先了解下写一个Flink程序的代码结构。运行main方法时&#xff0c;如果报如下错误&#xff1a;
NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction
你需要把如下选项勾上
执行结果为&#xff1a;
(flink,2)
(love,2)
(I,2)
(coding,1)
(study,1)
2.3 分析Flink程序代码结构
在上面程序中&#xff0c;我注释了5个步骤&#xff0c;Flink的Job程序也基本分5个步骤&#xff1a;
- 第1步&#xff1a;创建执行环境
- 第2步&#xff1a;创建数据源
- 第3步&#xff1a;处理数据流
- 第4步&#xff1a;输出结果到指定位置
- 第5步&#xff1a;触发执行Job
下面我们来逐步学习这5个步骤&#xff0c;其中第3步最为关键&#xff0c;是本博客的重点&#xff0c;更是平时开发的核心。
3. 第1步&#xff1a;创建执行环境
Flink程序最开始都是要创建执行环境&#xff0c;它会自动根据不同的运行场景创建对应的执行环境。
- 如果你在IDEA里运行main方法&#xff0c;Flink创建的是本地执行环境
- 如果你把程序打成jar包&#xff0c;提交到Flink集群上执行&#xff0c;Flink创建的是集群执行环境
创建执行环境很简单&#xff0c;就一句话&#xff1a;
ExecutionEnvironment env &#61; ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();
4. 第2步&#xff1a;创建数据源
创建数据源是指定义我们的Flink程序从哪里获取数据。
如果是实时计算&#xff0c;一般工作中我们都是从Kafka中获取数据。
如果是跑批处理&#xff0c;一般是从txt&#xff0c;csv&#xff0c;hdfs上读取数据。
还记得一开始说的Flink自我介绍数据流上的有状态计算这句话吗&#xff1f;
我认为很有必要先理解下什么是流&#xff1f;
4.1 什么是流&#xff1f;
对Flink而言&#xff0c;不管是不停采集新增的事件还是已经固定大小的数据集合&#xff0c;它们都是流数据&#xff0c;只不过根据它们是否有界限&#xff0c;分为无界流和有界流。
说明上面代码也可以写成下面这样&#xff0c;对流数据lines的操作&#xff0c;就是调用DataStream API操作它。
DataStream<String> lines &#61; env.addSource(kafkaSource)
有时间得看看DataStream.java这个类的代码。map&#xff0c;flatMap&#xff0c;filter等算子的定义都是在这个类里。
5.2 什么是元组&#xff08;Tuple&#xff09;
写Java的可能不知道元组&#xff0c;但是玩过Python的小朋友应该都知道。
元组与ArrayList类似&#xff0c;可以放一组数值&#xff0c;但是不同数据的类型可以不同。
从上面的WordCount代码里看&#xff0c;用到了Tuple2&#xff0c;这表示定义一个元组&#xff0c;它有2个值&#xff0c;
第一个值是String类型&#xff0c;第二个值是Integer类型。
查看Flink里Tuple2类&#xff0c;可以发现Flink定义了Tuple0一直到Tuple25。
5.3 基本转换算子
5.3.1 map
- 转换类型&#xff1a;DataStream → DataStream
- 说明&#xff1a;读取一个元素并生成一个新的元素&#xff0c;例如
- 举例&#xff1a;
输入 | map转换 | 输出 |
---|
1&#xff0c;2&#xff0c;3 | 乘以2 | 2&#xff0c;4&#xff0c;6 |
a&#xff0c;b&#xff0c;b | 添加一个元素1,组成Tuple2 | (a,1)&#xff0c;(b,1)&#xff0c;(b,1) |
下面举例a&#xff0c;b&#xff0c;b -> (a,1)&#xff0c;(b,1)&#xff0c;(b,1)
public class DataSourceTest {public static void main(String[] args) throws Exception {final ExecutionEnvironment env &#61; ExecutionEnvironment.getExecutionEnvironment();DataSource<String> dataSource3 &#61; env.fromCollection(Arrays.asList("a", "b", "b"));MapOperator<String, Tuple2<String, Integer>> maped &#61; dataSource3.map(new MapFunction<String, Tuple2<String, Integer>>() {&#64;Overridepublic Tuple2<String, Integer> map(String s) throws Exception {return new Tuple2<>(s, 1);}});maped.print();env.execute("BatchWordCount");}
}
5.3.2 flatMap
写Java8多的小朋友估计对flatMap不陌生&#xff0c;暂时叫它扁平map吧。
- 转换类型&#xff1a;DataStream → DataStream
- 说明&#xff1a;多组数据->生成多个流->合并成一个流
- 举例&#xff1a;
输入 | flatMap转换 | 输出 |
---|
“I love coding”, “I love flink” | 切分后,组成Tuple2 | (flink,1)2个(love,1)2个(I,1)(coding,1) |
DataSource<String> lines &#61; env.fromElements("I love coding", "I love flink");
FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne &#61; lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {&#64;Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words &#61; line.split(" ");for (String word : words) {collector.collect(Tuple2.of(word, 1));}}});
5.3.3 filter
- 转换类型&#xff1a;DataStream → DataStream
- 说明&#xff1a;该算子将按照条件对输入数据集进行筛选操作&#xff0c;将符合条件的数据集输出
- 举例&#xff1a;
输入 | flatMap转换 | 输出 |
---|
1, 2, 3, 4, 5, 6 | 找到奇数 | 1&#xff0c;3&#xff0c;5 |
DataStreamSource<Integer> nums &#61; env.fromElements(1, 2, 3, 4, 5, 6);SingleOutputStreamOperator<Long> filterd &#61; nums.filter(new FilterFunction<Long>() {&#64;Overridepublic boolean filter(Long value) throws Exception {return value %2 !&#61; 0;}
}).setParallelism(1);filterd.print();
5.3.4 keyBy
- 转换类型&#xff1a;DataStream → KeyedStream
- 说明&#xff1a;具有相同key的所有记录会分配给到同一分区&#xff0c;类似SQL的group by&#xff0c;在内部&#xff0c;keyBy&#xff08;&#xff09;是使用hash分区实现
- 举例&#xff1a;
如果是DataSet用groupBy&#xff0c;是DataStream用keyBy
接着上面4.5.2编写Kafka节&#xff0c;把从Kafka读取的数据进行WordCount
DataStreamSource<String> lines &#61; env.addSource(kafkaSource);SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne &#61;lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {&#64;Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words &#61; line.split(" ");for (String word : words){Tuple2<String, Integer> tp &#61; Tuple2.of(word, 1);out.collect(tp);}}});
SingleOutputStreamOperator<Tuple2<String, Integer>> sum &#61; wordAndOne.keyBy(0).sum(1);
上面例子是在元组类型上keyBy&#xff0c;所以传的是数字&#xff0c;如果是POJO类型&#xff0c;可以传入字段名
dataStream.keyBy("someKey")
5.3.5 reduce
- 转换类型&#xff1a;KeyedStream→DataStream
- 说明&#xff1a;在分区的数据流上调用reduce函数&#xff1a;将当前元素与最后一个reduce的值合并生成新值。
reduce函数是将KeyedStream转换为DataStream&#xff0c;也就是reduce调用前必须进行分区&#xff0c;即得先调用keyBy()函数 - 举例&#xff1a;
keyedStream.reduce(new ReduceFunction<Integer>() {&#64;Overridepublic Integer reduce(Integer value1, Integer value2)throws Exception {return value1 &#43; value2;}
});
后续
写到这&#xff0c;发现把时间语义&#xff0c;窗口&#xff0c;聚合&#xff0c;分流也写在这一博客&#xff0c;会显的博客太长了&#xff0c;而且质量会更低&#xff08;我承认我基本是把IDEA的代码贴过来&#xff09;。而且时间语义和窗口特别重要的知识&#xff0c;我还是放到下一篇博客吧&#xff08;该贴的代码还是得贴&#xff09;。
Flink教程(二) DataStream聚合 keyBy sum min和minBy区别