热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FlinkBroadcast广播变量应用案例实战Flink牛刀小试

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何问题,可随时联系。

1.1 Broadcast 真假曹操

  • DataStreaming Broadcast (元素广播):元素广播,重复处理

    • 把元素广播给所有的分区,数据会被重复处理,类似于storm中的allGrouping
    • 使用技巧:dataStream.broadcast()
  • Flink Broadcast(广播变量)

    • 广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。 另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。

    • 一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存 在一份。

    • 如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。

    • 用法如下:

      1:初始化数据DataSet toBroadcast = env.fromElements(1, 2, 3)2:广播数据.withBroadcastSet(toBroadcast, "broadcastSetName");3:获取数据Collection broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");注意:1:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束2:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。

2 元素广播案例实战

2.1 实现元素的重复广播,设置source的并行度为1

public class StreamingDemoWithMyNoPralalleSourceBroadcast {public static void main(String[] args) throws Exception {//获取Flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);//获取数据源DataStreamSource text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1DataStream num = text.broadcast().map(new MapFunction() {@Overridepublic Long map(Long value) throws Exception {long id = Thread.currentThread().getId();System.out.println("线程id:"+id+",接收到数据:" + value);return value;}});//每2秒钟处理一次数据DataStream sum = num.timeWindowAll(Time.seconds(2)).sum(0);//打印结果sum.print().setParallelism(1);String jobName = StreamingDemoWithMyNoPralalleSourceBroadcast.class.getSimpleName();env.execute(jobName);}
}

2.2 自定义接收器MyNoParalleSource

public class MyNoParalleSource implements SourceFunction{private long count = 1L;private boolean isRunning = true;/*** 主要的方法* 启动一个source* 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了** @param ctx* @throws Exception*/@Overridepublic void run(SourceContext ctx) throws Exception {while(isRunning){ctx.collect(count);count++;//每秒产生一条数据Thread.sleep(1000);}}/*** 取消一个cancel的时候会调用的方法**/@Overridepublic void cancel() {isRunning = false;}
}

2.3 结果展示

发现整个Map元素别处理了4次:

线程id:44,接收到数据:1
线程id:46,接收到数据:1
线程id:42,接收到数据:1
线程id:48,接收到数据:1
4

3 广播变量

3.1 第一步:封装DataSet,调用withBroadcastSet。

3.2 第二步:getRuntimeContext().getBroadcastVariable,获得广播变量

3.3 第三步:RichMapFunction中执行获得广播变量的逻辑

public class BatchDemoBroadcast {public static void main(String[] args) throws Exception{//获取运行环境ExecutionEnvironment env &#61; ExecutionEnvironment.getExecutionEnvironment();//1&#xff1a;准备需要广播的数据ArrayList> broadData &#61; new ArrayList<>();broadData.add(new Tuple2<>("zs",18));broadData.add(new Tuple2<>("ls",20));broadData.add(new Tuple2<>("ww",17));DataSet> tupleData &#61; env.fromCollection(broadData);//1.1:处理需要广播的数据,把数据集转换成map类型&#xff0c;map中的key就是用户姓名&#xff0c;value就是用户年龄DataSet> toBroadcast &#61; tupleData.map(new MapFunction, HashMap>() {&#64;Overridepublic HashMap map(Tuple2 value) throws Exception {HashMap res &#61; new HashMap<>();res.put(value.f0, value.f1);return res;}});//源数据DataSource data &#61; env.fromElements("zs", "ls", "ww");//注意&#xff1a;在这里需要使用到RichMapFunction获取广播变量DataSet result &#61; data.map(new RichMapFunction() {List> broadCastMap &#61; new ArrayList>();HashMap allMap &#61; new HashMap();/*** 这个方法只会执行一次* 可以在这里实现一些初始化的功能** 所以&#xff0c;就可以在open方法中获取广播变量数据**/&#64;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//3:获取广播数据this.broadCastMap &#61; getRuntimeContext().getBroadcastVariable("broadCastMapName");for (HashMap map : broadCastMap) {allMap.putAll(map);}}&#64;Overridepublic String map(String value) throws Exception {Integer age &#61; allMap.get(value);return value &#43; "," &#43; age;}}).withBroadcastSet(toBroadcast, "broadCastMapName");//2&#xff1a;执行广播数据的操作result.print();}
}

3.4 结果展示

zs,18ls,20ww,17

总结

简单成文&#xff0c;方便Flink整体体系构成&#xff0c;感谢Github FLink 源码作者&#xff0c;让我学到很多东西。辛苦成文&#xff0c;各自珍惜&#xff0c;谢谢&#xff01;

版权声明&#xff1a;本套技术专栏是作者&#xff08;秦凯新&#xff09;平时工作的总结和升华&#xff0c;通过从真实商业环境抽取案例进行总结和分享&#xff0c;并给出商业应用的调优建议和集群环境容量规划等内容&#xff0c;请持续关注本套博客。版权声明&#xff1a;禁止转载&#xff0c;欢迎学习。QQ邮箱地址&#xff1a;1120746959&#64;qq.com&#xff0c;如有任何问题&#xff0c;可随时联系。

秦凯新 于深圳 20181608



推荐阅读
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文介绍了在MFC下利用C++和MFC的特性动态创建窗口的方法,包括继承现有的MFC类并加以改造、插入工具栏和状态栏对象的声明等。同时还提到了窗口销毁的处理方法。本文详细介绍了实现方法并给出了相关注意事项。 ... [详细]
  • 本文介绍了pack布局管理器在Perl/Tk中的使用方法及注意事项。通过调用pack()方法,可以控制部件在显示窗口中的位置和大小。同时,本文还提到了在使用pack布局管理器时,应注意将部件分组以便在水平和垂直方向上进行堆放。此外,还介绍了使用Frame部件或Toplevel部件来组织部件在窗口内的方法。最后,本文强调了在使用pack布局管理器时,应避免在中间切换到grid布局管理器,以免造成混乱。 ... [详细]
  • 微软头条实习生分享深度学习自学指南
    本文介绍了一位微软头条实习生自学深度学习的经验分享,包括学习资源推荐、重要基础知识的学习要点等。作者强调了学好Python和数学基础的重要性,并提供了一些建议。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • FeatureRequestIsyourfeaturerequestrelatedtoaproblem?Please ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • This article discusses the efficiency of using char str[] and char *str and whether there is any reason to prefer one over the other. It explains the difference between the two and provides an example to illustrate their usage. ... [详细]
author-avatar
埼埼popo_514
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有