Micronaut Kafka

Integration between Micronaut and Kafka Messaging

Version:

1 Introduction

Apache Kafka is a distributed stream processing platform that can be used for a range of messaging requirements in addition to stream processing and real-time data handling.

Micronaut features dedicated support for defining both Kafka Producer and Consumer instances. Micronaut applications built with Kafka can be deployed with or without the presence of an HTTP server.

With Micronaut’s efficient compile-time AOP and cloud native features, writing efficient Kafka consumer applications that use very little resources is a breeze.

2 Release History

For this project, you can find a list of releases (with release notes) here:

Upgrading to Micronaut Kafka 5.0

Micronaut Kafka 5.0 is a significant major version which includes a number of changes you will need to consider when upgrading.

Micronaut 4, Kafka 3 & Java 17 baseline

Micronaut Kafka 5.0 requires the following minimum set of dependencies:

  • Java 17 or above

  • Kafka 3

  • Micronaut 4 or above

@KafkaClient no longer recoverable by default

Previous versions of Micronaut Kafka used the meta-annotation @Recoverable on the @KafkaClient annotation allowing you to define fallbacks in the case of failure. Micronaut Kafka 5 no longer includes this meta annotation and if you use fallbacks you should explicitly declare a dependency on io.micronaut:micronaut-retry and declare the @Recoverable explicitly.

Open Tracing No Longer Supported

Micronaut Kafka 5 no longer supports Open Tracing (which is deprecated and no longer maintained) and if you need distributed tracing you should instead use Open Telemetry.

3 Using the Micronaut CLI

To create a project with Kafka support using the Micronaut CLI, supply the kafka feature to the features flag.

$ mn create-app my-kafka-app --features kafka

This will create a project with the minimum necessary configuration for Kafka.

Kafka Messaging Application

The Micronaut CLI includes the ability to create Kafka-based messaging applications designed to implement message-driven microservices.

To create a Message-Driven Microservice with Micronaut + Kafka use the create-messaging-app command:

$ mn create-messaging-app my-kafka-app --features kafka

As you’d expect, you can start the application with ./gradlew run (for Gradle) or ./mvnw compile exec:exec (Maven). The application will (with the default config) attempt to connect to Kafka at http://localhost:9092, and will continue to run without starting up an HTTP server. All communication to/from the service will take place via Kafka producers and/or listeners.

Within the new project, you can now run the Kafka-specific code generation commands:

$ mn create-kafka-producer MessageProducer
| Rendered template Producer.java to destination src/main/java/my/kafka/app/MessageProducer.java

$ mn create-kafka-listener MessageListener
| Rendered template Listener.java to destination src/main/java/my/kafka/app/MessageListener.java

4 Kafka Quick Start

To add support for Kafka to an existing project, you should first add the Micronaut Kafka configuration to your build configuration. For example in Gradle:

implementation("io.micronaut.kafka:micronaut-kafka")
<dependency>
    <groupId>io.micronaut.kafka</groupId>
    <artifactId>micronaut-kafka</artifactId>
</dependency>

Configuring Kafka

The minimum requirement to configure Kafka is set the value of the kafka.bootstrap.servers property in application.yml:

Configuring Kafka
kafka.bootstrap.servers=localhost:9092
kafka:
    bootstrap:
        servers: localhost:9092
[kafka]
  [kafka.bootstrap]
    servers="localhost:9092"
kafka {
  bootstrap {
    servers = "localhost:9092"
  }
}
{
  kafka {
    bootstrap {
      servers = "localhost:9092"
    }
  }
}
{
  "kafka": {
    "bootstrap": {
      "servers": "localhost:9092"
    }
  }
}

The value can also be list of available servers:

Configuring Kafka
kafka.bootstrap.servers[0]=foo:9092
kafka.bootstrap.servers[1]=bar:9092
kafka:
    bootstrap:
        servers:
            - foo:9092
            - bar:9092
[kafka]
  [kafka.bootstrap]
    servers=[
      "foo:9092",
      "bar:9092"
    ]
kafka {
  bootstrap {
    servers = ["foo:9092", "bar:9092"]
  }
}
{
  kafka {
    bootstrap {
      servers = ["foo:9092", "bar:9092"]
    }
  }
}
{
  "kafka": {
    "bootstrap": {
      "servers": ["foo:9092", "bar:9092"]
    }
  }
}
You can also set the environment variable KAFKA_BOOTSTRAP_SERVERS to a comma separated list of values to externalize configuration.

If your broker needs to setup SSL, it can be configured this way:

Configuring Kafka
kafka.bootstrap.servers=localhost:9092
kafka.ssl.keystore.location=/path/to/client.keystore.p12
kafka.ssl.keystore.password=secret
kafka.ssl.truststore.location=/path/to/client.truststore.jks
kafka.ssl.truststore.password=secret
kafka.ssl.truststore.type=PKCS12
kafka.security.protocol=ssl
kafka:
  bootstrap:
    servers: localhost:9092
  ssl:
    keystore:
      location: /path/to/client.keystore.p12
      password: secret
    truststore:
      location: /path/to/client.truststore.jks
      password: secret
      type: PKCS12
  security:
    protocol: ssl
[kafka]
  [kafka.bootstrap]
    servers="localhost:9092"
  [kafka.ssl]
    [kafka.ssl.keystore]
      location="/path/to/client.keystore.p12"
      password="secret"
    [kafka.ssl.truststore]
      location="/path/to/client.truststore.jks"
      password="secret"
      type="PKCS12"
  [kafka.security]
    protocol="ssl"
kafka {
  bootstrap {
    servers = "localhost:9092"
  }
  ssl {
    keystore {
      location = "/path/to/client.keystore.p12"
      password = "secret"
    }
    truststore {
      location = "/path/to/client.truststore.jks"
      password = "secret"
      type = "PKCS12"
    }
  }
  security {
    protocol = "ssl"
  }
}
{
  kafka {
    bootstrap {
      servers = "localhost:9092"
    }
    ssl {
      keystore {
        location = "/path/to/client.keystore.p12"
        password = "secret"
      }
      truststore {
        location = "/path/to/client.truststore.jks"
        password = "secret"
        type = "PKCS12"
      }
    }
    security {
      protocol = "ssl"
    }
  }
}
{
  "kafka": {
    "bootstrap": {
      "servers": "localhost:9092"
    },
    "ssl": {
      "keystore": {
        "location": "/path/to/client.keystore.p12",
        "password": "secret"
      },
      "truststore": {
        "location": "/path/to/client.truststore.jks",
        "password": "secret",
        "type": "PKCS12"
      }
    },
    "security": {
      "protocol": "ssl"
    }
  }
}

Creating a Kafka Producer with @KafkaClient

To create a Kafka Producer that sends messages you can simply define an interface that is annotated with @KafkaClient.

For example the following is a trivial @KafkaClient interface:

ProductClient.java
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;

@KafkaClient // (1)
public interface ProductClient {

    @Topic("my-products") // (2)
    void sendProduct(@KafkaKey String brand, String name); // (3)

    void sendProduct(@Topic String topic, @KafkaKey String brand, String name); // (4)
}
ProductClient.java
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires

@KafkaClient // (1)
public interface ProductClient {

    @Topic('my-products') // (2)
    void sendProduct(@KafkaKey String brand, String name) // (3)

    void sendProduct(@Topic String topic, @KafkaKey String brand, String name) // (4)
}
ProductClient.java
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires

@KafkaClient // (1)
interface ProductClient {
    @Topic("my-products")  // (2)
    fun sendProduct(@KafkaKey brand: String, name: String) // (3)

    fun sendProduct(@Topic topic: String, @KafkaKey brand: String, name: String) // (4)
}
1 The @KafkaClient annotation is used to designate this interface as a client
2 The @Topic annotation indicates which topics the ProducerRecord should be published to
3 The method defines two parameters: The parameter that is the Kafka key and the value.
4 It is also possible for the topic to be dynamic by making it a method argument
You can omit the key, however this will result in a null key which means Kafka will not know how to partition the record.

At compile time Micronaut will produce an implementation of the above interface. You can retrieve an instance of ProductClient either by looking up the bean from the ApplicationContext or by injecting the bean with @Inject:

Using ProductClient
ProductClient client = applicationContext.getBean(ProductClient.class);
client.sendProduct("Nike", "Blue Trainers");
Using ProductClient
val client = beanContext.getBean(ProductClient::class.java)
client.sendProduct("Nike", "Blue Trainers")

Note that since the sendProduct method returns void this means the method will send the ProducerRecord and block until the response is received. You can return a Future or Publisher to support non-blocking message delivery.

Creating a Kafka Consumer with @KafkaListener

To listen to Kafka messages you can use the @KafkaListener annotation to define a message listener.

The following example will listen for messages published by the ProductClient in the previous section:

ProductListener.java
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import org.slf4j.Logger;

import static org.slf4j.LoggerFactory.getLogger;

@KafkaListener(offsetReset = OffsetReset.EARLIEST) // (1)
public class ProductListener {
    private static final Logger LOG = getLogger(ProductListener.class);

    @Topic("my-products") // (2)
    public void receive(@KafkaKey String brand, String name) { // (3)
        LOG.info("Got Product - {} by {}", name, brand);
    }
}
ProductListener.java
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires

@Slf4j
@KafkaListener(offsetReset = OffsetReset.EARLIEST) // (1)
class ProductListener {

    @Topic('my-products') // (2)
    void receive(@KafkaKey String brand, String name) { // (3)
        log.info("Got Product - {} by {}", name, brand)
    }
}
ProductListener.java
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import org.slf4j.LoggerFactory

@KafkaListener(offsetReset = OffsetReset.EARLIEST) // (1)
class ProductListener {
    companion object {
        private val LOG = LoggerFactory.getLogger(ProductListener::class.java)
    }

    @Topic("my-products") // (2)
    fun receive(@KafkaKey brand: String, name: String) { // (3)
        LOG.info("Got Product - {} by {}", name, brand)
    }
}
1 The @KafkaListener is used with offsetReset set to EARLIEST which makes the listener start listening to messages from the beginning of the partition.
2 The @Topic annotation is again used to indicate which topic(s) to subscribe to.
3 The receive method defines 2 arguments: The argument that will receive the key and the argument that will receive the value.

Disabling Kafka

If for some reason, you need to disable the creation of kafka consumers, or producers, you can through configuration:

Disabling Kafka
kafka.enabled=false
kafka:
  enabled: false
[kafka]
  enabled=false
kafka {
  enabled = false
}
{
  kafka {
    enabled = false
  }
}
{
  "kafka": {
    "enabled": false
  }
}

5 Kafka Producers Using @KafkaClient

5.1 Defining @KafkaClient Methods

Specifying the Key and the Value

The Kafka key can be specified by providing a parameter annotated with @KafkaKey. If no such parameter is specified the record is sent with a null key.

The value to send is resolved by selecting the argument annotated with @MessageBody, otherwise the first argument with no specific binding annotation is used. For example:

    @Topic("my-products")
    void sendProduct(@KafkaKey String brand, String name);
    @Topic('my-products')
    void sendProduct(@KafkaKey String brand, String name)
    @Topic("my-products")
    fun sendProduct(@KafkaKey brand: String, name: String)

The method above will use the parameter brand as the key and the parameter name as the value.

Including Message Headers

There are a number of ways you can include message headers. One way is to annotate an argument with the @MessageHeader annotation and include a value when calling the method:

    @Topic("my-products")
    void sendProduct(@KafkaKey String brand, String name, @MessageHeader("My-Header") String myHeader);
    @Topic('my-products')
    void sendProduct(@KafkaKey String brand, String name, @MessageHeader('My-Header') String myHeader)
    @Topic("my-products")
    fun sendProduct(@KafkaKey brand: String, name: String, @MessageHeader("My-Header") myHeader: String)

The example above will include the value of the myHeader argument as a header called My-Header.

Another way to include headers is at the type level with the values driven from configuration:

Declaring @KafkaClient Headers
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.messaging.annotation.MessageHeader;

@KafkaClient(id="product-client")
@MessageHeader(name = "X-Token", value = "${my.application.token}")
public interface ProductClient {
    // define client API
}
Declaring @KafkaClient Headers
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.messaging.annotation.MessageHeader

@KafkaClient(id='product-client')
@MessageHeader(name = 'X-Token', value = '${my.application.token}')
interface ProductClient {
    // define client API
}
Declaring @KafkaClient Headers
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.messaging.annotation.MessageHeader

@KafkaClient(id = "product-client")
@MessageHeader(name = "X-Token", value = "\${my.application.token}")
interface ProductClient {
// define client API
}

The above example will send a header called X-Token with the value read from the setting my.application.token in application.yml (or the environnment variable MY_APPLICATION_TOKEN).

If the my.application.token is not set then an error will occur creating the client.

It is also possible to pass Collection<Header> or Headers object as method arguments as seen below.

Collection<Header> Argument
    @Topic("my-bicycles")
    void sendBicycle(@KafkaKey String brand, String model, Collection<Header> headers);
Collection<Header> Argument
    @Topic('my-bicycles')
    void sendBicycle(@KafkaKey String brand, String model, Collection<Header> headers)
Collection<Header> Argument
    @Topic("my-bicycles")
    fun sendBicycle(@KafkaKey brand: String, model: String, headers: Collection<Header>)
Headers Argument
    @Topic("my-bicycles")
    void sendBicycle(@KafkaKey String brand, String model, Headers headers);
Headers Argument
    @Topic('my-bicycles')
    void sendBicycle(@KafkaKey String brand, String model, Headers headers)
Headers Argument
    @Topic("my-bicycles")
    fun sendBicycle(@KafkaKey brand: String, model: String, headers: Headers)

In the above examples, all of the key/value pairs in headers will be added to the list of headers produced to the topic. Header and Headers are part of the kafka-clients library:

Reactive and Non-Blocking Method Definitions

The @KafkaClient annotation supports the definition of reactive return types (such as Flowable or Reactor Flux) as well as Futures.

Add the library Micronaut Reactor or Micronaut RxJava 3 to your application’s dependencies.

The following sections, which use Micronaut Reactor, cover possible method signatures and behaviour:

Mono Value and Return Type

    @Topic("my-books")
    Mono<Book> sendBook(@KafkaKey String author, Mono<Book> book);
    @Topic('my-books')
    Mono<Book> sendBook(@KafkaKey String author, Mono<Book> book);
    @Topic("my-books")
    fun sendBook(@KafkaKey author: String, book: Mono<Book>): Mono<Book>

The implementation will return a Mono that when subscribed to will subscribe to the passed Mono and send the emitted item as a ProducerRecord emitting the item again if successful or an error otherwise.

Flux Value and Return Type

    @Topic("my-books")
    Flux<RecordMetadata> sendBooks(@KafkaKey String author, Flux<Book> book);
    @Topic('my-books')
    Flux<RecordMetadata> sendBooks(@KafkaKey String author, Flux<Book> book);
    @Topic("my-books")
    fun sendBooks(@KafkaKey author: String, book: Flux<Book>): Flux<RecordMetadata>

