Class KafkaTelemetry
java.lang.Object
io.micronaut.tracing.opentelemetry.instrument.kafka.KafkaTelemetry
The main class with opentelemetry-kafka logic.
- Since:
- 5.0.0
-
Constructor Summary
ConstructorDescriptionKafkaTelemetry
(io.opentelemetry.api.OpenTelemetry openTelemetry, io.opentelemetry.instrumentation.api.instrumenter.Instrumenter<io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest, org.apache.kafka.clients.producer.RecordMetadata> producerInstrumenter, io.opentelemetry.instrumentation.api.instrumenter.Instrumenter<io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest, Void> consumerProcessInstrumenter, Collection<KafkaTelemetryProducerTracingFilter> producerTracingFilters, Collection<KafkaTelemetryConsumerTracingFilter> consumerTracingFilters, KafkaTelemetryConfiguration kafkaTelemetryConfiguration, boolean producerPropagationEnabled) -
Method Summary
Modifier and TypeMethodDescription<K,
V> void buildAndFinishSpan
(List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>> records, String consumerGroup, String clientId) <K,
V> void buildAndFinishSpan
(List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>> records, org.apache.kafka.clients.consumer.Consumer<K, V> consumer) <K,
V> void buildAndFinishSpan
(org.apache.kafka.clients.consumer.ConsumerRecords<K, V> records, String consumerGroup, String clientId) <K,
V> void buildAndInjectSpan
(org.apache.kafka.clients.producer.ProducerRecord<K, V> record, String clientId) Build and inject span into record.<K,
V> Future<org.apache.kafka.clients.producer.RecordMetadata> buildAndInjectSpan
(org.apache.kafka.clients.producer.ProducerRecord<K, V> record, org.apache.kafka.clients.producer.Producer<K, V> producer, org.apache.kafka.clients.producer.Callback callback, BiFunction<org.apache.kafka.clients.producer.ProducerRecord<K, V>, org.apache.kafka.clients.producer.Callback, Future<org.apache.kafka.clients.producer.RecordMetadata>> sendFn) Build and inject span into record.static KafkaTelemetryBuilder
builder
(io.opentelemetry.api.OpenTelemetry openTelemetry, KafkaTelemetryConfiguration kafkaTelemetryConfiguration, Collection<KafkaTelemetryConsumerTracingFilter> consumerTracingFilters, Collection<KafkaTelemetryProducerTracingFilter> producerTracingFilters) Returns a newKafkaTelemetryBuilder
configured with the givenOpenTelemetry
.static KafkaTelemetry
create
(io.opentelemetry.api.OpenTelemetry openTelemetry, KafkaTelemetryConfiguration kafkaTelemetryConfiguration, Collection<KafkaTelemetryConsumerTracingFilter> consumerTracingFilters, Collection<KafkaTelemetryProducerTracingFilter> producerTracingFilters) Returns a new KafkaTelemetry configured with the givenOpenTelemetry
.boolean
excludeTopic
(String topic) Returns `true` if current topic need to exclude for tracing.<K,
V> boolean filterConsumerRecord
(org.apache.kafka.clients.consumer.ConsumerRecord<K, V> record, org.apache.kafka.clients.consumer.Consumer<K, V> consumer) <K,
V> boolean filterProducerRecord
(org.apache.kafka.clients.producer.ProducerRecord<K, V> record, org.apache.kafka.clients.producer.Producer<K, V> producer) Produces a set of kafka client config properties (consumer or producer) to register aMetricsReporter
that records metrics to anopenTelemetry
instance.<K,
V> org.apache.kafka.clients.consumer.Consumer<K, V> wrap
(org.apache.kafka.clients.consumer.Consumer<K, V> consumer) Returns a decoratedConsumer
that consumes spans for each received message.<K,
V> org.apache.kafka.clients.producer.Producer<K, V> wrap
(org.apache.kafka.clients.producer.Producer<K, V> producer) Returns a decoratedProducer
that consumes spans for each produced message.
-
Constructor Details
-
KafkaTelemetry
public KafkaTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry, io.opentelemetry.instrumentation.api.instrumenter.Instrumenter<io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest, org.apache.kafka.clients.producer.RecordMetadata> producerInstrumenter, io.opentelemetry.instrumentation.api.instrumenter.Instrumenter<io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest, Void> consumerProcessInstrumenter, Collection<KafkaTelemetryProducerTracingFilter> producerTracingFilters, Collection<KafkaTelemetryConsumerTracingFilter> consumerTracingFilters, KafkaTelemetryConfiguration kafkaTelemetryConfiguration, boolean producerPropagationEnabled)
-
-
Method Details
-
create
public static KafkaTelemetry create(io.opentelemetry.api.OpenTelemetry openTelemetry, KafkaTelemetryConfiguration kafkaTelemetryConfiguration, Collection<KafkaTelemetryConsumerTracingFilter> consumerTracingFilters, Collection<KafkaTelemetryProducerTracingFilter> producerTracingFilters) Returns a new KafkaTelemetry configured with the givenOpenTelemetry
.- Parameters:
openTelemetry
- openTelemetry instancekafkaTelemetryConfiguration
- kafkaTelemetryProperties instanceconsumerTracingFilters
- list of consumerTracingFiltersproducerTracingFilters
- list of producerTracingFilters- Returns:
- kafkaTelemetry instance
-
builder
public static KafkaTelemetryBuilder builder(io.opentelemetry.api.OpenTelemetry openTelemetry, KafkaTelemetryConfiguration kafkaTelemetryConfiguration, Collection<KafkaTelemetryConsumerTracingFilter> consumerTracingFilters, Collection<KafkaTelemetryProducerTracingFilter> producerTracingFilters) Returns a newKafkaTelemetryBuilder
configured with the givenOpenTelemetry
.- Parameters:
openTelemetry
- openTelemetry instancekafkaTelemetryConfiguration
- kafkaTelemetryProperties instanceconsumerTracingFilters
- list of consumerTracingFiltersproducerTracingFilters
- list of producerTracingFilters- Returns:
- KafkaTelemetryBuilder object
-
metricConfigProperties
Produces a set of kafka client config properties (consumer or producer) to register aMetricsReporter
that records metrics to anopenTelemetry
instance. Add these resulting properties to the configuration map used to initialize aKafkaConsumer
orKafkaProducer
.For producers:
// Map<String, Object> config = new HashMap<>(); // config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...); // config.putAll(kafkaTelemetry.metricConfigProperties()); // try (KafkaProducer<?, ?> producer = new KafkaProducer<>(config)) { ... }
For consumers:
// Map<String, Object> config = new HashMap<>(); // config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...); // config.putAll(kafkaTelemetry.metricConfigProperties()); // try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(config)) { ... }
- Returns:
- the kafka client properties
-
buildAndInjectSpan
public <K,V> void buildAndInjectSpan(org.apache.kafka.clients.producer.ProducerRecord<K, V> record, String clientId) Build and inject span into record.- Type Parameters:
K
- key classV
- value class- Parameters:
record
- the producer record to inject span info.clientId
- producer client ID
-
buildAndInjectSpan
public <K,V> Future<org.apache.kafka.clients.producer.RecordMetadata> buildAndInjectSpan(org.apache.kafka.clients.producer.ProducerRecord<K, V> record, org.apache.kafka.clients.producer.Producer<K, V> producer, org.apache.kafka.clients.producer.Callback callback, BiFunction<org.apache.kafka.clients.producer.ProducerRecord<K, V>, org.apache.kafka.clients.producer.Callback, Future<org.apache.kafka.clients.producer.RecordMetadata>> sendFn) Build and inject span into record.- Type Parameters:
K
- key classV
- value class- Parameters:
record
- the producer record to inject span info.producer
- the producercallback
- the producer send callbacksendFn
- send function- Returns:
- send function's result
-
buildAndFinishSpan
-
buildAndFinishSpan
public <K,V> void buildAndFinishSpan(List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>> records, org.apache.kafka.clients.consumer.Consumer<K, V> consumer) -
buildAndFinishSpan
-
excludeTopic
Returns `true` if current topic need to exclude for tracing.- Parameters:
topic
- the topic- Returns:
- nedd or not exclude topic for tracing.
-
filterConsumerRecord
public <K,V> boolean filterConsumerRecord(org.apache.kafka.clients.consumer.ConsumerRecord<K, V> record, org.apache.kafka.clients.consumer.Consumer<K, V> consumer) -
filterProducerRecord
public <K,V> boolean filterProducerRecord(org.apache.kafka.clients.producer.ProducerRecord<K, V> record, org.apache.kafka.clients.producer.Producer<K, V> producer) -
wrap
public <K,V> org.apache.kafka.clients.producer.Producer<K,V> wrap(org.apache.kafka.clients.producer.Producer<K, V> producer) Returns a decoratedProducer
that consumes spans for each produced message.- Type Parameters:
K
- key classV
- value class- Parameters:
producer
- kafka consumer- Returns:
- proxy object with tracing logic for producer
-
wrap
public <K,V> org.apache.kafka.clients.consumer.Consumer<K,V> wrap(org.apache.kafka.clients.consumer.Consumer<K, V> consumer) Returns a decoratedConsumer
that consumes spans for each received message.- Type Parameters:
K
- key classV
- value class- Parameters:
consumer
- kafka consumer- Returns:
- proxy object with tracing logic for consumer
-
getKafkaTelemetryProperties
-