we are migrating to Kafka, I need to create a monitoring POC service that will periodically check the unprocessed message count in the Kafka queue and based on the count take some action. but this service must not read or process the message, designated consumers will do that, with every cron this service just needs the count of unprocessed messages present in the queue.
so far I have done this, from multiple examples
public void stats() throws ExecutionException, InterruptedException {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kafka cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList(topicName));
while (true) {
Thread.sleep(1000);
ConsumerRecords<String, String> records = consumer.poll(1000);
if (!records.isEmpty()) {
System.out.println("records is not empty = " + records.count() + " " + records);
}
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
Set<TopicPartition> partitions = consumer.assignment();
//consumer.seekToBeginning(partitions);
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
for (TopicPartition partition : offsets.keySet()) {
OffsetAndMetadata commitOffset = consumer.committed(new TopicPartition(partition.topic(), partition.partition()));
Long lag = commitOffset == null ? offsets.get(partition) : offsets.get(partition) - commitOffset.offset();
System.out.println("lag = " + lag);
System.out.printf("partition %s is at %d
", partition.topic(), offsets.get(partition));
}
}
}
}
}
the code is working fine some times and some times gives wrong output, please let me know
question from:
https://stackoverflow.com/questions/65890606/get-the-unprocessed-message-count-in-spring-kafka 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…