The implementation will return a Reactor Flux that when subscribed to will subscribe to the passed Flux and for each emitted item will send a ProducerRecord emitting the resulting Kafka RecordMetadata if successful or an error otherwise.

Available Annotations

There are a number of annotations available that allow you to specify how a method argument is treated.

The following table summarizes the annotations and their purpose, with an example:

Table 1. Kafka Messaging Annotations
Annotation Description Example

@MessageBody

Allows explicitly indicating the body of the message to sent

@MessageBody Product product

@MessageHeader

Allows specifying a parameter that should be sent as a header

@MessageHeader("X-My-Header") String myHeader

@KafkaKey

Allows specifying the parameter that is the Kafka key

@KafkaKey String key

@KafkaPartition

Allows specifying the parameter that is the partition number

@KafkaPartition Integer partition

@KafkaPartitionKey

Allows specifying the parameter that is used to compute a partition number independently from the Message Key.

@KafkaPartition String partitionKey

For example, you can use the @MessageHeader annotation to bind a parameter value to a header in the ProducerRecord.

5.2 Configuring @KafkaClient beans

@KafkaClient and Producer Properties

There are a number of ways to pass configuration properties to the KafkaProducer. You can set default producer properties using kafka.producers.default in application.yml:

Applying Default Configuration
kafka.producers.default.retries=5
kafka.producers.default.bootstrap.servers=localhost:9096
kafka:
    producers:
        default:
            retries: 5
            bootstrap:
              servers: localhost:9096
[kafka]
  [kafka.producers]
    [kafka.producers.default]
      retries=5
      [kafka.producers.default.bootstrap]
        servers="localhost:9096"
kafka {
  producers {
    'default' {
      retries = 5
      bootstrap {
        servers = "localhost:9096"
      }
    }
  }
}
{
  kafka {
    producers {
      default {
        retries = 5
        bootstrap {
          servers = "localhost:9096"
        }
      }
    }
  }
}
{
  "kafka": {
    "producers": {
      "default": {
        "retries": 5,
        "bootstrap": {
          "servers": "localhost:9096"
        }
      }
    }
  }
}

Any property in the ProducerConfig class can be set, including any overrides over the global Micronaut Kafka configs. The above example will set the default number of times to retry sending a record as well as override kafka.bootstrap.servers.

Per @KafkaClient Producer Properties

To configure different properties for each client, you should set a @KafkaClient id using the annotation:

Using a Client ID
@KafkaClient("product-client")
Using a Client ID
@KafkaClient('product-client')
Using a Client ID
@KafkaClient("product-client")

This serves 2 purposes. Firstly it sets the value of the client.id setting used to build the Producer. Secondly, it allows you to apply per producer configuration in application.yml:

Applying Default Configuration
kafka.producers.product-client.retries=5
kafka.producers.product-client.bootstrap.servers=localhost:9097
kafka.producers.product-client-2.bootstrap.servers=localhost:9098
kafka:
    producers:
        product-client:
            retries: 5
            bootstrap:
              servers: localhost:9097
        product-client-2:
            bootstrap:
              servers: localhost:9098
[kafka]
  [kafka.producers]
    [kafka.producers.product-client]
      retries=5
      [kafka.producers.product-client.bootstrap]
        servers="localhost:9097"
    [kafka.producers.product-client-2]
      [kafka.producers.product-client-2.bootstrap]
        servers="localhost:9098"
kafka {
  producers {
    productClient {
      retries = 5
      bootstrap {
        servers = "localhost:9097"
      }
    }
    productClient2 {
      bootstrap {
        servers = "localhost:9098"
      }
    }
  }
}
{
  kafka {
    producers {
      product-client {
        retries = 5
        bootstrap {
          servers = "localhost:9097"
        }
      }
      product-client-2 {
        bootstrap {
          servers = "localhost:9098"
        }
      }
    }
  }
}
{
  "kafka": {
    "producers": {
      "product-client": {
        "retries": 5,
        "bootstrap": {
          "servers": "localhost:9097"
        }
      },
      "product-client-2": {
        "bootstrap": {
          "servers": "localhost:9098"
        }
      }
    }
  }
}

Finally, the @KafkaClient annotation itself provides a properties member that you can use to set producer specific properties:

Configuring Producer Properties with @KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Requires;
import org.apache.kafka.clients.producer.ProducerConfig;

@KafkaClient(
    id = "product-client",
    acks = KafkaClient.Acknowledge.ALL,
    properties = @Property(name = ProducerConfig.RETRIES_CONFIG, value = "5")
)
public interface ProductClient {
    // define client API
}
Configuring Producer Properties with @KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import org.apache.kafka.clients.producer.ProducerConfig

@KafkaClient(
        id = "product-client",
        acks = KafkaClient.Acknowledge.ALL,
        properties = @Property(name = ProducerConfig.RETRIES_CONFIG, value = '5')
)
interface ProductClient {
    // define client API
}
Configuring Producer Properties with @KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import org.apache.kafka.clients.producer.ProducerConfig

@KafkaClient(
    id = "product-client",
    acks = KafkaClient.Acknowledge.ALL,
    properties = [Property(name = ProducerConfig.RETRIES_CONFIG, value = "5")]
)
interface ProductClient {
    // define client API
}

@KafkaClient and Serializers

When serializing keys and values Micronaut will by default attempt to automatically pick a Serializer to use. This is done via the CompositeSerdeRegistry bean.

You can replace the default SerdeRegistry bean with your own implementation by defining a bean that uses @Replaces(CompositeSerdeRegistry.class). See the section on Bean Replacement.

All common java.lang types (String, Integer, primitives etc.) are supported and for POJOs by default a Jackson based JSON serializer is used.

You can, however, explicitly override the Serializer used by providing the appropriate configuration in application.yml:

Applying Default Configuration
kafka.producers.product-client.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
kafka:
    producers:
        product-client:
            value:
                serializer: org.apache.kafka.common.serialization.ByteArraySerializer
[kafka]
  [kafka.producers]
    [kafka.producers.product-client]
      [kafka.producers.product-client.value]
        serializer="org.apache.kafka.common.serialization.ByteArraySerializer"
kafka {
  producers {
    productClient {
      value {
        serializer = "org.apache.kafka.common.serialization.ByteArraySerializer"
      }
    }
  }
}
{
  kafka {
    producers {
      product-client {
        value {
          serializer = "org.apache.kafka.common.serialization.ByteArraySerializer"
        }
      }
    }
  }
}
{
  "kafka": {
    "producers": {
      "product-client": {
        "value": {
          "serializer": "org.apache.kafka.common.serialization.ByteArraySerializer"
        }
      }
    }
  }
}

You may want to do this if for example you choose an alternative serialization format such as Avro or Protobuf.

5.3 Sending Records in Batch

By default if you define a method that takes a container type such as a List the list will be serialized using the specified value.serializer (the default will result in a JSON array).

For example the following two methods will both send serialized arrays:

Sending Arrays and Lists
@Topic("books")
void sendList(List<Book> books);

@Topic("books")
void sendBooks(Book...books);
Sending Arrays and Lists
@Topic('books')
void sendList(List<Book> books)

@Topic('books')
void sendBooks(Book...books)
Sending Arrays and Lists
@Topic("books")
fun sendList(books: List<Book>)

@Topic("books")
fun sendBooks(vararg books: Book)

Instead of a sending a serialized array you may wish to instead send batches of ProducerRecord either synchronously or asynchronously.

To do this you can specify a value of true to the batch member of the @KafkaClient annotation:

Sending ProducerRecord batches
@KafkaClient(batch = true)
public interface BookClient {

    @Topic("books")
    void sendList(List<Book> books);
Sending ProducerRecord batches
@KafkaClient(batch = true)
interface BookClient {

    @Topic('books')
    void sendList(List<Book> books)
Sending ProducerRecord batches
@KafkaClient(batch = true)
interface BookClient {

    @Topic("books")
    fun sendList(books: List<Book>)

In the above case instead of sending a serialized array the client implementation will iterate over each item in the list and send a ProducerRecord for each. The previous example is blocking, however you can return a reactive type if desired:

Sending ProducerRecord batches Reactively
@KafkaClient(batch = true)
public interface BookClient {

    @Topic("books")
    Flux<RecordMetadata> send(List<Book> books);
Sending ProducerRecord batches Reactively
@KafkaClient(batch = true)
interface BookClient {

    @Topic('books')
    Flux<RecordMetadata> send(List<Book> books)
Sending ProducerRecord batches Reactively
@KafkaClient(batch = true)
interface BookClient {

    @Topic("books")
    fun send(books: List<Book>): Flux<RecordMetadata>

You can also use an unbound reactive type such as Flux as the source of your batch data:

Sending ProducerRecord batches from a Flux
@KafkaClient(batch = true)
public interface BookClient {

    @Topic("books")
    Flux<RecordMetadata> send(Flux<Book> books);
Sending ProducerRecord batches from a Flux
@KafkaClient(batch = true)
interface BookClient {

    @Topic('books')
    Flux<RecordMetadata> send(Flux<Book> books)
Sending ProducerRecord batches from a Flux
@KafkaClient(batch = true)
interface BookClient {

    @Topic("books")
    fun send(books: Flux<Book>): Flux<RecordMetadata>

5.4 Injecting Kafka Producer Beans

If you need maximum flexibility and don’t want to use the @KafkaClient support you can use the @KafkaClient annotation as qualifier for dependency injection of KafkaProducer instances.

Consider the following example:

Using a KafkaProducer directly
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.concurrent.Future;

@Singleton
public class BookSender {

    private final Producer<String, Book> kafkaProducer;

    public BookSender(@KafkaClient("book-producer") Producer<String, Book> kafkaProducer) { // (1)
        this.kafkaProducer = kafkaProducer;
    }

    public Future<RecordMetadata> send(String author, Book book) {
        return kafkaProducer.send(new ProducerRecord<>("books", author, book)); // (2)
    }
}
Using a KafkaProducer directly
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata

import java.util.concurrent.Future

@Singleton
class BookSender {

    private final Producer<String, Book> kafkaProducer

    BookSender(@KafkaClient('book-producer') Producer<String, Book> kafkaProducer) { // (1)
        this.kafkaProducer = kafkaProducer
    }

    Future<RecordMetadata> send(String author, Book book) {
        kafkaProducer.send(new ProducerRecord<>('books', author, book)) // (2)
    }
}
Using a KafkaProducer directly
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import java.util.concurrent.Future

@Requires(property = "spec.name", value = "BookSenderTest")
@Singleton
class BookSender(
    @param:KafkaClient("book-producer") private val kafkaProducer: Producer<String, Book>) { // (1)

    fun send(author: String, book: Book): Future<RecordMetadata> {
        return kafkaProducer.send(ProducerRecord("books", author, book)) // (2)
    }
}
1 The Producer is dependency injected into the constructor. If not specified in configuration, the key and value serializer are inferred from the generic type arguments.
2 The Producer is used to send records

Note that there is no need to call the close() method to shut down the KafkaProducer, it is fully managed by Micronaut and will be shutdown when the application shuts down.

The previous example can be tested in JUnit with the following test:

Using a KafkaProducer directly
@Test
void testBookSender() {

    try (ApplicationContext ctx = ApplicationContext.run( // (1)
        Map.of("kafka.enabled", "true", "spec.name", "BookSenderTest")
    )) {
        BookSender bookSender = ctx.getBean(BookSender.class); // (2)
        Book book = new Book("The Stand");
        Future<RecordMetadata> stephenKing = bookSender.send("Stephen King", book);
        assertDoesNotThrow(() -> {
            RecordMetadata recordMetadata = stephenKing.get();
            assertEquals("books", recordMetadata.topic());
        });
    }
}
Using a KafkaProducer directly
void "test Book Sender"() {
    given:
    ApplicationContext ctx = ApplicationContext.run(  // (1)
        'kafka.enabled': true, 'spec.name': 'BookSenderTest'
    )
    BookSender bookSender = ctx.getBean(BookSender) // (2)
    Book book = new Book('The Stand')

    when:
    bookSender.send('Stephen King', book)
    Future<RecordMetadata> stephenKing = bookSender.send('Stephen King', book);
    def recordMetadata = stephenKing.get();

    then:
    noExceptionThrown()
    recordMetadata.topic() == 'books'

    cleanup:
    ctx.close()
}
Using a KafkaProducer directly
@Test
fun testBookSender() {
    ApplicationContext.run(mapOf( // (1)
        "kafka.enabled" to StringUtils.TRUE, "spec.name" to "BookSenderTest")).use { ctx ->
        val bookSender = ctx.getBean(BookSender::class.java) // (2)
        val book = Book("The Stand")
        bookSender.send("Stephen King", book)
        val stephenKing = bookSender.send("Stephen King", book)
        Assertions.assertDoesNotThrow {
            val recordMetadata = stephenKing.get()
            Assertions.assertEquals("books", recordMetadata.topic())
        }
    }
1 A Kafka docker container is used
2 The BookSender is retrieved from the ApplicationContext and a ProducerRecord sent

By using the KafkaProducer API directly you open up even more options if you require transactions (exactly-once delivery) or want control over when records are flushed etc.

5.5 Transactions

Transaction processing can be enabled by defining transactionalId on @KafkaClient, which will initialize the producer for transactional usage and wrap any send operation with a transaction demarcation.

Transactional Client
@KafkaClient(id = "my-client", transactionalId = "my-tx-id")
public interface TransactionalClient {
    // define client API
}
Transactional Client
@KafkaClient(id = "my-client", transactionalId = "my-tx-id")
interface TransactionalClient {
    // define client API
}
Transactional Client
@KafkaClient(id = "my-client", transactionalId = "my-tx-id")
interface TransactionalClient {
    // define client API
}
Alternative Kafka producer transactional code
producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(...);
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

@KafkaClient beans are by default singleton. When using multiple threads, you must either synchronize access to the individual instance or declare the bean as @Prototype. Additionally, you can use random properties to your advantage so that each instance of your producer gets a different transactional ID.

Random transactional ID
@Prototype
@KafkaClient(id = "my-client", transactionalId = "my-tx-id-${random.uuid}")
public interface RandomTransactionalIdClient {
    // define client API
}
Random transactional ID
@Prototype
@KafkaClient(id = "my-client", transactionalId = 'my-tx-id-${random.uuid}')
interface RandomTransactionalIdClient {
    // define client API
}
Random transactional ID
@Prototype
@KafkaClient(id = "my-client", transactionalId = "my-tx-id-\${random.uuid}")
interface RandomTransactionalIdClient {
    // define client API
}

5.6 Running Kafka while testing and developing

Micronaut Test Resources simplifies running Kafka for local development and testing.

Micronaut Test Resources Kafka support will automatically start a Kafka container and provide the value of the kafka.bootstrap.servers property.

Micronaut Launch and CLI already apply Test Resources to your build when you select the kafka feature.

Micronaut Test Resources uses Test Containers under the hood. If you prefer to use Test Containers directly, you can create a Singleton Container and combine it with Micronaut Test TestPropertyProvider:

package io.micronaut.kafka.docs;

import io.micronaut.test.support.TestPropertyProvider;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.Collections;
import java.util.Map;

/**
 * @see <a href="https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers">Singleton containers</a>
 */
public abstract class AbstractKafkaTest implements TestPropertyProvider {

    static protected final KafkaContainer MY_KAFKA = new KafkaContainer(
        DockerImageName.parse("confluentinc/cp-kafka:latest")
    );

    @Override
    public Map<String, String> getProperties() {
        if (!MY_KAFKA.isRunning()) {
            MY_KAFKA.start();
        }
        return Collections.singletonMap(
            "kafka.bootstrap.servers", MY_KAFKA.getBootstrapServers()
        );
    }
}
package io.micronaut.kafka.docs

import io.micronaut.test.support.TestPropertyProvider
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification

abstract class AbstractKafkaTest extends Specification implements TestPropertyProvider {

    @Shared
    @AutoCleanup
    KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))

