In this blog we will discuss how to programmatically calculate offset lag in a kafka consumer group for a topic partition. Below is an example of kafka-consumer-groups.sh which is a command line tool to calculate offset lag for a consumer group.
Kafka consumer group Command output |
Sometimes we don’t have access to the production Kafka environment. For such scenarios we can calculate offset Lag programmatically for a consumer group.
Below is the algorithm to programmatically calculate Kafka TopicPartition offset lag for a given consumer group :
1. Fetch all TopicPartitions for a consumer group
2. Fetch committed offset data for all TopicPartitions
3. Fetch current offset data for all TopicPartitions
4. For each TopicPartition
5. Calculate Lag = currentOffset - committedOffset
Below are two different Java implementations of the above algorithm.
public void calculateLag(final String consumerGroupName) throws ExecutionException, InterruptedException {
// Create AdminClient
final Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
final AdminClient kafkaAdminClient = AdminClient.create(properties);
// Calculate all topic partitions via describeConsumerGroups
final Set<TopicPartition> allTopicPartitions = new HashSet<>();
final Map<String, ConsumerGroupDescription> describeConsumerGroups = kafkaAdminClient.describeConsumerGroups(Set.of(consumerGroupName)).all().get();
final ConsumerGroupDescription consumerGroupDescription = describeConsumerGroups.get(consumerGroupName);
for (MemberDescription memberDescription : consumerGroupDescription.members()) {
MemberAssignment assignment = memberDescription.assignment();
if (assignment != null) {
allTopicPartitions.addAll(assignment.topicPartitions());
}
}
final Consumer<String, String> kafkaConsumer = getKafkaConsumer(consumerGroupName);
// calculate committed offset via kafkaConsumer
final Map<TopicPartition, OffsetAndMetadata> commitOffsetData = kafkaConsumer.committed(allTopicPartitions, Duration.of(2, ChronoUnit.SECONDS));
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : commitOffsetData.entrySet()) {
final long commitOffset = entry.getValue().offset();
log.info("topicPartition : {}, commitOffset : {}", entry.getKey(), commitOffset);
}
// calculate current offset
final Map<TopicPartition, Long> topicPartitionMap = kafkaConsumer.endOffsets(allTopicPartitions);
for (Map.Entry<TopicPartition, Long> entry : topicPartitionMap.entrySet()) {
log.info("topicPartition : {}, current Offset : {}", entry.getKey(), entry.getValue());
}
// for each topic partition
// long offsetLag = currentOffset - committedOffset;
}
public void calculateLag(final String consumerGroupName) throws ExecutionException, InterruptedException {
// Create AdminClient
final Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
final AdminClient kafkaAdminClient = AdminClient.create(properties);
// Calculate all topic partitions via listConsumerGroupOffsets
final Set<TopicPartition> allTopicPartitions = new HashSet<>();
final Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = kafkaAdminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata().get();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
TopicPartition topicPartitionToAdd = new TopicPartition(entry.getKey().topic(), entry.getKey().partition());
allTopicPartitions.add(topicPartitionToAdd);
}
final Consumer<String, String> kafkaConsumer = getKafkaConsumer(consumerGroupName);
// calculate committed offset via kafkaAdminClient
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
final TopicPartition topicPartition = entry.getKey();
final long commitOffset = entry.getValue().offset();
log.info("topicPartition : {}, commitOffset : {}", topicPartition, commitOffset);
}
// calculate current offset
final Map<TopicPartition, Long> topicPartitionMap = kafkaConsumer.endOffsets(allTopicPartitions);
for (Map.Entry<TopicPartition, Long> entry : topicPartitionMap.entrySet()) {
log.info("topicPartition : {}, current Offset : {}", entry.getKey(), entry.getValue());
}
// for each topic partition
// long offsetLag = currentOffset - committedOffset;
}
Logic for creating KafkaConsumer object
private Consumer<String, String> getKafkaConsumer(final String consumerGroupName) {
final Map consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupName);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<>(consumerProps);
}
Note : Image taken from here
Please comment for any suggestion or improvement in this context.
Thanks
No comments:
Post a Comment