we have a kafka-topic product-update-events
which contains data about product updates and their variations.
We aggregate these events into a products
KTable using the kstreams 'aggregate' function.
For every product in this products
KTable we then want to calculate the 'best' variation (e.g one of the variations of the product by some criteria).
These 'best' variations are then written to another KTable and to a Kafka-Topic.
We only want to emit a best-variation update, when the best-variation actually has changed because of a product update. Therefore we use a custom transformer which checks the current best-variation in its state store.
The product-events and product table have the 'productId' as key and are partitioned by this. The best-variation records have the 'variationId' as key. We use a custom StreamPartitioner to also partition these records by productId, so that each KStreams application instance has the matching product and best-variation data:
{ _, _, variation, numPartitions -> Utils.toPositive(Utils.murmur2(StringSerializer().serialize("", variation.productId))) % numPartitions }
Now we come to the actual question :)
We want to delete the best-variation when we receive a 'delete' product-update event. Therefore we need to set the payload of the best-variation record to 'null'. But now we don't have any information about the productId this record belongs to for our custom partitioner.
Do you have any suggestion on how to solve this?
Our topology is as follows:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [product-update-events])
--> KSTREAM-AGGREGATE-0000000002
Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
--> KTABLE-TOSTREAM-0000000003
<-- KSTREAM-SOURCE-0000000000
Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
--> KSTREAM-SINK-0000000004
<-- KSTREAM-AGGREGATE-0000000002
Sink: KSTREAM-SINK-0000000004 (topic: products)
<-- KTABLE-TOSTREAM-0000000003
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000007 (topics: [products])
--> KSTREAM-TRANSFORM-0000000008
Source: KSTREAM-SOURCE-0000000005 (topics: [best-variation-per-article])
--> KTABLE-SOURCE-0000000006
Processor: KSTREAM-TRANSFORM-0000000008 (stores: [best-variation-per-article])
--> KSTREAM-SINK-0000000009
<-- KSTREAM-SOURCE-0000000007
Sink: KSTREAM-SINK-0000000009 (topic: best-variation-per-article)
<-- KSTREAM-TRANSFORM-0000000008
Processor: KTABLE-SOURCE-0000000006 (stores: [best-variation-per-article])
--> none
<-- KSTREAM-SOURCE-0000000005
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…