$ mn create-app my-kafka-app --features kafka
Table of Contents
Micronaut Kafka
Integration between Micronaut and Kafka Messaging
Version: 5.8.0
1 Introduction
Apache Kafka is a distributed stream processing platform that can be used for a range of messaging requirements in addition to stream processing and real-time data handling.
Micronaut features dedicated support for defining both Kafka Producer
and Consumer
instances. Micronaut applications built with Kafka can be deployed with or without the presence of an HTTP server.
With Micronaut’s efficient compile-time AOP and cloud native features, writing efficient Kafka consumer applications that use very little resources is a breeze.
2 Release History
For this project, you can find a list of releases (with release notes) here:
Upgrading to Micronaut Kafka 5.0
Micronaut Kafka 5.0 is a significant major version which includes a number of changes you will need to consider when upgrading.
Micronaut 4, Kafka 3 & Java 17 baseline
Micronaut Kafka 5.0 requires the following minimum set of dependencies:
-
Java 17 or above
-
Kafka 3
-
Micronaut 4 or above
@KafkaClient
no longer recoverable by default
Previous versions of Micronaut Kafka used the meta-annotation @Recoverable on the @KafkaClient
annotation allowing you to define fallbacks in the case of failure. Micronaut Kafka 5 no longer includes this meta annotation and if you use fallbacks you should explicitly declare a dependency on io.micronaut:micronaut-retry
and declare the @Recoverable
explicitly.
Open Tracing No Longer Supported
Micronaut Kafka 5 no longer supports Open Tracing (which is deprecated and no longer maintained) and if you need distributed tracing you should instead use Open Telemetry.
3 Using the Micronaut CLI
To create a project with Kafka support using the Micronaut CLI, supply the kafka
feature to the features
flag.
This will create a project with the minimum necessary configuration for Kafka.
Kafka Messaging Application
The Micronaut CLI includes the ability to create Kafka-based messaging applications designed to implement message-driven microservices.
To create a Message-Driven Microservice with Micronaut + Kafka use the create-messaging-app
command:
$ mn create-messaging-app my-kafka-app --features kafka
As you’d expect, you can start the application with ./gradlew run
(for Gradle) or ./mvnw compile exec:exec
(Maven). The application will (with the default config) attempt to connect to Kafka at http://localhost:9092
, and will continue to run without starting up an HTTP server. All communication to/from the service will take place via Kafka producers and/or listeners.
Within the new project, you can now run the Kafka-specific code generation commands:
$ mn create-kafka-producer MessageProducer | Rendered template Producer.java to destination src/main/java/my/kafka/app/MessageProducer.java $ mn create-kafka-listener MessageListener | Rendered template Listener.java to destination src/main/java/my/kafka/app/MessageListener.java
See the guide for Kafka and the Micronaut Framework - Event-driven Applications to learn more. |
4 Kafka Quick Start
To add support for Kafka to an existing project, you should first add the Micronaut Kafka configuration to your build configuration. For example in Gradle:
Gradle
Maven
implementation("io.micronaut.kafka:micronaut-kafka")
Configuring Kafka
The minimum requirement to configure Kafka is set the value of the kafka.bootstrap.servers
property in application.yml
or bootstrap.yml
:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
bootstrap:
servers: localhost:9092
The value can also be list of available servers:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
bootstrap:
servers:
- foo:9092
- bar:9092
You can also set the environment variable KAFKA_BOOTSTRAP_SERVERS to a comma separated list of values to externalize configuration.
|
You may also add any Apache Kafka configuration options directly under the kafka node. These configurations will apply to consumers, producers and streams:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
bootstrap:
servers: localhost:9092
reconnect.backoff.ms: 30000
retry.backoff.ms: 32000
If your broker needs to setup SSL, it can be configured this way:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
bootstrap:
servers: localhost:9093
ssl:
keystore:
location: /path/to/client.keystore.p12
password: secret
type: PKCS12
truststore:
location: /path/to/client.truststore.p12
password: secret
type: PKCS12
security:
protocol: ssl
Creating a Kafka Producer with @KafkaClient
To create a Kafka Producer
that sends messages you can simply define an interface that is annotated with @KafkaClient.
For example the following is a trivial @KafkaClient
interface:
Java
Groovy
Kotlin
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
@KafkaClient // (1)
public interface ProductClient {
@Topic("my-products") // (2)
void sendProduct(@KafkaKey String brand, String name); // (3)
void sendProduct(@Topic String topic, @KafkaKey String brand, String name); // (4)
}
1 | The @KafkaClient annotation is used to designate this interface as a client |
2 | The @Topic annotation indicates which topics the ProducerRecord should be published to |
3 | The method defines two parameters: The parameter that is the Kafka key and the value. |
4 | It is also possible for the topic to be dynamic by making it a method argument |
You can omit the key, however this will result in a null key which means Kafka will not know how to partition the record.
|
At compile time Micronaut will produce an implementation of the above interface. You can retrieve an instance of ProductClient
either by looking up the bean from the ApplicationContext or by injecting the bean with @Inject
:
Java
Kotlin
ProductClient client = applicationContext.getBean(ProductClient.class);
client.sendProduct("Nike", "Blue Trainers");
Note that since the sendProduct
method returns void
this means the method will send the ProducerRecord
and block until the response is received. You can specify an executor and return either a CompletableFuture
or Publisher to support non-blocking message delivery.
Creating a Kafka Consumer with @KafkaListener
To listen to Kafka messages you can use the @KafkaListener annotation to define a message listener.
The following example will listen for messages published by the ProductClient
in the previous section:
Java
Groovy
Kotlin
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;
@KafkaListener(offsetReset = OffsetReset.EARLIEST) // (1)
public class ProductListener {
private static final Logger LOG = getLogger(ProductListener.class);
@Topic("my-products") // (2)
public void receive(@KafkaKey String brand, String name) { // (3)
LOG.info("Got Product - {} by {}", name, brand);
}
}
1 | The @KafkaListener is used with offsetReset set to EARLIEST which makes the listener start listening to messages from the beginning of the partition. |
2 | The @Topic annotation is again used to indicate which topic(s) to subscribe to. |
3 | The receive method defines 2 arguments: The argument that will receive the key and the argument that will receive the value. |
Disabling Kafka
If for some reason, you need to disable the creation of kafka consumers, or producers, you can through configuration:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
enabled: false
5 Kafka Producers Using @KafkaClient
5.1 Defining @KafkaClient Methods
Specifying the Key and the Value
The Kafka key can be specified by providing a parameter annotated with @KafkaKey
. If no such parameter is specified the record is sent with a null
key.
The value to send is resolved by selecting the argument annotated with @MessageBody, otherwise the first argument with no specific binding annotation is used. For example:
Java
Groovy
Kotlin
@Topic("my-products")
void sendProduct(@KafkaKey String brand, String name);
The method above will use the parameter brand
as the key and the parameter name
as the value.
Including Message Headers
There are a number of ways you can include message headers. One way is to annotate an argument with the @MessageHeader annotation and include a value when calling the method:
Java
Groovy
Kotlin
@Topic("my-products")
void sendProduct(@KafkaKey String brand, String name, @MessageHeader("My-Header") String myHeader);
The example above will include the value of the myHeader
argument as a header called My-Header
.
Another way to include headers is at the type level with the values driven from configuration:
Java
Groovy
Kotlin
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.messaging.annotation.MessageHeader;
@KafkaClient(id="product-client")
@MessageHeader(name = "X-Token", value = "${my.application.token}")
public interface ProductClient {
// define client API
}
The above example will send a header called X-Token
with the value read from the setting my.application.token
in application.yml
(or the environnment variable MY_APPLICATION_TOKEN
).
If the my.application.token
is not set then an error will occur creating the client.
It is also possible to pass Collection<Header>
or Headers
object as method arguments as seen below.
Java
Groovy
Kotlin
@Topic("my-bicycles")
void sendBicycle(@KafkaKey String brand, String model, Collection<Header> headers);
Java
Groovy
Kotlin
@Topic("my-bicycles")
void sendBicycle(@KafkaKey String brand, String model, Headers headers);
In the above examples, all of the key/value pairs in headers
will be added to the list of headers produced to the topic. Header
and Headers
are
part of the kafka-clients
library:
Reactive and Non-Blocking Method Definitions
The @KafkaClient annotation supports the definition of reactive return types (such as Flowable or Reactor Flux
) as well as Futures.
The KafkaProducer used internally to implement @KafkaClient support is inherently blocking, even though some of its methods describe themselves as "asynchronous". Configuring an executor (as shown in the following examples) is required in order to guarantee that a returned reactive type or Future will not block the calling thread. |
Add the library Micronaut Reactor or Micronaut RxJava 3 to your application’s dependencies.
The following sections, which use Micronaut Reactor, cover advised configuration and possible method signatures and behaviour:
Configuring An Executor
As the send
method of KafkaProducer
can block the calling thread, it is recommended that you specify an executor to be used when returning either reactive types or CompletableFuture
. This will ensure that the send
logic is executed on a separate thread from that of the caller, and avoid undesirable conditions such as blocking of the Micronaut server’s event loop.
The executor to be used may be specified via configuration properties as in the following example:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
producers:
default:
executor: blocking
my-named-producer:
executor: io
Alternatively, the executor may be specified via the executor
property of the @KafkaClient annotation:
Java
Groovy
Kotlin
@KafkaClient(value = "product-client", executor = TaskExecutors.BLOCKING)
public interface BookClient {
Note that an executor
specified in the annotation will take precedent over that specified in the application configuration properties.
Mono Value and Return Type
Java
Groovy
Kotlin
@Topic("my-books")
Mono<Book> sendBook(@KafkaKey String author, Mono<Book> book);
The implementation will return a Mono
that when subscribed to will subscribe to the passed Mono
and send the emitted item as a ProducerRecord
emitting the item again if successful or an error otherwise.
Flux Value and Return Type
Java
Groovy
Kotlin
@Topic("my-books")
Flux<RecordMetadata> sendBooks(@KafkaKey String author, Flux<Book> book);
The implementation will return a Reactor Flux
that when subscribed to will subscribe to the passed Flux
and for each emitted item will send a ProducerRecord
emitting the resulting Kafka RecordMetadata
if successful or an error otherwise.
Available Annotations
There are a number of annotations available that allow you to specify how a method argument is treated.
The following table summarizes the annotations and their purpose, with an example:
Annotation | Description | Example |
---|---|---|
Allows explicitly indicating the body of the message to sent |
|
|
Allows specifying a parameter that should be sent as a header |
|
|
Allows specifying the parameter that is the Kafka key |
|
|
Allows specifying the parameter that is the partition number |
|
|
Allows specifying the parameter that is used to compute a partition number independently from the Message Key. |
|
For example, you can use the @MessageHeader annotation to bind a parameter value to a header in the ProducerRecord
.
5.2 Configuring @KafkaClient beans
@KafkaClient and Producer Properties
There are a number of ways to pass configuration properties to the KafkaProducer. You can set default producer properties using kafka.producers.default
in application.yml
:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
producers:
default:
retries: 5
bootstrap:
servers: localhost:9096
Any property in the ProducerConfig class can be set, including any overrides over the global Micronaut Kafka configs. The above example will set the default number of times to retry sending a record as well as override kafka.bootstrap.servers
.
Per @KafkaClient Producer Properties
To configure different properties for each client, you should set a @KafkaClient
id using the annotation:
Java
Groovy
Kotlin
@KafkaClient("product-client")
This serves 2 purposes. Firstly it sets the value of the client.id
setting used to build the Producer
. Secondly, it allows you to apply per producer configuration in application.yml
:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
producers:
product-client:
retries: 5
bootstrap:
servers: localhost:9097
product-client-2:
bootstrap:
servers: localhost:9098
Finally, the @KafkaClient annotation itself provides a properties
member that you can use to set producer specific properties:
Java
Groovy
Kotlin
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Requires;
import org.apache.kafka.clients.producer.ProducerConfig;
@KafkaClient(
id = "product-client",
acks = KafkaClient.Acknowledge.ALL,
properties = @Property(name = ProducerConfig.RETRIES_CONFIG, value = "5")
)
public interface ProductClient {
// define client API
}
@KafkaClient and Serializers
When serializing keys and values Micronaut will by default attempt to automatically pick a Serializer to use. This is done via the CompositeSerdeRegistry bean.
You can replace the default SerdeRegistry bean with your own implementation by defining a bean that uses @Replaces(CompositeSerdeRegistry.class) . See the section on Bean Replacement.
|
All common java.lang
types (String
, Integer
, primitives etc.) are supported and for POJOs by default a Jackson based JSON serializer is used.
You can, however, explicitly override the Serializer
used by providing the appropriate configuration in application.yml
:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
producers:
product-client:
value:
serializer: org.apache.kafka.common.serialization.ByteArraySerializer
You may want to do this if for example you choose an alternative serialization format such as Avro or Protobuf.
5.3 Sending Records in Batch
By default if you define a method that takes a container type such as a List the list will be serialized using the specified value.serializer
(the default will result in a JSON array).
For example the following two methods will both send serialized arrays:
Java
Groovy
Kotlin
@Topic("books")
void sendList(List<Book> books);
@Topic("books")
void sendBooks(Book...books);
Instead of a sending a serialized array you may wish to instead send batches of ProducerRecord either synchronously or asynchronously.
To do this you can specify a value of true
to the batch
member of the @KafkaClient annotation:
ProducerRecord
batchesJava
Groovy
Kotlin
@KafkaClient(batch = true)
public interface BookClient {
@Topic("books")
void sendList(List<Book> books);
In the above case instead of sending a serialized array the client implementation will iterate over each item in the list and send a ProducerRecord
for each. The previous example is blocking, however you can return a reactive type if desired:
ProducerRecord
batches ReactivelyJava
Groovy
Kotlin
@KafkaClient(batch = true)
public interface BookClient {
@Topic("books")
Flux<RecordMetadata> send(List<Book> books);
You can also use an unbound reactive type such as Flux
as the source of your batch data:
ProducerRecord
batches from a FluxJava
Groovy
Kotlin
@KafkaClient(batch = true)
public interface BookClient {
@Topic("books")
Flux<RecordMetadata> send(Flux<Book> books);
5.4 Injecting Kafka Producer Beans
If you need maximum flexibility and don’t want to use the @KafkaClient support you can use the @KafkaClient
annotation as qualifier for dependency injection of KafkaProducer instances.
Consider the following example:
Java
Groovy
Kotlin
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.concurrent.Future;
@Singleton
public class BookSender {
private final Producer<String, Book> kafkaProducer;
public BookSender(@KafkaClient("book-producer") Producer<String, Book> kafkaProducer) { // (1)
this.kafkaProducer = kafkaProducer;
}
public Future<RecordMetadata> send(String author, Book book) {
return kafkaProducer.send(new ProducerRecord<>("books", author, book)); // (2)
}
}
1 | The Producer is dependency injected into the constructor. If not specified in configuration, the key and value serializer are inferred from the generic type arguments. |
2 | The Producer is used to send records |
Note that there is no need to call the close()
method to shut down the KafkaProducer
, it is fully managed by Micronaut and will be shutdown when the application shuts down.
The previous example can be tested in JUnit with the following test:
Java
Groovy
Kotlin
@Test
void testBookSender() {
try (ApplicationContext ctx = ApplicationContext.run( // (1)
Map.of("kafka.enabled", "true", "spec.name", "BookSenderTest")
)) {
BookSender bookSender = ctx.getBean(BookSender.class); // (2)
Book book = new Book("The Stand");
Future<RecordMetadata> stephenKing = bookSender.send("Stephen King", book);
assertDoesNotThrow(() -> {
RecordMetadata recordMetadata = stephenKing.get();
assertEquals("books", recordMetadata.topic());
});
}
}
1 | A Kafka docker container is used |
2 | The BookSender is retrieved from the ApplicationContext and a ProducerRecord sent |
By using the KafkaProducer API directly you open up even more options if you require transactions (exactly-once delivery) or want control over when records are flushed etc.
5.5 Transactions
Transaction processing can be enabled by defining transactionalId
on @KafkaClient
, which will initialize the producer for transactional usage and wrap any send operation with a transaction demarcation.
Java
Groovy
Kotlin
@KafkaClient(id = "my-client", transactionalId = "my-tx-id")
public interface TransactionalClient {
// define client API
}
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(...);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
Random transactional ID Java Groovy Kotlin
Copy to Clipboard |
5.6 Running Kafka while testing and developing
Micronaut Test Resources simplifies running Kafka for local development and testing.
Micronaut Test Resources Kafka support will automatically start a Kafka container and provide the value of the
kafka.bootstrap.servers
property.
Micronaut Launch and CLI already apply Test Resources to your build when you select the kafka
feature.
Micronaut Test Resources uses Test Containers under the hood. If you prefer to use Test Containers directly, you can create a Singleton Container and combine it with Micronaut Test TestPropertyProvider
:
Java
Groovy
Kotlin
package io.micronaut.kafka.docs;
import io.micronaut.test.support.TestPropertyProvider;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import java.util.Collections;
import java.util.Map;
/**
* @see <a href="https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers">Singleton containers</a>
*/
public abstract class AbstractKafkaTest implements TestPropertyProvider {
static protected final KafkaContainer MY_KAFKA = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:latest")
);
@Override
public Map<String, String> getProperties() {
if (!MY_KAFKA.isRunning()) {
MY_KAFKA.start();
}
return Collections.singletonMap(
"kafka.bootstrap.servers", MY_KAFKA.getBootstrapServers()
);
}
}
And then test:
Java
Groovy
Kotlin
package io.micronaut.kafka.docs;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Requires;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
@Property(name = "spec.name", value = "MyTest")
@MicronautTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyTest extends AbstractKafkaTest {
@Test
void testKafkaRunning(MyProducer producer, MyConsumer consumer) {
final String message = "hello";
producer.produce(message);
await().atMost(5, SECONDS).until(() -> message.equals(consumer.consumed));
}
@Requires(property = "spec.name", value = "MyTest")
@KafkaClient
interface MyProducer {
@Topic("my-topic")
void produce(String message);
}
@Requires(property = "spec.name", value = "MyTest")
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
static class MyConsumer {
String consumed;
@Topic("my-topic")
public void consume(String message) {
consumed = message;
}
}
}
6 Kafka Consumers Using @KafkaListener
The quick start section presented a trivial example of what is possible with the @KafkaListener annotation.
Using the @KafkaListener
annotation Micronaut will build a KafkaConsumer and start the poll
loop by running the KafkaConsumer
in a special consumer
thread pool. You can configure the size of the thread pool based on the number of consumers in your application in application.yml
as desired:
consumer
thread poolProperties
Yaml
Toml
Groovy
Hocon
JSON
micronaut:
executors:
consumer:
type: fixed
nThreads: 25
KafkaConsumer
instances are single threaded, hence for each @KafkaListener
method you define a new thread is created to execute the poll
loop.
You may wish to scale the number of consumers you have listening on a particular topic. There are several ways you may achieve this. You could for example run multiple instances of your application each containing a single consumer in each JVM.
Alternatively, you can also scale via threads. By setting the number of threads
a particular consumer bean will create:
Java
Groovy
Kotlin
@KafkaListener(groupId = "myGroup", threads = 10)
The above example will create 10 KafkaConsumer instances, each running in a unique thread and participating in the myGroup
consumer group.
@KafkaListener beans are by default singleton. When using multiple threads you must either synchronize access to local state or declare the bean as @Prototype .
|
You can also make your number of threads configurable by using threadsValue
:
Java
Groovy
Kotlin
@KafkaListener(groupId = "myGroup", threadsValue = "${my.thread.count}")
threads will be overridden by threadsValue if they are both set.
|
By default Micronaut will inspect the method signature of the method annotated with @Topic
that will listen for ConsumerRecord
instances and from the types infer an appropriate key and value Deserializer.
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
consumers:
default:
allow.auto.create.topics: true
product:
bootstrap:
servers: localhost:9098
Any property in the ConsumerConfig class can be set for all @KafkaListener
beans based on the . The above example will enable the consumer to create a topic if it doesn’t exist for the default
(@KafkaListener
) client and set a custom bootstrap server for the product
client (@KafkaListener(value = "product")
)
See the guide for Testing Kafka Listener using Testcontainers with the Micronaut Framework to learn more. |
6.1 Defining @KafkaListener Methods
The @KafkaListener annotation examples up until now have been relatively trivial, but Micronaut offers a lot of flexibility when it comes to the types of method signatures you can define.
The following sections detail examples of supported use cases.
Specifying Topics
The @Topic annotation can be used at the method or the class level to specify which topics to be listened for.
Care needs to be taken when using @Topic at the class level because every public method of the class annotated with @KafkaListener will become a Kafka consumer, which may be undesirable.
You can make the topic name configurable using a placeholder: @Topic("${my.topic.name:myTopic}")
|
You can specify multiple topics to listen for:
Java
Groovy
Kotlin
@Topic({"fun-products", "awesome-products"})
public void receiveMultiTopics(@KafkaKey String brand, String name) {
LOG.info("Got Product - {} by {}", name, brand);
}
You can also specify one or many regular expressions to listen for:
Java
Groovy
Kotlin
@Topic(patterns="products-\\w+")
public void receivePatternTopics(@KafkaKey String brand, String name) {
LOG.info("Got Product - {} by {}", name, brand);
}
Available Annotations
There are a number of annotations available that allow you to specify how a method argument is bound.
The following table summarizes the annotations and their purpose, with an example:
Annotation | Description | Example |
---|---|---|
Allows explicitly indicating the body of the message |
|
|
Allows binding a parameter to a message header |
|
|
Allows binding a parameter to the message key |
|
|
Allows binding a parameter to the partition the message was received from |
|
For example, you can use the @MessageHeader annotation to bind a parameter value from a header contained within a ConsumerRecord
.
Topics, Partitions and Offsets
If you want a reference to the topic, partition or offset it is a simple matter of defining a parameter for each.
The following table summarizes example parameters and how they related to the ConsumerRecord
being processed:
Parameter | Description |
---|---|
|
The name of the topic |
|
The offset of the |
|
The partition of the |
|
The timestamp of the |
As an example, following listener method will receive all of the above mentioned parameters:
Java
Groovy
Kotlin
@Topic("awesome-products")
public void receive(@KafkaKey String brand, // (1)
Product product, // (2)
long offset, // (3)
int partition, // (4)
String topic, // (5)
long timestamp) { // (6)
LOG.info("Got Product - {} by {}",product.name(), brand);
}
1 | The Kafka key |
2 | The message body |
3 | The offset of the ConsumerRecord |
4 | The partition of the ConsumerRecord |
5 | The topic. Note that the @Topic annotation supports multiple topics. |
6 | The timestamp of the ConsumerRecord |
Receiving a ConsumerRecord
If you prefer you can also receive the entire ConsumerRecord
object being listened for. In this case you should specify appropriate generic types for the key and value of the ConsumerRecord
so that Micronaut can pick the correct deserializer for each.
Consider the following example:
Java
Groovy
Kotlin
@Topic("awesome-products")
public void receive(ConsumerRecord<String, Product> record) { // (1)
Product product = record.value(); // (2)
String brand = record.key(); // (3)
LOG.info("Got Product - {} by {}",product.name(), brand);
}
1 | The method signature accepts a ConsumerRecord that specifies a String for the key type and a POJO (Product ) for the value type. |
2 | The value() method is used to retrieve the value |
3 | The key() method is used to retrieve the key |
Receiving and returning Reactive Types
In addition to common Java types and POJOs you can also define listener methods that receive a Reactive type such as a Single or a Reactor Mono
.
Add the library Micronaut Reactor or Micronaut RxJava 3 to your application’s dependencies.
For example, using Reactor:
Java
Groovy
Kotlin
@Topic("reactive-products")
public Mono<Product> receive(@KafkaKey String brand, // (1)
Mono<Product> productPublisher) { // (2)
return productPublisher.doOnSuccess((product) ->
LOG.info("Got Product - {} by {}", product.name(), brand) // (3)
);
}
1 | The @KafkaKey annotation is used to indicate the key |
2 | A Mono is used to receive the message body |
3 | The doOnSuccess method is used to process the result |
Note that in this case the method returns a Mono
that indicates to Micronaut the poll
loop should continue, and if enable.auto.commit
is set to true
(the default) the offsets will be committed, potentially before the doOnSuccess
is called.
The idea here is that you are able to write consumers that don’t block, however care must be taken in the case where an error occurs in the doOnSuccess
method otherwise the message could be lost. You could for example re-deliver the message in case of an error.
Alternatively, you can use the @Blocking annotation to tell Micronaut to subscribe to the returned reactive type in a blocking manner which will result in blocking the poll
loop, preventing offsets from being committed automatically:
Java
Groovy
Kotlin
@Blocking
@Topic("reactive-products")
public Mono<Product> receiveBlocking(@KafkaKey String brand, Mono<Product> productPublisher) {
return productPublisher.doOnSuccess((product) ->
LOG.info("Got Product - {} by {}", product.name(), brand)
);
}
6.2 Configuring @KafkaListener beans
@KafkaListener and Consumer Groups
Kafka consumers created with @KafkaListener
will by default run within a consumer group that is the value of micronaut.application.name
unless you explicitly specify a value to the @KafkaListener
annotation. For example:
Java
Groovy
Kotlin
@KafkaListener("myGroup")
or
Java
Groovy
Kotlin
@KafkaListener(groupId = "myGroup")
The above examples will run the consumer within a consumer group called myGroup
.
In this case, each record will be consumed by one consumer instance of the consumer group.
You can make the consumer group configurable using a placeholder: @KafkaListener("${my.consumer.group:myGroup}")
|
To allow the records to be consumed by all the consumer instances (each instance will be part of a unique consumer group), uniqueGroupId
can be set to true
:
Java
Groovy
Kotlin
@KafkaListener(groupId = "myGroup", uniqueGroupId = true)
For more information, see for example https://kafka.apache.org/intro#intro_consumers
@KafkaListener and Consumer Properties
There are a number of ways to pass configuration properties to the KafkaConsumer
. You can set default consumer properties using kafka.consumers.default
in application.yml
:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
consumers:
default:
session:
timeout:
ms: 30000
The above example will set the default session.timeout.ms
that Kafka uses to decide whether a consumer is alive or not and applies it to all created KafkaConsumer
instances.
You can also provide configuration specific to a consumer group. For example consider the following configuration:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
consumers:
myGroup:
session:
timeout:
ms: 30000
The above configuration will pass properties to only the @KafkaListener
beans that apply to the consumer group myGroup
.
Finally, the @KafkaListener annotation itself provides a properties
member that you can use to set consumer specific properties:
Java
Groovy
Kotlin
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Requires;
import io.micronaut.kafka.docs.Product;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;
@KafkaListener(
groupId = "products",
pollTimeout = "500ms",
properties = @Property(name = ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, value = "10000")
)
public class ProductListener {
@KafkaListener and Deserializers
As mentioned previously when defining @KafkaListener
methods, Micronaut will attempt to pick an appropriate deserializer for the method signature. This is done via the CompositeSerdeRegistry bean.
You can replace the default SerdeRegistry bean with your own implementation by defining a bean that uses @Replaces(CompositeSerdeRegistry.class) . See the section on Bean Replacement.
|
All common java.lang
types (String
, Integer
, primitives etc.) are supported and for POJOs by default a Jackson based JSON deserializer is used.
You can, however, explicitly override the Deserializer
used by providing the appropriate configuration in application.yml
:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
consumers:
myGroup:
value:
deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
You may want to do this if for example you choose an alternative deserialization format such as Avro or Protobuf.
Transactional properties
There are a few options that can be enabled for only in the transactional processing:
Isolation
Use isolation
member to define if you want to receive messages that haven’t been committed yet.
Custom offset strategy
There is a special offset strategy OffsetStrategy.SEND_TO_TRANSACTION
that can only be used with an associated producer, only applicable when SendTo
is used.
6.3 Commiting Kafka Offsets
Automatically Committing Offsets
The way offsets are handled by a @KafkaListener bean is defined by the OffsetStrategy enum.
The following table summarizes the enum values and behaviour:
Value | Description |
---|---|
Automatically commit offsets. Sets |
|
Disables automatically committing offsets. Sets |
|
Commits offsets manually at the end of each |
|
Asynchronously commits offsets manually at the end of each |
|
Commits offsets manually after each |
|
Commits offsets asynchronously after each |
|
Only available when the transactional producer is enabled for |
Depending on the your level of paranoia or durability requirements you can choose to tune how and when offsets are committed.
Manually Committing Offsets
If you set the OffsetStrategy
to DISABLED it becomes your responsibility to commit offsets.
There are a couple of ways that can be achieved.
The simplest way is to define an argument of type Acknowledge and call the ack()
method to commit offsets synchronously:
ack()
Java
Groovy
Kotlin
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED) // (1)
public class ProductListener {
@Topic("awesome-products")
public void receive(Product product, Acknowledgement acknowledgement) { // (2)
// process product record
acknowledgement.ack(); // (3)
}
}
1 | Committing offsets automatically is disabled |
2 | The listener method specifies a parameter of type Acknowledge |
3 | The ack() method is called once the record has been processed |
Alternatively, you an supply a KafkaConsumer
method argument and then call commitSync
(or commitAsync
) yourself when you are ready to commit offsets:
KafkaConsumer
APIJava
Groovy
Kotlin
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import io.micronaut.kafka.docs.Product;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED;
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED) // (1)
public class ProductListener {
@Topic("awesome-products")
public void receive(Product product, long offset, int partition, String topic, Consumer kafkaConsumer) { // (2)
// process product record
// commit offsets
kafkaConsumer.commitSync(Collections.singletonMap( // (3)
new TopicPartition(topic, partition),
new OffsetAndMetadata(offset + 1, "my metadata")
));
}
}
1 | Committing offsets automatically is disabled |
2 | The listener method specifies that it receives the offset data and a KafkaConsumer |
3 | The commitSync() method is called once the record has been processed |
6.4 Assigning Kafka Offsets
6.4.1 Manually Assigning Offsets to a Consumer Bean
Sometimes you may wish to control exactly the position you wish to resume consuming messages from.
For example if you store offsets in a database you may wish to read the offsets from the database when the consumer starts and start reading from the position stored in the database.
To support this use case your consumer bean can implement the ConsumerSeekAware interface:
ConsumerSeekAware
APIJava
Groovy
Kotlin
package io.micronaut.kafka.docs.seek.aware;
import io.micronaut.configuration.kafka.ConsumerSeekAware;
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.configuration.kafka.seek.*;
import io.micronaut.context.annotation.Requires;
import io.micronaut.kafka.docs.Product;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
@KafkaListener
@Requires(property = "spec.name", value = "ConsumerSeekAwareTest")
public class ProductListener implements ConsumerSeekAware { // (1)
List<Product> processed = new ArrayList<>();
public ProductListener(ProductListenerConfiguration config) {
// ...
}
@Topic("wonderful-products")
void receive(Product product) {
processed.add(product);
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // (2)
// save offsets here
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions, KafkaSeeker seeker) { // (3)
// seek to offset here
partitions.stream().map(tp -> KafkaSeekOperation.seek(tp, 1)).forEach(seeker::perform);
}
}
1 | Implement the interface ConsumerSeekAware |
2 | The onPartitionsRevoked can be used to save offsets |
3 | The onPartitionsAssigned can use used to read offsets and seek to a specific position. In this trivial example we just seek to the offset 1 (skipping the first record). |
ConsumerSeekAware provides a convenient KafkaSeeker object that can be used to perform KafkaSeekOperations immediately on the underlying consumer. |
Alternatively, when more fine-grained access to the Kafka consumer is required, your consumer bean can instead implement the ConsumerRebalanceListener and ConsumerAware interfaces:
KafkaConsumer
APIJava
Groovy
Kotlin
package io.micronaut.kafka.docs.seek.rebalance;
import io.micronaut.configuration.kafka.ConsumerAware;
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.context.annotation.Requires;
import io.micronaut.kafka.docs.Product;
import io.micronaut.core.annotation.NonNull;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
@Requires(property = "spec.name", value = "ConsumerRebalanceListenerTest")
public class ProductListener implements ConsumerRebalanceListener, ConsumerAware {
List<Product> processed = new ArrayList<>();
private Consumer consumer;
public ProductListener(ProductListenerConfiguration config) {
// ...
}
@Override
public void setKafkaConsumer(@NonNull Consumer consumer) { // (1)
this.consumer = consumer;
}
@Topic("fantastic-products")
void receive(Product product) {
processed.add(product);
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // (2)
// save offsets here
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // (3)
// seek to offset here
for (TopicPartition partition : partitions) {
consumer.seek(partition, 1);
}
}
}
1 | The setKafkaConsumer of the ConsumerAware allows access to the underlying consumer |
2 | The onPartitionsRevoked can be used to save offsets |
3 | The onPartitionsAssigned can use used to read offsets and seek to a specific position. In this trivial example we just seek to the offset 1 (skipping the first record). |
6.4.2 Manual Offsets with Multiple Topics
It is possible for a single @KafkaListener
bean to represent multiple consumers. If you have more than one method annotated with @Topic
then setKafkaConsumer
will be called multiple times for each backing consumer.
It is recommended in the case of manually seeking offsets that you use a single listener bean per consumer, the alternative is to store an internal Set
of all consumers associated with a particular listener and manually search for the correct listener in the onPartitionsAssigned
using the partition assignment data.
Not doing so will lead to a ConcurrentModificationException error.
|
6.4.3 Manually Assigning Offsets from a Consumer Method
There may be some scenarios where you realize you need to seek
to a different offset while consuming another one.
To support this use case, your consumer method can receive a KafkaSeekOperations instance as a parameter:
Java
Groovy
Kotlin
package io.micronaut.kafka.docs.seek.ops;
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.configuration.kafka.seek.*;
import io.micronaut.context.annotation.*;
import io.micronaut.kafka.docs.Product;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
@KafkaListener(offsetReset = OffsetReset.EARLIEST, properties = @Property(name = "max.poll.records", value = "1"))
@Requires(property = "spec.name", value = "KafkaSeekOperationsTest")
public class ProductListener {
List<Product> processed = new ArrayList<>();
public ProductListener(ProductListenerConfiguration config) {
// ...
}
@Topic("amazing-products")
void receive(Product product, KafkaSeekOperations ops) { // (1)
processed.add(product);
ops.defer(KafkaSeekOperation.seekToEnd(new TopicPartition("amazing-products", 0))); // (2)
}
}
1 | An instance of KafkaSeekOperations will be injected to the method |
2 | Any number of seek operations can be deferred. In this trivial example we just seek to the end of the partition. |
The seek
operations will be performed by Micronaut automatically, when the consumer method completes successfully, possibly after committing offsets via OffsetStrategy.AUTO
.
These operations determine the next offset retrieved by poll . Take into account that, even if the seek operation performs successfully, your consumer method may keep receiving records that were cached by the previous call. You can configure max.poll.records to control the maximum number of records returned by a single call to poll .
|
6.4.4 Creating Kafka Seek Operations
The interface KafkaSeekOperation.KafkaSeekOperation provides several static methods to create seek
operations:
-
seek
: Creates an absolute seek operation. -
seekRelativeToBeginning
: Creates a seek operation relative to the beginning. -
seekToBeginning
: Creates a seek to the beginning operation. -
seekRelativeToEnd
: Creates a seek operation relative to the end. -
seekToEnd
: Creates a seek to the end operation. -
seekForward
: Creates a forward seek operation. -
seekBackward
: Creates a backward seek operation. -
seekToTimestamp
: Creates a seek to the timestamp operation.
6.5 Kafka Batch Processing
By default @KafkaListener listener methods will receive each ConsumerRecord one by one.
There may be cases where you prefer to receive all of the ConsumerRecord data from the ConsumerRecords holder object in one go.
To achieve this you can set the batch
member of the @KafkaListener to true
and specify a container type (typically List
) to receive all of the data:
Java
Groovy
Kotlin
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import java.util.List;
import static org.slf4j.LoggerFactory.getLogger;
@KafkaListener(batch = true) // (1)
public class BookListener {
@Topic("all-the-books")
public void receiveList(List<Book> books) { // (2)
for (Book book : books) {
LOG.info("Got Book = {}", book.title()); // (3)
}
}
}
1 | The @KafkaListener annotation’s batch member is set to true |
2 | The method defines that it receives a list of Book instances |
3 | The method processes the entire batch |
Note in the previous case offsets will automatically be committed for the whole batch by default when the method returns without error.
Manually Committing Offsets with Batch
As with one by one message processing, if you set the OffsetStrategy
to DISABLED it becomes your responsibility to commit offsets.
If you want to commit the entire batch of offsets at once during the course of processing, then the simplest approach is to add an argument of type Acknowledgement and call the ack()
method to commit the batch of offsets synchronously:
Java
Groovy
Kotlin
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // (1)
@Topic("all-the-books")
public void receive(List<Book> books, Acknowledgement acknowledgement) { // (2)
//process the books
acknowledgement.ack(); // (3)
}
1 | Committing offsets automatically is disabled |
2 | The listener method specifies a parameter of type Acknowledgement |
3 | The ack() method is called once the records have been processed |
You can also take more control of committing offsets when doing batch processing by specifying a method that receives the offsets in addition to the batch:
Java
Groovy
Kotlin
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // (1)
@Topic("all-the-books")
public void receive(List<ConsumerRecord<String, Book>> records, Consumer kafkaConsumer) { // (2)
for (int i = 0; i < records.size(); i++) {
ConsumerRecord<String, Book> record = records.get(i); // (3)
// process the book
Book book = record.value();
// commit offsets
String topic = record.topic();
int partition = record.partition();
long offset = record.offset(); // (4)
kafkaConsumer.commitSync(Collections.singletonMap( // (5)
new TopicPartition(topic, partition),
new OffsetAndMetadata(offset + 1, "my metadata")
));
}
}
1 | Committing offsets automatically is disabled |
2 | The method receives the batch of books as a list of consumer records |
3 | Each record is processed |
4 | The offset, partition and topic is read for the record |
5 | Offsets are committed |
This example is fairly trivial in that it commits offsets after processing each record in a batch, but you can for example commit after processing every 10, or every 100 or whatever makes sense for your application.
Receiving a ConsumerRecords
When batching you can receive the entire ConsumerRecords
object being listened for. In this case you should specify appropriate generic types for the key and value of the ConsumerRecords
so that Micronaut can pick the correct deserializer for each.
This is useful when the need is to process or commit the records by partition, as the ConsumerRecords
object already groups records by partition:
Java
Groovy
Kotlin
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // (1)
@Topic("all-the-books")
public void receiveConsumerRecords(ConsumerRecords<String, Book> consumerRecords, Consumer kafkaConsumer) { // (2)
for (TopicPartition partition : consumerRecords.partitions()) { // (3)
long offset = Long.MIN_VALUE;
// process partition records
for (ConsumerRecord<String, Book> record : consumerRecords.records(partition)) { // (4)
// process the book
Book book = record.value();
// keep last offset
offset = record.offset(); // (5)
}
// commit partition offset
kafkaConsumer.commitSync(Collections.singletonMap( // (6)
partition,
new OffsetAndMetadata(offset + 1, "my metadata")
));
}
}
1 | Committing offsets automatically is disabled |
2 | The method receives the batch of books as a ConsumerRecords holder object |
3 | Each partition is iterated over |
4 | Each record for the partition is processed |
5 | The last read offset for the partition is stored |
6 | The offset is committed once for each partition |
Reactive Batch Processing
Batch listeners also support defining reactive types (Reactor Flux
or RxJava Flowable) as the method argument.
Add the library Micronaut Reactor or Micronaut RxJava 3 to your application’s dependencies.
In this case the method will be passed a reactive type that can be returned from the method allowing non-blocking processing of the batch:
Java
Groovy
Kotlin
@Topic("all-the-books")
public Flux<Book> receiveFlux(Flux<Book> books) {
return books.doOnNext(book ->
LOG.info("Got Book = {}", book.title())
);
}
Remember that as with non batch processing, the reactive type will be subscribed to on a different thread and offsets will be committed automatically likely prior to the point when the reactive type is subscribed to.
This means that you should only use reactive processing if message durability is not a requirement and you may wish to implement message re-delivery upon failure.
6.6 Forwarding Messages with @SendTo
On any @KafkaListener
method that returns a value, you can use the @SendTo annotation to forward the return value to the topic or topics specified by the @SendTo
annotation.
The key of the original ConsumerRecord
will be used as the key when forwarding the message.
Java
Groovy
Kotlin
@Topic("sendto-products") // (1)
@SendTo("product-quantities") // (2)
public int receive(@KafkaKey String brand, Product product) {
LOG.info("Got Product - {} by {}", product.name(), brand);
return product.quantity(); // (3)
}
1 | The topic subscribed to is awesome-products |
2 | The topic to send the result to is product-quantities |
3 | The return value is used to indicate the value to forward |
You can also do the same using Reactive programming:
Java
Groovy
Kotlin
@Topic("sendto-products") // (1)
@SendTo("product-quantities") // (2)
public Mono<Integer> receiveProduct(@KafkaKey String brand,
Mono<Product> productSingle) {
return productSingle.map(product -> {
LOG.info("Got Product - {} by {}", product.name(), brand);
return product.quantity(); // (3)
});
}
1 | The topic subscribed to is awesome-products |
2 | The topic to send the result to is product-quantities |
3 | The return is mapped from the single to the value of the quantity |
In the reactive case the poll
loop will continue and will not wait for the record to be sent unless you specifically annotate the method with @Blocking.
To enable transactional sending of the messages you need to define producerTransactionalId
in @KafkaListener
.
Java
Groovy
Kotlin
@KafkaListener(
offsetReset = OffsetReset.EARLIEST,
producerClientId = "word-counter-producer", // (1)
producerTransactionalId = "tx-word-counter-id", // (2)
offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, // (3)
isolation = IsolationLevel.READ_COMMITTED // (4)
)
public class WordCounter {
@Topic("tx-incoming-strings")
@SendTo("my-words-count")
List<KafkaMessage<byte[], Integer>> wordsCounter(String string) {
return Stream.of(string.split("\\s+"))
.collect(Collectors.groupingBy(Function.identity(), Collectors.summingInt(i -> 1)))
.entrySet()
.stream()
.map(e -> KafkaMessage.Builder.<byte[], Integer>withBody(e.getValue()).key(e.getKey().getBytes()).build())
.toList();
}
}
1 | The id of the producer to load additional config properties |
2 | The transactional id that is required to enable transactional processing |
3 | Enable offset strategy to commit the offsets to the transaction |
4 | Consumer read messages isolation |
6.7 Handling Consumer Exceptions
Consumer error strategies
It’s possible to define a different error strategy for @KafkaListener using errorStrategy
attribute:
Java
Groovy
Kotlin
@KafkaListener(
value = "myGroup",
errorStrategy = @ErrorStrategy(
value = ErrorStrategyValue.RETRY_ON_ERROR,
retryDelay = "50ms",
retryCount = 3
)
)
Setting the error strategy allows you to resume at the next offset or to seek the consumer (stop on error) to the failed offset so that it can retry if an error occurs.
You can choose one of the error strategies:
-
RETRY_ON_ERROR
- This strategy will stop consuming subsequent records in the case of an error and by default will attempt to re-consume the current record once. Possible retry delay can be defined byretryDelay
and retry count byretryCount
. -
RETRY_EXPONENTIALLY_ON_ERROR
- This strategy will stop consuming subsequent records in the case of an error and by default will attempt to re-consume the current record once. The exponentially growing time breaks between consumption attempts is computed using then * 2^(k - 1)
formula where the initial delayn
isretryDelay
and the number of retries isretryCount
. -
RETRY_CONDITIONALLY_ON_ERROR
- This strategy will stop consuming subsequent records in the case of an error and by default will attempt to re-consume the current record once. The retry behaviour can be overridden. Possible retry delay can be defined byretryDelay
and retry count byretryCount
. -
RETRY_CONDITIONALLY_EXPONENTIALLY_ON_ERROR
- This strategy will stop consuming subsequent records in the case of an error and by default will attempt to re-consume the current record once. The retry behaviour can be overridden. The exponentially growing time breaks between consumption attempts is computed using then * 2^(k - 1)
formula where the initial delayn
isretryDelay
and the number of retries isretryCount
. -
RESUME_AT_NEXT_RECORD
- This strategy will ignore the current error and will resume at the next offset, in this case it’s recommended to have a custom exception handler that moves the failed message into an error queue. -
NONE
- This error strategy will skip over all records from the current offset in the current poll when the consumer encounters an error. This option is deprecated and kept for consistent behaviour with previous versions of Micronaut Kafka that do not support error strategy.
The error strategies apply only for non-batch messages processing. |
When using retry error strategies in combination with reactive consumer methods, it is necessary to add the @Blocking annotation to the reactive consumer method.
|
You can also make the number of retries configurable by using retryCountValue
:
Java
Groovy
Kotlin
@KafkaListener(
value = "myGroup",
errorStrategy = @ErrorStrategy(
value = ErrorStrategyValue.RETRY_ON_ERROR,
retryCountValue = "${my.retry.count}"
)
)
retryCountValue will be overridden by retryCount if they are both set.
|
Specify exceptions to retry
It’s possible to define only exceptions from which the retry will occur.
Java
Groovy
Kotlin
@KafkaListener(
value = "myGroup",
errorStrategy = @ErrorStrategy(
value = ErrorStrategyValue.RETRY_ON_ERROR,
retryDelay = "50ms",
retryCount = 3,
exceptionTypes = { MyException.class, MySecondException.class }
)
)
Specify exception to retry apply only for RETRY_ON_ERROR and RETRY_EXPONENTIALLY_ON_ERROR error strategies.
|
Conditional retries
It is possible to conditionally retry a message based on the exception thrown when the error strategy is RETRY_CONDITIONALLY_ON_ERROR
or RETRY_CONDITIONALLY_EXPONENTIALLY_ON_ERROR
.
Java
Groovy
Kotlin
@KafkaListener(
value = "myGroup",
errorStrategy = @ErrorStrategy(
value = ErrorStrategyValue.RETRY_CONDITIONALLY_ON_ERROR
)
)
public class ConditionalRetryListener implements ConditionalRetryBehaviourHandler {
@Override
public ConditionalRetryBehaviour conditionalRetryBehaviour(KafkaListenerException exception) {
return shouldRetry(exception) ? ConditionalRetryBehaviour.RETRY : ConditionalRetryBehaviour.SKIP;
}
// ...
When a @KafkaListener does not implement @ConditionalRetryBehaviourHandler, the @DefaultConditionalRetryBehaviourHandler will be used and all messages that failed processing will be retried.
If you wish to apply the same conditional retry strategy for all of your @KafkaListener you can define a bean that implements ConditionalRetryBehaviourHandler
and use Micronaut’s Bean Replacement feature to replace the default bean: @Replaces(DefaultConditionalRetryBehaviourHandler.class)
.
Conditional retry behaviour only applies to RETRY_CONDITIONALLY_ON_ERROR and RETRY_CONDITIONALLY_EXPONENTIALLY_ON_ERROR error strategies.
|
Exception handlers
When an exception occurs in a @KafkaListener method by default the exception is simply logged. This is handled by DefaultKafkaListenerExceptionHandler.
The following options are available to configure the default Kafka listener exception handler:
Property | Type | Description |
---|---|---|
|
boolean |
Whether to skip record on deserialization failure. Default value true |
|
boolean |
Whether to commit record on deserialization failure. Default value false |
If you wish to replace this default exception handling with another implementation you can use the Micronaut’s Bean Replacement feature to define a bean that replaces it: @Replaces(DefaultKafkaListenerExceptionHandler.class)
.
You can also define per bean exception handling logic by implementing the KafkaListenerExceptionHandler interface in your @KafkaListener class.
The KafkaListenerExceptionHandler receives an exception of type KafkaListenerException which allows access to the original ConsumerRecord
, if available.
7 Running Kafka Applications
You can run a Micronaut Kafka application with or without the presence of an HTTP server.
If you run your application without the http-server-netty
dependency you will see output like the following on startup:
11:06:22.638 [main] INFO io.micronaut.runtime.Micronaut - Startup completed in 402ms. Server Running: 4 active message listeners.
No port is exposed, but the Kafka consumers are active and running. The process registers a shutdown hook such that the KafkaConsumer
instances are closed correctly when the server is shutdown.
7.1 Kafka Health Checks
In addition to http-server-netty
, if the management
dependency is added, then Micronaut’s Health Endpoint can be used to expose the health status of the Kafka consumer application.
For example if Kafka is not available the /health
endpoint will return:
{
"status": "DOWN",
"details": {
...
"kafka": {
"status": "DOWN",
"details": {
"error": "java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment."
}
}
}
}
By default, the details visible above are only shown to authenticated users. See the Health Endpoint documentation for how to configure that setting. |
The following options are available to configure the Kafka Health indicator:
Property | Type | Description |
---|---|---|
|
java.time.Duration |
The health check timeout. |
|
boolean |
Whether the Kafka health check is enabled. Default value true. |
|
boolean |
By default, the health check requires cluster-wide permissions in order to get information about the nodes in the Kafka cluster. If your application doesn’t have admin privileges (for example, this might happen in multi-tenant scenarios), you can switch to a "restricted" version of the health check which only validates basic connectivity but doesn’t require any additional permissions.. Default value false |
7.2 Kafka Metrics
You can enable Kafka Metrics collection by enabling Micrometer Metrics.
If you do not wish to collect Kafka metrics, you can set micronaut.metrics.binders.kafka.enabled
to false
in application.yml
.
In the case of Kafka Streams metrics, you can use micronaut.metrics.binders.kafka.streams.enabled
instead.
7.3 Kafka Distributed Tracing
Distributed tracing is supported via the Micronaut Tracing module using Open Telemetry.
7.4 Creating New Topics
You can automatically add topics to the broker when your application starts. To do so, add a bean of type a NewTopic
for each topic you want to create. NewTopic
instances let you specify the name, the number of partitions, the replication factor, the replicas assignments and the configuration properties you want to associate with the new topic. Additionally, you can add a bean of type CreateTopicsOptions
that will be used when the new topics are created.
Java
Groovy
Kotlin
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
@Requires(bean = AdminClient.class)
@Factory
public class MyTopicFactory {
@Bean
CreateTopicsOptions options() {
return new CreateTopicsOptions().timeoutMs(5000).validateOnly(true).retryOnQuotaViolation(false);
}
@Bean
NewTopic topic1() {
return new NewTopic("my-new-topic-1", 1, (short) 1);
}
@Bean
NewTopic topic2() {
return new NewTopic("my-new-topic-2", 2, (short) 1);
}
}
Creating topics is not a transactional operation, so it may succeed for some topics while fail for others. This operation also executes asynchronously, so it may take several seconds until all the brokers become aware that the topics have been created. |
If you ever need to check if the operation has completed, you can @Inject
or retrieve the KafkaNewTopics bean from the application context and then retrieve the operation result that Kafka returned when the topics were created.
Java
Groovy
Kotlin
boolean areNewTopicsDone(KafkaNewTopics newTopics) {
return newTopics.getResult().all().isDone();
}
7.5 Disabling Micronaut-Kafka
If you want to disable micronaut-kafka entirely, you can set kafka.enabled
to false
in application.yml
.
This will prevent the instantiation of all kafka-related beans.
You must, however, provide your own replacement implementations of any @KafkaClient
interfaces:
Java
Groovy
Kotlin
import io.micronaut.context.annotation.Replaces;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.StringUtils;
import jakarta.inject.Singleton;
@Requires(property = "kafka.enabled", notEquals = StringUtils.TRUE, defaultValue = StringUtils.TRUE) // (1)
@Replaces(MessageClient.class) // (2)
@Singleton
public class MessageClientFallback implements MessageClient { // (3)
@Override
public void send(String message) {
throw new UnsupportedOperationException(); // (4)
}
}
1 | Only instantiate when kafka.enabled is set to false |
2 | Replace the @KafkaClient interface |
3 | Implement the interface |
4 | Provide an alternative implementation for all client methods |
8 Kafka Streams
Using the CLI
If you are creating your project using the Micronaut CLI, supply the $ mn create-app my-app --features kafka-streams |
Kafka Streams is a platform for building real time streaming applications.
When using Micronaut with Kafka Stream, your application gains all of the features from Micronaut (configuration management, AOP, DI, health checks etc.), simplifying the construction of Kafka Stream applications.
Since Micronaut’s DI and AOP is compile time, you can build low overhead stream applications with ease.
Defining Kafka Streams
To define Kafka Streams you should first add the kafka-streams
configuration to your build.
Gradle
Maven
implementation("io.micronaut.kafka:micronaut-kafka-streams")
The minimum configuration required is to set the Kafka bootstrap servers:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
bootstrap:
servers: localhost:9092
You should then define a @Factory for your streams that defines beans that return a KStream
. For example to implement the Word Count example from the Kafka Streams documentation:
Java
Groovy
Kotlin
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
@Factory
public class WordCountStream {
@Singleton
@Named("word-count")
KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { // (1)
// set default serdes
Properties props = builder.getConfiguration();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500");
KStream<String, String> source = builder.stream("streams-plaintext-input"); // (2)
KTable<String, Long> groupedByWord = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
//Store the result in a store for lookup later
.count(Materialized.as("word-count-store-java")); // (3)
groupedByWord
//convert to stream
.toStream()
//send to output using specific serdes
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); // (4)
return source;
}
1 | An instance of ConfiguredStreamBuilder is injected that allows mutating the configuration |
2 | The input topic |
3 | Materialize the count stream and save to a state store |
4 | The output topic |
With Kafka streams the key and value Serdes (serializer/deserializer) must be classes with a zero argument constructor. If you wish to use JSON (de)serialization you can subclass JsonObjectSerde to define your Serdes
|
You can use the @KafkaClient annotation to send a sentence to be processed by the above stream:
Java
Groovy
Kotlin
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
@KafkaClient
public interface WordCountClient {
@Topic("streams-plaintext-input")
void publishSentence(String sentence);
}
You can also define a @KafkaListener to listen for the result of the word count stream:
Java
Groovy
Kotlin
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;
@KafkaListener(offsetReset = EARLIEST, groupId = "WordCountListener")
public class WordCountListener {
private final Map<String, Long> wordCounts = new ConcurrentHashMap<>();
@Topic("streams-wordcount-output")
void count(@KafkaKey String word, long count) {
wordCounts.put(word, count);
}
public long getCount(String word) {
Long num = wordCounts.get(word);
return num != null ? num : 0;
}
public Map<String, Long> getWordCounts() {
return Collections.unmodifiableMap(wordCounts);
}
}
Configuring Kafka Streams
If you have a single Kafka Stream configured on your Micronaut service, then you should use the default
key on the configuration.
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
streams:
default:
processing.guarantee: "exactly_once"
auto.offset.reset: "earliest"
The above configuration example sets the processing.guarantee
and auto.offset.reset
setting of the default
Stream. Most of the configuration properties pass directly through to the KafkaStreams
instance being initialized.
In addition to those standard properties, you may want to customize how long you wait for Kafka Streams to shut down (this is mostly useful during testing), with close-timeout
.
For example, this will make Micronaut Kafka wait for up to 10 seconds to shut down the default stream:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
streams:
default:
close-timeout: 10s
You can define multiple Kafka Streams on the same Micronaut application, each with their own unique configuration.
To do this you should define the configuration with kafka.streams.[STREAM-NAME]
.
Assuming you have 2 or more Kafka Streams definitions on a single service, you will need to use the default
key for at least one of them and then define kafka.streams.[STREAM-NAME]
for the rest.
For example in application.yml
:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
streams:
default:
num:
stream:
threads: 1
my-other-stream:
num:
stream:
threads: 10
The above configuration sets the num.stream.threads
setting of the Kafka StreamsConfig
to 1
for the default
stream, and the same setting to 10
for a stream named my-stream
.
You can then inject an ConfiguredStreamBuilder
specifically for the above configuration using jakarta.inject.Named
:
Java
Groovy
Kotlin
@Singleton
@Named("my-other-stream")
KStream<String, String> myOtherKStream(ConfiguredStreamBuilder builder) {
return builder.stream("my-other-stream");
}
If you do not provide a @Named on the ConfiguredStreamBuilder you have multiple KStreams defined that share the default configurations like client id, application id, etc. It is advisable when using multiple streams in a single app to provide a @Named instance of ConfiguredStreamBuilder for each stream.
|
Java
Groovy
Kotlin
@Singleton
@Named("my-stream")
KStream<String, String> myStream(ConfiguredStreamBuilder builder) {
When writing a test without starting the actual Kafka server, you can instruct Micronaut not to start Kafka Streams. To do this create a config file suffixed with an environment name, such as application-test.yml
and set the kafka.streams.[STREAM-NAME].start-kafka-streams
to false
.
For example in application-test.yml
:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
streams:
default:
start-kafka-streams: false
8.1 Interactive Query Service
When using streams you can set a state store for your stream using a store builder and telling the stream to store its data. In the above example for the Kafka Streams Word Count, the output is materialized to a named store that can later be retrieved via the Interactive Query Service. Apache Kafka docs available here.
You can inject the InteractiveQueryService and use the method getQueryableStore(String storeName, QueryableStoreType<T> storeType)
to get values from a state store.
An example service that wraps the InteractiveQueryService
is included below. This is here to illustrate that when calling the getQueryableStore
method you must provide the store name and preferably the type of key and value you are trying to retrieve.
Java
Groovy
Kotlin
import io.micronaut.configuration.kafka.streams.InteractiveQueryService;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.Optional;
/**
* Example service that uses the InteractiveQueryService in a reusable way. This is only intended as an example.
*/
@Singleton
public class InteractiveQueryServiceExample {
private final InteractiveQueryService interactiveQueryService;
public InteractiveQueryServiceExample(InteractiveQueryService interactiveQueryService) {
this.interactiveQueryService = interactiveQueryService;
}
/**
* Method to get the word state store and word count from the store using the interactive query service.
*
* @param stateStore the name of the state store ie "foo-store"
* @param word the key to get, in this case the word as the stream and ktable have been grouped by word
* @return the Long count of the word in the store
*/
public Long getWordCount(String stateStore, String word) {
Optional<ReadOnlyKeyValueStore<String, Long>> queryableStore = interactiveQueryService.getQueryableStore(
stateStore, QueryableStoreTypes.keyValueStore());
return queryableStore.map(kvReadOnlyKeyValueStore ->
kvReadOnlyKeyValueStore.get(word)).orElse(0L);
}
/**
* Method to get byte array from a state store using the interactive query service.
*
* @param stateStore the name of the state store ie "bar-store"
* @param blobName the key to get, in this case the name of the blob
* @return the byte[] stored in the state store
*/
public byte[] getBytes(String stateStore, String blobName) {
Optional<ReadOnlyKeyValueStore<String, byte[]>> queryableStore = interactiveQueryService.getQueryableStore(
stateStore, QueryableStoreTypes.keyValueStore());
return queryableStore.map(stringReadOnlyKeyValueStore ->
stringReadOnlyKeyValueStore.get(blobName)).orElse(null);
}
/**
* Method to get value V by key K.
*
* @param stateStore the name of the state store ie "baz-store"
* @param name the key to get
* @return the value of type V stored in the state store
*/
public <K, V> V getGenericKeyValue(String stateStore, K name) {
Optional<ReadOnlyKeyValueStore<K, V>> queryableStore = interactiveQueryService.getQueryableStore(
stateStore, QueryableStoreTypes.<K, V>keyValueStore());
return queryableStore.map(kvReadOnlyKeyValueStore ->
kvReadOnlyKeyValueStore.get(name)).orElse(null);
}
}
8.2 Kafka Stream Health Checks
In addition to http-server-netty
, if the management
dependency is added, then Micronaut’s Health Endpoint can be used to expose the health status of the Kafka streams application.
For example stream health at the /health
endpoint will return:
{
"status": "UP",
"details": {
"kafkaStreams": {
"name": "my-application",
"status": "UP",
"details": {
"named-stream": {
"name": "my-application",
"status": "UP",
"details": {
"adminClientId": "my-consumer-id-admin",
"restoreConsumerClientId": "my-consumer-id-StreamThread-1-restore-consumer",
"threadState": "RUNNING",
"producerClientIds": [
"my-consumer-id-StreamThread-1-producer"
],
"consumerClientId": "my-consumer-id-StreamThread-1-consumer",
"threadName": "my-consumer-id-StreamThread-1"
}
}
}
},
...
}
}
By default, the details visible above are only shown to authenticated users. See the Health Endpoint documentation for how to configure that setting. |
If you wish to disable the Kafka streams health check while still using the management
dependency you can set the property kafka.health.streams.enabled
to false
in your application configuration.
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
health:
streams:
enabled: false
8.3 Handling Uncaught Exceptions
Since version 2.8.0, Kafka allows you to handle uncaught exceptions that may be thrown from your streams. This handler must return the action that must be taken, depending on the thrown exception.
There are three possible responses: REPLACE_THREAD
, SHUTDOWN_CLIENT
, or SHUTDOWN_APPLICATION
.
You can find more details about this mechanism here. |
If you just want to take the same action every time, you can set the application property kafka.streams.[STREAM-NAME].uncaught-exception-handler
to a valid action, such as REPLACE_THREAD
.
For example in application-test.yml
:
Properties
Yaml
Toml
Groovy
Hocon
JSON
kafka:
streams:
my-stream:
uncaught-exception-handler: REPLACE_THREAD
To implement your own handler, you can listen to the application event BeforeKafkaStreamStart and configure the streams with your own business logic:
import io.micronaut.configuration.kafka.streams.event.BeforeKafkaStreamStart;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventListener;
import jakarta.inject.Singleton;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
@Singleton
public class MyStreamsUncaughtExceptionHandler
implements ApplicationEventListener<BeforeKafkaStreamStart>, StreamsUncaughtExceptionHandler {
boolean dangerAvoided = false;
@Override
public void onApplicationEvent(BeforeKafkaStreamStart event) {
event.getKafkaStreams().setUncaughtExceptionHandler(this);
}
@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
if (exception.getCause() instanceof MyException) {
this.dangerAvoided = true;
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
}
}
9 Guides
See the following list of guides to learn more about working with Kafka in the Micronaut Framework:
10 Repository
You can find the source code of this project in this repository: