作者:hedongsheng | 来源:互联网 | 2023-01-31 14:36
我正在使用kafka流,我正在尝试将KTable实现为主题.
它工作但似乎每30秒左右完成一次.
Kafka Stream如何/何时决定将KTable的当前状态实现为主题?
有没有办法缩短这个时间并使其更"实时"?
这是我正在使用的实际代码
// Stream of random ints: (1,1) -> (6,6) -> (3,3)
// one record every 500ms
KStream kStream = builder.stream(Serdes.Integer(), Serdes.Integer(), RandomNumberProducer.TOPIC);
// grouping by key
KGroupedStream byKey = kStream.groupByKey(Serdes.Integer(), Serdes.Integer());
// same behaviour with or without the TimeWindow
KTable, Long> count = byKey.count(TimeWindows.of(1000L),"total");
// same behaviour with only count.to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
count.toStream().map((k,v) -> new KeyValue<>(k.key(), v)).to(Serdes.Integer(), Serdes.Long(), RandomCountConsumer.TOPIC);
Michal Borow..
13
这由commit.interval.ms控制,默认为30秒.更多细节:http:
//docs.confluent.io/current/streams/developer-guide.html
缓存的语义是,只要最早的commit.interval.ms或cache.max.bytes.buffering(缓存压力)命中,数据就会刷新到状态存储并转发到下一个下游处理器节点.
和这里:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams
1> Michal Borow..:
这由commit.interval.ms控制,默认为30秒.更多细节:http:
//docs.confluent.io/current/streams/developer-guide.html
缓存的语义是,只要最早的commit.interval.ms或cache.max.bytes.buffering(缓存压力)命中,数据就会刷新到状态存储并转发到下一个下游处理器节点.
和这里:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams