Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
599 views
in Technique[技术] by (71.8m points)

java - get the unprocessed message count in spring kafka

we are migrating to Kafka, I need to create a monitoring POC service that will periodically check the unprocessed message count in the Kafka queue and based on the count take some action. but this service must not read or process the message, designated consumers will do that, with every cron this service just needs the count of unprocessed messages present in the queue. so far I have done this, from multiple examples

 public void stats() throws ExecutionException, InterruptedException {
    Map<String, Object> props = new HashMap<>();
    // list of host:port pairs used for establishing the initial connections to the Kafka cluster
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Arrays.asList(topicName));
        while (true) {
            Thread.sleep(1000);
            ConsumerRecords<String, String> records = consumer.poll(1000); 
            if (!records.isEmpty()) {
                System.out.println("records is not empty = " + records.count() + " " + records);
            }
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                Set<TopicPartition> partitions = consumer.assignment();
                //consumer.seekToBeginning(partitions);
                Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
                for (TopicPartition partition : offsets.keySet()) {
                    OffsetAndMetadata commitOffset = consumer.committed(new TopicPartition(partition.topic(), partition.partition()));
                    Long lag = commitOffset == null ? offsets.get(partition) : offsets.get(partition) - commitOffset.offset();
                    System.out.println("lag = " + lag);
                    System.out.printf("partition %s is at %d
", partition.topic(), offsets.get(partition));
                }
            }
        }
    }
}

the code is working fine some times and some times gives wrong output, please let me know

question from:https://stackoverflow.com/questions/65890606/get-the-unprocessed-message-count-in-spring-kafka

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Don't subscribe to the topic; just create a consumer with the same group to get the endOffsets.

See this answer for an example.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...