    @Override
    Map<String, String> getProperties() {
        kafkaContainer.start()

        ["kafka.bootstrap.servers": kafkaContainer.getBootstrapServers()]
    }
}
package io.micronaut.kafka.docs

import io.micronaut.test.support.TestPropertyProvider
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName

/**
 * @see <a href="https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers">Singleton containers</a>
 */
abstract class AbstractKafkaTest : TestPropertyProvider {

    companion object {
        var MY_KAFKA: KafkaContainer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))
    }

    override fun getProperties(): MutableMap<String, String> {
        if(!MY_KAFKA.isRunning) {
            MY_KAFKA.start()
        }
        return mutableMapOf(
            "kafka.bootstrap.servers" to MY_KAFKA.bootstrapServers
        )
    }
}

And then test:

package io.micronaut.kafka.docs;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Requires;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;

@Property(name = "spec.name", value = "MyTest")
@MicronautTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyTest extends AbstractKafkaTest {

    @Test
    void testKafkaRunning(MyProducer producer, MyConsumer consumer) {
        final String message = "hello";
        producer.produce(message);
        await().atMost(5, SECONDS).until(() -> message.equals(consumer.consumed));
    }

    @Requires(property = "spec.name", value = "MyTest")
    @KafkaClient
    interface MyProducer {
        @Topic("my-topic")
        void produce(String message);
    }

    @Requires(property = "spec.name", value = "MyTest")
    @KafkaListener(offsetReset = OffsetReset.EARLIEST)
    static class MyConsumer {
        String consumed;
        @Topic("my-topic")
        public void consume(String message) {
            consumed = message;
        }
    }
}
package io.micronaut.kafka.docs


import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import spock.util.concurrent.PollingConditions

@MicronautTest
@Property(name = "spec.name", value = "MyTest")
class MyTest extends AbstractKafkaTest {

    @Inject
    MyProducer producer
    @Inject
    MyConsumer consumer

    PollingConditions conditions = new PollingConditions()

    void "test kafka running"() {
        given:
        String message = "hello"

        when:
        producer.produce(message)

        then:
        conditions.within(5) {
            consumer.consumed == message
        }
    }

    @Requires(property = "spec.name", value = "MyTest")
    @KafkaClient
    static interface MyProducer {
        @Topic("my-topic")
        void produce(String message)
    }

    @Requires(property = "spec.name", value = "MyTest")
    @KafkaListener(offsetReset = OffsetReset.EARLIEST)
    static class MyConsumer {
        String consumed

        @Topic("my-topic")
        void consume(String message) {
            consumed = message
        }
    }

}
package io.micronaut.kafka.docs

import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import org.awaitility.Awaitility.await
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import java.util.concurrent.TimeUnit

@Property(name = "spec.name", value = "MyTest")
@MicronautTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
internal class MyTest : AbstractKafkaTest() {

    @Test
    fun testKafkaRunning(producer: MyProducer, consumer: MyConsumer) {
        val message = "hello"
        producer.produce(message)
        await().atMost(5, TimeUnit.SECONDS).until { consumer.consumed == message }
    }

    @Requires(property = "spec.name", value = "MyTest")
    @KafkaClient
    interface MyProducer {
        @Topic("my-topic")
        fun produce(message: String)
    }

    @Requires(property = "spec.name", value = "MyTest")
    @KafkaListener(offsetReset = OffsetReset.EARLIEST)
    class MyConsumer {
        var consumed: String? = null

        @Topic("my-topic")
        fun consume(message: String) {
            consumed = message
        }
    }
}

6 Kafka Consumers Using @KafkaListener

The quick start section presented a trivial example of what is possible with the @KafkaListener annotation.

Using the @KafkaListener annotation Micronaut will build a KafkaConsumer and start the poll loop by running the KafkaConsumer in a special consumer thread pool. You can configure the size of the thread pool based on the number of consumers in your application in application.yml as desired:

Configuring the consumer thread pool
micronaut.executors.consumer.type=fixed
micronaut.executors.consumer.nThreads=25
micronaut:
    executors:
        consumer:
            type: fixed
            nThreads: 25
[micronaut]
  [micronaut.executors]
    [micronaut.executors.consumer]
      type="fixed"
      nThreads=25
micronaut {
  executors {
    consumer {
      type = "fixed"
      nThreads = 25
    }
  }
}
{
  micronaut {
    executors {
      consumer {
        type = "fixed"
        nThreads = 25
      }
    }
  }
}
{
  "micronaut": {
    "executors": {
      "consumer": {
        "type": "fixed",
        "nThreads": 25
      }
    }
  }
}

KafkaConsumer instances are single threaded, hence for each @KafkaListener method you define a new thread is created to execute the poll loop.

You may wish to scale the number of consumers you have listening on a particular topic. There are several ways you may achieve this. You could for example run multiple instances of your application each containing a single consumer in each JVM.

Alternatively, you can also scale via threads. By setting the number of threads a particular consumer bean will create:

Scaling with Threads
@KafkaListener(groupId = "myGroup", threads = 10)
Scaling with Threads
@KafkaListener(groupId='myGroup', threads = 10)
Scaling with Threads
@KafkaListener(groupId = "myGroup", threads = 10)

The above example will create 10 KafkaConsumer instances, each running in a unique thread and participating in the myGroup consumer group.

@KafkaListener beans are by default singleton. When using multiple threads you must either synchronize access to local state or declare the bean as @Prototype.

You can also make your number of threads configurable by using threadsValue:

Dynamically Configuring Threads
@KafkaListener(groupId = "myGroup", threadsValue = "${my.thread.count}")
Dynamically Configuring Threads
@KafkaListener(groupId = 'myGroup', threadsValue = '${my.thread.count}')
Dynamically Configuring Threads
@KafkaListener(groupId = "myGroup", threadsValue = "\${my.thread.count}")
threads will be overridden by threadsValue if they are both set.

By default Micronaut will inspect the method signature of the method annotated with @Topic that will listen for ConsumerRecord instances and from the types infer an appropriate key and value Deserializer.

Applying Configuration
kafka.consumers.default.allow.auto.create.topics=true
kafka.consumers.product.bootstrap.servers=localhost:9098
kafka:
    consumers:
        default:
            allow.auto.create.topics: true
        product:
            bootstrap:
              servers: localhost:9098
[kafka]
  [kafka.consumers]
    [kafka.consumers.default]
      "allow.auto.create.topics"=true
    [kafka.consumers.product]
      [kafka.consumers.product.bootstrap]
        servers="localhost:9098"
kafka {
  consumers {
    'default' {
      allow.auto.create.topics = true
    }
    product {
      bootstrap {
        servers = "localhost:9098"
      }
    }
  }
}
{
  kafka {
    consumers {
      default {
        "allow.auto.create.topics" = true
      }
      product {
        bootstrap {
          servers = "localhost:9098"
        }
      }
    }
  }
}
{
  "kafka": {
    "consumers": {
      "default": {
        "allow.auto.create.topics": true
      },
      "product": {
        "bootstrap": {
          "servers": "localhost:9098"
        }
      }
    }
  }
}

Any property in the ConsumerConfig class can be set for all @KafkaListener beans based on the . The above example will enable the consumer to create a topic if it doesn’t exist for the default (@KafkaListener) client and set a custom bootstrap server for the product client (@KafkaListener(value = "product"))

6.1 Defining @KafkaListener Methods

The @KafkaListener annotation examples up until now have been relatively trivial, but Micronaut offers a lot of flexibility when it comes to the types of method signatures you can define.

The following sections detail examples of supported use cases.

Specifying Topics

The @Topic annotation can be used at the method or the class level to specify which topics to be listened for.

Care needs to be taken when using @Topic at the class level because every public method of the class annotated with @KafkaListener will become a Kafka consumer, which may be undesirable.

You can specify multiple topics to listen for:

Specifying Multiple Topics
@Topic({"fun-products", "awesome-products"})
public void receiveMultiTopics(@KafkaKey String brand, String name) {
    LOG.info("Got Product - {} by {}", name, brand);
}
Specifying Multiple Topics
@Topic(['fun-products', 'awesome-products'])
void receiveMultiTopics(@KafkaKey String brand, String name) {
    log.info("Got Product - {} by {}", name, brand)
}
Specifying Multiple Topics
@Topic("fun-products", "awesome-products")
fun receiveMultiTopics(@KafkaKey brand: String, name: String) {
    LOG.info("Got Product - {} by {}", name, brand)
}

You can also specify one or many regular expressions to listen for:

Using regular expressions to match Topics
@Topic(patterns="products-\\w+")
public void receivePatternTopics(@KafkaKey String brand, String name) {
    LOG.info("Got Product - {} by {}", name, brand);
}
Using regular expressions to match Topics
@Topic(patterns='products-\\w+')
void receivePatternTopics(@KafkaKey String brand, String name) {
    log.info("Got Product - {} by {}", name, brand)
}
Using regular expressions to match Topics
@Topic(patterns = ["products-\\w+"])
fun receivePatternTopics(@KafkaKey brand: String, name: String) {
    LOG.info("Got Product - {} by {}", name, brand)
}

Available Annotations

There are a number of annotations available that allow you to specify how a method argument is bound.

The following table summarizes the annotations and their purpose, with an example:

Table 1. Kafka Messaging Annotations
Annotation Description Example

@MessageBody

Allows explicitly indicating the body of the message

@MessageBody Product product

@MessageHeader

Allows binding a parameter to a message header

@MessageHeader("X-My-Header") String myHeader

@KafkaKey

Allows binding a parameter to the message key

@KafkaKey String messageKey

@KafkaPartition

Allows binding a parameter to the partition the message was received from

@KafkaPartition Integer partition

For example, you can use the @MessageHeader annotation to bind a parameter value from a header contained within a ConsumerRecord.

Topics, Partitions and Offsets

If you want a reference to the topic, partition or offset it is a simple matter of defining a parameter for each.

The following table summarizes example parameters and how they related to the ConsumerRecord being processed:

Table 2. @KafkaListener Method Parameters
Parameter Description

String topic

The name of the topic

long offset

The offset of the ConsumerRecord

int partition

The partition of the ConsumerRecord

long timestamp

The timestamp of the ConsumerRecord

As an example, following listener method will receive all of the above mentioned parameters:

Specifying Parameters for offset, topic etc.
@Topic("awesome-products")
public void receive(@KafkaKey String brand, // (1)
                    Product product, // (2)
                    long offset, // (3)
                    int partition, // (4)
                    String topic, // (5)
                    long timestamp) { // (6)
    LOG.info("Got Product - {} by {}",product.name(), brand);
}
Specifying Parameters for offset, topic etc.
@Topic('awesome-products')
void receive(@KafkaKey String brand,  // (1)
             Product product, // (2)
             long offset, // (3)
             int partition, // (4)
             String topic, // (5)
             long timestamp) { // (6)
    log.info("Got Product - {} by {}", product.name, brand)
}
Specifying Parameters for offset, topic etc.
@Topic("awesome-products")
fun receive(
    @KafkaKey brand: String,  // (1)
    product: Product,  // (2)
    offset: Long,  // (3)
    partition: Int,  // (4)
    topic: String?,  // (5)
    timestamp: Long // (6)
) {
    LOG.info("Got Product - {} by {}", product.name, brand)
}
1 The Kafka key
2 The message body
3 The offset of the ConsumerRecord
4 The partition of the ConsumerRecord
5 The topic. Note that the @Topic annotation supports multiple topics.
6 The timestamp of the ConsumerRecord

Receiving a ConsumerRecord

If you prefer you can also receive the entire ConsumerRecord object being listened for. In this case you should specify appropriate generic types for the key and value of the ConsumerRecord so that Micronaut can pick the correct deserializer for each.

Consider the following example:

Specifying Parameters for offset, topic etc.
@Topic("awesome-products")
public void receive(ConsumerRecord<String, Product> record) { // (1)
    Product product = record.value(); // (2)
    String brand = record.key(); // (3)
    LOG.info("Got Product - {} by {}",product.name(), brand);
}
Specifying Parameters for offset, topic etc.
@Topic("awesome-products")
public void receive(ConsumerRecord<String, Product> record) { // (1)
    Product product = record.value() // (2)
    String brand = record.key() // (3)
    log.info("Got Product - {} by {}", product.name, brand)
}
Specifying Parameters for offset, topic etc.
@Topic("awesome-products")
fun receive(record: ConsumerRecord<String, Product>) { // (1)
    val name = record.value() // (2)
    val brand = record.key() // (3)
    LOG.info("Got Product - $name by $brand")
}
1 The method signature accepts a ConsumerRecord that specifies a String for the key type and a POJO (Product) for the value type.
2 The value() method is used to retrieve the value
3 The key() method is used to retrieve the key

Receiving and returning Reactive Types

In addition to common Java types and POJOs you can also define listener methods that receive a Reactive type such as a Single or a Reactor Mono.

Add the library Micronaut Reactor or Micronaut RxJava 3 to your application’s dependencies.

For example, using Reactor:

Using Reactive Types
@Topic("reactive-products")
public Mono<Product> receive(@KafkaKey String brand,  // (1)
                             Mono<Product> productPublisher) { // (2)
    return productPublisher.doOnSuccess((product) ->
        LOG.info("Got Product - {} by {}", product.name(), brand) // (3)
    );
}
Using Reactive Types
@Topic('reactive-products')
Mono<Product> receive(@KafkaKey String brand,  // (1)
                      Mono<Product> productPublisher) { // (2)
    return productPublisher.doOnSuccess(product ->
        log.info("Got Product - {} by {}", product.name, brand) // (3)
    )
}
Using Reactive Types
@Topic("reactive-products")
fun receive(@KafkaKey brand: String,  // (1)
    product: Mono<Product>): Mono<Product> { // (2)
    return product.doOnSuccess { (name): Product ->
        LOG.info("Got Product - {} by {}", name, brand) // (3)
    }
}
1 The @KafkaKey annotation is used to indicate the key
2 A Mono is used to receive the message body
3 The doOnSuccess method is used to process the result

Note that in this case the method returns a Mono that indicates to Micronaut the poll loop should continue, and if enable.auto.commit is set to true (the default) the offsets will be committed, potentially before the doOnSuccess is called.

The idea here is that you are able to write consumers that don’t block, however care must be taken in the case where an error occurs in the doOnSuccess method otherwise the message could be lost. You could for example re-deliver the message in case of an error.

Alternatively, you can use the @Blocking annotation to tell Micronaut to subscribe to the returned reactive type in a blocking manner which will result in blocking the poll loop, preventing offsets from being committed automatically:

Blocking with Reactive Consumers
@Blocking
@Topic("reactive-products")
public Mono<Product> receiveBlocking(@KafkaKey String brand, Mono<Product> productPublisher) {
    return productPublisher.doOnSuccess((product) ->
        LOG.info("Got Product - {} by {}", product.name(), brand)
    );
}
Blocking with Reactive Consumers
@Blocking
@Topic('reactive-products')
Mono<Product> receiveBlocking(@KafkaKey String brand, Mono<Product> productPublisher) {
    return productPublisher.doOnSuccess(product ->
            log.info("Got Product - {} by {}", product.name, brand)
    )
}
Blocking with Reactive Consumers
@Blocking
@Topic("reactive-products")
fun receiveBlocking(@KafkaKey brand: String, product: Mono<Product>): Mono<Product> {
    return product.doOnSuccess { (name): Product ->
        LOG.info("Got Product - {} by {}", name, brand)
    }
}

6.2 Configuring @KafkaListener beans

@KafkaListener and Consumer Groups

Kafka consumers created with @KafkaListener will by default run within a consumer group that is the value of micronaut.application.name unless you explicitly specify a value to the @KafkaListener annotation. For example:

Specifying a Consumer Group
@KafkaListener("myGroup")
Specifying a Consumer Group
@KafkaListener('myGroup')
Specifying a Consumer Group
@KafkaListener("myGroup")

or

Specifying a Consumer Group alternative
@KafkaListener(groupId = "myGroup")
Specifying a Consumer Group alternative
@KafkaListener(groupId = 'myGroup')
Specifying a Consumer Group alternative
@KafkaListener(groupId = "myGroup")

The above examples will run the consumer within a consumer group called myGroup. In this case, each record will be consumed by one consumer instance of the consumer group.

You can make the consumer group configurable using a placeholder: @KafkaListener("${my.consumer.group:myGroup}")

To allow the records to be consumed by all the consumer instances (each instance will be part of a unique consumer group), uniqueGroupId can be set to true:

Unique group IDs
@KafkaListener(groupId = "myGroup", uniqueGroupId = true)
Unique group IDs
@KafkaListener(groupId = 'myGroup', uniqueGroupId = true)
Unique group IDs
@KafkaListener(groupId = "myGroup", uniqueGroupId = true)

For more information, see for example https://kafka.apache.org/intro#intro_consumers

@KafkaListener and Consumer Properties

There are a number of ways to pass configuration properties to the KafkaConsumer. You can set default consumer properties using kafka.consumers.default in application.yml:

Applying Default Configuration
kafka.consumers.default.session.timeout.ms=30000
kafka:
    consumers:
        default:
            session:
                timeout:
                    ms: 30000
[kafka]
  [kafka.consumers]
    [kafka.consumers.default]
      [kafka.consumers.default.session]
        [kafka.consumers.default.session.timeout]
          ms=30000
kafka {
  consumers {
    'default' {
      session {
        timeout {
          ms = 30000
        }
      }
    }
  }
}
{
  kafka {
    consumers {
      default {
        session {
          timeout {
            ms = 30000
          }
        }
      }
    }
  }
}
{
  "kafka": {
    "consumers": {
      "default": {
        "session": {
          "timeout": {
            "ms": 30000
          }
        }
      }
    }
  }
}

The above example will set the default session.timeout.ms that Kafka uses to decide whether a consumer is alive or not and applies it to all created KafkaConsumer instances.

You can also provide configuration specific to a consumer group. For example consider the following configuration:

Applying Consumer Group Specific config
kafka.consumers.myGroup.session.timeout.ms=30000
kafka:
    consumers:
        myGroup:
            session:
                timeout:
                    ms: 30000
[kafka]
  [kafka.consumers]
    [kafka.consumers.myGroup]
      [kafka.consumers.myGroup.session]
        [kafka.consumers.myGroup.session.timeout]
          ms=30000
kafka {
  consumers {
    myGroup {
      session {
        timeout {
          ms = 30000
        }
      }
    }
  }
}
{
  kafka {
    consumers {
      myGroup {
        session {
          timeout {
            ms = 30000
          }
        }
      }
    }
  }
}
{
  "kafka": {
    "consumers": {
      "myGroup": {
        "session": {
          "timeout": {
            "ms": 30000
          }
        }
      }
    }
  }
}

The above configuration will pass properties to only the @KafkaListener beans that apply to the consumer group myGroup.

Finally, the @KafkaListener annotation itself provides a properties member that you can use to set consumer specific properties:

Configuring Consumer Properties with @KafkaListener
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Requires;
import io.micronaut.kafka.docs.Product;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;

import static org.slf4j.LoggerFactory.getLogger;

@KafkaListener(
    groupId = "products",
    pollTimeout = "500ms",
    properties = @Property(name = ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, value = "10000")
)
public class ProductListener {
Configuring Consumer Properties with @KafkaListener
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord

@KafkaListener(
        groupId = 'products',
        pollTimeout = '500ms',
        properties = @Property(name = ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, value = '10000')
)
class ProductListener {
Configuring Consumer Properties with @KafkaListener
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product
import io.micronaut.kafka.docs.consumer.topics.ProductListener
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory


@KafkaListener(
    groupId = "products",
    pollTimeout = "500ms",
    properties = [Property(name = ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, value = "10000")]
)
class ProductListener {

@KafkaListener and Deserializers

As mentioned previously when defining @KafkaListener methods, Micronaut will attempt to pick an appropriate deserializer for the method signature. This is done via the CompositeSerdeRegistry bean.

You can replace the default SerdeRegistry bean with your own implementation by defining a bean that uses @Replaces(CompositeSerdeRegistry.class). See the section on Bean Replacement.

All common java.lang types (String, Integer, primitives etc.) are supported and for POJOs by default a Jackson based JSON deserializer is used.

You can, however, explicitly override the Deserializer used by providing the appropriate configuration in application.yml:

Applying Default Configuration
kafka.consumers.myGroup.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka:
    consumers:
        myGroup:
            value:
                deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
[kafka]
  [kafka.consumers]
    [kafka.consumers.myGroup]
      [kafka.consumers.myGroup.value]
        deserializer="org.apache.kafka.common.serialization.ByteArrayDeserializer"
kafka {
  consumers {
    myGroup {
      value {
        deserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
      }
    }
  }
}
{
  kafka {
    consumers {
      myGroup {
        value {
          deserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
        }
      }
    }
  }
}
{
  "kafka": {
    "consumers": {
      "myGroup": {
        "value": {
          "deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
        }
      }
    }
  }
}

You may want to do this if for example you choose an alternative deserialization format such as Avro or Protobuf.

Transactional properties

There are a few options that can be enabled for only in the transactional processing:

Isolation

Use isolation member to define if you want to receive messages that haven’t been committed yet.

Custom offset strategy

There is a special offset strategy OffsetStrategy.SEND_TO_TRANSACTION that can only be used with an associated producer, only applicable when SendTo is used.

6.3 Commiting Kafka Offsets

Automatically Committing Offsets

The way offsets are handled by a @KafkaListener bean is defined by the OffsetStrategy enum.

The following table summarizes the enum values and behaviour:

Table 1. Kafka Messaging Annotations
Value Description

AUTO

Automatically commit offsets. Sets enable.auto.commit to true

DISABLED

Disables automatically committing offsets. Sets enable.auto.commit to false

SYNC

Commits offsets manually at the end of each poll() loop if no exceptions occur. Sets enable.auto.commit to false

ASYNC

Asynchronously commits offsets manually at the end of each poll() loop if no exceptions occur. Sets enable.auto.commit to false

SYNC_PER_RECORD

Commits offsets manually after each ConsumerRecord is processed. Sets enable.auto.commit to false

ASYNC_PER_RECORD

Commits offsets asynchronously after each ConsumerRecord is processed. Sets enable.auto.commit to false

SEND_TO_TRANSACTION

Only available when the transactional producer is enabled for @SendTo. Sends offsets to transaction using method sendOffsetsToTransaction of org.apache.kafka.clients.producer.Producer.

Depending on the your level of paranoia or durability requirements you can choose to tune how and when offsets are committed.

Manually Committing Offsets

If you set the OffsetStrategy to DISABLED it becomes your responsibility to commit offsets.

There are a couple of ways that can be achieved.

The simplest way is to define an argument of type Acknowledge and call the ack() method to commit offsets synchronously:

Committing offsets with ack()
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED) // (1)
public class ProductListener {

