Class TracingConsumerInterceptor<K,V>

java.lang.Object
io.micronaut.tracing.opentelemetry.instrument.kafka.TracingConsumerInterceptor<K,V>
Type Parameters:
K - key class
V - value class
All Implemented Interfaces:
AutoCloseable, org.apache.kafka.clients.consumer.ConsumerInterceptor<K,V>, org.apache.kafka.common.Configurable

public class TracingConsumerInterceptor<K,V> extends Object implements org.apache.kafka.clients.consumer.ConsumerInterceptor<K,V>
Default tracing consumer kafka interceptor. Altrernative way to connect tracing to your kafka records. If you want to trace records, just connect this default implementation consumer interceptor to kafka. If you want to customize filter messages to trace, then you need to override method filterRecord in child class and connect it to kafka.
Since:
5.0.0
  • Constructor Details

    • TracingConsumerInterceptor

      public TracingConsumerInterceptor()
  • Method Details

    • onConsume

      public org.apache.kafka.clients.consumer.ConsumerRecords<K,V> onConsume(org.apache.kafka.clients.consumer.ConsumerRecords<K,V> records)
      Specified by:
      onConsume in interface org.apache.kafka.clients.consumer.ConsumerInterceptor<K,V>
    • filterRecord

      public boolean filterRecord(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record, String consumerGroup, String clientId)
      Override this method if you need to set custom condition or logic to filter message to trace.
      Parameters:
      record - consumer record
      consumerGroup - consumer group
      clientId - clinet ID
      Returns:
      true if this record need to trace, false - otherwise
    • onCommit

      public void onCommit(Map<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets)
      Specified by:
      onCommit in interface org.apache.kafka.clients.consumer.ConsumerInterceptor<K,V>
    • close

      public void close()
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface org.apache.kafka.clients.consumer.ConsumerInterceptor<K,V>
    • configure

      public void configure(Map<String,?> configs)
      Specified by:
      configure in interface org.apache.kafka.common.Configurable