Monday 1 May 2023

Calculate Kafka Consumer lag programmatically

In this blog we will discuss how to programmatically calculate offset lag in a kafka consumer group for a topic partition. Below is an example of kafka-consumer-groups.sh  which is a command line tool to calculate offset lag for a consumer group.

Kafka consumer group Command output
Sometimes we don’t have access to the production Kafka environment. For such scenarios we can calculate offset Lag programmatically for a consumer group.

Below is the algorithm to programmatically calculate Kafka TopicPartition offset lag for a given consumer group :
1. Fetch all TopicPartitions for a consumer group
2. Fetch committed offset data for all TopicPartitions
3. Fetch current offset data for all TopicPartitions
4. For each TopicPartition
5. Calculate Lag = currentOffset - committedOffset

Below are two different Java implementations of the above algorithm.


Approach 1 uses kafkaConsumer for calculating committed offset
public void calculateLag(final String consumerGroupName) throws ExecutionException, InterruptedException {
    
    // Create AdminClient
    final Map<String, Object> properties = new HashMap<>();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    final AdminClient kafkaAdminClient = AdminClient.create(properties);

    // Calculate all topic partitions via describeConsumerGroups
    final Set<TopicPartition> allTopicPartitions = new HashSet<>();
    final Map<String, ConsumerGroupDescription> describeConsumerGroups = kafkaAdminClient.describeConsumerGroups(Set.of(consumerGroupName)).all().get();
    final ConsumerGroupDescription consumerGroupDescription = describeConsumerGroups.get(consumerGroupName);
    for (MemberDescription memberDescription : consumerGroupDescription.members()) {
        MemberAssignment assignment = memberDescription.assignment();
        if (assignment != null) {
            allTopicPartitions.addAll(assignment.topicPartitions());
        }
    }

    final Consumer<String, String> kafkaConsumer = getKafkaConsumer(consumerGroupName);

    // calculate committed offset via kafkaConsumer
    final Map<TopicPartition, OffsetAndMetadata> commitOffsetData = kafkaConsumer.committed(allTopicPartitions, Duration.of(2, ChronoUnit.SECONDS));
    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : commitOffsetData.entrySet()) {
        final long commitOffset = entry.getValue().offset();
        log.info("topicPartition : {}, commitOffset : {}", entry.getKey(), commitOffset);
    }

    // calculate current offset
    final Map<TopicPartition, Long> topicPartitionMap = kafkaConsumer.endOffsets(allTopicPartitions);
    for (Map.Entry<TopicPartition, Long> entry : topicPartitionMap.entrySet()) {
        log.info("topicPartition : {}, current Offset : {}", entry.getKey(), entry.getValue());
    }

    // for each topic partition
    // long offsetLag = currentOffset - committedOffset;
}

Approach 2 uses Kafka AdminClient for calculating committed offset
public void calculateLag(final String consumerGroupName) throws ExecutionException, InterruptedException {
    
    // Create AdminClient
    final Map<String, Object> properties = new HashMap<>();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    final AdminClient kafkaAdminClient = AdminClient.create(properties);

    // Calculate all topic partitions via listConsumerGroupOffsets
    final Set<TopicPartition> allTopicPartitions = new HashSet<>();
    final Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = kafkaAdminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata().get();
    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
        TopicPartition topicPartitionToAdd = new TopicPartition(entry.getKey().topic(), entry.getKey().partition());
        allTopicPartitions.add(topicPartitionToAdd);
    }


    final Consumer<String, String> kafkaConsumer = getKafkaConsumer(consumerGroupName);

    // calculate committed offset via kafkaAdminClient
    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
        final TopicPartition topicPartition = entry.getKey();
        final long commitOffset = entry.getValue().offset();
        log.info("topicPartition : {}, commitOffset : {}", topicPartition, commitOffset);
    }


    // calculate current offset
    final Map<TopicPartition, Long> topicPartitionMap = kafkaConsumer.endOffsets(allTopicPartitions);
    for (Map.Entry<TopicPartition, Long> entry : topicPartitionMap.entrySet()) {
        log.info("topicPartition : {}, current Offset : {}", entry.getKey(), entry.getValue());
    }

    // for each topic partition
    // long offsetLag = currentOffset - committedOffset;
}
Logic for creating KafkaConsumer object
private Consumer<String, String> getKafkaConsumer(final String consumerGroupName) {
    final Map consumerProps = new HashMap<>();
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupName);
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return new KafkaConsumer<>(consumerProps);
}

Note : Image taken from here
Please comment for any suggestion or improvement in this context.
Thanks

No comments:

Post a Comment