    @Topic("awesome-products")
    public void receive(Product product, Acknowledgement acknowledgement) { // (2)
        // process product record
        acknowledgement.ack(); // (3)
    }
}
Committing offsets with ack()
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED) // (1)
class ProductListener {

    @Topic('awesome-products')
    void receive(Product product, Acknowledgement acknowledgement) { // (2)
        // process product record
        acknowledgement.ack() // (3)
    }
}
Committing offsets with ack()
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED) // (1)
class ProductListener {

    @Topic("awesome-products")
    fun receive(product: Product, acknowledgement: Acknowledgement) { // (2)
        // process product record
        acknowledgement.ack() // (3)
    }
}
1 Committing offsets automatically is disabled
2 The listener method specifies a parameter of type Acknowledge
3 The ack() method is called once the record has been processed

Alternatively, you an supply a KafkaConsumer method argument and then call commitSync (or commitAsync) yourself when you are ready to commit offsets:

Committing offsets with the KafkaConsumer API
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import io.micronaut.kafka.docs.Product;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Collections;

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED;

@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED) // (1)
public class ProductListener {

    @Topic("awesome-products")
    public void receive(Product product, long offset, int partition, String topic, Consumer kafkaConsumer) { // (2)
        // process product record

        // commit offsets
        kafkaConsumer.commitSync(Collections.singletonMap( // (3)
                new TopicPartition(topic, partition),
                new OffsetAndMetadata(offset + 1, "my metadata")
        ));
    }
}
Committing offsets with the KafkaConsumer API
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED

@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED) // (1)
class ProductListener {

    @Topic('awesome-products')
    void receive(Product product, long offset, int partition, String topic, Consumer kafkaConsumer) { // (2)
        // process product record

        // commit offsets
        kafkaConsumer.commitSync(Collections.singletonMap( // (3)
                new TopicPartition(topic, partition),
                new OffsetAndMetadata(offset + 1, 'my metadata')
        ))
    }
}
Committing offsets with the KafkaConsumer API
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import java.util.*

import io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
import io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED

@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED) // (1)
class ProductListener {

    @Topic("awesome-products")
    fun receive(product: Product, offset: Long, partition: Int, topic: String, kafkaConsumer: Consumer<*, *>) { // (2)
        // process product record

        // commit offsets
        kafkaConsumer.commitSync(Collections.singletonMap( // (3)
                TopicPartition(topic, partition),
                OffsetAndMetadata(offset + 1, "my metadata")
            )
        )
    }
}
1 Committing offsets automatically is disabled
2 The listener method specifies that it receives the offset data and a KafkaConsumer
3 The commitSync() method is called once the record has been processed

6.4 Assigning Kafka Offsets

6.4.1 Manually Assigning Offsets to a Consumer Bean

Sometimes you may wish to control exactly the position you wish to resume consuming messages from.

For example if you store offsets in a database you may wish to read the offsets from the database when the consumer starts and start reading from the position stored in the database.

To support this use case your consumer bean can implement the ConsumerSeekAware interface:

Manually seeking offsets with the ConsumerSeekAware API
package io.micronaut.kafka.docs.seek.aware;

import io.micronaut.configuration.kafka.ConsumerSeekAware;
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.configuration.kafka.seek.*;
import io.micronaut.context.annotation.Requires;
import io.micronaut.kafka.docs.Product;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

@KafkaListener
@Requires(property = "spec.name", value = "ConsumerSeekAwareTest")
public class ProductListener implements ConsumerSeekAware { // (1)

    List<Product> processed = new ArrayList<>();

    public ProductListener(ProductListenerConfiguration config) {
        // ...
    }

    @Topic("wonderful-products")
    void receive(Product product) {
        processed.add(product);
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // (2)
        // save offsets here
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions, KafkaSeeker seeker) { // (3)
        // seek to offset here
        partitions.stream().map(tp -> KafkaSeekOperation.seek(tp, 1)).forEach(seeker::perform);
    }
}
Manually seeking offsets with the ConsumerSeekAware API
package io.micronaut.kafka.docs.seek.aware

import io.micronaut.configuration.kafka.ConsumerSeekAware
import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.configuration.kafka.seek.*
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product
import org.apache.kafka.common.TopicPartition

@KafkaListener
@Requires(property = "spec.name", value = "ConsumerSeekAwareSpec")
class ProductListener implements ConsumerSeekAware { // (1)

    List<Product> processed = []

    ProductListener(ProductListenerConfiguration config) {
        // ...
    }

    @Topic("wonderful-products")
    void receive(Product product) {
        processed << product
    }

    @Override
    void onPartitionsRevoked(Collection<TopicPartition> partitions) { // (2)
        // save offsets here
    }

    @Override
    void onPartitionsAssigned(Collection<TopicPartition> partitions, KafkaSeeker seeker) { // (3)
        // seek to offset here
        partitions.collect { KafkaSeekOperation.seek(it, 1) }.each(seeker.&perform)
    }
}
Manually seeking offsets with the ConsumerSeekAware API
package io.micronaut.kafka.docs.seek.aware

import io.micronaut.configuration.kafka.ConsumerSeekAware
import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.configuration.kafka.seek.*
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product
import org.apache.kafka.common.TopicPartition

@KafkaListener
@Requires(property = "spec.name", value = "ConsumerSeekAwareTest")
class ProductListener constructor(config: ProductListenerConfiguration) : ConsumerSeekAware { // (1)

    var processed: MutableList<Product> = mutableListOf()

    @Topic("wonderful-products")
    fun receive(product: Product) {
        processed.add(product)
    }

    override fun onPartitionsRevoked(partitions: Collection<TopicPartition>) { // (2)
        // save offsets here
    }

    override fun onPartitionsAssigned(partitions: Collection<TopicPartition>, seeker: KafkaSeeker) { // (3)
        // seek to offset here
        partitions.stream().map { tp -> KafkaSeekOperation.seek(tp, 1) }.forEach(seeker::perform)
    }
}
1 Implement the interface ConsumerSeekAware
2 The onPartitionsRevoked can be used to save offsets
3 The onPartitionsAssigned can use used to read offsets and seek to a specific position. In this trivial example we just seek to the offset 1 (skipping the first record).
ConsumerSeekAware provides a convenient KafkaSeeker object that can be used to perform KafkaSeekOperations immediately on the underlying consumer.

Alternatively, when more fine-grained access to the Kafka consumer is required, your consumer bean can instead implement the ConsumerRebalanceListener and ConsumerAware interfaces:

Manually seeking offsets with the KafkaConsumer API
package io.micronaut.kafka.docs.seek.rebalance;

import io.micronaut.configuration.kafka.ConsumerAware;
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.context.annotation.Requires;
import io.micronaut.kafka.docs.Product;
import io.micronaut.core.annotation.NonNull;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

@KafkaListener(offsetReset = OffsetReset.EARLIEST)
@Requires(property = "spec.name", value = "ConsumerRebalanceListenerTest")
public class ProductListener implements ConsumerRebalanceListener, ConsumerAware {

