@Singleton @Requires(beans=KafkaDefaultConfiguration.class) public class KafkaConsumerProcessor extends java.lang.Object implements io.micronaut.context.processor.ExecutableMethodProcessor<Topic>, java.lang.AutoCloseable, ConsumerRegistry
A ExecutableMethodProcessor
that will process all beans annotated with KafkaListener
and create and subscribe the relevant methods as consumers to Kafka topics.
Constructor and Description |
---|
KafkaConsumerProcessor(java.util.concurrent.ExecutorService executorService,
io.micronaut.runtime.ApplicationConfiguration applicationConfiguration,
io.micronaut.context.BeanContext beanContext,
AbstractKafkaConsumerConfiguration defaultConsumerConfiguration,
ConsumerRecordBinderRegistry binderRegistry,
BatchConsumerRecordsBinderRegistry batchBinderRegistry,
SerdeRegistry serdeRegistry,
ProducerRegistry producerRegistry,
KafkaListenerExceptionHandler exceptionHandler,
java.util.concurrent.ExecutorService schedulerService,
TransactionalProducerRegistry transactionalProducerRegistry)
Creates a new processor using the given
ExecutorService to schedule consumers on. |
Modifier and Type | Method and Description |
---|---|
void |
close() |
<K,V> org.apache.kafka.clients.consumer.Consumer<K,V> |
getConsumer(java.lang.String id)
Returns a managed Consumer.
|
java.util.Set<org.apache.kafka.common.TopicPartition> |
getConsumerAssignment(java.lang.String id)
Returns a managed Consumer's assignment info.
|
java.util.Set<java.lang.String> |
getConsumerIds()
The IDs of the available consumers.
|
java.util.Set<java.lang.String> |
getConsumerSubscription(java.lang.String id)
Returns a managed Consumer's subscriptions.
|
boolean |
isPaused(java.lang.String id)
Is the consumer with the given ID paused.
|
boolean |
isPaused(java.lang.String id,
java.util.Collection<org.apache.kafka.common.TopicPartition> topicPartitions)
Is the consumer with the given ID paused to consume from the given topic partitions.
|
void |
pause(java.lang.String id)
Pause the consumer for the given ID.
|
void |
pause(java.lang.String id,
java.util.Collection<org.apache.kafka.common.TopicPartition> topicPartitions)
Pause the consumer for the given ID to consume from the given topic partitions.
|
void |
process(io.micronaut.inject.BeanDefinition<?> beanDefinition,
io.micronaut.inject.ExecutableMethod<?,?> method) |
void |
resume(java.lang.String id)
Resume the consumer for the given ID.
|
void |
resume(java.lang.String id,
java.util.Collection<org.apache.kafka.common.TopicPartition> topicPartitions)
Resume the consumer for the given ID to consume from the given topic partitions.
|
public KafkaConsumerProcessor(@Named(value="consumer") java.util.concurrent.ExecutorService executorService, io.micronaut.runtime.ApplicationConfiguration applicationConfiguration, io.micronaut.context.BeanContext beanContext, AbstractKafkaConsumerConfiguration defaultConsumerConfiguration, ConsumerRecordBinderRegistry binderRegistry, BatchConsumerRecordsBinderRegistry batchBinderRegistry, SerdeRegistry serdeRegistry, ProducerRegistry producerRegistry, KafkaListenerExceptionHandler exceptionHandler, @Named(value="scheduled") java.util.concurrent.ExecutorService schedulerService, TransactionalProducerRegistry transactionalProducerRegistry)
ExecutorService
to schedule consumers on.executorService
- The executor serviceapplicationConfiguration
- The application configurationbeanContext
- The bean contextdefaultConsumerConfiguration
- The default consumer configbinderRegistry
- The ConsumerRecordBinderRegistry
batchBinderRegistry
- The BatchConsumerRecordsBinderRegistry
serdeRegistry
- The Serde
registryproducerRegistry
- The ProducerRegistry
exceptionHandler
- The exception handler to useschedulerService
- The scheduler servicetransactionalProducerRegistry
- The transactional producer registry@NonNull public <K,V> org.apache.kafka.clients.consumer.Consumer<K,V> getConsumer(@NonNull java.lang.String id)
ConsumerRegistry
getConsumer
in interface ConsumerRegistry
K
- The key generic typeV
- The value generic typeid
- The id of the producer.@NonNull public java.util.Set<java.lang.String> getConsumerSubscription(@NonNull java.lang.String id)
ConsumerRegistry
getConsumerSubscription
in interface ConsumerRegistry
id
- The id of the producer.@NonNull public java.util.Set<org.apache.kafka.common.TopicPartition> getConsumerAssignment(@NonNull java.lang.String id)
ConsumerRegistry
getConsumerAssignment
in interface ConsumerRegistry
id
- The id of the producer.@NonNull public java.util.Set<java.lang.String> getConsumerIds()
ConsumerRegistry
getConsumerIds
in interface ConsumerRegistry
public boolean isPaused(@NonNull java.lang.String id)
ConsumerRegistry
isPaused
in interface ConsumerRegistry
id
- the consumers idpublic boolean isPaused(@NonNull java.lang.String id, @NonNull java.util.Collection<org.apache.kafka.common.TopicPartition> topicPartitions)
ConsumerRegistry
isPaused
in interface ConsumerRegistry
id
- the consumers idtopicPartitions
- The topic partitions to check if pausedpublic void pause(@NonNull java.lang.String id)
ConsumerRegistry
ConsumerRegistry.isPaused(String)
method to
establish when the consumer has actually been paused.pause
in interface ConsumerRegistry
id
- The id of the consumerpublic void pause(@NonNull java.lang.String id, @NonNull java.util.Collection<org.apache.kafka.common.TopicPartition> topicPartitions)
ConsumerRegistry
ConsumerRegistry.isPaused(String, Collection)
method to
establish when the consumer has actually been paused for the topic partitions.pause
in interface ConsumerRegistry
id
- The id of the consumertopicPartitions
- The topic partitions to pause consuming frompublic void resume(@NonNull java.lang.String id)
ConsumerRegistry
ConsumerRegistry.isPaused(String)
method to
establish when the consumer has actually been resumed.resume
in interface ConsumerRegistry
id
- The id of the consumerpublic void resume(@NonNull java.lang.String id, @NonNull java.util.Collection<org.apache.kafka.common.TopicPartition> topicPartitions)
ConsumerRegistry
ConsumerRegistry.isPaused(String, Collection)
method to
establish when the consumer has actually been resumed to consume from the given topic partitions.resume
in interface ConsumerRegistry
id
- The id of the consumertopicPartitions
- The topic partitions to pause consuming frompublic void process(io.micronaut.inject.BeanDefinition<?> beanDefinition, io.micronaut.inject.ExecutableMethod<?,?> method)
@PreDestroy public void close()
close
in interface java.lang.AutoCloseable