热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

《Flink应用实战》(一)广播状态

目录一、基本概念1、什么是状态2、状态的分类3、什么情况下需要保存状态3、为什么要广播状态3、广播状态的应用场景二、广播状态的设计实践1、数据分流2、补全用户信息

目录

一、基本概念

1、什么是状态

2、状态的分类

3、什么情况下需要保存状态

3、为什么要广播状态

3、广播状态的应用场景

二、广播状态的设计实践

1、数据分流

2、补全用户信息

3、评论过滤

4、用户消费优惠券奖励机制

三、广播状态的开发应用

(1)数据分流开发实践参考Flink动态分流到kafka,hbase_阿飞不会飞丶的博客-CSDN博客_flink hbase phoenix

(2)补全用户信息开发实践(已测试通过)

(3)评论过滤开发实践(已测试通过)

(4)用户消费优惠券奖励机制开发实践(已测试通过)



一、基本概念


1、什么是状态

在Flink中,状态State是指一个具体的task(任务)/operator(算子)的状态。state数据默认是保存在java的堆内存中。

与checkpoint的区别在于checkpoint表示一个FlinkJob在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态。可以理解为checkpoint是把state数据持久化存储了。

2、状态的分类

在Flink中,状态始终与特定算子相关联。有两种类型的状态:算子状态(operator state)和键控状态(keyed state)

(1)算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。

(2)键控状态是根据输入数据流中定义的键(Key)来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个键对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的键。因此,具有相同键的所有数据都会访问相同的状态。

(3)键控状态类似于一个分布式的键值对映射数据结构,只能用于KeyedStream(keyBy算子处理之后)

3、什么情况下需要保存状态

流计算分为无状态有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果,例如,流处理应用程序从传感器接收水库的水位数据,并在水位超过指定高度时发出警告。

有状态的计算则会基于多个事件输出结果,比如:

(1)所有类型的窗口计算。例如,计算过去一小时的平均水位,就是有状态的计算。

(2)所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差20cm以上的水位差读数,则发出警告,就是有状态的计算。

(3)流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是需要有状态的计算。

3、为什么要广播状态

状态只限定在同一任务的算子中,状态不能由相同或不同算子的另一个任务访问。如何其它的任务数据流需要使用另一任务数据流中的状态,则需要将数据流中的状态广播出去,其它任务数据流才能访问。

3、广播状态的应用场景

(1)将低吞吐量流中的数据以状态的方式广播给高吞吐量流使用。

(2)广播流通常作为主数据流的动态计算规则,主数据流需要借助这些规则 进行计算和分流。

(3)常见业务场景:

  1. 关键字、敏感信息过滤。设置过滤规则和关键字,对主数据流进行筛选。

  2. 数据分流。设置不同数据的分流条件和相应的配置信息,对主数据流进行自动分流存储。


二、广播状态的设计实践


1、数据分流

(1)需求分析

主数据流根据配置规则确定数据的存储方式,比如在数据仓库,维表数据量小,不常更新,多用于数据关联筛选查询,而考虑存储到HBase,Redis,MySQL等。而事实表,数据量大,高并发读写,考虑分类存到kafka中,进行进一步的处理,最终形成宽表。


(2)Flink数据流处理流程


  1. mysqlSource为mysql数据表,动态的配置来源表、操作类型、输出表、输出字段等。

  2. 配置数据流通过broadcast声明为广播数据流,需指定广播状态管理器,结构类似于这样:

  3. 通过connect 连接主数据流和广播流

  4. process算子对流中的每个元素调用BroadcastProcessFunction进行处理

  5. 自定义BroadcastProcessFunction方法中,需实现processBroadcastElement 处理广播过来的数据,并将数据写入状态中,processElement 从上下文中获取状态数据,并对主数据流进行处理,最后返回处理后的DataStream。


2、补全用户信息

(1)需求分析

  1. 用户点击事件数据流中,包含用户ID,事件时间,点击事件类型等,但缺少用户的姓名,年龄等基础信息。

  2. 用户注册后基础信息存储到MYSQL。

  3. 在数据流中需要实时补全点击事件中用户的基础信息。

(2)数据结构

  1. 用于广播流的mapState状态管理器

    MapStateDescriptor>>

    点击事件流无key值分组,故mapState状态也常用无key或自定义默认key值设计,这里使用void空值类占位符作为key。

    value值自定义map结构:map("userid" => ("姓名","年龄") ),通过useridkey值匹配对应的用户基础信息。

    1)processBroadcastElement方法中向mapState中写入数据

    broadcastState.put(null,value); //value 规则输入流

    2)processElement方法中获取mapState数据

    String userId = value.f0; // value 点击事件输入流
    //找到对应userid的数据
    Map> map = broadcastState.get(null);
    Tuple2 userInfo = map.get(userId);

  2. 最终输出流

    SingleOutputStreamOperator>

    返回一个包含事件数据和用户数据的Tuple结构


3、评论过滤

(1)需求分析

  1. 实时过滤评论中的敏感信息。

  2. 过滤评论规则支持动态的添加和删除。

(2)数据结构

  1. 用于广播流的mapState状态管理器

    MapStateDescriptor[String, List[String]]

    评论数据流无key分组,mapState的key值可选择自定义一固定值。

    因规则可以多个,则支持动态添加和删除,采用List结构存储多个关键字。

    1)processBroadcastElement方法中向mapState中写入数据

    val ruleList: List[String] = broadcastRule.get("keyword")
    val ruleList2 = ruleList :+ in2._1; //in2 规则输入流
    broadcastRule.put("keyword", ruleList2)

    2)processElement方法中获取mapState数据

    val rule: ReadOnlyBroadcastState[String, List[String]] = readOnlyContext.getBroadcastState(ruleMS)
    val ruleList: List[String] = rule.get("keyword")

  2. 最终输出过滤后评论流

    val result: String = ruleList.fold(in1)((result, elem) => result.replace(elem, "**")) //in1是评论输入流
    out.collect(result)


4、用户消费优惠券奖励机制

(1)需求分析

  1. 用户消费的不同类别会有对应的奖励机制,比如消费满200减20。

  2. 用户消费在没达到奖励条件之前,消费金额累加,达到条件后,发放优惠券并清除消费累计,重新累计。

  3. 同一个类别,可能存在多种奖励机制。

  4. 如果没有达到最低奖励条件,给出鼓励提示。

  5. 支持动态添加和删除奖励条件。