    List<Product> processed = new ArrayList<>();
    private Consumer consumer;

    public ProductListener(ProductListenerConfiguration config) {
        // ...
    }

    @Override
    public void setKafkaConsumer(@NonNull Consumer consumer) { // (1)
        this.consumer = consumer;
    }

    @Topic("fantastic-products")
    void receive(Product product) {
        processed.add(product);
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // (2)
        // save offsets here
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // (3)
        // seek to offset here
        for (TopicPartition partition : partitions) {
            consumer.seek(partition, 1);
        }
    }
}
Manually seeking offsets with the KafkaConsumer API
package io.micronaut.kafka.docs.seek.rebalance

import io.micronaut.configuration.kafka.ConsumerAware
import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.context.annotation.Requires
import io.micronaut.core.annotation.NonNull
import io.micronaut.kafka.docs.Product
import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.TopicPartition

@KafkaListener(offsetReset = OffsetReset.EARLIEST)
@Requires(property = "spec.name", value = "ConsumerRebalanceListenerSpec")
class ProductListener implements ConsumerRebalanceListener, ConsumerAware {

    List<Product> processed = []
    private Consumer consumer

    ProductListener(ProductListenerConfiguration config) {
        // ...
    }

    @Override
    void setKafkaConsumer(@NonNull Consumer consumer) { // (1)
        this.consumer = consumer
    }

    @Topic("fantastic-products")
    void receive(Product product) {
        processed << product
    }

    @Override
    void onPartitionsRevoked(Collection<TopicPartition> partitions) { // (2)
        // save offsets here
    }

    @Override
    void onPartitionsAssigned(Collection<TopicPartition> partitions) { // (3)
        // seek to offset here
        for (TopicPartition partition : partitions) {
            consumer.seek(partition, 1)
        }
    }
}
Manually seeking offsets with the KafkaConsumer API
package io.micronaut.kafka.docs.seek.rebalance

import io.micronaut.configuration.kafka.ConsumerAware
import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product
import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.TopicPartition

@KafkaListener(offsetReset = OffsetReset.EARLIEST)
@Requires(property = "spec.name", value = "ConsumerRebalanceListenerTest")
class ProductListener constructor(config: ProductListenerConfiguration) : ConsumerRebalanceListener, ConsumerAware<Any?, Any?> {

    var processed: MutableList<Product> = mutableListOf()
    private var consumer: Consumer<*, *>? = null

    override fun setKafkaConsumer(consumer: Consumer<Any?, Any?>?) { // (1)
        this.consumer = consumer
    }

    @Topic("fantastic-products")
    fun receive(product: Product) {
        processed.add(product)
    }

    override fun onPartitionsRevoked(partitions: Collection<TopicPartition>) { // (2)
        // save offsets here
    }

    override fun onPartitionsAssigned(partitions: Collection<TopicPartition>) { // (3)
        // seek to offset here
        for (partition in partitions) {
            consumer!!.seek(partition, 1)
        }
    }
}
1 The setKafkaConsumer of the ConsumerAware allows access to the underlying consumer
2 The onPartitionsRevoked can be used to save offsets
3 The onPartitionsAssigned can use used to read offsets and seek to a specific position. In this trivial example we just seek to the offset 1 (skipping the first record).

6.4.2 Manual Offsets with Multiple Topics

It is possible for a single @KafkaListener bean to represent multiple consumers. If you have more than one method annotated with @Topic then setKafkaConsumer will be called multiple times for each backing consumer.

It is recommended in the case of manually seeking offsets that you use a single listener bean per consumer, the alternative is to store an internal Set of all consumers associated with a particular listener and manually search for the correct listener in the onPartitionsAssigned using the partition assignment data.

Not doing so will lead to a ConcurrentModificationException error.

6.4.3 Manually Assigning Offsets from a Consumer Method

There may be some scenarios where you realize you need to seek to a different offset while consuming another one.

To support this use case, your consumer method can receive a KafkaSeekOperations instance as a parameter:

package io.micronaut.kafka.docs.seek.ops;

import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.configuration.kafka.seek.*;
import io.micronaut.context.annotation.*;
import io.micronaut.kafka.docs.Product;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

@KafkaListener(offsetReset = OffsetReset.EARLIEST, properties = @Property(name = "max.poll.records", value = "1"))
@Requires(property = "spec.name", value = "KafkaSeekOperationsTest")
public class ProductListener {

    List<Product> processed = new ArrayList<>();

    public ProductListener(ProductListenerConfiguration config) {
        // ...
    }

    @Topic("amazing-products")
    void receive(Product product, KafkaSeekOperations ops) { // (1)
        processed.add(product);
        ops.defer(KafkaSeekOperation.seekToEnd(new TopicPartition("amazing-products", 0))); // (2)
    }
}
package io.micronaut.kafka.docs.seek.ops

import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.configuration.kafka.seek.*
import io.micronaut.context.annotation.*
import io.micronaut.kafka.docs.Product
import org.apache.kafka.common.TopicPartition

@KafkaListener(offsetReset = OffsetReset.EARLIEST, properties = @Property(name = "max.poll.records", value = "1"))
@Requires(property = "spec.name", value = "KafkaSeekOperationsSpec")
class ProductListener {

    List<Product> processed = []

    ProductListener(ProductListenerConfiguration config) {
        // ...
    }

    @Topic("amazing-products")
    void receive(Product product, KafkaSeekOperations ops) { // (1)
        processed << product
        ops.defer(KafkaSeekOperation.seekToEnd(new TopicPartition("amazing-products", 0))); // (2)
    }
}
package io.micronaut.kafka.docs.seek.ops

import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.configuration.kafka.seek.*
import io.micronaut.context.annotation.*
import io.micronaut.kafka.docs.Product
import org.apache.kafka.common.TopicPartition

@KafkaListener(offsetReset = OffsetReset.EARLIEST, properties = [Property(name = "max.poll.records", value = "1")])
@Requires(property = "spec.name", value = "KafkaSeekOperationsTest")
class ProductListener constructor(config: ProductListenerConfiguration) {

    var processed: MutableList<Product> = mutableListOf()

    @Topic("amazing-products")
    fun receive(product: Product, ops: KafkaSeekOperations) { // (1)
        processed.add(product)
        ops.defer(KafkaSeekOperation.seekToBeginning(TopicPartition("amazing-products", 0))) // (2)
    }
}
1 An instance of KafkaSeekOperations will be injected to the method
2 Any number of seek operations can be deferred. In this trivial example we just seek to the end of the partition.

The seek operations will be performed by Micronaut automatically, when the consumer method completes successfully, possibly after committing offsets via OffsetStrategy.AUTO.

These operations determine the next offset retrieved by poll. Take into account that, even if the seek operation performs successfully, your consumer method may keep receiving records that were cached by the previous call. You can configure max.poll.records to control the maximum number of records returned by a single call to poll.

6.4.4 Creating Kafka Seek Operations

The interface KafkaSeekOperation.KafkaSeekOperation provides several static methods to create seek operations:

  • seek: Creates an absolute seek operation.

  • seekRelativeToBeginning: Creates a seek operation relative to the beginning.

  • seekToBeginning: Creates a seek to the beginning operation.

  • seekRelativeToEnd: Creates a seek operation relative to the end.

  • seekToEnd: Creates a seek to the end operation.

  • seekForward: Creates a forward seek operation.

  • seekBackward: Creates a backward seek operation.

  • seekToTimestamp: Creates a seek to the timestamp operation.

6.5 Kafka Batch Processing

By default @KafkaListener listener methods will receive each ConsumerRecord one by one.

There may be cases where you prefer to receive all of the ConsumerRecord data from the ConsumerRecords holder object in one go.

To achieve this you can set the batch member of the @KafkaListener to true and specify a container type (typically List) to receive all of the data:

Receiving a Batch of Records
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;

import java.util.List;

import static org.slf4j.LoggerFactory.getLogger;

@KafkaListener(batch = true) // (1)
public class BookListener {

@Topic("all-the-books")
public void receiveList(List<Book> books) { // (2)
    for (Book book : books) {
        LOG.info("Got Book = {}", book.title()); // (3)
    }
}

}
Receiving a Batch of Records
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import reactor.core.publisher.Flux

@KafkaListener(batch = true) // (1)
@Slf4j
class BookListener {

@Topic("all-the-books")
void receiveList(List<Book> books) { // (2)
    for (Book book : books) {
        log.info("Got Book = {}", book.title) // (3)
    }
}

}
Receiving a Batch of Records
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import org.slf4j.LoggerFactory.getLogger
import reactor.core.publisher.Flux
import java.util.*

@KafkaListener(batch = true) // (1)
class BookListener {

@Topic("all-the-books")
fun receiveList(books: List<Book>) { // (2)
    for (book in books) {
        LOG.info("Got Book = {}", book.title) // (3)
    }
}

}
1 The @KafkaListener annotation’s batch member is set to true
2 The method defines that it receives a list of Book instances
3 The method processes the entire batch

Note in the previous case offsets will automatically be committed for the whole batch by default when the method returns without error.

Manually Committing Offsets with Batch

As with one by one message processing, if you set the OffsetStrategy to DISABLED it becomes your responsibility to commit offsets.

If you want to commit the entire batch of offsets at once during the course of processing, then the simplest approach is to add an argument of type Acknowledgement and call the ack() method to commit the batch of offsets synchronously:

Committing a Batch of Offsets Manually with ack()
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // (1)
@Topic("all-the-books")
public void receive(List<Book> books, Acknowledgement acknowledgement) { // (2)

    //process the books

    acknowledgement.ack(); // (3)
}
Committing a Batch of Offsets Manually with ack()
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // (1)
@Topic("all-the-books")
void receive(List<Book> books, Acknowledgement acknowledgement) { // (2)

    //process the books

    acknowledgement.ack() // (3)
}
Committing a Batch of Offsets Manually with ack()
@KafkaListener(offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED, batch = true) // (1)
@Topic("all-the-books")
fun receive(books: List<Book?>?, acknowledgement: Acknowledgement) { // (2)

    //process the books

    acknowledgement.ack() // (3)
}
1 Committing offsets automatically is disabled
2 The listener method specifies a parameter of type Acknowledgement
3 The ack() method is called once the records have been processed

You can also take more control of committing offsets when doing batch processing by specifying a method that receives the offsets in addition to the batch:

Committing Offsets Manually with Batch
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // (1)
@Topic("all-the-books")
public void receive(List<ConsumerRecord<String, Book>> records, Consumer kafkaConsumer) { // (2)

    for (int i = 0; i < records.size(); i++) {
        ConsumerRecord<String, Book> record = records.get(i); // (3)

        // process the book
        Book book = record.value();

        // commit offsets
        String topic = record.topic();
        int partition = record.partition();
        long offset = record.offset(); // (4)

        kafkaConsumer.commitSync(Collections.singletonMap( // (5)
            new TopicPartition(topic, partition),
            new OffsetAndMetadata(offset + 1, "my metadata")
        ));

    }
}
Committing Offsets Manually with Batch
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // (1)
@Topic("all-the-books")
void receive(List<ConsumerRecord<String, Book>> records, Consumer kafkaConsumer) { // (2)

    for (int i = 0; i < records.size(); i++) {
        ConsumerRecord<String, Book> record = records.get(i) // (3)

        // process the book
        Book book = record.value()

        // commit offsets
        String topic = record.topic()
        int partition = record.partition()
        long offset = record.offset() // (4)

        kafkaConsumer.commitSync(Collections.singletonMap( // (5)
                new TopicPartition(topic, partition),
                new OffsetAndMetadata(offset + 1, "my metadata")
        ))
    }
}
Committing Offsets Manually with Batch
@KafkaListener(offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED, batch = true) // (1)
@Topic("all-the-books")
fun receive(records: List<ConsumerRecord<String?, Book?>>, kafkaConsumer: Consumer<*, *>) { // (2)
    for (i in records.indices) {
        val record = records[i] // (3)

        // process the book
        val book = record.value()

        // commit offsets
        val topic = record.topic()
        val partition = record.partition()
        val offset = record.offset() // (4)
        kafkaConsumer.commitSync(
            Collections.singletonMap( // (5)
                TopicPartition(topic, partition),
                OffsetAndMetadata(offset + 1, "my metadata")
            )
        )
    }
}
1 Committing offsets automatically is disabled
2 The method receives the batch of books as a list of consumer records
3 Each record is processed
4 The offset, partition and topic is read for the record
5 Offsets are committed

This example is fairly trivial in that it commits offsets after processing each record in a batch, but you can for example commit after processing every 10, or every 100 or whatever makes sense for your application.

Receiving a ConsumerRecords

When batching you can receive the entire ConsumerRecords object being listened for. In this case you should specify appropriate generic types for the key and value of the ConsumerRecords so that Micronaut can pick the correct deserializer for each.

This is useful when the need is to process or commit the records by partition, as the ConsumerRecords object already groups records by partition:

Commit only once for each partition
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // (1)
@Topic("all-the-books")
public void receiveConsumerRecords(ConsumerRecords<String, Book> consumerRecords, Consumer kafkaConsumer) { // (2)
    for (TopicPartition partition : consumerRecords.partitions()) { // (3)
        long offset = Long.MIN_VALUE;
        // process partition records
        for (ConsumerRecord<String, Book> record : consumerRecords.records(partition)) { // (4)
            // process the book
            Book book = record.value();
            // keep last offset
            offset = record.offset(); // (5)
        }

        // commit partition offset
        kafkaConsumer.commitSync(Collections.singletonMap( // (6)
            partition,
            new OffsetAndMetadata(offset + 1, "my metadata")
        ));
    }
}
Commit only once for each partition
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // (1)
@Topic("all-the-books")
void receiveConsumerRecords(ConsumerRecords<String, Book> consumerRecords, Consumer kafkaConsumer) { // (2)
    for (TopicPartition partition : consumerRecords.partitions()) { // (3)
        long offset = Long.MIN_VALUE;
        // process partition records
        for (ConsumerRecord<String, Book> record : consumerRecords.records(partition)) { // (4)
            // process the book
            Book book = record.value();
            // keep last offset
            offset = record.offset(); // (5)
        }

        // commit partition offset
        kafkaConsumer.commitSync(Collections.singletonMap( // (6)
                partition,
                new OffsetAndMetadata(offset + 1, "my metadata")
        ));
    }
}
Commit only once for each partition
@KafkaListener(offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED, batch = true) // (1)
@Topic("all-the-books")
fun receiveConsumerRecords(consumerRecords: ConsumerRecords<String?, Book?>, kafkaConsumer: Consumer<*, *>) { // (2)
    for (partition in consumerRecords.partitions()) { // (3)
        var offset = Long.MIN_VALUE
        // process partition records
        for (record in consumerRecords.records(partition)) { // (4)
            // process the book
            val book = record.value()
            // keep last offset
            offset = record.offset() // (5)
        }

        // commit partition offset
        kafkaConsumer.commitSync(
            Collections.singletonMap( // (6)
                partition,
                OffsetAndMetadata(offset + 1, "my metadata")
            )
        )
    }
}
1 Committing offsets automatically is disabled
2 The method receives the batch of books as a ConsumerRecords holder object
3 Each partition is iterated over
4 Each record for the partition is processed
5 The last read offset for the partition is stored
6 The offset is committed once for each partition

Reactive Batch Processing

Batch listeners also support defining reactive types (Reactor Flux or RxJava Flowable) as the method argument.

Add the library Micronaut Reactor or Micronaut RxJava 3 to your application’s dependencies.

In this case the method will be passed a reactive type that can be returned from the method allowing non-blocking processing of the batch:

Reactive Processing of Batch Records
@Topic("all-the-books")
public Flux<Book> receiveFlux(Flux<Book> books) {
    return books.doOnNext(book ->
        LOG.info("Got Book = {}", book.title())
    );
}
Reactive Processing of Batch Records
@Topic("all-the-books")
Flux<Book> receiveFlux(Flux<Book> books) {
    books.doOnNext(book ->
        log.info("Got Book = {}", book.title)
    )
}
Reactive Processing of Batch Records
@Topic("all-the-books")
fun receiveFlux(books: Flux<Book>): Flux<Book> {
    return books.doOnNext { book: Book ->
        LOG.info("Got Book = {}", book.title)
    }
}

Remember that as with non batch processing, the reactive type will be subscribed to on a different thread and offsets will be committed automatically likely prior to the point when the reactive type is subscribed to.

This means that you should only use reactive processing if message durability is not a requirement and you may wish to implement message re-delivery upon failure.

6.6 Forwarding Messages with @SendTo

On any @KafkaListener method that returns a value, you can use the @SendTo annotation to forward the return value to the topic or topics specified by the @SendTo annotation.

The key of the original ConsumerRecord will be used as the key when forwarding the message.

Forwarding with @SendTo
@Topic("sendto-products") // (1)
@SendTo("product-quantities") // (2)
public int receive(@KafkaKey String brand, Product product) {
    LOG.info("Got Product - {} by {}", product.name(), brand);
    return product.quantity(); // (3)
}
Forwarding with @SendTo
@Topic("sendto-products") // (1)
@SendTo("product-quantities") // (2)
int receive(@KafkaKey String brand, Product product) {
    log.info("Got Product - {} by {}", product.name, brand)
    product.quantity // (3)
}
Forwarding with @SendTo
@Topic("sendto-products") // (1)
@SendTo("product-quantities") // (2)
fun receive(@KafkaKey brand: String?, product: Product): Int {
    LOG.info("Got Product - {} by {}", product.name, brand)
    return product.quantity // (3)
}
1 The topic subscribed to is awesome-products
2 The topic to send the result to is product-quantities
3 The return value is used to indicate the value to forward

You can also do the same using Reactive programming:

Forwarding Reactively with @SendTo
@Topic("sendto-products") // (1)
@SendTo("product-quantities") // (2)
public Mono<Integer> receiveProduct(@KafkaKey String brand,
                                    Mono<Product> productSingle) {

    return productSingle.map(product -> {
        LOG.info("Got Product - {} by {}", product.name(), brand);
        return product.quantity(); // (3)
    });
}
Forwarding Reactively with @SendTo
@Topic("sendto-products") // (1)
@SendTo("product-quantities") // (2)
Mono<Integer> receiveProduct(@KafkaKey String brand, Mono<Product> productSingle) {
    productSingle.map(product -> {
        log.info("Got Product - {} by {}", product.name, brand)
        product.quantity // (3)
    })
}
Forwarding Reactively with @SendTo
@Topic("sendto-products") // (1)
@SendTo("product-quantities") // (2)
fun receiveProduct(@KafkaKey brand: String?, productSingle: Mono<Product>): Mono<Int> {
    return productSingle.map(Function<Product, Int> { product: Product ->
        LOG.info("Got Product - {} by {}", product.name, brand)
        product.quantity // (3)
    })
}
1 The topic subscribed to is awesome-products
2 The topic to send the result to is product-quantities
3 The return is mapped from the single to the value of the quantity

In the reactive case the poll loop will continue and will not wait for the record to be sent unless you specifically annotate the method with @Blocking.

To enable transactional sending of the messages you need to define producerTransactionalId in @KafkaListener.

Transactional consumer-producer
@KafkaListener(
    offsetReset = OffsetReset.EARLIEST,
    producerClientId = "word-counter-producer", // (1)
    producerTransactionalId = "tx-word-counter-id", // (2)
    offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, // (3)
    isolation = IsolationLevel.READ_COMMITTED // (4)
)
public class WordCounter {

