热门标签 | HotTags
当前位置:  开发笔记 > 前端 > 正文

Javalambda表达式实现FlinkWordCount过程解析

这篇文章主要介绍了Javalambda表达式实现FlinkWordCount过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

这篇文章主要介绍了Java lambda表达式实现Flink WordCount过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

本篇我们将使用Java语言来实现Flink的单词统计。

代码开发

环境准备

导入Flink 1.9 pom依赖


    
      org.apache.flink
      flink-java
      1.9.0
    
    
      org.apache.flink
      flink-streaming-java_2.11
      1.9.0
    
    
      org.apache.commons
      commons-lang3
      3.7
    
  

构建Flink流处理环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

自定义source

每秒生成一行文本

DataStreamSource wordLineDS = env.addSource(new RichSourceFunction() {
      private boolean isCanal = false;
      private String[] words = {
          "important oracle jdk license update",
          "the oracle jdk license has changed for releases starting april 16 2019",
          "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
          "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
          "downloading and using this product an faq is available here ",
          "commercial license and support is available with a low cost java se subscription",
          "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
      };

      @Override
      public void run(SourceContext ctx) throws Exception {
        // 每秒发送一行文本
        while (!isCanal) {
          int randomIndex = RandomUtils.nextInt(0, words.length);
          ctx.collect(words[randomIndex]);
          Thread.sleep(1000);
        }
      }

      @Override
      public void cancel() {
        isCanal = true;
      }
    });

单词计算

// 3. 单词统计
    // 3.1 将文本行切分成一个个的单词
    SingleOutputStreamOperator wordsDS = wordLineDS.flatMap((String line, Collector ctx) -> {
      // 切分单词
      Arrays.stream(line.split(" ")).forEach(word -> {
        ctx.collect(word);
      });
    }).returns(Types.STRING);

    //3.2 将单词转换为一个个的元组
    SingleOutputStreamOperator> tupleDS = wordsDS
        .map(word -> Tuple2.of(word, 1))
        .returns(Types.TUPLE(Types.STRING, Types.INT));

    // 3.3 按照单词进行分组
    KeyedStream, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

    // 3.4 对每组单词数量进行累加
    SingleOutputStreamOperator> resultDS = keyedDS
        .timeWindow(Time.seconds(3))
        .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

    resultDS.print();

参考代码

public class WordCount {
  public static void main(String[] args) throws Exception {
    // 1. 构建Flink流式初始化环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 2. 自定义source - 每秒发送一行文本
    DataStreamSource wordLineDS = env.addSource(new RichSourceFunction() {
      private boolean isCanal = false;
      private String[] words = {
          "important oracle jdk license update",
          "the oracle jdk license has changed for releases starting april 16 2019",
          "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
          "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
          "downloading and using this product an faq is available here ",
          "commercial license and support is available with a low cost java se subscription",
          "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
      };

      @Override
      public void run(SourceContext ctx) throws Exception {
        // 每秒发送一行文本
        while (!isCanal) {
          int randomIndex = RandomUtils.nextInt(0, words.length);
          ctx.collect(words[randomIndex]);
          Thread.sleep(1000);
        }
      }

      @Override
      public void cancel() {
        isCanal = true;
      }
    });

    // 3. 单词统计
    // 3.1 将文本行切分成一个个的单词
    SingleOutputStreamOperator wordsDS = wordLineDS.flatMap((String line, Collector ctx) -> {
      // 切分单词
      Arrays.stream(line.split(" ")).forEach(word -> {
        ctx.collect(word);
      });
    }).returns(Types.STRING);

    //3.2 将单词转换为一个个的元组
    SingleOutputStreamOperator> tupleDS = wordsDS
        .map(word -> Tuple2.of(word, 1))
        .returns(Types.TUPLE(Types.STRING, Types.INT));

    // 3.3 按照单词进行分组
    KeyedStream, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

    // 3.4 对每组单词数量进行累加
    SingleOutputStreamOperator> resultDS = keyedDS
        .timeWindow(Time.seconds(3))
        .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

    resultDS.print();

    env.execute("app");
  }
}

Flink对Java Lambda表达式支持情况

Flink支持Java API所有操作符使用Lambda表达式。但是,但Lambda表达式使用Java泛型时,就需要声明类型信息。

我们来看下上述的这段代码:

SingleOutputStreamOperator wordsDS = wordLineDS.flatMap((String line, Collector ctx) -> {
      // 切分单词
      Arrays.stream(line.split(" ")).forEach(word -> {
        ctx.collect(word);
      });
    }).returns(Types.STRING);

之所以这里将所有的类型信息,因为Flink无法正确自动推断出来Collector中带的泛型。我们来看一下FlatMapFuntion的源代码

@Public
@FunctionalInterface
public interface FlatMapFunction extends Function, Serializable {

  /**
  * The core method of the FlatMapFunction. Takes an element from the input data set and transforms
  * it into zero, one, or more elements.
  *
  * @param value The input value.
  * @param out The collector for returning result values.
  *
  * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
  *          to fail and may trigger recovery.
  */
  void flatMap(T value, Collector out) throws Exception;
}

