Annotation Interface KafkaListener
Annotation applied at the class level to indicate that a bean is a Kafka Consumer
.
- Since:
- 1.0
- Author:
- graemerocher
-
Optional Element Summary
Modifier and TypeOptional ElementDescriptionboolean
Setting to allow start the consumer in paused mode.boolean
By default each listener will consume a singleConsumerRecord
.Sets the client id of the Kafka consumer.Setting the error strategy allows you to resume at the next offset or to seek the consumer (stop on error) to the failed offset so that it can retry if an error occurs The consumer bean is still able to implement a custom exception handler to replaceDefaultKafkaListenerExceptionHandler
and set the error strategy.The same asvalue()
.The heart beat interval for the consumer.Kafka consumer isolation level to control how to read messages written transactionally.TheOffsetStrategy
to use for the consumer.The timeout to use for calls toConsumer.poll(java.time.Duration)
.The client id of the producer that is used forSendTo
.This setting applies only for the producer that is used forSendTo
.io.micronaut.context.annotation.Property[]
Additional properties to configure with for Consumer.boolean
For listeners that return reactive types, message offsets are committed without blocking the consumer.The session timeout for a consumer.int
Statically configure the number of threads of a Kafka consumer.Dynamically configure the number of threads of a Kafka consumer.boolean
A unique string (UUID) can be appended to the group ID.Sets the consumer group id of the Kafka consumer.
-
Element Details
-
value
Sets the consumer group id of the Kafka consumer. If not specified the group id is configured to be the value ofApplicationConfiguration.getName()
otherwise the name of the class is used.- Returns:
- The group id
- Default:
- ""
-
groupId
The same asvalue()
.- Returns:
- The group id
- Default:
- ""
-
uniqueGroupId
boolean uniqueGroupIdA unique string (UUID) can be appended to the group ID. In that case, each consumer will be the only member of a unique consumer group.- Returns:
- True to make each group ID unique. Defaults to false.
- Default:
- false
-
clientId
String clientIdSets the client id of the Kafka consumer. If not specified the client id is configured to be the value ofApplicationConfiguration.getName()
.- Returns:
- The client id
- Default:
- ""
-
offsetStrategy
OffsetStrategy offsetStrategyTheOffsetStrategy
to use for the consumer.- Returns:
- The
OffsetStrategy
- Default:
- AUTO
-
offsetReset
OffsetReset offsetReset- Returns:
- The strategy to use to start consuming records
- Default:
- LATEST
-
producerClientId
String producerClientIdThe client id of the producer that is used forSendTo
.- Returns:
- the producer client id
- Default:
- ""
-
producerTransactionalId
String producerTransactionalIdThis setting applies only for the producer that is used forSendTo
. The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. If a TransactionalId is configured,enable.idempotence
is implied. By default, the TransactionId is not configured, which means transactions cannot be used.- Returns:
- the producer transaction id
- Default:
- ""
-
isolation
IsolationLevel isolationKafka consumer isolation level to control how to read messages written transactionally. SeeConsumerConfig.ISOLATION_LEVEL_CONFIG
.- Returns:
- The isolation level
- Default:
- READ_UNCOMMITTED
-
errorStrategy
ErrorStrategy errorStrategySetting the error strategy allows you to resume at the next offset or to seek the consumer (stop on error) to the failed offset so that it can retry if an error occurs The consumer bean is still able to implement a custom exception handler to replaceDefaultKafkaListenerExceptionHandler
and set the error strategy.- Returns:
- The strategy to use when an error occurs
- Default:
- @io.micronaut.configuration.kafka.annotation.ErrorStrategy
-
threadsValue
Dynamically configure the number of threads of a Kafka consumer.Kafka consumers are by default single threaded. If you wish to increase the number of threads for a consumer you can alter this setting. Note that this means that multiple partitions will be allocated to a single application.
NOTE: When using this setting if your bean is
Singleton
then local state will be shared between invocations from different consumer threadsthreadsValue
takes precedence overthreads
if they are both set.- Returns:
- The number of threads
- See Also:
- Default:
- ""
-
threads
int threadsStatically configure the number of threads of a Kafka consumer.Kafka consumers are by default single threaded. If you wish to increase the number of threads for a consumer you can alter this setting. Note that this means that multiple partitions will be allocated to a single application.
NOTE: When using this setting if your bean is
Singleton
then local state will be shared between invocations from different consumer threadsthreads
will be overridden bythreadsValue
if they are both set.- Returns:
- The number of threads
- See Also:
- Default:
- 1
-
pollTimeout
String pollTimeoutThe timeout to use for calls toConsumer.poll(java.time.Duration)
.- Returns:
- The timeout. Defaults to 100ms
- Default:
- "100ms"
-
sessionTimeout
String sessionTimeoutThe session timeout for a consumer. Seesession.timeout.ms
.- Returns:
- The session timeout as a duration.
- See Also:
- Default:
- ""
-
heartbeatInterval
String heartbeatIntervalThe heart beat interval for the consumer. Seeheartbeat.interval.ms
.- Returns:
- The heartbeat interval as a duration.
- See Also:
- Default:
- ""
-
redelivery
boolean redeliveryFor listeners that return reactive types, message offsets are committed without blocking the consumer. If the reactive type produces an error, then the message can be lost. Enabling this setting allows the consumer to redelivery the message so that it can be processed again by another consumer that may have better luck.- Returns:
- True if redelivery should be enabled. Defaults to false.
- Default:
- false
-
batch
boolean batchBy default each listener will consume a singleConsumerRecord
. By setting this value totrue
and specifying a container type in the method signatures you can indicate that the method should instead receive all the records at once in a batch.- Returns:
- Whether to receive a batch of records or not
- Default:
- false
-
properties
io.micronaut.context.annotation.Property[] propertiesAdditional properties to configure with for Consumer.- Returns:
- The properties
- Default:
- {}
-
autoStartup
boolean autoStartupSetting to allow start the consumer in paused mode.- Returns:
- the auto startup setting
- Default:
- true
-