作者:玩上加瘾_926 | 来源:互联网 | 2023-02-03 18:09
我们试图在Apache Beam管道上使用固定窗口(使用DirectRunner
).我们的流程如下:
从pub/sub中提取数据
将JSON反序列化为Java对象
窗口事件w /固定窗口5秒
使用自定义CombineFn
,将每个Event
s 窗口组合成一个List
为了测试,只需输出结果 List
管道代码:
pipeline
// Read from pubsub topic to create unbounded PCollection
.apply(PubsubIO
.read()
.topic(options.getTopic())
.withCoder(StringUtf8Coder.of())
)
// Deserialize JSON into Event object
.apply("ParseEvent", ParDo
.of(new ParseEventFn())
)
// Window events with a fixed window size of 5 seconds
.apply("Window", Window
.into(FixedWindows
.of(Duration.standardSeconds(5))
)
)
// Group events by window
.apply("CombineEvents", Combine
.globally(new CombineEventsFn())
.withoutDefaults()
)
// Log grouped events
.apply("LogEvent", ParDo
.of(new LogEventFn())
);
我们看到的结果是最后一步永远不会运行,因为我们没有得到任何记录.
此外,我们已经System.out.println("***")
在我们的自定义CombineFn
类的每个方法中添加,以便跟踪它们何时运行,并且它们似乎也不运行.
窗口设置不正确吗?我们按照https://beam.apache.org/documentation/programming-guide/#windowing中的一个示例进行了操作,看起来相当简单,但显然有一些基本缺失.
感谢任何见解 - 提前感谢!
1> Chris Staiko..:
看起来主要问题确实是一个缺失的触发器 - 窗口打开了,没有什么可以告诉它何时发出结果.我们想根据处理时间(而不是事件时间)简单地窗口,所以做了以下事情:
.apply("Window", Window
.into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
本质上,这会创建一个全局窗口,触发在处理第一个元素5秒后发出事件.每次关闭窗口时,一旦窗口收到元素,另一个窗口就会打开.当我们没有这withAllowedLateness
件作品时梁抱怨- 据我所知,这只是告诉它忽略任何后期数据.
我的理解可能有点偏僻,但上面的片段解决了我们的问题!
您的窗口设置仍然是正确的,完全是一个不同的计算.你关心5秒钟内发生的事件吗?这是开窗.您是否关心在处理全局聚合期间每5秒获得一次增量结果?这是一个全局窗口+触发.您之前行为的常见原因是水印没有前进,这可能是由元素上的时间戳引起的.