作者:皇家突然回家_390 | 来源:互联网 | 2022-12-09 21:19
我有一个像下面这样的用例.对于每个传入的事件,我想查看某个字段以查看它的状态是否从A更改为B,如果是,则将其发送到输出主题.流程是这样的:具有键"xyz"的事件带有状态A,并且一段时间之后另一个事件带有带状态B的键"xyz".我使用高级DSL获得此代码.
final KStream inputStream....
final KStream outputStream = inputStream
.map((k, v) -> new KeyValue<>(v.getId(), v))
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.aggregate(DomainStatusMonitor::new,
(k, v, aggregate) -> {
aggregate.updateStatusMonitor(v);
return aggregate;
}, Materialized.with(Serdes.String(), jsonSerde))
.toStream()
.filter((k, v) -> v.isStatusChangedFromAtoB())
.map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));
有没有更好的方法来使用DSL编写此逻辑?
关于状态存储的几个问题由上面的代码中的聚合创建.
它默认是创建内存状态存储吗?
如果我有无限数量的唯一传入密钥,会发生什么?如果它默认使用内存存储,那么我不需要切换到持久存储吗?我们如何处理DSL中的情况?
如果状态存储非常大(内存或持久性),它如何影响启动时间?如何使流处理等待以便存储完全初始化?或者Kafka Streams会确保在处理任何传入事件之前完全初始化状态存储吗?
提前致谢!
1> Matthias J. ..:
默认情况下,将使用持久的RocksDB存储.如果你想使用内存商店,你会传入Materialized.as(Stores.inMemoryKeyValueStore(...))
如果你有无限数量的唯一键,你最终将耗尽主内存或磁盘,你的应用程序将会死亡.根据您的语义,您可以通过使用窗口聚合来获得"TTL",而不是使旧密钥到期.
在处理新数据之前,将始终恢复状态.如果使用内存存储,则会通过使用基础更改日志主题来实现.根据您所在州的规模,这可能需要一段时间.如果使用持久性RocksDB存储,则将从磁盘加载状态,因此不需要还原,并且应立即进行处理.只有当您在本地磁盘上丢失状态时,才会在此情况下从changelog主题进行恢复.