    @Topic("tx-incoming-strings")
    @SendTo("my-words-count")
    List<KafkaMessage<byte[], Integer>> wordsCounter(String string) {
        return Stream.of(string.split("\\s+"))
            .collect(Collectors.groupingBy(Function.identity(), Collectors.summingInt(i -> 1)))
            .entrySet()
            .stream()
            .map(e -> KafkaMessage.Builder.<byte[], Integer>withBody(e.getValue()).key(e.getKey().getBytes()).build())
            .toList();
    }
}
Transactional consumer-producer
@KafkaListener(
        offsetReset = OffsetReset.EARLIEST,
        producerClientId = 'word-counter-producer', // (1)
        producerTransactionalId = 'tx-word-counter-id', // (2)
        offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, // (3)
        isolation = IsolationLevel.READ_COMMITTED // (4)
)
class WordCounter {

    @Topic('tx-incoming-strings')
    @SendTo('my-words-count')
    List<KafkaMessage<byte[], Integer>> wordsCounter(String string) {
        string.split("\\s+")
            .groupBy()
            .collect { key, instanceList ->
                KafkaMessage.Builder.withBody(instanceList.size()).key(key.bytes).build()
            }
    }
}
Transactional consumer-producer
@KafkaListener(
    offsetReset = OffsetReset.EARLIEST,
    producerClientId = "word-counter-producer", // (1)
    producerTransactionalId = "tx-word-counter-id", // (2)
    offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, // (3)
    isolation = IsolationLevel.READ_COMMITTED // (4)
)
class WordCounter {

    @Topic("tx-incoming-strings")
    @SendTo("my-words-count")
    fun wordsCounter(string: String) = string
        .split(Regex("\\s+"))
        .groupBy { it }
        .map { KafkaMessage.Builder.withBody<ByteArray, Int>(it.value.size).key(it.key.toByteArray()).build() }
}
1 The id of the producer to load additional config properties
2 The transactional id that is required to enable transactional processing
3 Enable offset strategy to commit the offsets to the transaction
4 Consumer read messages isolation

6.7 Handling Consumer Exceptions

Consumer error strategies

It’s possible to define a different error strategy for @KafkaListener using errorStrategy attribute:

Specifying an error strategy
@KafkaListener(
    value = "myGroup",
    errorStrategy = @ErrorStrategy(
        value = ErrorStrategyValue.RETRY_ON_ERROR,
        retryDelay = "50ms",
        retryCount = 3
    )
)
Specifying an error strategy
@KafkaListener(
    value = 'myGroup',
    errorStrategy = @ErrorStrategy(
            value = ErrorStrategyValue.RETRY_ON_ERROR,
            retryDelay = '50ms',
            retryCount = 3
    )
)
Specifying an error strategy
@KafkaListener(
    value = "myGroup",
    errorStrategy = ErrorStrategy(
        value = ErrorStrategyValue.RETRY_ON_ERROR,
        retryDelay = "50ms",
        retryCount = 3
    )
)

Setting the error strategy allows you to resume at the next offset or to seek the consumer (stop on error) to the failed offset so that it can retry if an error occurs.

You can choose one of the error strategies:

  • RETRY_ON_ERROR - This strategy will stop consuming subsequent records in the case of an error and by default will attempt to re-consume the current record once. Possible retry delay can be defined by retryDelay and retry count by retryCount.

  • RETRY_EXPONENTIALLY_ON_ERROR - This strategy will stop consuming subsequent records in the case of an error and by default will attempt to re-consume the current record once. The exponentially growing time breaks between consumption attempts is computed using the n * 2^(k - 1) formula where the initial delay n is retryDelay and the number of retries is retryCount.

  • RETRY_CONDITIONALLY_ON_ERROR - This strategy will stop consuming subsequent records in the case of an error and by default will attempt to re-consume the current record once. The retry behaviour can be overridden. Possible retry delay can be defined by retryDelay and retry count by retryCount.

  • RETRY_CONDITIONALLY_EXPONENTIALLY_ON_ERROR - This strategy will stop consuming subsequent records in the case of an error and by default will attempt to re-consume the current record once. The retry behaviour can be overridden. The exponentially growing time breaks between consumption attempts is computed using the n * 2^(k - 1) formula where the initial delay n is retryDelay and the number of retries is retryCount.

  • RESUME_AT_NEXT_RECORD - This strategy will ignore the current error and will resume at the next offset, in this case it’s recommended to have a custom exception handler that moves the failed message into an error queue.

  • NONE - This error strategy will skip over all records from the current offset in the current poll when the consumer encounters an error. This option is deprecated and kept for consistent behaviour with previous versions of Micronaut Kafka that do not support error strategy.

The error strategies apply only for non-batch messages processing.

You can also make the number of retries configurable by using retryCountValue:

Dynamically Configuring Retries
@KafkaListener(
    value = "myGroup",
    errorStrategy = @ErrorStrategy(
        value = ErrorStrategyValue.RETRY_ON_ERROR,
        retryCountValue = "${my.retry.count}"
    )
)
Dynamically Configuring Retries
@KafkaListener(
    value = 'myGroup',
    errorStrategy = @ErrorStrategy(
            value = ErrorStrategyValue.RETRY_ON_ERROR,
            retryCountValue = '${my.retry.count}'
    )
)
Dynamically Configuring Retries
@KafkaListener(
    value = "myGroup",
    errorStrategy = ErrorStrategy(
        value = ErrorStrategyValue.RETRY_ON_ERROR,
        retryCountValue = "\${my.retry.count}"
    )
)
retryCountValue will be overridden by retryCount if they are both set.

Specify exceptions to retry

It’s possible to define only exceptions from which the retry will occur.

Specifying exceptions to retry
@KafkaListener(
        value = "myGroup",
        errorStrategy = @ErrorStrategy(
                value = ErrorStrategyValue.RETRY_ON_ERROR,
                retryDelay = "50ms",
                retryCount = 3,
                exceptionTypes = { MyException.class, MySecondException.class }
        )
)
Specifying exceptions to retry
@KafkaListener(
    value = 'myGroup',
    errorStrategy = @ErrorStrategy(
            value = ErrorStrategyValue.RETRY_ON_ERROR,
            retryDelay = '50ms',
            retryCount = 3,
            exceptionTypes = [ MyException, MySecondException ]
    )
)
Specifying exceptions to retry
@KafkaListener(
    value = "myGroup",
    errorStrategy = ErrorStrategy(
        value = ErrorStrategyValue.RETRY_ON_ERROR,
        retryDelay = "50ms",
        retryCount = 3,
        exceptionTypes = [MyException::class, MySecondException::class]
    )
)
Specify exception to retry apply only for RETRY_ON_ERROR and RETRY_EXPONENTIALLY_ON_ERROR error strategies.

Conditional retries

It is possible to conditionally retry a message based on the exception thrown when the error strategy is RETRY_CONDITIONALLY_ON_ERROR or RETRY_CONDITIONALLY_EXPONENTIALLY_ON_ERROR.

Specifying conditional retry behaviour on the listener
@KafkaListener(
    value = "myGroup",
    errorStrategy = @ErrorStrategy(
        value = ErrorStrategyValue.RETRY_CONDITIONALLY_ON_ERROR
    )
)
public class ConditionalRetryListener implements ConditionalRetryBehaviourHandler {

    @Override
    public ConditionalRetryBehaviour conditionalRetryBehaviour(KafkaListenerException exception) {
        return shouldRetry(exception) ? ConditionalRetryBehaviour.RETRY : ConditionalRetryBehaviour.SKIP;
    }

    // ...
Specifying conditional retry behaviour on the listener
@KafkaListener(
    value = 'myGroup',
    errorStrategy = @ErrorStrategy(
            value = ErrorStrategyValue.RETRY_CONDITIONALLY_ON_ERROR
    )
)
class ConditionalRetryListener implements ConditionalRetryBehaviourHandler {
    @Override
    ConditionalRetryBehaviour conditionalRetryBehaviour(KafkaListenerException exception) {
        return shouldRetry(exception) ? ConditionalRetryBehaviour.RETRY : ConditionalRetryBehaviour.SKIP
    }