我们发现 flatMap的第二个参数是Collector,是一个带参数的泛型。Java编译器编译该代码时会进行参数类型擦除,所以Java编译器会变成成:

void flatMap(T value, Collector out)

这种情况,Flink将无法自动推断类型信息。如果我们没有显示地提供类型信息,将会出现以下错误:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
  In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
  An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
  Otherwise the type has to be specified explicitly using type information.

这种情况下,必须要显示指定类型信息,否则输出将返回值视为Object类型,这将导致Flink无法正确序列化。

所以,我们需要显示地指定Lambda表达式的参数类型信息,并通过returns方法显示指定输出的类型信息

我们再看一段代码:

SingleOutputStreamOperator> tupleDS = wordsDS
        .map(word -> Tuple2.of(word, 1))
        .returns(Types.TUPLE(Types.STRING, Types.INT));

为什么map后面也需要指定类型呢?

因为此处map返回的是Tuple2类型,Tuple2是带有泛型参数,在编译的时候同样会被查出泛型参数信息,导致Flink无法正确推断。

更多关于对Java Lambda表达式的支持请参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


推荐阅读
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 这是原文链接:sendingformdata许多情况下,我们使用表单发送数据到服务器。服务器处理数据并返回响应给用户。这看起来很简单,但是 ... [详细]
  • 本文介绍了一些Java开发项目管理工具及其配置教程,包括团队协同工具worktil,版本管理工具GitLab,自动化构建工具Jenkins,项目管理工具Maven和Maven私服Nexus,以及Mybatis的安装和代码自动生成工具。提供了相关链接供读者参考。 ... [详细]
  • EPICS Archiver Appliance存储waveform记录的尝试及资源需求分析
    本文介绍了EPICS Archiver Appliance存储waveform记录的尝试过程,并分析了其所需的资源容量。通过解决错误提示和调整内存大小,成功存储了波形数据。然后,讨论了储存环逐束团信号的意义,以及通过记录多圈的束团信号进行参数分析的可能性。波形数据的存储需求巨大,每天需要近250G,一年需要90T。然而,储存环逐束团信号具有重要意义,可以揭示出每个束团的纵向振荡频率和模式。 ... [详细]
  • 目录实现效果:实现环境实现方法一:基本思路主要代码JavaScript代码总结方法二主要代码总结方法三基本思路主要代码JavaScriptHTML总结实 ... [详细]
  • 本文讨论了Alink回归预测的不完善问题,指出目前主要针对Python做案例,对其他语言支持不足。同时介绍了pom.xml文件的基本结构和使用方法,以及Maven的相关知识。最后,对Alink回归预测的未来发展提出了期待。 ... [详细]
  • 本文介绍了九度OnlineJudge中的1002题目“Grading”的解决方法。该题目要求设计一个公平的评分过程,将每个考题分配给3个独立的专家,如果他们的评分不一致,则需要请一位裁判做出最终决定。文章详细描述了评分规则,并给出了解决该问题的程序。 ... [详细]
  • 原文地址:https:www.cnblogs.combaoyipSpringBoot_YML.html1.在springboot中,有两种配置文件,一种 ... [详细]
  • 本文介绍了Svn和Maven的使用说明,包括版本控制和构建工具的功能和优势。同时提供了一个相关链接,链接中详细介绍了SvnMaven的使用方法和注意事项。通过学习和使用SvnMaven,开发人员可以更好地进行代码管理、软件开发和协作开发,提高项目管理的效率和质量。 ... [详细]
  • 本文介绍了在Mac上搭建php环境后无法使用localhost连接mysql的问题,并通过将localhost替换为127.0.0.1或本机IP解决了该问题。文章解释了localhost和127.0.0.1的区别,指出了使用socket方式连接导致连接失败的原因。此外,还提供了相关链接供读者深入了解。 ... [详细]
  • 本文介绍了在Windows环境下如何配置php+apache环境,包括下载php7和apache2.4、安装vc2015运行时环境、启动php7和apache2.4等步骤。希望对需要搭建php7环境的读者有一定的参考价值。摘要长度为169字。 ... [详细]
  • 本文由编程笔记#小编为大家整理,主要介绍了StartingzookeeperFAILEDTOSTART相关的知识,希望对你有一定的参考价值。下载路径:https://ar ... [详细]
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • Java如何导入和导出Excel文件的方法和步骤详解
    本文详细介绍了在SpringBoot中使用Java导入和导出Excel文件的方法和步骤,包括添加操作Excel的依赖、自定义注解等。文章还提供了示例代码,并将代码上传至GitHub供访问。 ... [详细]
  • Apache Shiro 身份验证绕过漏洞 (CVE202011989) 详细解析及防范措施
    本文详细解析了Apache Shiro 身份验证绕过漏洞 (CVE202011989) 的原理和影响,并提供了相应的防范措施。Apache Shiro 是一个强大且易用的Java安全框架,常用于执行身份验证、授权、密码和会话管理。在Apache Shiro 1.5.3之前的版本中,与Spring控制器一起使用时,存在特制请求可能导致身份验证绕过的漏洞。本文还介绍了该漏洞的具体细节,并给出了防范该漏洞的建议措施。 ... [详细]
author-avatar
liuluoyu
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有