java.lang.Object
io.micronaut.tracing.opentelemetry.instrument.kafka.KafkaTelemetry

@Internal public final class KafkaTelemetry extends Object
The main class with opentelemetry-kafka logic.
Since:
5.0.0
  • Constructor Details

    • KafkaTelemetry

      public KafkaTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry, io.opentelemetry.instrumentation.api.instrumenter.Instrumenter<io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest,org.apache.kafka.clients.producer.RecordMetadata> producerInstrumenter, io.opentelemetry.instrumentation.api.instrumenter.Instrumenter<io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest,Void> consumerProcessInstrumenter, Collection<KafkaTelemetryProducerTracingFilter> producerTracingFilters, Collection<KafkaTelemetryConsumerTracingFilter> consumerTracingFilters, KafkaTelemetryConfiguration kafkaTelemetryConfiguration, boolean producerPropagationEnabled)
  • Method Details

    • create

      public static KafkaTelemetry create(io.opentelemetry.api.OpenTelemetry openTelemetry, KafkaTelemetryConfiguration kafkaTelemetryConfiguration, Collection<KafkaTelemetryConsumerTracingFilter> consumerTracingFilters, Collection<KafkaTelemetryProducerTracingFilter> producerTracingFilters)
      Returns a new KafkaTelemetry configured with the given OpenTelemetry.
      Parameters:
      openTelemetry - openTelemetry instance
      kafkaTelemetryConfiguration - kafkaTelemetryProperties instance
      consumerTracingFilters - list of consumerTracingFilters
      producerTracingFilters - list of producerTracingFilters
      Returns:
      kafkaTelemetry instance
    • builder

      public static KafkaTelemetryBuilder builder(io.opentelemetry.api.OpenTelemetry openTelemetry, KafkaTelemetryConfiguration kafkaTelemetryConfiguration, Collection<KafkaTelemetryConsumerTracingFilter> consumerTracingFilters, Collection<KafkaTelemetryProducerTracingFilter> producerTracingFilters)
      Returns a new KafkaTelemetryBuilder configured with the given OpenTelemetry.
      Parameters:
      openTelemetry - openTelemetry instance
      kafkaTelemetryConfiguration - kafkaTelemetryProperties instance
      consumerTracingFilters - list of consumerTracingFilters
      producerTracingFilters - list of producerTracingFilters
      Returns:
      KafkaTelemetryBuilder object
    • metricConfigProperties

      public Map<String,?> metricConfigProperties()
      Produces a set of kafka client config properties (consumer or producer) to register a MetricsReporter that records metrics to an openTelemetry instance. Add these resulting properties to the configuration map used to initialize a KafkaConsumer or KafkaProducer.

      For producers:

      
       //    Map<String, Object> config = new HashMap<>();
       //    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
       //    config.putAll(kafkaTelemetry.metricConfigProperties());
       //    try (KafkaProducer<?, ?> producer = new KafkaProducer<>(config)) { ... }
       

      For consumers:

      
       //    Map<String, Object> config = new HashMap<>();
       //    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
       //    config.putAll(kafkaTelemetry.metricConfigProperties());
       //    try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(config)) { ... }
       
      Returns:
      the kafka client properties
    • buildAndInjectSpan

      public <K, V> void buildAndInjectSpan(org.apache.kafka.clients.producer.ProducerRecord<K,V> record, String clientId)
      Build and inject span into record.
      Type Parameters:
      K - key class
      V - value class
      Parameters:
      record - the producer record to inject span info.
      clientId - producer client ID
    • buildAndInjectSpan

      public <K, V> Future<org.apache.kafka.clients.producer.RecordMetadata> buildAndInjectSpan(org.apache.kafka.clients.producer.ProducerRecord<K,V> record, org.apache.kafka.clients.producer.Producer<K,V> producer, org.apache.kafka.clients.producer.Callback callback, BiFunction<org.apache.kafka.clients.producer.ProducerRecord<K,V>,org.apache.kafka.clients.producer.Callback,Future<org.apache.kafka.clients.producer.RecordMetadata>> sendFn)
      Build and inject span into record.
      Type Parameters:
      K - key class
      V - value class
      Parameters:
      record - the producer record to inject span info.
      producer - the producer
      callback - the producer send callback
      sendFn - send function
      Returns:
      send function's result
    • buildAndFinishSpan

      public <K, V> void buildAndFinishSpan(org.apache.kafka.clients.consumer.ConsumerRecords<K,V> records, String consumerGroup, String clientId)
    • buildAndFinishSpan

      public <K, V> void buildAndFinishSpan(List<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> records, org.apache.kafka.clients.consumer.Consumer<K,V> consumer)
    • buildAndFinishSpan

      public <K, V> void buildAndFinishSpan(List<org.apache.kafka.clients.consumer.ConsumerRecord<K,V>> records, String consumerGroup, String clientId)
    • excludeTopic

      public boolean excludeTopic(String topic)
      Returns `true` if current topic need to exclude for tracing.
      Parameters:
      topic - the topic
      Returns:
      nedd or not exclude topic for tracing.
    • filterConsumerRecord

      public <K, V> boolean filterConsumerRecord(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record, org.apache.kafka.clients.consumer.Consumer<K,V> consumer)
    • filterProducerRecord

      public <K, V> boolean filterProducerRecord(org.apache.kafka.clients.producer.ProducerRecord<K,V> record, org.apache.kafka.clients.producer.Producer<K,V> producer)
    • wrap

      public <K, V> org.apache.kafka.clients.producer.Producer<K,V> wrap(org.apache.kafka.clients.producer.Producer<K,V> producer)
      Returns a decorated Producer that consumes spans for each produced message.
      Type Parameters:
      K - key class
      V - value class
      Parameters:
      producer - kafka consumer
      Returns:
      proxy object with tracing logic for producer
    • wrap

      public <K, V> org.apache.kafka.clients.consumer.Consumer<K,V> wrap(org.apache.kafka.clients.consumer.Consumer<K,V> consumer)
      Returns a decorated Consumer that consumes spans for each received message.
      Type Parameters:
      K - key class
      V - value class
      Parameters:
      consumer - kafka consumer
      Returns:
      proxy object with tracing logic for consumer
    • getKafkaTelemetryProperties

      public KafkaTelemetryConfiguration getKafkaTelemetryProperties()