Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
318 views
in Technique[技术] by (71.8m points)

Delete data from KTable that has a custom stream StreamPartitioner

we have a kafka-topic product-update-events which contains data about product updates and their variations.

We aggregate these events into a products KTable using the kstreams 'aggregate' function. For every product in this products KTable we then want to calculate the 'best' variation (e.g one of the variations of the product by some criteria). These 'best' variations are then written to another KTable and to a Kafka-Topic.

We only want to emit a best-variation update, when the best-variation actually has changed because of a product update. Therefore we use a custom transformer which checks the current best-variation in its state store. The product-events and product table have the 'productId' as key and are partitioned by this. The best-variation records have the 'variationId' as key. We use a custom StreamPartitioner to also partition these records by productId, so that each KStreams application instance has the matching product and best-variation data:

{ _, _, variation, numPartitions -> Utils.toPositive(Utils.murmur2(StringSerializer().serialize("", variation.productId))) % numPartitions }

Now we come to the actual question :)

We want to delete the best-variation when we receive a 'delete' product-update event. Therefore we need to set the payload of the best-variation record to 'null'. But now we don't have any information about the productId this record belongs to for our custom partitioner. Do you have any suggestion on how to solve this?

Our topology is as follows:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [product-update-events])
      --> KSTREAM-AGGREGATE-0000000002
    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
      --> KTABLE-TOSTREAM-0000000003
      <-- KSTREAM-SOURCE-0000000000
    Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
      --> KSTREAM-SINK-0000000004
      <-- KSTREAM-AGGREGATE-0000000002
    Sink: KSTREAM-SINK-0000000004 (topic: products)
      <-- KTABLE-TOSTREAM-0000000003

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000007 (topics: [products])
      --> KSTREAM-TRANSFORM-0000000008
    Source: KSTREAM-SOURCE-0000000005 (topics: [best-variation-per-article])
      --> KTABLE-SOURCE-0000000006
    Processor: KSTREAM-TRANSFORM-0000000008 (stores: [best-variation-per-article])
      --> KSTREAM-SINK-0000000009
      <-- KSTREAM-SOURCE-0000000007
    Sink: KSTREAM-SINK-0000000009 (topic: best-variation-per-article)
      <-- KSTREAM-TRANSFORM-0000000008
    Processor: KTABLE-SOURCE-0000000006 (stores: [best-variation-per-article])
      --> none
      <-- KSTREAM-SOURCE-0000000005

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You will want to use a tombstone basically it is the same key with a null value this will cause the store to drop the entry with that key.

this is a pretty decent example that includes deletion


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...