Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
338 views
in Technique[技术] by (71.8m points)

How to send final kafka-streams aggregation result of a time windowed KTable?

What I'd like to do is this:

  1. Consume records from a numbers topic (Long's)
  2. Aggregate (count) the values for each 5 sec window
  3. Send the FINAL aggregation result to another topic

My code looks like this:

KStream<String, Long> longs = builder.stream(
            Serdes.String(), Serdes.Long(), "longs");

// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts = 
            longs.countByKey(TimeWindows.of("longCounts", 5000L));

// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
          .to("long-counts");

It looks like everything works as expected, but the aggregations are sent to the destination topic for each incoming record. My question is how can I send only the final aggregation result of each window?

question from:https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

In Kafka Streams there is no such thing as a "final aggregation". Windows are kept open all the time to handle out-of-order records that arrive after the window end-time passed. However, windows are not kept forever. They get discarded once their retention time expires. There is no special action as to when a window gets discarded.

See Confluent documentation for more details: http://docs.confluent.io/current/streams/

Thus, for each update to an aggregation, a result record is produced (because Kafka Streams also update the aggregation result on out-of-order records). Your "final result" would be the latest result record (before a window gets discarded). Depending on your use case, manual de-duplication would be a way to resolve the issue (using lower lever API, transform() or process())

This blog post might help, too: https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html

Another blog post addressing this issue without using punctuations: http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html

Update

With KIP-328, a KTable#suppress() operator is added, that will allow to suppress consecutive updates in a strict manner and to emit a single result record per window; the tradeoff is an increase latency.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...