AggregateFunction(主要用于增量计算)
// 测试数据: 某个用户在某个时刻浏览了某个商品,以及商品的价值
// {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}// API
// IN: 输入元素类型
// ACC: 累加器类型
// OUT: 输出元素类型
public interface AggregateFunction extends Function, Serializable {// 初始化累加器 创建一个新的累加器,启动一个新的聚合,负责迭代状态的初始化ACC createAccumulator();// 累加 对于数据的每条数据,和迭代数据的聚合的具体实现ACC add(IN value, ACC accumulator);// 累加器合并 合并两个累加器,返回一个具有合并状态的累加器ACC merge(ACC a, ACC b);// 输出 从累加器获取聚合的结果OUT getResult(ACC accumulator);
}
实例一
// 示例: 获取一段时间内(Window Size)每个用户(KeyBy)浏览的平均价值(AggregateFunction)
kafkaStream// 将从Kafka获取的JSON数据解析成Java Bean.process(new KafkaProcessFunction())// 提取时间戳生成水印.assignTimestampsAn