    // ...
Specifying conditional retry behaviour on the listener
@KafkaListener(
    value = "myGroup",
    errorStrategy = ErrorStrategy(
        value = ErrorStrategyValue.RETRY_CONDITIONALLY_ON_ERROR
    )
)
class ConditionalRetryListener :
    ConditionalRetryBehaviourHandler {
    override fun conditionalRetryBehaviour(exception: KafkaListenerException): ConditionalRetryBehaviour {
        return if (shouldRetry(exception)) {
            ConditionalRetryBehaviour.RETRY
        } else {
            ConditionalRetryBehaviour.SKIP
        }
    }

    // ...

When a @KafkaListener does not implement @ConditionalRetryBehaviourHandler, the @DefaultConditionalRetryBehaviourHandler will be used and all messages that failed processing will be retried.

If you wish to apply the same conditional retry strategy for all of your @KafkaListener you can define a bean that implements ConditionalRetryBehaviourHandler and use Micronaut’s Bean Replacement feature to replace the default bean: @Replaces(DefaultConditionalRetryBehaviourHandler.class).

Conditional retry behaviour only applies to RETRY_CONDITIONALLY_ON_ERROR and RETRY_CONDITIONALLY_EXPONENTIALLY_ON_ERROR error strategies.

Exception handlers

When an exception occurs in a @KafkaListener method by default the exception is simply logged. This is handled by DefaultKafkaListenerExceptionHandler.

The following options are available to configure the default Kafka listener exception handler:

🔗
Table 1. Configuration Properties for DefaultKafkaListenerExceptionHandlerConfigurationProperties
Property Type Description

kafka.default-listener-exception-handler.skip-record-on-deserialization-failure

boolean

Whether to skip record on deserialization failure. Default value true

kafka.default-listener-exception-handler.commit-record-on-deserialization-failure

boolean

Whether to commit record on deserialization failure. Default value false

If you wish to replace this default exception handling with another implementation you can use the Micronaut’s Bean Replacement feature to define a bean that replaces it: @Replaces(DefaultKafkaListenerExceptionHandler.class).

You can also define per bean exception handling logic by implementing the KafkaListenerExceptionHandler interface in your @KafkaListener class.

The KafkaListenerExceptionHandler receives an exception of type KafkaListenerException which allows access to the original ConsumerRecord, if available.

7 Running Kafka Applications

You can run a Micronaut Kafka application with or without the presence of an HTTP server.

If you run your application without the http-server-netty dependency you will see output like the following on startup:

11:06:22.638 [main] INFO  io.micronaut.runtime.Micronaut - Startup completed in 402ms. Server Running: 4 active message listeners.

No port is exposed, but the Kafka consumers are active and running. The process registers a shutdown hook such that the KafkaConsumer instances are closed correctly when the server is shutdown.

7.1 Kafka Health Checks

In addition to http-server-netty, if the management dependency is added, then Micronaut’s Health Endpoint can be used to expose the health status of the Kafka consumer application.

For example if Kafka is not available the /health endpoint will return:

{
    "status": "DOWN",
    "details": {
        ...
        "kafka": {
            "status": "DOWN",
            "details": {
                "error": "java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment."
            }
        }
    }
}
By default, the details visible above are only shown to authenticated users. See the Health Endpoint documentation for how to configure that setting.

The following options are available to configure the Kafka Health indicator:

🔗
Table 1. Configuration Properties for KafkaHealthConfigurationProperties
Property Type Description

kafka.health.enabled

boolean

Whether the Kafka health check is enabled. Default value true.

kafka.health.restricted

boolean

By default, the health check requires cluster-wide permissions in order to get information about the nodes in the Kafka cluster. If your application doesn’t have admin privileges (for example, this might happen in multi-tenant scenarios), you can switch to a "restricted" version of the health check which only validates basic connectivity but doesn’t require any additional permissions.. Default value false

kafka.health-timeout

java.time.Duration

The health check timeout.

7.2 Kafka Metrics

You can enable Kafka Metrics collection by enabling Micrometer Metrics.

If you do not wish to collect Kafka metrics, you can set micronaut.metrics.binders.kafka.enabled to false in application.yml.

7.3 Kafka Distributed Tracing

Distributed tracing is supported via the Micronaut Tracing module using Open Telemetry.

7.4 Creating New Topics

You can automatically add topics to the broker when your application starts. To do so, add a bean of type a NewTopic for each topic you want to create. NewTopic instances let you specify the name, the number of partitions, the replication factor, the replicas assignments and the configuration properties you want to associate with the new topic. Additionally, you can add a bean of type CreateTopicsOptions that will be used when the new topics are created.

Creating New Kafka Topics with Options
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;

@Requires(bean = AdminClient.class)
@Factory
public class MyTopicFactory {

    @Bean
    CreateTopicsOptions options() {
        return new CreateTopicsOptions().timeoutMs(5000).validateOnly(true).retryOnQuotaViolation(false);
    }

    @Bean
    NewTopic topic1() {
        return new NewTopic("my-new-topic-1", 1, (short) 1);
    }

    @Bean
    NewTopic topic2() {
        return new NewTopic("my-new-topic-2", 2, (short) 1);
    }
}
Creating New Kafka Topics with Options
import io.micronaut.context.annotation.Bean
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.CreateTopicsOptions
import org.apache.kafka.clients.admin.NewTopic

@Requires(bean = AdminClient)
@Factory
class MyTopicFactory {

    @Bean
    CreateTopicsOptions options() {
        new CreateTopicsOptions().timeoutMs(5000).validateOnly(true).retryOnQuotaViolation(false)
    }

    @Bean
    NewTopic topic1() {
        new NewTopic("my-new-topic-1", 1, (short) 1)
    }

    @Bean
    NewTopic topic2() {
        new NewTopic("my-new-topic-2", 2, (short) 1)
    }
}
Creating New Kafka Topics with Options
import io.micronaut.context.annotation.Bean
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.CreateTopicsOptions
import org.apache.kafka.clients.admin.NewTopic

@Requires(bean = AdminClient::class)
@Factory
class MyTopicFactory {

    @Bean
    fun options(): CreateTopicsOptions {
        return CreateTopicsOptions().timeoutMs(5000).validateOnly(true).retryOnQuotaViolation(false)
    }

    @Bean
    fun topic1(): NewTopic {
        return NewTopic("my-new-topic-1", 1, 1)
    }

    @Bean
    fun topic2(): NewTopic {
        return NewTopic("my-new-topic-2", 2, 1)
    }
}
Creating topics is not a transactional operation, so it may succeed for some topics while fail for others. This operation also executes asynchronously, so it may take several seconds until all the brokers become aware that the topics have been created.

If you ever need to check if the operation has completed, you can @Inject or retrieve the KafkaNewTopics bean from the application context and then retrieve the operation result that Kafka returned when the topics were created.

Checking if Topic Creation is Done
    boolean areNewTopicsDone(KafkaNewTopics newTopics) {
        return newTopics.getResult().all().isDone();
    }
Checking if Topic Creation is Done
    boolean areNewTopicsDone(KafkaNewTopics newTopics) {
        newTopics.result.all().done
    }
Checking if Topic Creation is Done
    fun areNewTopicsDone(newTopics: KafkaNewTopics): Boolean {
        return newTopics.result.all().isDone
    }

7.5 Disabling Micronaut-Kafka

If you want to disable micronaut-kafka entirely, you can set kafka.enabled to false in application.yml.

This will prevent the instantiation of all kafka-related beans.

You must, however, provide your own replacement implementations of any @KafkaClient interfaces:

Creating Replacement KafkaClient Implementations
import io.micronaut.context.annotation.Replaces;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.StringUtils;
import jakarta.inject.Singleton;

@Requires(property = "kafka.enabled", notEquals = StringUtils.TRUE, defaultValue = StringUtils.TRUE) // (1)
@Replaces(MessageClient.class) // (2)
@Singleton
public class MessageClientFallback implements MessageClient { // (3)

    @Override
    public void send(String message) {
        throw new UnsupportedOperationException(); // (4)
    }
}
Creating Replacement KafkaClient Implementations
import io.micronaut.context.annotation.Replaces
import io.micronaut.context.annotation.Requires
import io.micronaut.core.util.StringUtils
import jakarta.inject.Singleton

@Requires(property = 'kafka.enabled', notEquals = StringUtils.TRUE, defaultValue = StringUtils.TRUE) // (1)
@Replaces(MessageClient.class) // (2)
@Singleton
class MessageClientFallback implements MessageClient { // (3)

    @Override
    void send(String message) {
        throw new UnsupportedOperationException() // (4)
    }
}
Creating Replacement KafkaClient Implementations
import io.micronaut.context.annotation.Replaces
import io.micronaut.context.annotation.Requires
import io.micronaut.core.util.StringUtils
import jakarta.inject.Singleton

@Requires(property = "kafka.enabled", notEquals = StringUtils.TRUE, defaultValue = StringUtils.TRUE) // (1)
@Replaces(MessageClient::class) // (2)
@Singleton
class MessageClientFallback : MessageClient { // (3)

    override fun send(message: String) {
        throw UnsupportedOperationException() // (4)
    }
}
1 Only instantiate when kafka.enabled is set to false
2 Replace the @KafkaClient interface
3 Implement the interface
4 Provide an alternative implementation for all client methods

8 Kafka Streams

Using the CLI

If you are creating your project using the Micronaut CLI, supply the kafka-streams feature to include a simple Kafka Streams configuration in your project:

$ mn create-app my-app --features kafka-streams

Kafka Streams is a platform for building real time streaming applications.

When using Micronaut with Kafka Stream, your application gains all of the features from Micronaut (configuration management, AOP, DI, health checks etc.), simplifying the construction of Kafka Stream applications.

Since Micronaut’s DI and AOP is compile time, you can build low overhead stream applications with ease.

Defining Kafka Streams

To define Kafka Streams you should first add the kafka-streams configuration to your build.

implementation("io.micronaut.kafka:micronaut-kafka-streams")
<dependency>
    <groupId>io.micronaut.kafka</groupId>
    <artifactId>micronaut-kafka-streams</artifactId>
</dependency>

The minimum configuration required is to set the Kafka bootstrap servers:

Configuring Kafka
kafka.bootstrap.servers=localhost:9092
kafka:
    bootstrap:
        servers: localhost:9092
[kafka]
  [kafka.bootstrap]
    servers="localhost:9092"
kafka {
  bootstrap {
    servers = "localhost:9092"
  }
}
{
  kafka {
    bootstrap {
      servers = "localhost:9092"
    }
  }
}
{
  "kafka": {
    "bootstrap": {
      "servers": "localhost:9092"
    }
  }
}

You should then define a @Factory for your streams that defines beans that return a KStream. For example to implement the Word Count example from the Kafka Streams documentation:

Kafka Streams Word Count
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;

@Factory
public class WordCountStream {

    @Singleton
    @Named("word-count")
    KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { // (1)
        // set default serdes
        Properties props = builder.getConfiguration();
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500");

        KStream<String, String> source = builder.stream("streams-plaintext-input"); // (2)

        KTable<String, Long> groupedByWord = source
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
            //Store the result in a store for lookup later
            .count(Materialized.as("word-count-store-java")); // (3)

        groupedByWord
            //convert to stream
            .toStream()
            //send to output using specific serdes
            .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); // (4)

        return source;
    }
Kafka Streams Word Count
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import jakarta.inject.Named
import jakarta.inject.Singleton
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.Grouped
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.KTable
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.kstream.Produced

@Factory
class WordCountStream {

    @Singleton
    @Named('word-count')
    KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { // (1)
        // set default serdes
        Properties props = builder.getConfiguration();
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 'earliest')
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, '500')

        KStream<String, String> source = builder.stream('streams-plaintext-input') // (2)

        KTable<String, Long> groupedByWord = source
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
            //Store the result in a store for lookup later
            .count(Materialized.as('word-count-store-groovy')) // (3)

        groupedByWord
            //convert to stream
            .toStream()
            //send to output using specific serdes
            .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())) // (4)

        return source
    }
Kafka Streams Word Count
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import jakarta.inject.Named
import jakarta.inject.Singleton
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.Grouped
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.kstream.Produced
import java.util.*

@Factory
class WordCountStream {

    @Singleton
    @Named("word-count")
    fun wordCountStream(builder: ConfiguredStreamBuilder): KStream<String, String> { // (1)
        // set default serdes
        val props = builder.configuration
        props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.getName()
        props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.getName()
        props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
        props[StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = "500"
        val source = builder.stream<String, String>("streams-plaintext-input") // (2)
        val groupedByWord = source
            .flatMapValues { value: String ->
                Arrays.asList(
                    *value.lowercase(Locale.getDefault()).split("\\W+".toRegex()).dropLastWhile { it.isEmpty() }
                        .toTypedArray())
            }
            .groupBy(
                { key: String?, word: String? -> word },
                Grouped.with(Serdes.String(), Serdes.String())
            )
            //Store the result in a store for lookup later
            .count(Materialized.`as`("word-count-store-kotlin")) // (3)
        groupedByWord
            //convert to stream
            .toStream()
            //send to output using specific serdes
            .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())) // (4)
        return source
    }
1 An instance of ConfiguredStreamBuilder is injected that allows mutating the configuration
2 The input topic
3 Materialize the count stream and save to a state store
4 The output topic
With Kafka streams the key and value Serdes (serializer/deserializer) must be classes with a zero argument constructor. If you wish to use JSON (de)serialization you can subclass JsonObjectSerde to define your Serdes

You can use the @KafkaClient annotation to send a sentence to be processed by the above stream:

Defining a Kafka Client
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;

@KafkaClient
public interface WordCountClient {

    @Topic("streams-plaintext-input")
    void publishSentence(String sentence);
}
Defining a Kafka Client
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires

@KafkaClient
interface WordCountClient {

    @Topic("streams-plaintext-input")
    void publishSentence(String sentence)
}
Defining a Kafka Client
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires

@KafkaClient
interface WordCountClient {

    @Topic("streams-plaintext-input")
    fun publishSentence(sentence: String?)
}

You can also define a @KafkaListener to listen for the result of the word count stream:

Defining a Kafka Listener
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;

@KafkaListener(offsetReset = EARLIEST, groupId = "WordCountListener")
public class WordCountListener {

    private final Map<String, Long> wordCounts = new ConcurrentHashMap<>();

    @Topic("streams-wordcount-output")
    void count(@KafkaKey String word, long count) {
        wordCounts.put(word, count);
    }

    public long getCount(String word) {
        Long num = wordCounts.get(word);
        return num != null ? num : 0;
    }

    public Map<String, Long> getWordCounts() {
        return Collections.unmodifiableMap(wordCounts);
    }
}
Defining a Kafka Listener
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires

import java.util.concurrent.ConcurrentHashMap

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST

@KafkaListener(offsetReset = EARLIEST, groupId = 'WordCountListener')
class WordCountListener {

    private final Map<String, Long> wordCounts = new ConcurrentHashMap<>()

    @Topic("streams-wordcount-output")
    void count(@KafkaKey String word, long count) {
        wordCounts.put(word, count)
    }

    long getCount(String word) {
        Long num = wordCounts.get(word)
        num ?: 0
    }

    Map<String, Long> getWordCounts() {
        Collections.unmodifiableMap(wordCounts)
    }
}
Defining a Kafka Listener
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import java.util.*
import java.util.concurrent.ConcurrentHashMap

@KafkaListener(offsetReset = OffsetReset.EARLIEST, groupId = "WordCountListener")
class WordCountListener {

    private val wordCounts: MutableMap<String, Long> = ConcurrentHashMap()

    @Topic("streams-wordcount-output")
    fun count(@KafkaKey word: String, count: Long) {
        wordCounts[word] = count
    }

    fun getCount(word: String): Long {
        val num = wordCounts[word]
        return num ?: 0
    }