(2)数据结构

  1. 数据流分组

    由于涉及用户不同类别下消费金额的累计,对消费主数据流进行分组

    .keyBy("userID", "categoryName")

    数据流被分组后,类似于这样:(每一个key数据流对应一个mapState状态管理器)

  2. 用于广播流的mapState状态管理器

    MapStateDescriptor[String, List[CouponInfo]]

    规则流每个类别,可能存在多个奖励条件,存储格式:map("categoryName" => List[CouponInfo])

    广播流处理采用带keyed的广播处理函数:KeyedBroadcastProcessFunction

    1)processBroadcastElement方法中向mapState中写入数据

    val couponInfoList: List[CouponInfo] = broadcastState.get(value._2.categoryName)
    val couponInfoList2: List[CouponInfo] = couponInfoList :+ value._2broadcastState.put(value._2.categoryName, couponInfoList2) //value 规则输入流

    2)processElement方法中获取mapState数据

    val couponInfoList: List[CouponInfo] = broadcastState.get(value.categoryName)
    //先降序排列优惠券的金额
    val couponInfoList2: List[CouponInfo] = couponInfoList.sortBy(v => v.orderMoney).reverse
    //获取状态中的金额
    var count = 0
    val totalMoney: Double = allmoney.get()
    Breaks.breakable(for (couponInfo <- couponInfoList2) {if (totalMoney >&#61; couponInfo.orderMoney) {out.collect("在" &#43; value.categoryName &#43; "分类下&#xff0c;订单总金额已经达到了" &#43; couponInfo.orderMoney &#43; "&#xff0c;发放了一个价值" &#43; couponInfo.couponMoney &#43; "的优惠券。请到优惠券频道查看并使用")allmoney.clear()Breaks.break()}count &#43;&#61; 1}
    )

  3. 用于累计用户的金额状态管理器ReducingState

    方法说明
    ReducingState这种状态通过用户传入的reduceFunction&#xff0c;每次调用add方法添加值的时候&#xff0c;会调用reduceFunction&#xff0c;最后合并到一个单一的状态值

    var allmoney: ReducingState[Double] &#61; _override def open(parameters: Configuration): Unit &#61; {var reduceFunction: ReduceFunction[Double] &#61; new ReduceFunction[Double] {override def reduce(value1: Double, value2: Double): Double &#61; value1 &#43; value2}val reducingStateDescriptor: ReducingStateDescriptor[Double] &#61; new ReducingStateDescriptor[Double]("rsd", reduceFunction, createTypeInformation[Double])allmoney &#61; getRuntimeContext.getReducingState(reducingStateDescriptor)
    }//2.累计金额
    allmoney.add(value.money)


三、广播状态的开发应用


&#xff08;1&#xff09;数据分流开发实践参考Flink动态分流到kafka&#xff0c;hbase_阿飞不会飞丶的博客-CSDN博客_flink hbase phoenix


&#xff08;2&#xff09;补全用户信息开发实践&#xff08;已测试通过&#xff09;


import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;/*** Desc* 需求:* 使用Flink的BroadcastState来完成* 事件流和配置流(需要广播为State)的关联,并实现配置的动态更新!*/
public class UserInfo {public static void main(String[] args) throws Exception {//1,获取环境StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();//2&#xff0c;读取数据源//数据源1的格式&#xff1a;//{"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}//{"userID": "user_2", "eventTime": "2019-08-17 12:19:48", "eventType": "click", "productID": 1}DataStreamSource> eventDS &#61; env.addSource(new MySource());//数据源2的格式&#xff1a;在mysql中/**DROP TABLE IF EXISTS &#96;user_info&#96;;CREATE TABLE &#96;user_info&#96; (&#96;userID&#96; varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,&#96;userName&#96; varchar(10) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,&#96;userAge&#96; int(11) NULL DEFAULT NULL,PRIMARY KEY (&#96;userID&#96;) USING BTREE) ENGINE &#61; MyISAM CHARACTER SET &#61; utf8 COLLATE &#61; utf8_general_ci ROW_FORMAT &#61; Dynamic;-- ------------------------------ Records of user_info-- ----------------------------INSERT INTO &#96;user_info&#96; VALUES (&#39;user_1&#39;, &#39;张三&#39;, 10);INSERT INTO &#96;user_info&#96; VALUES (&#39;user_2&#39;, &#39;李四&#39;, 20);INSERT INTO &#96;user_info&#96; VALUES (&#39;user_3&#39;, &#39;王五&#39;, 30);INSERT INTO &#96;user_info&#96; VALUES (&#39;user_4&#39;, &#39;赵六&#39;, 40);SET FOREIGN_KEY_CHECKS &#61; 1;*/DataStreamSource>> configDS &#61; env.addSource(new MySQLSource());//3&#xff0c;数据转换//3.1 广播配置表流MapStateDescriptor>> mapStateDescriptor &#61; new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));BroadcastStream>> broadcastDS &#61; configDS.broadcast(mapStateDescriptor);//3.2 连接广播流BroadcastConnectedStream, Map>> connectDS &#61; eventDS.connect(broadcastDS);//3.3 补全主流用户信息SingleOutputStreamOperator> result &#61; connectDS.process(new MyProcessFunction(mapStateDescriptor));//4,执行result.print();env.execute();}private static class MySource implements SourceFunction> {private boolean isRunning &#61; true;&#64;Overridepublic void run(SourceContext> ctx) throws Exception {Random random &#61; new Random();SimpleDateFormat df &#61; new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");while(isRunning){int id &#61; random.nextInt(4) &#43; 1;String userId &#61; "user_" &#43; id;String eventTime &#61; df.format(new Date());String eventType &#61; "type_" &#43; random.nextInt(3);int productId &#61; random.nextInt(4);ctx.collect(Tuple4.of(userId,eventTime,eventType,productId));Thread.sleep(500);}}&#64;Overridepublic void cancel() {isRunning &#61; false;}}private static class MySQLSource extends RichSourceFunction>> {private Connection conn &#61; null;private PreparedStatement ps &#61; null;private boolean flag &#61; true;private ResultSet rs &#61; null;&#64;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//连接数据库Class.forName("com.mysql.cj.jdbc.Driver");conn &#61; DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/test?useUnicode&#61;true&characterEncoding&#61;utf8&serverTimezone&#61;UTC", "root", "123456");String sql &#61; "select &#96;userID&#96;, &#96;userName&#96;, &#96;userAge&#96; from &#96;user_info&#96;";ps &#61; conn.prepareStatement(sql);}&#64;Overridepublic void run(SourceContext>> ctx) throws Exception {while (flag) {Map> map &#61; new HashMap<>();rs &#61; ps.executeQuery();while (rs.next()) {String userID &#61; rs.getString("userID");String userName &#61; rs.getString("userName");int userAge &#61; rs.getInt("userAge");map.put(userID, Tuple2.of(userName, userAge));}ctx.collect(map);Thread.sleep(5000);//每隔5s更新一下用户的配置信息!}}&#64;Overridepublic void cancel() {flag &#61; false;}&#64;Overridepublic void close() throws Exception {if(conn !&#61; null) conn.close();if(ps !&#61; null) ps.close();if(rs !&#61; null) rs.close();}}private static class MyProcessFunction extends BroadcastProcessFunction,Map>, Tuple6> {MapStateDescriptor>> mapStateDescriptor;public MyProcessFunction() {}public MyProcessFunction( MapStateDescriptor>> mapStateDescriptor) {this.mapStateDescriptor &#61; mapStateDescriptor;}&#64;Overridepublic void processElement(Tuple4 value, ReadOnlyContext ctx, Collector> out) throws Exception {String userId &#61; value.f0;ReadOnlyBroadcastState>> broadcastState &#61; ctx.getBroadcastState(mapStateDescriptor);if(broadcastState !&#61; null){Map> map &#61; broadcastState.get(null);if(map !&#61; null) {Tuple2 userInfo &#61; map.get(userId);String userName &#61; userInfo.f0;Integer userAge &#61; userInfo.f1;out.collect(Tuple6.of(userId, value.f1, value.f2, value.f3, userName, userAge));}}}&#64;Overridepublic void processBroadcastElement(Map> value, Context ctx, Collector> out) throws Exception {BroadcastState>> broadcastState &#61; ctx.getBroadcastState(mapStateDescriptor);broadcastState.clear();broadcastState.put(null,value);}}
}

&#xff08;3&#xff09;评论过滤开发实践&#xff08;已测试通过&#xff09;


import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState}
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.scala.{BroadcastConnectedStream, DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector/*** 评论数据过滤*/
object PingLun {def main(args: Array[String]): Unit &#61; {//1、获取环境val env &#61; StreamExecutionEnvironment.getExecutionEnvironment//2、读取数据//2.1 评论主流val plDS: DataStream[String] &#61; env.socketTextStream("127.0.0.1", 9999)//2.2 规则流val ruleDS &#61; env.socketTextStream("127.0.0.1", 9998)//3、数据转换//3.1.1 将规则流中输入进来的数据进行映射成一个元祖&#xff0c;第一个是word关键字&#xff0c;第二个是标记操作类型&#xff0c;是增加还是删除规则val ruleTupleDS: DataStream[(String, String)] &#61; ruleDS.map(_.split(" ")).map(v &#61;> (v(0), v(1)))//3.1 广播规则流val ruleMS: MapStateDescriptor[String, List[String]] &#61; new MapStateDescriptor[String, List[String]]("rule", createTypeInformation[String], createTypeInformation[List[String]])val broadcastDS: BroadcastStream[(String, String)] &#61; ruleTupleDS.broadcast(ruleMS)//3.2 连接数据流val connectDS: BroadcastConnectedStream[String, (String, String)] &#61; plDS.connect(broadcastDS)//3.3 数据流处理val result: DataStream[String] &#61; connectDS.process(new MyProcessFunction(ruleMS))//4、执行result.print()env.execute("PingLun")}class MyProcessFunction(ruleMS: MapStateDescriptor[String, List[String]]) extends BroadcastProcessFunction[String, (String, String), String] {override def processElement(in1: String, readOnlyContext: BroadcastProcessFunction[String, (String, String), String]#ReadOnlyContext, out: Collector[String]): Unit &#61; {val rule: ReadOnlyBroadcastState[String, List[String]] &#61; readOnlyContext.getBroadcastState(ruleMS)if (rule.contains("keyword")) {val ruleList: List[String] &#61; rule.get("keyword")val result: String &#61; ruleList.fold(in1)((result, elem) &#61;> result.replace(elem, "**"))out.collect(result)} else {out.collect(in1)}}override def processBroadcastElement(in2: (String, String), context: BroadcastProcessFunction[String, (String, String), String]#Context, out: Collector[String]): Unit &#61; {val broadcastRule: BroadcastState[String, List[String]] &#61; context.getBroadcastState(ruleMS)if (broadcastRule.contains("keyword")) {val ruleList: List[String] &#61; broadcastRule.get("keyword")if ("&#43;".equals(in2._2)) {val ruleList2 &#61; ruleList :&#43; in2._1broadcastRule.put("keyword", ruleList2)} else if ("-".equals(in2._2)) {val ruleList2: List[String] &#61; ruleList.filter(!_.equals(in2._1))broadcastRule.put("keyword", ruleList2)}} else {if ("&#43;".equals(in2._2)) {broadcastRule.put("keyword", List(in2._1))}}}}}

&#xff08;4&#xff09;用户消费优惠券奖励机制开发实践&#xff08;已测试通过&#xff09;

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState, ReducingState, ReducingStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorimport scala.util.control.Breaks/**** 1、不同类别会有对应的奖励机制&#xff0c;需要把这个奖励机制广播给用户消费对应的流* 2、用户的消费应该是一个高吞吐量流* 3、通过用户消费流连接奖励机制流&#xff0c;然后通过process处理* 4、用户消费流应该根据用户标记以以及类别分组&#61;&#61;&#61;》流是KeyedStream* 5、ProcessFunction应该选中KeyedBroadcastProcessFunction* 6、在KeyedBroadcastProcessFunction中完成奖励机制以及用户消费统计、分析、处理**//*** 订单详情类** &#64;param userID 用户编号* &#64;param categoryName 商品类别* &#64;param money 订单金额* user1 pants 500* user1 pants 600* user2 pants 500* user2 pants 100*/
case class OrderItem(userID: String, categoryName: String, money: Double)/*** 优惠券类** &#64;param categoryName 商品类别* &#64;param orderMoney 订单金额* &#64;param couponMoney 优惠券金额* pants 1000 200 &#43;* pants 900 100 &#43;*/
case class CouponInfo(categoryName: String, orderMoney: Double, couponMoney: Double)object CouponAward {def main(args: Array[String]): Unit &#61; {//1&#xff0c;获取环境val env &#61; StreamExecutionEnvironment.getExecutionEnvironment//2&#xff0c;获取数据源//2.1 用户购买数据val userDS: DataStream[String] &#61; env.socketTextStream("127.0.0.1", 9999)//2.1.1 转成实例对象&#xff0c;并分组val userKeyedStream: KeyedStream[OrderItem, Tuple] &#61; userDS.map(_.split(" ")).map(array &#61;> new OrderItem(array(0), array(1), array(2).toDouble)).keyBy("userID", "categoryName")//2.2 规则流val couponDS &#61; env.socketTextStream("127.0.0.1", 9998)//2.2.1 转成实例对象//支持动态增加或删除优惠券规则val couponInfoDS: DataStream[(String, CouponInfo)] &#61; couponDS.map(_.split(" ")).map(array &#61;> (array(3), new CouponInfo(array(0), array(1).toDouble, array(2).toDouble)))//3&#xff0c;数据转换//3.1 声明规则流广播val mapStateDescriptor: MapStateDescriptor[String, List[CouponInfo]] &#61; new MapStateDescriptor[String, List[CouponInfo]]("msz", createTypeInformation[String], createTypeInformation[List[CouponInfo]])val couponInfoBCS: BroadcastStream[(String, CouponInfo)] &#61; couponInfoDS.broadcast(mapStateDescriptor)//3.2 连接广播流val connectStream: BroadcastConnectedStream[OrderItem, (String, CouponInfo)] &#61; userKeyedStream.connect(couponInfoBCS)//3.3 执行proess数据处理val result: DataStream[String] &#61; connectStream.process(new MykeyedBroadcastFunction(mapStateDescriptor))//4&#xff0c;执行result.print()env.execute("CouponAward")}class MykeyedBroadcastFunction(mapStateDescriptor: MapStateDescriptor[String, List[CouponInfo]]) extends KeyedBroadcastProcessFunction[Tuple, OrderItem, (String, CouponInfo), String] {//需要一状态来存储对应用户的总金额&#xff0c;金额作累加&#xff0c;达到领优惠券之后输出并金额清除var allmoney: ReducingState[Double] &#61; _override def open(parameters: Configuration): Unit &#61; {var reduceFunction: ReduceFunction[Double] &#61; new ReduceFunction[Double] {override def reduce(value1: Double, value2: Double): Double &#61; value1 &#43; value2}val reducingStateDescriptor: ReducingStateDescriptor[Double] &#61; new ReducingStateDescriptor[Double]("rsd", reduceFunction, createTypeInformation[Double])allmoney &#61; getRuntimeContext.getReducingState(reducingStateDescriptor)}override def processElement(value: OrderItem, readOnlyContext: KeyedBroadcastProcessFunction[Tuple, OrderItem, (String, CouponInfo), String]#ReadOnlyContext, out: Collector[String]): Unit &#61; {//1.从上下文拿到广播状态val broadcastState: ReadOnlyBroadcastState[String, List[CouponInfo]] &#61; readOnlyContext.getBroadcastState(mapStateDescriptor)//2.累计金额allmoney.add(value.money)//3.判断该商品类别有没优惠券&#xff0c;有的话就发放优惠券并清空累计金额&#xff0c;没有就告诉他别灰心if (broadcastState.contains(value.categoryName)) {val couponInfoList: List[CouponInfo] &#61; broadcastState.get(value.categoryName)//先降序排列优惠券的金额val couponInfoList2: List[CouponInfo] &#61; couponInfoList.sortBy(v &#61;> v.orderMoney).reverse//获取状态中的金额var count &#61; 0val totalMoney: Double &#61; allmoney.get()Breaks.breakable(for (couponInfo <- couponInfoList2) {if (totalMoney >&#61; couponInfo.orderMoney) {out.collect("在" &#43; value.categoryName &#43; "分类下&#xff0c;订单总金额已经达到了" &#43; couponInfo.orderMoney &#43; "&#xff0c;发放了一个价值" &#43; couponInfo.couponMoney &#43; "的优惠券。请到优惠券频道查看并使用")allmoney.clear()Breaks.break()}count &#43;&#61; 1})//没有达到任务的优惠if(count &#61;&#61; couponInfoList2.size){//取要求最小的一个&#xff0c;提示val couponInfo: CouponInfo &#61; couponInfoList2(couponInfoList2.size - 1)out.collect("在"&#43;value.categoryName&#43;"分类下&#xff0c;还需要再消费"&#43;(couponInfo.orderMoney - totalMoney)&#43;"就可以获取到价值"&#43;couponInfo.couponMoney&#43;"的优惠券")}}else{//否则就说明这个类别还没有优惠券out.collect("在"&#43;value.categoryName&#43;"分类下&#xff0c;还没有设置优惠券信息&#xff0c;不过没有关系&#xff0c;现在的消费金额也会参与到后续的优惠券发放统计内")}}override def processBroadcastElement(value: (String, CouponInfo), ctx: KeyedBroadcastProcessFunction[Tuple, OrderItem, (String, CouponInfo), String]#Context, out: Collector[String]): Unit &#61; {val broadcastState: BroadcastState[String, List[CouponInfo]] &#61; ctx.getBroadcastState(mapStateDescriptor)//判断状态管理器中是否已存在相应的规则if (broadcastState.contains(value._2.categoryName)) {//根据操作规则是增加还是删除作处理val couponInfoList: List[CouponInfo] &#61; broadcastState.get(value._2.categoryName)if ("&#43;".equals(value._1)) {val couponInfoList2: List[CouponInfo] &#61; couponInfoList :&#43; value._2broadcastState.put(value._2.categoryName, couponInfoList2)} else if ("-".equals(value._1)) {val couponInfoList2: List[CouponInfo] &#61; couponInfoList.filter(v &#61;> v.orderMoney !&#61; value._2.orderMoney)broadcastState.put(value._2.categoryName, couponInfoList2)}} else {if ("&#43;".equals(value._1)) {broadcastState.put(value._2.categoryName, List(value._2))}}}}}

参考资料&#xff1a;

《Flink基础编程》-- 林子雨

Flink动态分流到kafka&#xff0c;hbase_阿飞不会飞丶的博客-CSDN博客_flink hbase phoenix

Flink&#xff08;51&#xff09;&#xff1a;Flink高级特性之广播状态&#xff08;BroadcastState&#xff09;_电光闪烁的博客-CSDN博客_flink 广播状态

Flink(八) 广播状态模式 Broadcast State Pattern_小雨光的博客-CSDN博客 &#xff08;原博代码有BUG&#xff0c;已本地测式通过&#xff09;


推荐阅读
  • 本文整理了Java中org.apache.hadoop.hbase.client.Increment.getDurability()方法的一些代码示例,展示了 ... [详细]
  • 本文整理了Java中java.lang.NoSuchMethodError.getMessage()方法的一些代码示例,展示了NoSuchMethodErr ... [详细]
  • 目录实现效果:实现环境实现方法一:基本思路主要代码JavaScript代码总结方法二主要代码总结方法三基本思路主要代码JavaScriptHTML总结实 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文探讨了C语言中指针的应用与价值,指针在C语言中具有灵活性和可变性,通过指针可以操作系统内存和控制外部I/O端口。文章介绍了指针变量和指针的指向变量的含义和用法,以及判断变量数据类型和指向变量或成员变量的类型的方法。还讨论了指针访问数组元素和下标法数组元素的等价关系,以及指针作为函数参数可以改变主调函数变量的值的特点。此外,文章还提到了指针在动态存储分配、链表创建和相关操作中的应用,以及类成员指针与外部变量的区分方法。通过本文的阐述,读者可以更好地理解和应用C语言中的指针。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 本文介绍了如何使用Express App提供静态文件,同时提到了一些不需要使用的文件,如package.json和/.ssh/known_hosts,并解释了为什么app.get('*')无法捕获所有请求以及为什么app.use(express.static(__dirname))可能会提供不需要的文件。 ... [详细]
  • Week04面向对象设计与继承学习总结及作业要求
    本文总结了Week04面向对象设计与继承的重要知识点,包括对象、类、封装性、静态属性、静态方法、重载、继承和多态等。同时,还介绍了私有构造函数在类外部无法被调用、static不能访问非静态属性以及该类实例可以共享类里的static属性等内容。此外,还提到了作业要求,包括讲述一个在网上商城购物或在班级博客进行学习的故事,并使用Markdown的加粗标记和语句块标记标注关键名词和动词。最后,还提到了参考资料中关于UML类图如何绘制的范例。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 生成式对抗网络模型综述摘要生成式对抗网络模型(GAN)是基于深度学习的一种强大的生成模型,可以应用于计算机视觉、自然语言处理、半监督学习等重要领域。生成式对抗网络 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • 本文讨论了在Windows 8上安装gvim中插件时出现的错误加载问题。作者将EasyMotion插件放在了正确的位置,但加载时却出现了错误。作者提供了下载链接和之前放置插件的位置,并列出了出现的错误信息。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • 本文介绍了在实现了System.Collections.Generic.IDictionary接口的泛型字典类中如何使用foreach循环来枚举字典中的键值对。同时还讨论了非泛型字典类和泛型字典类在foreach循环中使用的不同类型,以及使用KeyValuePair类型在foreach循环中枚举泛型字典类的优势。阅读本文可以帮助您更好地理解泛型字典类的使用和性能优化。 ... [详细]
author-avatar
7777-丿M
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有