Class KafkaTelemetry
java.lang.Object
io.micronaut.tracing.opentelemetry.instrument.kafka.KafkaTelemetry
The main class with opentelemetry-kafka logic.
- Since:
- 5.0.0
-
Constructor Summary
ConstructorsConstructorDescriptionKafkaTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry, io.opentelemetry.instrumentation.api.instrumenter.Instrumenter<io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest, org.apache.kafka.clients.producer.RecordMetadata> producerInstrumenter, io.opentelemetry.instrumentation.api.instrumenter.Instrumenter<io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest, Void> consumerProcessInstrumenter, Collection<KafkaTelemetryProducerTracingFilter> producerTracingFilters, Collection<KafkaTelemetryConsumerTracingFilter> consumerTracingFilters, KafkaTelemetryConfiguration kafkaTelemetryConfiguration, boolean producerPropagationEnabled) -
Method Summary
Modifier and TypeMethodDescription<K,V> void buildAndFinishSpan(List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>> records, String consumerGroup, String clientId) <K,V> void buildAndFinishSpan(List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>> records, org.apache.kafka.clients.consumer.Consumer<K, V> consumer) <K,V> void buildAndFinishSpan(org.apache.kafka.clients.consumer.ConsumerRecords<K, V> records, String consumerGroup, String clientId) <K,V> void buildAndInjectSpan(org.apache.kafka.clients.producer.ProducerRecord<K, V> record, String clientId) Build and inject span into record.<K,V> Future<org.apache.kafka.clients.producer.RecordMetadata> buildAndInjectSpan(org.apache.kafka.clients.producer.ProducerRecord<K, V> record, org.apache.kafka.clients.producer.Producer<K, V> producer, org.apache.kafka.clients.producer.Callback callback, BiFunction<org.apache.kafka.clients.producer.ProducerRecord<K, V>, org.apache.kafka.clients.producer.Callback, Future<org.apache.kafka.clients.producer.RecordMetadata>> sendFn) Build and inject span into record.static KafkaTelemetryBuilderbuilder(io.opentelemetry.api.OpenTelemetry openTelemetry, KafkaTelemetryConfiguration kafkaTelemetryConfiguration, Collection<KafkaTelemetryConsumerTracingFilter> consumerTracingFilters, Collection<KafkaTelemetryProducerTracingFilter> producerTracingFilters) Returns a newKafkaTelemetryBuilderconfigured with the givenOpenTelemetry.static KafkaTelemetrycreate(io.opentelemetry.api.OpenTelemetry openTelemetry, KafkaTelemetryConfiguration kafkaTelemetryConfiguration, Collection<KafkaTelemetryConsumerTracingFilter> consumerTracingFilters, Collection<KafkaTelemetryProducerTracingFilter> producerTracingFilters) Returns a new KafkaTelemetry configured with the givenOpenTelemetry.booleanexcludeTopic(String topic) Returns `true` if current topic need to exclude for tracing.<K,V> boolean filterConsumerRecord(org.apache.kafka.clients.consumer.ConsumerRecord<K, V> record, org.apache.kafka.clients.consumer.Consumer<K, V> consumer) <K,V> boolean filterProducerRecord(org.apache.kafka.clients.producer.ProducerRecord<K, V> record, org.apache.kafka.clients.producer.Producer<K, V> producer) Produces a set of kafka client config properties (consumer or producer) to register aMetricsReporterthat records metrics to anopenTelemetryinstance.<K,V> org.apache.kafka.clients.consumer.Consumer<K, V> wrap(org.apache.kafka.clients.consumer.Consumer<K, V> consumer) Returns a decoratedConsumerthat consumes spans for each received message.<K,V> org.apache.kafka.clients.producer.Producer<K, V> wrap(org.apache.kafka.clients.producer.Producer<K, V> producer) Returns a decoratedProducerthat consumes spans for each produced message.
-
Constructor Details
-
KafkaTelemetry
public KafkaTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry, io.opentelemetry.instrumentation.api.instrumenter.Instrumenter<io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest, org.apache.kafka.clients.producer.RecordMetadata> producerInstrumenter, io.opentelemetry.instrumentation.api.instrumenter.Instrumenter<io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest, Void> consumerProcessInstrumenter, Collection<KafkaTelemetryProducerTracingFilter> producerTracingFilters, Collection<KafkaTelemetryConsumerTracingFilter> consumerTracingFilters, KafkaTelemetryConfiguration kafkaTelemetryConfiguration, boolean producerPropagationEnabled)
-
-
Method Details
-
create
public static KafkaTelemetry create(io.opentelemetry.api.OpenTelemetry openTelemetry, KafkaTelemetryConfiguration kafkaTelemetryConfiguration, Collection<KafkaTelemetryConsumerTracingFilter> consumerTracingFilters, Collection<KafkaTelemetryProducerTracingFilter> producerTracingFilters) Returns a new KafkaTelemetry configured with the givenOpenTelemetry.- Parameters:
openTelemetry- openTelemetry instancekafkaTelemetryConfiguration- kafkaTelemetryProperties instanceconsumerTracingFilters- list of consumerTracingFiltersproducerTracingFilters- list of producerTracingFilters- Returns:
- kafkaTelemetry instance
-
builder
public static KafkaTelemetryBuilder builder(io.opentelemetry.api.OpenTelemetry openTelemetry, KafkaTelemetryConfiguration kafkaTelemetryConfiguration, Collection<KafkaTelemetryConsumerTracingFilter> consumerTracingFilters, Collection<KafkaTelemetryProducerTracingFilter> producerTracingFilters) Returns a newKafkaTelemetryBuilderconfigured with the givenOpenTelemetry.- Parameters:
openTelemetry- openTelemetry instancekafkaTelemetryConfiguration- kafkaTelemetryProperties instanceconsumerTracingFilters- list of consumerTracingFiltersproducerTracingFilters- list of producerTracingFilters- Returns:
- KafkaTelemetryBuilder object
-
metricConfigProperties
Produces a set of kafka client config properties (consumer or producer) to register aMetricsReporterthat records metrics to anopenTelemetryinstance. Add these resulting properties to the configuration map used to initialize aKafkaConsumerorKafkaProducer.For producers:
// Map<String, Object> config = new HashMap<>(); // config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...); // config.putAll(kafkaTelemetry.metricConfigProperties()); // try (KafkaProducer<?, ?> producer = new KafkaProducer<>(config)) { ... }For consumers:
// Map<String, Object> config = new HashMap<>(); // config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...); // config.putAll(kafkaTelemetry.metricConfigProperties()); // try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(config)) { ... }- Returns:
- the kafka client properties
-
buildAndInjectSpan
public <K,V> void buildAndInjectSpan(org.apache.kafka.clients.producer.ProducerRecord<K, V> record, String clientId) Build and inject span into record.- Type Parameters:
K- key classV- value class- Parameters:
record- the producer record to inject span info.clientId- producer client ID
-
buildAndInjectSpan
public <K,V> Future<org.apache.kafka.clients.producer.RecordMetadata> buildAndInjectSpan(org.apache.kafka.clients.producer.ProducerRecord<K, V> record, org.apache.kafka.clients.producer.Producer<K, V> producer, org.apache.kafka.clients.producer.Callback callback, BiFunction<org.apache.kafka.clients.producer.ProducerRecord<K, V>, org.apache.kafka.clients.producer.Callback, Future<org.apache.kafka.clients.producer.RecordMetadata>> sendFn) Build and inject span into record.- Type Parameters:
K- key classV- value class- Parameters:
record- the producer record to inject span info.producer- the producercallback- the producer send callbacksendFn- send function- Returns:
- send function's result
-
buildAndFinishSpan
-
buildAndFinishSpan
public <K,V> void buildAndFinishSpan(List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>> records, org.apache.kafka.clients.consumer.Consumer<K, V> consumer) -
buildAndFinishSpan
-
excludeTopic
Returns `true` if current topic need to exclude for tracing.- Parameters:
topic- the topic- Returns:
- nedd or not exclude topic for tracing.
-
filterConsumerRecord
public <K,V> boolean filterConsumerRecord(org.apache.kafka.clients.consumer.ConsumerRecord<K, V> record, org.apache.kafka.clients.consumer.Consumer<K, V> consumer) -
filterProducerRecord
public <K,V> boolean filterProducerRecord(org.apache.kafka.clients.producer.ProducerRecord<K, V> record, org.apache.kafka.clients.producer.Producer<K, V> producer) -
wrap
public <K,V> org.apache.kafka.clients.producer.Producer<K,V> wrap(org.apache.kafka.clients.producer.Producer<K, V> producer) Returns a decoratedProducerthat consumes spans for each produced message.- Type Parameters:
K- key classV- value class- Parameters:
producer- kafka consumer- Returns:
- proxy object with tracing logic for producer
-
wrap
public <K,V> org.apache.kafka.clients.consumer.Consumer<K,V> wrap(org.apache.kafka.clients.consumer.Consumer<K, V> consumer) Returns a decoratedConsumerthat consumes spans for each received message.- Type Parameters:
K- key classV- value class- Parameters:
consumer- kafka consumer- Returns:
- proxy object with tracing logic for consumer
-
getKafkaTelemetryProperties
-