    fun getWordCounts(): Map<String, Long> {
        return Collections.unmodifiableMap(wordCounts)
    }
}

Configuring Kafka Streams

Configuring a single Stream on a Micronaut application.

If you have a single Kafka Stream configured on your Micronaut service, then you should use the default key on the configuration.

kafka.streams.default.processing.guarantee=exactly_once
kafka.streams.default.auto.offset.reset=earliest
kafka:
    streams:
       default:
          processing.guarantee: "exactly_once"
          auto.offset.reset: "earliest"
[kafka]
  [kafka.streams]
    [kafka.streams.default]
      "processing.guarantee"="exactly_once"
      "auto.offset.reset"="earliest"
kafka {
  streams {
    'default' {
      processing.guarantee = "exactly_once"
      auto.offset.reset = "earliest"
    }
  }
}
{
  kafka {
    streams {
      default {
        "processing.guarantee" = "exactly_once"
        "auto.offset.reset" = "earliest"
      }
    }
  }
}
{
  "kafka": {
    "streams": {
      "default": {
        "processing.guarantee": "exactly_once",
        "auto.offset.reset": "earliest"
      }
    }
  }
}

The above configuration example sets the processing.guarantee and auto.offset.reset setting of the default Stream.

Configuring multiple Stream definitions on the same Micronaut Service.

You can define multiple Kafka Streams on the same Micronaut application, each with their own unique configuration. To do this you should define the configuration with kafka.streams.[STREAM-NAME]. Assuming you have 2 or more Kafka Streams definitions on a single service, you will need to use the default key for at least one of them and then define kafka.streams.[STREAM-NAME] for the rest.

For example in application.yml:

kafka.streams.default.num.stream.threads=1
kafka.streams.my-other-stream.num.stream.threads=10
kafka:
    streams:
        default:
          num:
            stream:
              threads: 1
        my-other-stream:
          num:
            stream:
              threads: 10
[kafka]
  [kafka.streams]
    [kafka.streams.default]
      [kafka.streams.default.num]
        [kafka.streams.default.num.stream]
          threads=1
    [kafka.streams.my-other-stream]
      [kafka.streams.my-other-stream.num]
        [kafka.streams.my-other-stream.num.stream]
          threads=10
kafka {
  streams {
    'default' {
      num {
        stream {
          threads = 1
        }
      }
    }
    myOtherStream {
      num {
        stream {
          threads = 10
        }
      }
    }
  }
}
{
  kafka {
    streams {
      default {
        num {
          stream {
            threads = 1
          }
        }
      }
      my-other-stream {
        num {
          stream {
            threads = 10
          }
        }
      }
    }
  }
}
{
  "kafka": {
    "streams": {
      "default": {
        "num": {
          "stream": {
            "threads": 1
          }
        }
      },
      "my-other-stream": {
        "num": {
          "stream": {
            "threads": 10
          }
        }
      }
    }
  }
}

The above configuration sets the num.stream.threads setting of the Kafka StreamsConfig to 1 for the default stream, and the same setting to 10 for a stream named my-stream.

You can then inject an ConfiguredStreamBuilder specifically for the above configuration using jakarta.inject.Named:

@Singleton
@Named("my-other-stream")
KStream<String, String> myOtherKStream(ConfiguredStreamBuilder builder)  {
    return builder.stream("my-other-stream");
}
@Singleton
@Named('my-other-stream')
KStream<String, String> myOtherKStream(ConfiguredStreamBuilder builder)  {
    return builder.stream('my-other-stream')
}
@Singleton
@Named("my-other-stream")
fun myOtherKStream(builder: ConfiguredStreamBuilder): KStream<String, String> {
    return builder.stream("my-other-stream")
}
If you do not provide a @Named on the ConfiguredStreamBuilder you have multiple KStreams defined that share the default configurations like client id, application id, etc.It is advisable when using multiple streams in a single app to provide a @Named instance of ConfiguredStreamBuilder for each stream.
Kafka Streams Word Count
@Singleton
@Named("my-stream")
KStream<String, String> myStream(ConfiguredStreamBuilder builder) {
Kafka Streams Word Count
@Singleton
@Named('my-stream')
KStream<String, String> myStream(ConfiguredStreamBuilder builder) {
Kafka Streams Word Count
@Singleton
@Named("my-stream")
fun myStream(builder: ConfiguredStreamBuilder): KStream<String, String> {
Configuring Kafka Streams for testing

When writing a test without starting the actual Kafka server, you can instruct Micronaut not to start Kafka Streams. To do this create a config file suffixed with an environment name, such as application-test.yml and set the kafka.streams.[STREAM-NAME].start-kafka-streams to false.

For example in application-test.yml:

kafka.streams.default.start-kafka-streams=false
kafka:
    streams:
        default:
            start-kafka-streams: false
[kafka]
  [kafka.streams]
    [kafka.streams.default]
      start-kafka-streams=false
kafka {
  streams {
    'default' {
      startKafkaStreams = false
    }
  }
}
{
  kafka {
    streams {
      default {
        start-kafka-streams = false
      }
    }
  }
}
{
  "kafka": {
    "streams": {
      "default": {
        "start-kafka-streams": false
      }
    }
  }
}

8.1 Interactive Query Service

When using streams you can set a state store for your stream using a store builder and telling the stream to store its data. In the above example for the Kafka Streams Word Count, the output is materialized to a named store that can later be retrieved via the Interactive Query Service. Apache Kafka docs available here.

You can inject the InteractiveQueryService and use the method getQueryableStore(String storeName, QueryableStoreType<T> storeType) to get values from a state store.

An example service that wraps the InteractiveQueryService is included below. This is here to illustrate that when calling the getQueryableStore method you must provide the store name and preferably the type of key and value you are trying to retrieve.

import io.micronaut.configuration.kafka.streams.InteractiveQueryService;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

import java.util.Optional;

/**
 * Example service that uses the InteractiveQueryService in a reusable way.  This is only intended as an example.
 */
@Singleton
public class InteractiveQueryServiceExample {

    private final InteractiveQueryService interactiveQueryService;

    public InteractiveQueryServiceExample(InteractiveQueryService interactiveQueryService) {
        this.interactiveQueryService = interactiveQueryService;
    }

    /**
     * Method to get the word state store and word count from the store using the interactive query service.
     *
     * @param stateStore the name of the state store ie "foo-store"
     * @param word       the key to get, in this case the word as the stream and ktable have been grouped by word
     * @return the Long count of the word in the store
     */
    public Long getWordCount(String stateStore, String word) {
        Optional<ReadOnlyKeyValueStore<String, Long>> queryableStore = interactiveQueryService.getQueryableStore(
            stateStore, QueryableStoreTypes.keyValueStore());
        return queryableStore.map(kvReadOnlyKeyValueStore ->
            kvReadOnlyKeyValueStore.get(word)).orElse(0L);
    }

    /**
     * Method to get byte array from a state store using the interactive query service.
     *
     * @param stateStore the name of the state store ie "bar-store"
     * @param blobName   the key to get, in this case the name of the blob
     * @return the byte[] stored in the state store
     */
    public byte[] getBytes(String stateStore, String blobName) {
        Optional<ReadOnlyKeyValueStore<String, byte[]>> queryableStore = interactiveQueryService.getQueryableStore(
            stateStore, QueryableStoreTypes.keyValueStore());
        return queryableStore.map(stringReadOnlyKeyValueStore ->
            stringReadOnlyKeyValueStore.get(blobName)).orElse(null);
    }

    /**
     * Method to get value V by key K.
     *
     * @param stateStore the name of the state store ie "baz-store"
     * @param name       the key to get
     * @return the value of type V stored in the state store
     */
    public <K, V> V getGenericKeyValue(String stateStore, K name) {
        Optional<ReadOnlyKeyValueStore<K, V>> queryableStore = interactiveQueryService.getQueryableStore(
            stateStore, QueryableStoreTypes.<K, V>keyValueStore());
        return queryableStore.map(kvReadOnlyKeyValueStore ->
            kvReadOnlyKeyValueStore.get(name)).orElse(null);
    }
}
import io.micronaut.configuration.kafka.streams.InteractiveQueryService
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

/**
 * Example service that uses the InteractiveQueryService in a reusable way.  This is only intended as an example.
 */
@Singleton
class InteractiveQueryServiceExample {

    private final InteractiveQueryService interactiveQueryService;

    InteractiveQueryServiceExample(InteractiveQueryService interactiveQueryService) {
        this.interactiveQueryService = interactiveQueryService;
    }

    /**
     * Method to get the word state store and word count from the store using the interactive query service.
     *
     * @param stateStore the name of the state store ie "foo-store"
     * @param word       the key to get, in this case the word as the stream and ktable have been grouped by word
     * @return the Long count of the word in the store
     */
    Long getWordCount(String stateStore, String word) {
        Optional<ReadOnlyKeyValueStore<String, Long>> queryableStore = interactiveQueryService.getQueryableStore(
                stateStore, QueryableStoreTypes.keyValueStore());
        return queryableStore.map(kvReadOnlyKeyValueStore ->
                kvReadOnlyKeyValueStore.get(word)).orElse(0L);
    }

    /**
     * Method to get byte array from a state store using the interactive query service.
     *
     * @param stateStore the name of the state store ie "bar-store"
     * @param blobName   the key to get, in this case the name of the blob
     * @return the byte[] stored in the state store
     */
    byte[] getBytes(String stateStore, String blobName) {
        Optional<ReadOnlyKeyValueStore<String, byte[]>> queryableStore = interactiveQueryService.getQueryableStore(
                stateStore, QueryableStoreTypes.keyValueStore());
        return queryableStore.map(stringReadOnlyKeyValueStore ->
                stringReadOnlyKeyValueStore.get(blobName)).orElse(null);
    }

    /**
     * Method to get value V by key K.
     *
     * @param stateStore the name of the state store ie "baz-store"
     * @param name       the key to get
     * @return the value of type V stored in the state store
     */
    <K, V> V getGenericKeyValue(String stateStore, K name) {
        Optional<ReadOnlyKeyValueStore<K, V>> queryableStore = interactiveQueryService.getQueryableStore(
                stateStore, QueryableStoreTypes.<K, V>keyValueStore());
        return queryableStore.map(kvReadOnlyKeyValueStore ->
                kvReadOnlyKeyValueStore.get(name)).orElse(null);
    }
}
import io.micronaut.configuration.kafka.streams.InteractiveQueryService
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
import org.apache.kafka.streams.state.QueryableStoreTypes
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore

/**
 * Example service that uses the InteractiveQueryService in a reusable way.  This is only intended as an example.
 */
@Singleton
class InteractiveQueryServiceExample(private val interactiveQueryService: InteractiveQueryService) {
    /**
     * Method to get the word state store and word count from the store using the interactive query service.
     *
     * @param stateStore the name of the state store ie "foo-store"
     * @param word       the key to get, in this case the word as the stream and ktable have been grouped by word
     * @return the Long count of the word in the store
     */
    fun getWordCount(stateStore: String, word: String): Long {
        val queryableStore = interactiveQueryService.getQueryableStore(
            stateStore, QueryableStoreTypes.keyValueStore<String, Long>())
        return queryableStore.map { kvReadOnlyKeyValueStore: ReadOnlyKeyValueStore<String, Long> ->
            kvReadOnlyKeyValueStore[word] }.orElse(0L)
    }

    /**
     * Method to get byte array from a state store using the interactive query service.
     *
     * @param stateStore the name of the state store ie "bar-store"
     * @param blobName   the key to get, in this case the name of the blob
     * @return the byte[] stored in the state store
     */
    fun getBytes(stateStore: String, blobName: String): ByteArray? {
        val queryableStore = interactiveQueryService.getQueryableStore(
            stateStore, QueryableStoreTypes.keyValueStore<String, ByteArray>())
        return queryableStore.map { stringReadOnlyKeyValueStore: ReadOnlyKeyValueStore<String, ByteArray> ->
            stringReadOnlyKeyValueStore[blobName] }.orElse(null)
    }

    /**
     * Method to get value V by key K.
     *
     * @param stateStore the name of the state store ie "baz-store"
     * @param name       the key to get
     * @return the value of type V stored in the state store
     */
    fun <K, V> getGenericKeyValue(stateStore: String, name: K): V {
        val queryableStore = interactiveQueryService.getQueryableStore(
            stateStore, QueryableStoreTypes.keyValueStore<K, V>())
        return queryableStore.map { kvReadOnlyKeyValueStore: ReadOnlyKeyValueStore<K, V> ->
            kvReadOnlyKeyValueStore[name] }.orElse(null)
    }
}

8.2 Kafka Stream Health Checks

In addition to http-server-netty, if the management dependency is added, then Micronaut’s Health Endpoint can be used to expose the health status of the Kafka streams application.

For example stream health at the /health endpoint will return:

{
  "status": "UP",
  "details": {
    "kafkaStreams": {
      "name": "my-application",
      "status": "UP",
      "details": {
        "named-stream": {
          "name": "my-application",
          "status": "UP",
          "details": {
            "adminClientId": "my-consumer-id-admin",
            "restoreConsumerClientId": "my-consumer-id-StreamThread-1-restore-consumer",
            "threadState": "RUNNING",
            "producerClientIds": [
              "my-consumer-id-StreamThread-1-producer"
            ],
            "consumerClientId": "my-consumer-id-StreamThread-1-consumer",
            "threadName": "my-consumer-id-StreamThread-1"
          }
        }
      }
    },
    ...
  }
}
By default, the details visible above are only shown to authenticated users. See the Health Endpoint documentation for how to configure that setting.

If you wish to disable the Kafka streams health check while still using the management dependency you can set the property kafka.health.streams.enabled to false in your application configuration.

kafka.health.streams.enabled=false
kafka:
    health:
        streams:
            enabled: false
[kafka]
  [kafka.health]
    [kafka.health.streams]
      enabled=false
kafka {
  health {
    streams {
      enabled = false
    }
  }
}
{
  kafka {
    health {
      streams {
        enabled = false
      }
    }
  }
}
{
  "kafka": {
    "health": {
      "streams": {
        "enabled": false
      }
    }
  }
}

8.3 Handling Uncaught Exceptions

Since version 2.8.0, Kafka allows you to handle uncaught exceptions that may be thrown from your streams. This handler must return the action that must be taken, depending on the thrown exception.

There are three possible responses: REPLACE_THREAD, SHUTDOWN_CLIENT, or SHUTDOWN_APPLICATION.

You can find more details about this mechanism here.

If you just want to take the same action every time, you can set the application property kafka.streams.[STREAM-NAME].uncaught-exception-handler to a valid action, such as REPLACE_THREAD.

For example in application-test.yml:

kafka.streams.my-stream.uncaught-exception-handler=REPLACE_THREAD
kafka:
    streams:
        my-stream:
            uncaught-exception-handler: REPLACE_THREAD
[kafka]
  [kafka.streams]
    [kafka.streams.my-stream]
      uncaught-exception-handler="REPLACE_THREAD"
kafka {
  streams {
    myStream {
      uncaughtExceptionHandler = "REPLACE_THREAD"
    }
  }
}
{
  kafka {
    streams {
      my-stream {
        uncaught-exception-handler = "REPLACE_THREAD"
      }
    }
  }
}
{
  "kafka": {
    "streams": {
      "my-stream": {
        "uncaught-exception-handler": "REPLACE_THREAD"
      }
    }
  }
}

To implement your own handler, you can listen to the application event BeforeKafkaStreamStart and configure the streams with your own business logic:

import io.micronaut.configuration.kafka.streams.event.BeforeKafkaStreamStart;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventListener;
import jakarta.inject.Singleton;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

@Singleton
public class MyStreamsUncaughtExceptionHandler
    implements ApplicationEventListener<BeforeKafkaStreamStart>, StreamsUncaughtExceptionHandler {

    boolean dangerAvoided = false;

    @Override
    public void onApplicationEvent(BeforeKafkaStreamStart event) {
        event.getKafkaStreams().setUncaughtExceptionHandler(this);
    }

    @Override
    public StreamThreadExceptionResponse handle(Throwable exception) {
        if (exception.getCause() instanceof MyException) {
            this.dangerAvoided = true;
            return StreamThreadExceptionResponse.REPLACE_THREAD;
        }
        return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
    }
}

9 Guides

See the following list of guides to learn more about working with Kafka in the Micronaut Framework:

10 Repository

You can find the source code of this project in this repository: