热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink(三)IDEA开发Flink环境搭建与测试

一.IDEA开发环境1.pom文件设置1.8

一.IDEA开发环境


1.pom文件设置


1.8
1.8
UTF-8
2.11.12
2.11
2.7.6
1.6.1



org.scala-lang
scala-library
${scala.version}


org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}


org.apache.flink
flink-scala_${scala.binary.version}
${flink.version}


org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}


org.apache.flink
flink-table_${scala.binary.version}
${flink.version}


org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}


org.apache.flink
flink-connector-kafka-0.10_${scala.binary.version}
${flink.version}


org.apache.hadoop
hadoop-client
${hadoop.version}


mysql
mysql-connector-java
5.1.38


com.alibaba
fastjson
1.2.22



src/main/scala
src/test/scala



net.alchim31.maven
scala-maven-plugin
3.2.0



compile
testCompile




-dependencyfile
${project.build.directory}/.scala_dependencies






org.apache.maven.plugins
maven-surefire-plugin
2.18.1

false
true

**/*Test.*
**/*Suite.*




org.apache.maven.plugins
maven-shade-plugin
3.0.0


package

shade




*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA





org.apache.spark.WordCount








2.flink开发流程

Flink具有特殊类DataSetDataStream在程序中表示数据。您可以将它们视为可以包含重复项的不可变数据集合。在DataSet数据有限的情况下,对于一个DataStream元素的数量可以是无界的。

这些集合在某些关键方面与常规Java集合不同。首先,它们是不可变的,这意味着一旦创建它们就无法添加或删除元素。你也不能简单地检查里面的元素。

集合最初通过在弗林克程序添加源创建和新的集合从这些通过将它们使用API方法如衍生mapfilter等等。

Flink程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分:

1.获取execution environment,

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2.加载/创建初始化数据

DataStream text = env.readTextFile("file:///path/to/file");

3.指定此数据的转换

val mapped = input.map { x => x.toInt }

4.指定放置计算结果的位置

writeAsText(String path)

print()

5.触发程序执行

在local模式下执行程序

execute()

将程序达成jar运行在线上

./bin/flink run \

-m node21:8081 \

./examples/batch/WordCount.jar \

--input  hdfs:///user/admin/input/wc.txt \

--output  hdfs:///user/admin/output2  \


二. Wordcount案例


1.Scala代码

package com.xyg.streaming
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* Author: Mr.Deng
* Date: 2018/10/15
* Desc:
*/
object SocketWindowWordCount {
def main(args: Array[String]) : Unit = {
// 定义一个数据类型保存单词出现的次数
case class WordWithCount(word: String, count: Long)
// port 表示需要连接的端口
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port

'")
return
}
}
// 获取运行环境
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
// 连接此socket获取输入数据
val text = env.socketTextStream("node21", port, '\n')
//需要加上这一行隐式转换 否则在调用flatmap方法的时候会报错
import org.apache.flink.api.scala._
// 解析数据, 分组, 窗口化, 并且聚合求SUM
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
// 打印输出并设置使用一个并行度
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
}

2.Java代码

package com.xyg.streaming;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Author: Mr.Deng
* Date: 2018/10/15
* Desc: 使用flink对指定窗口内的数据进行实时统计,最终把结果打印出来
* 先在node21机器上执行nc -l 9000
*/
public class StreamingWindowWordCountJava {
public static void main(String[] args) throws Exception {
//定义socket的端口号
int port;
try{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("没有指定port参数,使用默认值9000");
port = 9000;
}
//获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入的数据
DataStreamSource text = env.socketTextStream("node21", port, "\n");
//计算数据
DataStream windowCount = text.flatMap(new FlatMapFunction() {
public void flatMap(String value, Collector out) throws Exception {
String[] splits = value.split("\\s");
for (String word:splits) {
out.collect(new WordWithCount(word,1L));
}
}
})//打平操作,把每行的单词转为类型的数据
//针对相同的word数据进行分组
.keyBy("word")
//指定计算数据的窗口大小和滑动窗口大小
.timeWindow(Time.seconds(2),Time.seconds(1))
.sum("count");
//把数据打印到控制台,使用一个并行度
windowCount.print().setParallelism(1);
//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
env.execute("streaming word count");
}
/**
* 主要为了存储单词以及单词出现的次数
*/
public static class WordWithCount{
public String word;
public long count;
public WordWithCount(){}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}

3.运行测试

首先,使用nc命令启动一个本地监听,命令是:

[admin@node21 ~]$ nc -l 9000

通过netstat命令观察9000端口。 netstat -anlp | grep 9000,启动监听如果报错:-bash: nc: command not found,请先安装nc,在线安装命令:yum -y install nc

然后,IDEA上运行flink官方案例程序

node21上输入

IDEA控制台输出如下


4.集群测试

这里单机测试官方案例

[admin@node21 flink-1.6.1]$ pwd
/opt/flink-1.6.1
[admin@node21 flink-1.6.1]$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node21.
Starting taskexecutor daemon on host node21.
[admin@node21 flink-1.6.1]$ jps
2100 StandaloneSessionClusterEntrypoint
2518 TaskManagerRunner
2584 Jps
[admin@node21 flink-1.6.1]$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

程序连接到套接字并等待输入。您可以检查Web界面以验证作业是否按预期运行:

单词在5秒的时间窗口(处理时间,翻滚窗口)中计算并打印到stdout。监视TaskManager的输出文件并写入一些文本nc(输入在点击后逐行发送到Flink):


三. 使用IDEA开发离线程序

Dataset是flink的常用程序,数据集通过source进行初始化,例如读取文件或者序列化集合,然后通过transformation(filtering、mapping、joining、grouping)将数据集转成,然后通过sink进行存储,既可以写入hdfs这种分布式文件系统,也可以打印控制台,flink可以有很多种运行方式,如local、flink集群、yarn等.


1. scala程序

import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]) {
//初始化环境
val env = ExecutionEnvironment.getExecutionEnvironment
//从字符串中加载数据
val text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?")
//分割字符串、汇总tuple、按照key进行分组、统计分组后word个数
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
//打印
counts.print()
}
}

2. java程序

package com.xyg.batch;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Author: Mr.Deng
* Date: 2018/10/19
* Desc:
*/
public class WordCount {
public static void main(String[] args) throws Exception {
//构建环境
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
//通过字符串构建数据集
DataSet text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
//分割字符串、按照key进行分组、统计相同的key个数
DataSet> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
//打印
wordCounts.print();
}
//分割字符串的方法
public static class LineSplitter implements FlatMapFunction> {
@Override
public void flatMap(String line, Collector> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2(word, 1));
}
}
}
}

3.运行

 


推荐阅读
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 使用nodejs爬取b站番剧数据,计算最佳追番推荐
    本文介绍了如何使用nodejs爬取b站番剧数据,并通过计算得出最佳追番推荐。通过调用相关接口获取番剧数据和评分数据,以及使用相应的算法进行计算。该方法可以帮助用户找到适合自己的番剧进行观看。 ... [详细]
  • 本文介绍了使用kotlin实现动画效果的方法,包括上下移动、放大缩小、旋转等功能。通过代码示例演示了如何使用ObjectAnimator和AnimatorSet来实现动画效果,并提供了实现抖动效果的代码。同时还介绍了如何使用translationY和translationX来实现上下和左右移动的效果。最后还提供了一个anim_small.xml文件的代码示例,可以用来实现放大缩小的效果。 ... [详细]
  • 本文介绍了为什么要使用多进程处理TCP服务端,多进程的好处包括可靠性高和处理大量数据时速度快。然而,多进程不能共享进程空间,因此有一些变量不能共享。文章还提供了使用多进程实现TCP服务端的代码,并对代码进行了详细注释。 ... [详细]
  • 在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板
    本文介绍了在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板的方法和步骤,包括将ResourceDictionary添加到页面中以及在ResourceDictionary中实现模板的构建。通过本文的阅读,读者可以了解到在Xamarin XAML语言中构建控件模板的具体操作步骤和语法形式。 ... [详细]
  • 在Kubernetes上部署JupyterHub的步骤和实验依赖
    本文介绍了在Kubernetes上部署JupyterHub的步骤和实验所需的依赖,包括安装Docker和K8s,使用kubeadm进行安装,以及更新下载的镜像等。 ... [详细]
  • Commit1ced2a7433ea8937a1b260ea65d708f32ca7c95eintroduceda+Clonetraitboundtom ... [详细]
  • eclipse学习(第三章:ssh中的Hibernate)——11.Hibernate的缓存(2级缓存,get和load)
    本文介绍了eclipse学习中的第三章内容,主要讲解了ssh中的Hibernate的缓存,包括2级缓存和get方法、load方法的区别。文章还涉及了项目实践和相关知识点的讲解。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 开发笔记:实验7的文件读写操作
    本文介绍了使用C++的ofstream和ifstream类进行文件读写操作的方法,包括创建文件、写入文件和读取文件的过程。同时还介绍了如何判断文件是否成功打开和关闭文件的方法。通过本文的学习,读者可以了解如何在C++中进行文件读写操作。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • 本文介绍了在Cpp中将字符串形式的数值转换为int或float等数值类型的方法,主要使用了strtol、strtod和strtoul函数。这些函数可以将以null结尾的字符串转换为long int、double或unsigned long类型的数值,且支持任意进制的字符串转换。相比之下,atoi函数只能转换十进制数值且没有错误返回。 ... [详细]
  • python3 nmap函数简介及使用方法
    本文介绍了python3 nmap函数的简介及使用方法,python-nmap是一个使用nmap进行端口扫描的python库,它可以生成nmap扫描报告,并帮助系统管理员进行自动化扫描任务和生成报告。同时,它也支持nmap脚本输出。文章详细介绍了python-nmap的几个py文件的功能和用途,包括__init__.py、nmap.py和test.py。__init__.py主要导入基本信息,nmap.py用于调用nmap的功能进行扫描,test.py用于测试是否可以利用nmap的扫描功能。 ... [详细]
author-avatar
手机用户2502854041
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有