$ mn create-app my-kafka-app --features kafka
Table of Contents
Micronaut Kafka
Integration between Micronaut and Kafka Messaging
Version: 5.7.1-SNAPSHOT
1 Introduction
Apache Kafka is a distributed stream processing platform that can be used for a range of messaging requirements in addition to stream processing and real-time data handling.
Micronaut features dedicated support for defining both Kafka Producer
and Consumer
instances. Micronaut applications built with Kafka can be deployed with or without the presence of an HTTP server.
With Micronaut’s efficient compile-time AOP and cloud native features, writing efficient Kafka consumer applications that use very little resources is a breeze.
2 Release History
For this project, you can find a list of releases (with release notes) here:
Upgrading to Micronaut Kafka 5.0
Micronaut Kafka 5.0 is a significant major version which includes a number of changes you will need to consider when upgrading.
Micronaut 4, Kafka 3 & Java 17 baseline
Micronaut Kafka 5.0 requires the following minimum set of dependencies:
-
Java 17 or above
-
Kafka 3
-
Micronaut 4 or above
@KafkaClient
no longer recoverable by default
Previous versions of Micronaut Kafka used the meta-annotation @Recoverable on the @KafkaClient
annotation allowing you to define fallbacks in the case of failure. Micronaut Kafka 5 no longer includes this meta annotation and if you use fallbacks you should explicitly declare a dependency on io.micronaut:micronaut-retry
and declare the @Recoverable
explicitly.
Open Tracing No Longer Supported
Micronaut Kafka 5 no longer supports Open Tracing (which is deprecated and no longer maintained) and if you need distributed tracing you should instead use Open Telemetry.
3 Using the Micronaut CLI
To create a project with Kafka support using the Micronaut CLI, supply the kafka
feature to the features
flag.
This will create a project with the minimum necessary configuration for Kafka.
Kafka Messaging Application
The Micronaut CLI includes the ability to create Kafka-based messaging applications designed to implement message-driven microservices.
To create a Message-Driven Microservice with Micronaut + Kafka use the create-messaging-app
command:
$ mn create-messaging-app my-kafka-app --features kafka
As you’d expect, you can start the application with ./gradlew run
(for Gradle) or ./mvnw compile exec:exec
(Maven). The application will (with the default config) attempt to connect to Kafka at http://localhost:9092
, and will continue to run without starting up an HTTP server. All communication to/from the service will take place via Kafka producers and/or listeners.
Within the new project, you can now run the Kafka-specific code generation commands:
$ mn create-kafka-producer MessageProducer | Rendered template Producer.java to destination src/main/java/my/kafka/app/MessageProducer.java $ mn create-kafka-listener MessageListener | Rendered template Listener.java to destination src/main/java/my/kafka/app/MessageListener.java
See the guide for Kafka and the Micronaut Framework - Event-driven Applications to learn more. |
4 Kafka Quick Start
To add support for Kafka to an existing project, you should first add the Micronaut Kafka configuration to your build configuration. For example in Gradle:
implementation("io.micronaut.kafka:micronaut-kafka")
<dependency>
<groupId>io.micronaut.kafka</groupId>
<artifactId>micronaut-kafka</artifactId>
</dependency>
Configuring Kafka
The minimum requirement to configure Kafka is set the value of the kafka.bootstrap.servers
property in application.yml
or bootstrap.yml
:
kafka.bootstrap.servers=localhost:9092
kafka:
bootstrap:
servers: localhost:9092
[kafka]
[kafka.bootstrap]
servers="localhost:9092"
kafka {
bootstrap {
servers = "localhost:9092"
}
}
{
kafka {
bootstrap {
servers = "localhost:9092"
}
}
}
{
"kafka": {
"bootstrap": {
"servers": "localhost:9092"
}
}
}
The value can also be list of available servers:
kafka.bootstrap.servers[0]=foo:9092
kafka.bootstrap.servers[1]=bar:9092
kafka:
bootstrap:
servers:
- foo:9092
- bar:9092
[kafka]
[kafka.bootstrap]
servers=[
"foo:9092",
"bar:9092"
]
kafka {
bootstrap {
servers = ["foo:9092", "bar:9092"]
}
}
{
kafka {
bootstrap {
servers = ["foo:9092", "bar:9092"]
}
}
}
{
"kafka": {
"bootstrap": {
"servers": ["foo:9092", "bar:9092"]
}
}
}
You can also set the environment variable KAFKA_BOOTSTRAP_SERVERS to a comma separated list of values to externalize configuration.
|
You may also add any Apache Kafka configuration options directly under the kafka node. These configurations will apply to consumers, producers and streams:
kafka.bootstrap.servers=localhost:9092
kafka.reconnect.backoff.ms=30000
kafka.retry.backoff.ms=32000
kafka:
bootstrap:
servers: localhost:9092
reconnect.backoff.ms: 30000
retry.backoff.ms: 32000
[kafka]
"reconnect.backoff.ms"=30000
"retry.backoff.ms"=32000
[kafka.bootstrap]
servers="localhost:9092"
kafka {
bootstrap {
servers = "localhost:9092"
}
reconnect.backoff.ms = 30000
retry.backoff.ms = 32000
}
{
kafka {
bootstrap {
servers = "localhost:9092"
}
"reconnect.backoff.ms" = 30000
"retry.backoff.ms" = 32000
}
}
{
"kafka": {
"bootstrap": {
"servers": "localhost:9092"
},
"reconnect.backoff.ms": 30000,
"retry.backoff.ms": 32000
}
}
If your broker needs to setup SSL, it can be configured this way:
kafka.bootstrap.servers=localhost:9093
kafka.ssl.keystore.location=/path/to/client.keystore.p12
kafka.ssl.keystore.password=secret
kafka.ssl.keystore.type=PKCS12
kafka.ssl.truststore.location=/path/to/client.truststore.p12
kafka.ssl.truststore.password=secret
kafka.ssl.truststore.type=PKCS12
kafka.security.protocol=ssl
kafka:
bootstrap:
servers: localhost:9093
ssl:
keystore:
location: /path/to/client.keystore.p12
password: secret
type: PKCS12
truststore:
location: /path/to/client.truststore.p12
password: secret
type: PKCS12
security:
protocol: ssl
[kafka]
[kafka.bootstrap]
servers="localhost:9093"
[kafka.ssl]
[kafka.ssl.keystore]
location="/path/to/client.keystore.p12"
password="secret"
type="PKCS12"
[kafka.ssl.truststore]
location="/path/to/client.truststore.p12"
password="secret"
type="PKCS12"
[kafka.security]
protocol="ssl"
kafka {
bootstrap {
servers = "localhost:9093"
}
ssl {
keystore {
location = "/path/to/client.keystore.p12"
password = "secret"
type = "PKCS12"
}
truststore {
location = "/path/to/client.truststore.p12"
password = "secret"
type = "PKCS12"
}
}
security {
protocol = "ssl"
}
}
{
kafka {
bootstrap {
servers = "localhost:9093"
}
ssl {
keystore {
location = "/path/to/client.keystore.p12"
password = "secret"
type = "PKCS12"
}
truststore {
location = "/path/to/client.truststore.p12"
password = "secret"
type = "PKCS12"
}
}
security {
protocol = "ssl"
}
}
}
{
"kafka": {
"bootstrap": {
"servers": "localhost:9093"
},
"ssl": {
"keystore": {
"location": "/path/to/client.keystore.p12",
"password": "secret",
"type": "PKCS12"
},
"truststore": {
"location": "/path/to/client.truststore.p12",
"password": "secret",
"type": "PKCS12"
}
},
"security": {
"protocol": "ssl"
}
}
}
Creating a Kafka Producer with @KafkaClient
To create a Kafka Producer
that sends messages you can simply define an interface that is annotated with @KafkaClient.
For example the following is a trivial @KafkaClient
interface:
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
@KafkaClient // (1)
public interface ProductClient {
@Topic("my-products") // (2)
void sendProduct(@KafkaKey String brand, String name); // (3)
void sendProduct(@Topic String topic, @KafkaKey String brand, String name); // (4)
}
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
@KafkaClient // (1)
public interface ProductClient {
@Topic('my-products') // (2)
void sendProduct(@KafkaKey String brand, String name) // (3)
void sendProduct(@Topic String topic, @KafkaKey String brand, String name) // (4)
}
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
@KafkaClient // (1)
interface ProductClient {
@Topic("my-products") // (2)
fun sendProduct(@KafkaKey brand: String, name: String) // (3)
fun sendProduct(@Topic topic: String, @KafkaKey brand: String, name: String) // (4)
}
1 | The @KafkaClient annotation is used to designate this interface as a client |
2 | The @Topic annotation indicates which topics the ProducerRecord should be published to |
3 | The method defines two parameters: The parameter that is the Kafka key and the value. |
4 | It is also possible for the topic to be dynamic by making it a method argument |
You can omit the key, however this will result in a null key which means Kafka will not know how to partition the record.
|
At compile time Micronaut will produce an implementation of the above interface. You can retrieve an instance of ProductClient
either by looking up the bean from the ApplicationContext or by injecting the bean with @Inject
:
ProductClient client = applicationContext.getBean(ProductClient.class);
client.sendProduct("Nike", "Blue Trainers");
val client = beanContext.getBean(ProductClient::class.java)
client.sendProduct("Nike", "Blue Trainers")
Note that since the sendProduct
method returns void
this means the method will send the ProducerRecord
and block until the response is received. You can specify an executor and return either a CompletableFuture
or Publisher to support non-blocking message delivery.
Creating a Kafka Consumer with @KafkaListener
To listen to Kafka messages you can use the @KafkaListener annotation to define a message listener.
The following example will listen for messages published by the ProductClient
in the previous section:
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;
@KafkaListener(offsetReset = OffsetReset.EARLIEST) // (1)
public class ProductListener {
private static final Logger LOG = getLogger(ProductListener.class);
@Topic("my-products") // (2)
public void receive(@KafkaKey String brand, String name) { // (3)
LOG.info("Got Product - {} by {}", name, brand);
}
}
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
@Slf4j
@KafkaListener(offsetReset = OffsetReset.EARLIEST) // (1)
class ProductListener {
@Topic('my-products') // (2)
void receive(@KafkaKey String brand, String name) { // (3)
log.info("Got Product - {} by {}", name, brand)
}
}
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import org.slf4j.LoggerFactory
@KafkaListener(offsetReset = OffsetReset.EARLIEST) // (1)
class ProductListener {
companion object {
private val LOG = LoggerFactory.getLogger(ProductListener::class.java)
}
@Topic("my-products") // (2)
fun receive(@KafkaKey brand: String, name: String) { // (3)
LOG.info("Got Product - {} by {}", name, brand)
}
}
1 | The @KafkaListener is used with offsetReset set to EARLIEST which makes the listener start listening to messages from the beginning of the partition. |
2 | The @Topic annotation is again used to indicate which topic(s) to subscribe to. |
3 | The receive method defines 2 arguments: The argument that will receive the key and the argument that will receive the value. |
Disabling Kafka
If for some reason, you need to disable the creation of kafka consumers, or producers, you can through configuration:
kafka.enabled=false
kafka:
enabled: false
[kafka]
enabled=false
kafka {
enabled = false
}
{
kafka {
enabled = false
}
}
{
"kafka": {
"enabled": false
}
}
5 Kafka Producers Using @KafkaClient
5.1 Defining @KafkaClient Methods
Specifying the Key and the Value
The Kafka key can be specified by providing a parameter annotated with @KafkaKey
. If no such parameter is specified the record is sent with a null
key.
The value to send is resolved by selecting the argument annotated with @MessageBody, otherwise the first argument with no specific binding annotation is used. For example:
@Topic("my-products")
void sendProduct(@KafkaKey String brand, String name);
@Topic('my-products')
void sendProduct(@KafkaKey String brand, String name)
@Topic("my-products")
fun sendProduct(@KafkaKey brand: String, name: String)
The method above will use the parameter brand
as the key and the parameter name
as the value.
Including Message Headers
There are a number of ways you can include message headers. One way is to annotate an argument with the @MessageHeader annotation and include a value when calling the method:
@Topic("my-products")
void sendProduct(@KafkaKey String brand, String name, @MessageHeader("My-Header") String myHeader);
@Topic('my-products')
void sendProduct(@KafkaKey String brand, String name, @MessageHeader('My-Header') String myHeader)
@Topic("my-products")
fun sendProduct(@KafkaKey brand: String, name: String, @MessageHeader("My-Header") myHeader: String)
The example above will include the value of the myHeader
argument as a header called My-Header
.
Another way to include headers is at the type level with the values driven from configuration:
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.messaging.annotation.MessageHeader;
@KafkaClient(id="product-client")
@MessageHeader(name = "X-Token", value = "${my.application.token}")
public interface ProductClient {
// define client API
}
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.messaging.annotation.MessageHeader
@KafkaClient(id='product-client')
@MessageHeader(name = 'X-Token', value = '${my.application.token}')
interface ProductClient {
// define client API
}
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.messaging.annotation.MessageHeader
@KafkaClient(id = "product-client")
@MessageHeader(name = "X-Token", value = "\${my.application.token}")
interface ProductClient {
// define client API
}
The above example will send a header called X-Token
with the value read from the setting my.application.token
in application.yml
(or the environnment variable MY_APPLICATION_TOKEN
).
If the my.application.token
is not set then an error will occur creating the client.
It is also possible to pass Collection<Header>
or Headers
object as method arguments as seen below.
@Topic("my-bicycles")
void sendBicycle(@KafkaKey String brand, String model, Collection<Header> headers);
@Topic('my-bicycles')
void sendBicycle(@KafkaKey String brand, String model, Collection<Header> headers)
@Topic("my-bicycles")
fun sendBicycle(@KafkaKey brand: String, model: String, headers: Collection<Header>)
@Topic("my-bicycles")
void sendBicycle(@KafkaKey String brand, String model, Headers headers);
@Topic('my-bicycles')
void sendBicycle(@KafkaKey String brand, String model, Headers headers)
@Topic("my-bicycles")
fun sendBicycle(@KafkaKey brand: String, model: String, headers: Headers)
In the above examples, all of the key/value pairs in headers
will be added to the list of headers produced to the topic. Header
and Headers
are
part of the kafka-clients
library:
Reactive and Non-Blocking Method Definitions
The @KafkaClient annotation supports the definition of reactive return types (such as Flowable or Reactor Flux
) as well as Futures.
The KafkaProducer used internally to implement @KafkaClient support is inherently blocking, even though some of its methods describe themselves as "asynchronous". Configuring an executor (as shown in the following examples) is required in order to guarantee that a returned reactive type or Future will not block the calling thread. |
Add the library Micronaut Reactor or Micronaut RxJava 3 to your application’s dependencies.
The following sections, which use Micronaut Reactor, cover advised configuration and possible method signatures and behaviour:
Configuring An Executor
As the send
method of KafkaProducer
can block the calling thread, it is recommended that you specify an executor to be used when returning either reactive types or CompletableFuture
. This will ensure that the send
logic is executed on a separate thread from that of the caller, and avoid undesirable conditions such as blocking of the Micronaut server’s event loop.
The executor to be used may be specified via configuration properties as in the following example:
kafka.producers.default.executor=blocking
kafka.producers.my-named-producer.executor=io
kafka:
producers:
default:
executor: blocking
my-named-producer:
executor: io
[kafka]
[kafka.producers]
[kafka.producers.default]
executor="blocking"
[kafka.producers.my-named-producer]
executor="io"
kafka {
producers {
'default' {
executor = "blocking"
}
myNamedProducer {
executor = "io"
}
}
}
{
kafka {
producers {
default {
executor = "blocking"
}
my-named-producer {
executor = "io"
}
}
}
}
{
"kafka": {
"producers": {
"default": {
"executor": "blocking"
},
"my-named-producer": {
"executor": "io"
}
}
}
}
Alternatively, the executor may be specified via the executor
property of the @KafkaClient annotation:
@KafkaClient(value = "product-client", executor = TaskExecutors.BLOCKING)
public interface BookClient {
Note that an executor
specified in the annotation will take precedent over that specified in the application configuration properties.
Mono Value and Return Type
@Topic("my-books")
Mono<Book> sendBook(@KafkaKey String author, Mono<Book> book);
@Topic('my-books')
Mono<Book> sendBook(@KafkaKey String author, Mono<Book> book);
@Topic("my-books")
fun sendBook(@KafkaKey author: String, book: Mono<Book>): Mono<Book>
The implementation will return a Mono
that when subscribed to will subscribe to the passed Mono
and send the emitted item as a ProducerRecord
emitting the item again if successful or an error otherwise.
Flux Value and Return Type
@Topic("my-books")
Flux<RecordMetadata> sendBooks(@KafkaKey String author, Flux<Book> book);
@Topic('my-books')
Flux<RecordMetadata> sendBooks(@KafkaKey String author, Flux<Book> book);
@Topic("my-books")
fun sendBooks(@KafkaKey author: String, book: Flux<Book>): Flux<RecordMetadata>
The implementation will return a Reactor Flux
that when subscribed to will subscribe to the passed Flux
and for each emitted item will send a ProducerRecord
emitting the resulting Kafka RecordMetadata
if successful or an error otherwise.
Available Annotations
There are a number of annotations available that allow you to specify how a method argument is treated.
The following table summarizes the annotations and their purpose, with an example:
Annotation | Description | Example |
---|---|---|
Allows explicitly indicating the body of the message to sent |
|
|
Allows specifying a parameter that should be sent as a header |
|
|
Allows specifying the parameter that is the Kafka key |
|
|
Allows specifying the parameter that is the partition number |
|
|
Allows specifying the parameter that is used to compute a partition number independently from the Message Key. |
|
For example, you can use the @MessageHeader annotation to bind a parameter value to a header in the ProducerRecord
.
5.2 Configuring @KafkaClient beans
@KafkaClient and Producer Properties
There are a number of ways to pass configuration properties to the KafkaProducer. You can set default producer properties using kafka.producers.default
in application.yml
:
kafka.producers.default.retries=5
kafka.producers.default.bootstrap.servers=localhost:9096
kafka:
producers:
default:
retries: 5
bootstrap:
servers: localhost:9096
[kafka]
[kafka.producers]
[kafka.producers.default]
retries=5
[kafka.producers.default.bootstrap]
servers="localhost:9096"
kafka {
producers {
'default' {
retries = 5
bootstrap {
servers = "localhost:9096"
}
}
}
}
{
kafka {
producers {
default {
retries = 5
bootstrap {
servers = "localhost:9096"
}
}
}
}
}
{
"kafka": {
"producers": {
"default": {
"retries": 5,
"bootstrap": {
"servers": "localhost:9096"
}
}
}
}
}
Any property in the ProducerConfig class can be set, including any overrides over the global Micronaut Kafka configs. The above example will set the default number of times to retry sending a record as well as override kafka.bootstrap.servers
.
Per @KafkaClient Producer Properties
To configure different properties for each client, you should set a @KafkaClient
id using the annotation:
@KafkaClient("product-client")
@KafkaClient('product-client')
@KafkaClient("product-client")
This serves 2 purposes. Firstly it sets the value of the client.id
setting used to build the Producer
. Secondly, it allows you to apply per producer configuration in application.yml
:
kafka.producers.product-client.retries=5
kafka.producers.product-client.bootstrap.servers=localhost:9097
kafka.producers.product-client-2.bootstrap.servers=localhost:9098
kafka:
producers:
product-client:
retries: 5
bootstrap:
servers: localhost:9097
product-client-2:
bootstrap:
servers: localhost:9098
[kafka]
[kafka.producers]
[kafka.producers.product-client]
retries=5
[kafka.producers.product-client.bootstrap]
servers="localhost:9097"
[kafka.producers.product-client-2]
[kafka.producers.product-client-2.bootstrap]
servers="localhost:9098"
kafka {
producers {
productClient {
retries = 5
bootstrap {
servers = "localhost:9097"
}
}
productClient2 {
bootstrap {
servers = "localhost:9098"
}
}
}
}
{
kafka {
producers {
product-client {
retries = 5
bootstrap {
servers = "localhost:9097"
}
}
product-client-2 {
bootstrap {
servers = "localhost:9098"
}
}
}
}
}
{
"kafka": {
"producers": {
"product-client": {
"retries": 5,
"bootstrap": {
"servers": "localhost:9097"
}
},
"product-client-2": {
"bootstrap": {
"servers": "localhost:9098"
}
}
}
}
}
Finally, the @KafkaClient annotation itself provides a properties
member that you can use to set producer specific properties:
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Requires;
import org.apache.kafka.clients.producer.ProducerConfig;
@KafkaClient(
id = "product-client",
acks = KafkaClient.Acknowledge.ALL,
properties = @Property(name = ProducerConfig.RETRIES_CONFIG, value = "5")
)
public interface ProductClient {
// define client API
}
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import org.apache.kafka.clients.producer.ProducerConfig
@KafkaClient(
id = "product-client",
acks = KafkaClient.Acknowledge.ALL,
properties = @Property(name = ProducerConfig.RETRIES_CONFIG, value = '5')
)
interface ProductClient {
// define client API
}
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import org.apache.kafka.clients.producer.ProducerConfig
@KafkaClient(
id = "product-client",
acks = KafkaClient.Acknowledge.ALL,
properties = [Property(name = ProducerConfig.RETRIES_CONFIG, value = "5")]
)
interface ProductClient {
// define client API
}
@KafkaClient and Serializers
When serializing keys and values Micronaut will by default attempt to automatically pick a Serializer to use. This is done via the CompositeSerdeRegistry bean.
You can replace the default SerdeRegistry bean with your own implementation by defining a bean that uses @Replaces(CompositeSerdeRegistry.class) . See the section on Bean Replacement.
|
All common java.lang
types (String
, Integer
, primitives etc.) are supported and for POJOs by default a Jackson based JSON serializer is used.
You can, however, explicitly override the Serializer
used by providing the appropriate configuration in application.yml
:
kafka.producers.product-client.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
kafka:
producers:
product-client:
value:
serializer: org.apache.kafka.common.serialization.ByteArraySerializer
[kafka]
[kafka.producers]
[kafka.producers.product-client]
[kafka.producers.product-client.value]
serializer="org.apache.kafka.common.serialization.ByteArraySerializer"
kafka {
producers {
productClient {
value {
serializer = "org.apache.kafka.common.serialization.ByteArraySerializer"
}
}
}
}
{
kafka {
producers {
product-client {
value {
serializer = "org.apache.kafka.common.serialization.ByteArraySerializer"
}
}
}
}
}
{
"kafka": {
"producers": {
"product-client": {
"value": {
"serializer": "org.apache.kafka.common.serialization.ByteArraySerializer"
}
}
}
}
}
You may want to do this if for example you choose an alternative serialization format such as Avro or Protobuf.
5.3 Sending Records in Batch
By default if you define a method that takes a container type such as a List the list will be serialized using the specified value.serializer
(the default will result in a JSON array).
For example the following two methods will both send serialized arrays:
@Topic("books")
void sendList(List<Book> books);
@Topic("books")
void sendBooks(Book...books);
@Topic('books')
void sendList(List<Book> books)
@Topic('books')
void sendBooks(Book...books)
@Topic("books")
fun sendList(books: List<Book>)
@Topic("books")
fun sendBooks(vararg books: Book)
Instead of a sending a serialized array you may wish to instead send batches of ProducerRecord either synchronously or asynchronously.
To do this you can specify a value of true
to the batch
member of the @KafkaClient annotation:
ProducerRecord
batches@KafkaClient(batch = true)
public interface BookClient {
@Topic("books")
void sendList(List<Book> books);
ProducerRecord
batches@KafkaClient(batch = true)
interface BookClient {
@Topic('books')
void sendList(List<Book> books)
ProducerRecord
batches@KafkaClient(batch = true)
interface BookClient {
@Topic("books")
fun sendList(books: List<Book>)
In the above case instead of sending a serialized array the client implementation will iterate over each item in the list and send a ProducerRecord
for each. The previous example is blocking, however you can return a reactive type if desired:
ProducerRecord
batches Reactively@KafkaClient(batch = true)
public interface BookClient {
@Topic("books")
Flux<RecordMetadata> send(List<Book> books);
ProducerRecord
batches Reactively@KafkaClient(batch = true)
interface BookClient {
@Topic('books')
Flux<RecordMetadata> send(List<Book> books)
ProducerRecord
batches Reactively@KafkaClient(batch = true)
interface BookClient {
@Topic("books")
fun send(books: List<Book>): Flux<RecordMetadata>
You can also use an unbound reactive type such as Flux
as the source of your batch data:
ProducerRecord
batches from a Flux@KafkaClient(batch = true)
public interface BookClient {
@Topic("books")
Flux<RecordMetadata> send(Flux<Book> books);
ProducerRecord
batches from a Flux@KafkaClient(batch = true)
interface BookClient {
@Topic('books')
Flux<RecordMetadata> send(Flux<Book> books)
ProducerRecord
batches from a Flux@KafkaClient(batch = true)
interface BookClient {
@Topic("books")
fun send(books: Flux<Book>): Flux<RecordMetadata>
5.4 Injecting Kafka Producer Beans
If you need maximum flexibility and don’t want to use the @KafkaClient support you can use the @KafkaClient
annotation as qualifier for dependency injection of KafkaProducer instances.
Consider the following example:
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.concurrent.Future;
@Singleton
public class BookSender {
private final Producer<String, Book> kafkaProducer;
public BookSender(@KafkaClient("book-producer") Producer<String, Book> kafkaProducer) { // (1)
this.kafkaProducer = kafkaProducer;
}
public Future<RecordMetadata> send(String author, Book book) {
return kafkaProducer.send(new ProducerRecord<>("books", author, book)); // (2)
}
}
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import java.util.concurrent.Future
@Singleton
class BookSender {
private final Producer<String, Book> kafkaProducer
BookSender(@KafkaClient('book-producer') Producer<String, Book> kafkaProducer) { // (1)
this.kafkaProducer = kafkaProducer
}
Future<RecordMetadata> send(String author, Book book) {
kafkaProducer.send(new ProducerRecord<>('books', author, book)) // (2)
}
}
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import java.util.concurrent.Future
@Requires(property = "spec.name", value = "BookSenderTest")
@Singleton
class BookSender(
@param:KafkaClient("book-producer") private val kafkaProducer: Producer<String, Book>) { // (1)
fun send(author: String, book: Book): Future<RecordMetadata> {
return kafkaProducer.send(ProducerRecord("books", author, book)) // (2)
}
}
1 | The Producer is dependency injected into the constructor. If not specified in configuration, the key and value serializer are inferred from the generic type arguments. |
2 | The Producer is used to send records |
Note that there is no need to call the close()
method to shut down the KafkaProducer
, it is fully managed by Micronaut and will be shutdown when the application shuts down.
The previous example can be tested in JUnit with the following test:
@Test
void testBookSender() {
try (ApplicationContext ctx = ApplicationContext.run( // (1)
Map.of("kafka.enabled", "true", "spec.name", "BookSenderTest")
)) {
BookSender bookSender = ctx.getBean(BookSender.class); // (2)
Book book = new Book("The Stand");
Future<RecordMetadata> stephenKing = bookSender.send("Stephen King", book);
assertDoesNotThrow(() -> {
RecordMetadata recordMetadata = stephenKing.get();
assertEquals("books", recordMetadata.topic());
});
}
}
void "test Book Sender"() {
given:
ApplicationContext ctx = ApplicationContext.run( // (1)
'kafka.enabled': true, 'spec.name': 'BookSenderTest'
)
BookSender bookSender = ctx.getBean(BookSender) // (2)
Book book = new Book('The Stand')
when:
bookSender.send('Stephen King', book)
Future<RecordMetadata> stephenKing = bookSender.send('Stephen King', book);
def recordMetadata = stephenKing.get();
then:
noExceptionThrown()
recordMetadata.topic() == 'books'
cleanup:
ctx.close()
}
@Test
fun testBookSender() {
ApplicationContext.run(mapOf( // (1)
"kafka.enabled" to StringUtils.TRUE, "spec.name" to "BookSenderTest")).use { ctx ->
val bookSender = ctx.getBean(BookSender::class.java) // (2)
val book = Book("The Stand")
bookSender.send("Stephen King", book)
val stephenKing = bookSender.send("Stephen King", book)
Assertions.assertDoesNotThrow {
val recordMetadata = stephenKing.get()
Assertions.assertEquals("books", recordMetadata.topic())
}
}
1 | A Kafka docker container is used |
2 | The BookSender is retrieved from the ApplicationContext and a ProducerRecord sent |
By using the KafkaProducer API directly you open up even more options if you require transactions (exactly-once delivery) or want control over when records are flushed etc.
5.5 Transactions
Transaction processing can be enabled by defining transactionalId
on @KafkaClient
, which will initialize the producer for transactional usage and wrap any send operation with a transaction demarcation.
@KafkaClient(id = "my-client", transactionalId = "my-tx-id")
public interface TransactionalClient {
// define client API
}
@KafkaClient(id = "my-client", transactionalId = "my-tx-id")
interface TransactionalClient {
// define client API
}
@KafkaClient(id = "my-client", transactionalId = "my-tx-id")
interface TransactionalClient {
// define client API
}
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(...);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
Random transactional ID
Random transactional ID
Random transactional ID
|
5.6 Running Kafka while testing and developing
Micronaut Test Resources simplifies running Kafka for local development and testing.
Micronaut Test Resources Kafka support will automatically start a Kafka container and provide the value of the
kafka.bootstrap.servers
property.
Micronaut Launch and CLI already apply Test Resources to your build when you select the kafka
feature.
Micronaut Test Resources uses Test Containers under the hood. If you prefer to use Test Containers directly, you can create a Singleton Container and combine it with Micronaut Test TestPropertyProvider
:
package io.micronaut.kafka.docs;
import io.micronaut.test.support.TestPropertyProvider;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import java.util.Collections;
import java.util.Map;
/**
* @see <a href="https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers">Singleton containers</a>
*/
public abstract class AbstractKafkaTest implements TestPropertyProvider {
static protected final KafkaContainer MY_KAFKA = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:latest")
);
@Override
public Map<String, String> getProperties() {
if (!MY_KAFKA.isRunning()) {
MY_KAFKA.start();
}
return Collections.singletonMap(
"kafka.bootstrap.servers", MY_KAFKA.getBootstrapServers()
);
}
}
package io.micronaut.kafka.docs
import io.micronaut.test.support.TestPropertyProvider
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
abstract class AbstractKafkaTest extends Specification implements TestPropertyProvider {
@Shared
@AutoCleanup
KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))
@Override
Map<String, String> getProperties() {
kafkaContainer.start()
["kafka.bootstrap.servers": kafkaContainer.getBootstrapServers()]
}
}
package io.micronaut.kafka.docs
import io.micronaut.test.support.TestPropertyProvider
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName
/**
* @see <a href="https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers">Singleton containers</a>
*/
abstract class AbstractKafkaTest : TestPropertyProvider {
companion object {
var MY_KAFKA: KafkaContainer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))
}
override fun getProperties(): MutableMap<String, String> {
if(!MY_KAFKA.isRunning) {
MY_KAFKA.start()
}
return mutableMapOf(
"kafka.bootstrap.servers" to MY_KAFKA.bootstrapServers
)
}
}
And then test:
package io.micronaut.kafka.docs;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.OffsetReset;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Requires;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
@Property(name = "spec.name", value = "MyTest")
@MicronautTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyTest extends AbstractKafkaTest {
@Test
void testKafkaRunning(MyProducer producer, MyConsumer consumer) {
final String message = "hello";
producer.produce(message);
await().atMost(5, SECONDS).until(() -> message.equals(consumer.consumed));
}
@Requires(property = "spec.name", value = "MyTest")
@KafkaClient
interface MyProducer {
@Topic("my-topic")
void produce(String message);
}
@Requires(property = "spec.name", value = "MyTest")
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
static class MyConsumer {
String consumed;
@Topic("my-topic")
public void consume(String message) {
consumed = message;
}
}
}
package io.micronaut.kafka.docs
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import spock.util.concurrent.PollingConditions
@MicronautTest
@Property(name = "spec.name", value = "MyTest")
class MyTest extends AbstractKafkaTest {
@Inject
MyProducer producer
@Inject
MyConsumer consumer
PollingConditions conditions = new PollingConditions()
void "test kafka running"() {
given:
String message = "hello"
when:
producer.produce(message)
then:
conditions.within(5) {
consumer.consumed == message
}
}
@Requires(property = "spec.name", value = "MyTest")
@KafkaClient
static interface MyProducer {
@Topic("my-topic")
void produce(String message)
}
@Requires(property = "spec.name", value = "MyTest")
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
static class MyConsumer {
String consumed
@Topic("my-topic")
void consume(String message) {
consumed = message
}
}
}
package io.micronaut.kafka.docs
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import org.awaitility.Awaitility.await
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import java.util.concurrent.TimeUnit
@Property(name = "spec.name", value = "MyTest")
@MicronautTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
internal class MyTest : AbstractKafkaTest() {
@Test
fun testKafkaRunning(producer: MyProducer, consumer: MyConsumer) {
val message = "hello"
producer.produce(message)
await().atMost(5, TimeUnit.SECONDS).until { consumer.consumed == message }
}
@Requires(property = "spec.name", value = "MyTest")
@KafkaClient
interface MyProducer {
@Topic("my-topic")
fun produce(message: String)
}
@Requires(property = "spec.name", value = "MyTest")
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class MyConsumer {
var consumed: String? = null
@Topic("my-topic")
fun consume(message: String) {
consumed = message
}
}
}
6 Kafka Consumers Using @KafkaListener
The quick start section presented a trivial example of what is possible with the @KafkaListener annotation.
Using the @KafkaListener
annotation Micronaut will build a KafkaConsumer and start the poll
loop by running the KafkaConsumer
in a special consumer
thread pool. You can configure the size of the thread pool based on the number of consumers in your application in application.yml
as desired:
consumer
thread poolmicronaut.executors.consumer.type=fixed
micronaut.executors.consumer.nThreads=25
micronaut:
executors:
consumer:
type: fixed
nThreads: 25
[micronaut]
[micronaut.executors]
[micronaut.executors.consumer]
type="fixed"
nThreads=25
micronaut {
executors {
consumer {
type = "fixed"
nThreads = 25
}
}
}
{
micronaut {
executors {
consumer {
type = "fixed"
nThreads = 25
}
}
}
}
{
"micronaut": {
"executors": {
"consumer": {
"type": "fixed",
"nThreads": 25
}
}
}
}
KafkaConsumer
instances are single threaded, hence for each @KafkaListener
method you define a new thread is created to execute the poll
loop.
You may wish to scale the number of consumers you have listening on a particular topic. There are several ways you may achieve this. You could for example run multiple instances of your application each containing a single consumer in each JVM.
Alternatively, you can also scale via threads. By setting the number of threads
a particular consumer bean will create:
@KafkaListener(groupId = "myGroup", threads = 10)
@KafkaListener(groupId='myGroup', threads = 10)
@KafkaListener(groupId = "myGroup", threads = 10)
The above example will create 10 KafkaConsumer instances, each running in a unique thread and participating in the myGroup
consumer group.
@KafkaListener beans are by default singleton. When using multiple threads you must either synchronize access to local state or declare the bean as @Prototype .
|
You can also make your number of threads configurable by using threadsValue
:
@KafkaListener(groupId = "myGroup", threadsValue = "${my.thread.count}")
@KafkaListener(groupId = 'myGroup', threadsValue = '${my.thread.count}')
@KafkaListener(groupId = "myGroup", threadsValue = "\${my.thread.count}")
threads will be overridden by threadsValue if they are both set.
|
By default Micronaut will inspect the method signature of the method annotated with @Topic
that will listen for ConsumerRecord
instances and from the types infer an appropriate key and value Deserializer.
kafka.consumers.default.allow.auto.create.topics=true
kafka.consumers.product.bootstrap.servers=localhost:9098
kafka:
consumers:
default:
allow.auto.create.topics: true
product:
bootstrap:
servers: localhost:9098
[kafka]
[kafka.consumers]
[kafka.consumers.default]
"allow.auto.create.topics"=true
[kafka.consumers.product]
[kafka.consumers.product.bootstrap]
servers="localhost:9098"
kafka {
consumers {
'default' {
allow.auto.create.topics = true
}
product {
bootstrap {
servers = "localhost:9098"
}
}
}
}
{
kafka {
consumers {
default {
"allow.auto.create.topics" = true
}
product {
bootstrap {
servers = "localhost:9098"
}
}
}
}
}
{
"kafka": {
"consumers": {
"default": {
"allow.auto.create.topics": true
},
"product": {
"bootstrap": {
"servers": "localhost:9098"
}
}
}
}
}
Any property in the ConsumerConfig class can be set for all @KafkaListener
beans based on the . The above example will enable the consumer to create a topic if it doesn’t exist for the default
(@KafkaListener
) client and set a custom bootstrap server for the product
client (@KafkaListener(value = "product")
)
See the guide for Testing Kafka Listener using Testcontainers with the Micronaut Framework to learn more. |
6.1 Defining @KafkaListener Methods
The @KafkaListener annotation examples up until now have been relatively trivial, but Micronaut offers a lot of flexibility when it comes to the types of method signatures you can define.
The following sections detail examples of supported use cases.
Specifying Topics
The @Topic annotation can be used at the method or the class level to specify which topics to be listened for.
Care needs to be taken when using @Topic at the class level because every public method of the class annotated with @KafkaListener will become a Kafka consumer, which may be undesirable.
You can make the topic name configurable using a placeholder: @Topic("${my.topic.name:myTopic}")
|
You can specify multiple topics to listen for:
@Topic({"fun-products", "awesome-products"})
public void receiveMultiTopics(@KafkaKey String brand, String name) {
LOG.info("Got Product - {} by {}", name, brand);
}
@Topic(['fun-products', 'awesome-products'])
void receiveMultiTopics(@KafkaKey String brand, String name) {
log.info("Got Product - {} by {}", name, brand)
}
@Topic("fun-products", "awesome-products")
fun receiveMultiTopics(@KafkaKey brand: String, name: String) {
LOG.info("Got Product - {} by {}", name, brand)
}
You can also specify one or many regular expressions to listen for:
@Topic(patterns="products-\\w+")
public void receivePatternTopics(@KafkaKey String brand, String name) {
LOG.info("Got Product - {} by {}", name, brand);
}
@Topic(patterns='products-\\w+')
void receivePatternTopics(@KafkaKey String brand, String name) {
log.info("Got Product - {} by {}", name, brand)
}
@Topic(patterns = ["products-\\w+"])
fun receivePatternTopics(@KafkaKey brand: String, name: String) {
LOG.info("Got Product - {} by {}", name, brand)
}
Available Annotations
There are a number of annotations available that allow you to specify how a method argument is bound.
The following table summarizes the annotations and their purpose, with an example:
Annotation | Description | Example |
---|---|---|
Allows explicitly indicating the body of the message |
|
|
Allows binding a parameter to a message header |
|
|
Allows binding a parameter to the message key |
|
|
Allows binding a parameter to the partition the message was received from |
|
For example, you can use the @MessageHeader annotation to bind a parameter value from a header contained within a ConsumerRecord
.
Topics, Partitions and Offsets
If you want a reference to the topic, partition or offset it is a simple matter of defining a parameter for each.
The following table summarizes example parameters and how they related to the ConsumerRecord
being processed:
Parameter | Description |
---|---|
|
The name of the topic |
|
The offset of the |
|
The partition of the |
|
The timestamp of the |
As an example, following listener method will receive all of the above mentioned parameters:
@Topic("awesome-products")
public void receive(@KafkaKey String brand, // (1)
Product product, // (2)
long offset, // (3)
int partition, // (4)
String topic, // (5)
long timestamp) { // (6)
LOG.info("Got Product - {} by {}",product.name(), brand);
}
@Topic('awesome-products')
void receive(@KafkaKey String brand, // (1)
Product product, // (2)
long offset, // (3)
int partition, // (4)
String topic, // (5)
long timestamp) { // (6)
log.info("Got Product - {} by {}", product.name, brand)
}
@Topic("awesome-products")
fun receive(
@KafkaKey brand: String, // (1)
product: Product, // (2)
offset: Long, // (3)
partition: Int, // (4)
topic: String?, // (5)
timestamp: Long // (6)
) {
LOG.info("Got Product - {} by {}", product.name, brand)
}
1 | The Kafka key |
2 | The message body |
3 | The offset of the ConsumerRecord |
4 | The partition of the ConsumerRecord |
5 | The topic. Note that the @Topic annotation supports multiple topics. |
6 | The timestamp of the ConsumerRecord |
Receiving a ConsumerRecord
If you prefer you can also receive the entire ConsumerRecord
object being listened for. In this case you should specify appropriate generic types for the key and value of the ConsumerRecord
so that Micronaut can pick the correct deserializer for each.
Consider the following example:
@Topic("awesome-products")
public void receive(ConsumerRecord<String, Product> record) { // (1)
Product product = record.value(); // (2)
String brand = record.key(); // (3)
LOG.info("Got Product - {} by {}",product.name(), brand);
}
@Topic("awesome-products")
public void receive(ConsumerRecord<String, Product> record) { // (1)
Product product = record.value() // (2)
String brand = record.key() // (3)
log.info("Got Product - {} by {}", product.name, brand)
}
@Topic("awesome-products")
fun receive(record: ConsumerRecord<String, Product>) { // (1)
val name = record.value() // (2)
val brand = record.key() // (3)
LOG.info("Got Product - $name by $brand")
}
1 | The method signature accepts a ConsumerRecord that specifies a String for the key type and a POJO (Product ) for the value type. |
2 | The value() method is used to retrieve the value |
3 | The key() method is used to retrieve the key |
Receiving and returning Reactive Types
In addition to common Java types and POJOs you can also define listener methods that receive a Reactive type such as a Single or a Reactor Mono
.
Add the library Micronaut Reactor or Micronaut RxJava 3 to your application’s dependencies.
For example, using Reactor:
@Topic("reactive-products")
public Mono<Product> receive(@KafkaKey String brand, // (1)
Mono<Product> productPublisher) { // (2)
return productPublisher.doOnSuccess((product) ->
LOG.info("Got Product - {} by {}", product.name(), brand) // (3)
);
}
@Topic('reactive-products')
Mono<Product> receive(@KafkaKey String brand, // (1)
Mono<Product> productPublisher) { // (2)
return productPublisher.doOnSuccess(product ->
log.info("Got Product - {} by {}", product.name, brand) // (3)
)
}
@Topic("reactive-products")
fun receive(@KafkaKey brand: String, // (1)
product: Mono<Product>): Mono<Product> { // (2)
return product.doOnSuccess { (name): Product ->
LOG.info("Got Product - {} by {}", name, brand) // (3)
}
}
1 | The @KafkaKey annotation is used to indicate the key |
2 | A Mono is used to receive the message body |
3 | The doOnSuccess method is used to process the result |
Note that in this case the method returns a Mono
that indicates to Micronaut the poll
loop should continue, and if enable.auto.commit
is set to true
(the default) the offsets will be committed, potentially before the doOnSuccess
is called.
The idea here is that you are able to write consumers that don’t block, however care must be taken in the case where an error occurs in the doOnSuccess
method otherwise the message could be lost. You could for example re-deliver the message in case of an error.
Alternatively, you can use the @Blocking annotation to tell Micronaut to subscribe to the returned reactive type in a blocking manner which will result in blocking the poll
loop, preventing offsets from being committed automatically:
@Blocking
@Topic("reactive-products")
public Mono<Product> receiveBlocking(@KafkaKey String brand, Mono<Product> productPublisher) {
return productPublisher.doOnSuccess((product) ->
LOG.info("Got Product - {} by {}", product.name(), brand)
);
}
@Blocking
@Topic('reactive-products')
Mono<Product> receiveBlocking(@KafkaKey String brand, Mono<Product> productPublisher) {
return productPublisher.doOnSuccess(product ->
log.info("Got Product - {} by {}", product.name, brand)
)
}
@Blocking
@Topic("reactive-products")
fun receiveBlocking(@KafkaKey brand: String, product: Mono<Product>): Mono<Product> {
return product.doOnSuccess { (name): Product ->
LOG.info("Got Product - {} by {}", name, brand)
}
}
6.2 Configuring @KafkaListener beans
@KafkaListener and Consumer Groups
Kafka consumers created with @KafkaListener
will by default run within a consumer group that is the value of micronaut.application.name
unless you explicitly specify a value to the @KafkaListener
annotation. For example:
@KafkaListener("myGroup")
@KafkaListener('myGroup')
@KafkaListener("myGroup")
or
@KafkaListener(groupId = "myGroup")
@KafkaListener(groupId = 'myGroup')
@KafkaListener(groupId = "myGroup")
The above examples will run the consumer within a consumer group called myGroup
.
In this case, each record will be consumed by one consumer instance of the consumer group.
You can make the consumer group configurable using a placeholder: @KafkaListener("${my.consumer.group:myGroup}")
|
To allow the records to be consumed by all the consumer instances (each instance will be part of a unique consumer group), uniqueGroupId
can be set to true
:
@KafkaListener(groupId = "myGroup", uniqueGroupId = true)
@KafkaListener(groupId = 'myGroup', uniqueGroupId = true)
@KafkaListener(groupId = "myGroup", uniqueGroupId = true)
For more information, see for example https://kafka.apache.org/intro#intro_consumers
@KafkaListener and Consumer Properties
There are a number of ways to pass configuration properties to the KafkaConsumer
. You can set default consumer properties using kafka.consumers.default
in application.yml
:
kafka.consumers.default.session.timeout.ms=30000
kafka:
consumers:
default:
session:
timeout:
ms: 30000
[kafka]
[kafka.consumers]
[kafka.consumers.default]
[kafka.consumers.default.session]
[kafka.consumers.default.session.timeout]
ms=30000
kafka {
consumers {
'default' {
session {
timeout {
ms = 30000
}
}
}
}
}
{
kafka {
consumers {
default {
session {
timeout {
ms = 30000
}
}
}
}
}
}
{
"kafka": {
"consumers": {
"default": {
"session": {
"timeout": {
"ms": 30000
}
}
}
}
}
}
The above example will set the default session.timeout.ms
that Kafka uses to decide whether a consumer is alive or not and applies it to all created KafkaConsumer
instances.
You can also provide configuration specific to a consumer group. For example consider the following configuration:
kafka.consumers.myGroup.session.timeout.ms=30000
kafka:
consumers:
myGroup:
session:
timeout:
ms: 30000
[kafka]
[kafka.consumers]
[kafka.consumers.myGroup]
[kafka.consumers.myGroup.session]
[kafka.consumers.myGroup.session.timeout]
ms=30000
kafka {
consumers {
myGroup {
session {
timeout {
ms = 30000
}
}
}
}
}
{
kafka {
consumers {
myGroup {
session {
timeout {
ms = 30000
}
}
}
}
}
}
{
"kafka": {
"consumers": {
"myGroup": {
"session": {
"timeout": {
"ms": 30000
}
}
}
}
}
}
The above configuration will pass properties to only the @KafkaListener
beans that apply to the consumer group myGroup
.
Finally, the @KafkaListener annotation itself provides a properties
member that you can use to set consumer specific properties:
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Requires;
import io.micronaut.kafka.docs.Product;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;
@KafkaListener(
groupId = "products",
pollTimeout = "500ms",
properties = @Property(name = ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, value = "10000")
)
public class ProductListener {
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
@KafkaListener(
groupId = 'products',
pollTimeout = '500ms',
properties = @Property(name = ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, value = '10000')
)
class ProductListener {
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product
import io.micronaut.kafka.docs.consumer.topics.ProductListener
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
@KafkaListener(
groupId = "products",
pollTimeout = "500ms",
properties = [Property(name = ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, value = "10000")]
)
class ProductListener {
@KafkaListener and Deserializers
As mentioned previously when defining @KafkaListener
methods, Micronaut will attempt to pick an appropriate deserializer for the method signature. This is done via the CompositeSerdeRegistry bean.
You can replace the default SerdeRegistry bean with your own implementation by defining a bean that uses @Replaces(CompositeSerdeRegistry.class) . See the section on Bean Replacement.
|
All common java.lang
types (String
, Integer
, primitives etc.) are supported and for POJOs by default a Jackson based JSON deserializer is used.
You can, however, explicitly override the Deserializer
used by providing the appropriate configuration in application.yml
:
kafka.consumers.myGroup.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka:
consumers:
myGroup:
value:
deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
[kafka]
[kafka.consumers]
[kafka.consumers.myGroup]
[kafka.consumers.myGroup.value]
deserializer="org.apache.kafka.common.serialization.ByteArrayDeserializer"
kafka {
consumers {
myGroup {
value {
deserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
}
}
}
{
kafka {
consumers {
myGroup {
value {
deserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
}
}
}
}
{
"kafka": {
"consumers": {
"myGroup": {
"value": {
"deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
}
}
}
}
You may want to do this if for example you choose an alternative deserialization format such as Avro or Protobuf.
Transactional properties
There are a few options that can be enabled for only in the transactional processing:
Isolation
Use isolation
member to define if you want to receive messages that haven’t been committed yet.
Custom offset strategy
There is a special offset strategy OffsetStrategy.SEND_TO_TRANSACTION
that can only be used with an associated producer, only applicable when SendTo
is used.
6.3 Commiting Kafka Offsets
Automatically Committing Offsets
The way offsets are handled by a @KafkaListener bean is defined by the OffsetStrategy enum.
The following table summarizes the enum values and behaviour:
Value | Description |
---|---|
Automatically commit offsets. Sets |
|
Disables automatically committing offsets. Sets |
|
Commits offsets manually at the end of each |
|
Asynchronously commits offsets manually at the end of each |
|
Commits offsets manually after each |
|
Commits offsets asynchronously after each |
|
Only available when the transactional producer is enabled for |
Depending on the your level of paranoia or durability requirements you can choose to tune how and when offsets are committed.
Manually Committing Offsets
If you set the OffsetStrategy
to DISABLED it becomes your responsibility to commit offsets.
There are a couple of ways that can be achieved.
The simplest way is to define an argument of type Acknowledge and call the ack()
method to commit offsets synchronously:
ack()
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED) // (1)
public class ProductListener {
@Topic("awesome-products")
public void receive(Product product, Acknowledgement acknowledgement) { // (2)
// process product record
acknowledgement.ack(); // (3)
}
}
ack()
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED) // (1)
class ProductListener {
@Topic('awesome-products')
void receive(Product product, Acknowledgement acknowledgement) { // (2)
// process product record
acknowledgement.ack() // (3)
}
}
ack()
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED) // (1)
class ProductListener {
@Topic("awesome-products")
fun receive(product: Product, acknowledgement: Acknowledgement) { // (2)
// process product record
acknowledgement.ack() // (3)
}
}
1 | Committing offsets automatically is disabled |
2 | The listener method specifies a parameter of type Acknowledge |
3 | The ack() method is called once the record has been processed |
Alternatively, you an supply a KafkaConsumer
method argument and then call commitSync
(or commitAsync
) yourself when you are ready to commit offsets:
KafkaConsumer
APIimport io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import io.micronaut.kafka.docs.Product;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED;
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED) // (1)
public class ProductListener {
@Topic("awesome-products")
public void receive(Product product, long offset, int partition, String topic, Consumer kafkaConsumer) { // (2)
// process product record
// commit offsets
kafkaConsumer.commitSync(Collections.singletonMap( // (3)
new TopicPartition(topic, partition),
new OffsetAndMetadata(offset + 1, "my metadata")
));
}
}
KafkaConsumer
APIimport io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
import static io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED) // (1)
class ProductListener {
@Topic('awesome-products')
void receive(Product product, long offset, int partition, String topic, Consumer kafkaConsumer) { // (2)
// process product record
// commit offsets
kafkaConsumer.commitSync(Collections.singletonMap( // (3)
new TopicPartition(topic, partition),
new OffsetAndMetadata(offset + 1, 'my metadata')
))
}
}
KafkaConsumer
APIimport io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import java.util.*
import io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
import io.micronaut.configuration.kafka.annotation.OffsetStrategy.DISABLED
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED) // (1)
class ProductListener {
@Topic("awesome-products")
fun receive(product: Product, offset: Long, partition: Int, topic: String, kafkaConsumer: Consumer<*, *>) { // (2)
// process product record
// commit offsets
kafkaConsumer.commitSync(Collections.singletonMap( // (3)
TopicPartition(topic, partition),
OffsetAndMetadata(offset + 1, "my metadata")
)
)
}
}
1 | Committing offsets automatically is disabled |
2 | The listener method specifies that it receives the offset data and a KafkaConsumer |
3 | The commitSync() method is called once the record has been processed |
6.4 Assigning Kafka Offsets
6.4.1 Manually Assigning Offsets to a Consumer Bean
Sometimes you may wish to control exactly the position you wish to resume consuming messages from.
For example if you store offsets in a database you may wish to read the offsets from the database when the consumer starts and start reading from the position stored in the database.
To support this use case your consumer bean can implement the ConsumerSeekAware interface:
ConsumerSeekAware
APIpackage io.micronaut.kafka.docs.seek.aware;
import io.micronaut.configuration.kafka.ConsumerSeekAware;
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.configuration.kafka.seek.*;
import io.micronaut.context.annotation.Requires;
import io.micronaut.kafka.docs.Product;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
@KafkaListener
@Requires(property = "spec.name", value = "ConsumerSeekAwareTest")
public class ProductListener implements ConsumerSeekAware { // (1)
List<Product> processed = new ArrayList<>();
public ProductListener(ProductListenerConfiguration config) {
// ...
}
@Topic("wonderful-products")
void receive(Product product) {
processed.add(product);
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // (2)
// save offsets here
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions, KafkaSeeker seeker) { // (3)
// seek to offset here
partitions.stream().map(tp -> KafkaSeekOperation.seek(tp, 1)).forEach(seeker::perform);
}
}
ConsumerSeekAware
APIpackage io.micronaut.kafka.docs.seek.aware
import io.micronaut.configuration.kafka.ConsumerSeekAware
import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.configuration.kafka.seek.*
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product
import org.apache.kafka.common.TopicPartition
@KafkaListener
@Requires(property = "spec.name", value = "ConsumerSeekAwareSpec")
class ProductListener implements ConsumerSeekAware { // (1)
List<Product> processed = []
ProductListener(ProductListenerConfiguration config) {
// ...
}
@Topic("wonderful-products")
void receive(Product product) {
processed << product
}
@Override
void onPartitionsRevoked(Collection<TopicPartition> partitions) { // (2)
// save offsets here
}
@Override
void onPartitionsAssigned(Collection<TopicPartition> partitions, KafkaSeeker seeker) { // (3)
// seek to offset here
partitions.collect { KafkaSeekOperation.seek(it, 1) }.each(seeker.&perform)
}
}
ConsumerSeekAware
APIpackage io.micronaut.kafka.docs.seek.aware
import io.micronaut.configuration.kafka.ConsumerSeekAware
import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.configuration.kafka.seek.*
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product
import org.apache.kafka.common.TopicPartition
@KafkaListener
@Requires(property = "spec.name", value = "ConsumerSeekAwareTest")
class ProductListener constructor(config: ProductListenerConfiguration) : ConsumerSeekAware { // (1)
var processed: MutableList<Product> = mutableListOf()
@Topic("wonderful-products")
fun receive(product: Product) {
processed.add(product)
}
override fun onPartitionsRevoked(partitions: Collection<TopicPartition>) { // (2)
// save offsets here
}
override fun onPartitionsAssigned(partitions: Collection<TopicPartition>, seeker: KafkaSeeker) { // (3)
// seek to offset here
partitions.stream().map { tp -> KafkaSeekOperation.seek(tp, 1) }.forEach(seeker::perform)
}
}
1 | Implement the interface ConsumerSeekAware |
2 | The onPartitionsRevoked can be used to save offsets |
3 | The onPartitionsAssigned can use used to read offsets and seek to a specific position. In this trivial example we just seek to the offset 1 (skipping the first record). |
ConsumerSeekAware provides a convenient KafkaSeeker object that can be used to perform KafkaSeekOperations immediately on the underlying consumer. |
Alternatively, when more fine-grained access to the Kafka consumer is required, your consumer bean can instead implement the ConsumerRebalanceListener and ConsumerAware interfaces:
KafkaConsumer
APIpackage io.micronaut.kafka.docs.seek.rebalance;
import io.micronaut.configuration.kafka.ConsumerAware;
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.context.annotation.Requires;
import io.micronaut.kafka.docs.Product;
import io.micronaut.core.annotation.NonNull;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
@Requires(property = "spec.name", value = "ConsumerRebalanceListenerTest")
public class ProductListener implements ConsumerRebalanceListener, ConsumerAware {
List<Product> processed = new ArrayList<>();
private Consumer consumer;
public ProductListener(ProductListenerConfiguration config) {
// ...
}
@Override
public void setKafkaConsumer(@NonNull Consumer consumer) { // (1)
this.consumer = consumer;
}
@Topic("fantastic-products")
void receive(Product product) {
processed.add(product);
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // (2)
// save offsets here
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // (3)
// seek to offset here
for (TopicPartition partition : partitions) {
consumer.seek(partition, 1);
}
}
}
KafkaConsumer
APIpackage io.micronaut.kafka.docs.seek.rebalance
import io.micronaut.configuration.kafka.ConsumerAware
import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.context.annotation.Requires
import io.micronaut.core.annotation.NonNull
import io.micronaut.kafka.docs.Product
import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.TopicPartition
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
@Requires(property = "spec.name", value = "ConsumerRebalanceListenerSpec")
class ProductListener implements ConsumerRebalanceListener, ConsumerAware {
List<Product> processed = []
private Consumer consumer
ProductListener(ProductListenerConfiguration config) {
// ...
}
@Override
void setKafkaConsumer(@NonNull Consumer consumer) { // (1)
this.consumer = consumer
}
@Topic("fantastic-products")
void receive(Product product) {
processed << product
}
@Override
void onPartitionsRevoked(Collection<TopicPartition> partitions) { // (2)
// save offsets here
}
@Override
void onPartitionsAssigned(Collection<TopicPartition> partitions) { // (3)
// seek to offset here
for (TopicPartition partition : partitions) {
consumer.seek(partition, 1)
}
}
}
KafkaConsumer
APIpackage io.micronaut.kafka.docs.seek.rebalance
import io.micronaut.configuration.kafka.ConsumerAware
import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.context.annotation.Requires
import io.micronaut.kafka.docs.Product
import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.TopicPartition
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
@Requires(property = "spec.name", value = "ConsumerRebalanceListenerTest")
class ProductListener constructor(config: ProductListenerConfiguration) : ConsumerRebalanceListener, ConsumerAware<Any?, Any?> {
var processed: MutableList<Product> = mutableListOf()
private var consumer: Consumer<*, *>? = null
override fun setKafkaConsumer(consumer: Consumer<Any?, Any?>?) { // (1)
this.consumer = consumer
}
@Topic("fantastic-products")
fun receive(product: Product) {
processed.add(product)
}
override fun onPartitionsRevoked(partitions: Collection<TopicPartition>) { // (2)
// save offsets here
}
override fun onPartitionsAssigned(partitions: Collection<TopicPartition>) { // (3)
// seek to offset here
for (partition in partitions) {
consumer!!.seek(partition, 1)
}
}
}
1 | The setKafkaConsumer of the ConsumerAware allows access to the underlying consumer |
2 | The onPartitionsRevoked can be used to save offsets |
3 | The onPartitionsAssigned can use used to read offsets and seek to a specific position. In this trivial example we just seek to the offset 1 (skipping the first record). |
6.4.2 Manual Offsets with Multiple Topics
It is possible for a single @KafkaListener
bean to represent multiple consumers. If you have more than one method annotated with @Topic
then setKafkaConsumer
will be called multiple times for each backing consumer.
It is recommended in the case of manually seeking offsets that you use a single listener bean per consumer, the alternative is to store an internal Set
of all consumers associated with a particular listener and manually search for the correct listener in the onPartitionsAssigned
using the partition assignment data.
Not doing so will lead to a ConcurrentModificationException error.
|
6.4.3 Manually Assigning Offsets from a Consumer Method
There may be some scenarios where you realize you need to seek
to a different offset while consuming another one.
To support this use case, your consumer method can receive a KafkaSeekOperations instance as a parameter:
package io.micronaut.kafka.docs.seek.ops;
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.configuration.kafka.seek.*;
import io.micronaut.context.annotation.*;
import io.micronaut.kafka.docs.Product;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
@KafkaListener(offsetReset = OffsetReset.EARLIEST, properties = @Property(name = "max.poll.records", value = "1"))
@Requires(property = "spec.name", value = "KafkaSeekOperationsTest")
public class ProductListener {
List<Product> processed = new ArrayList<>();
public ProductListener(ProductListenerConfiguration config) {
// ...
}
@Topic("amazing-products")
void receive(Product product, KafkaSeekOperations ops) { // (1)
processed.add(product);
ops.defer(KafkaSeekOperation.seekToEnd(new TopicPartition("amazing-products", 0))); // (2)
}
}
package io.micronaut.kafka.docs.seek.ops
import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.configuration.kafka.seek.*
import io.micronaut.context.annotation.*
import io.micronaut.kafka.docs.Product
import org.apache.kafka.common.TopicPartition
@KafkaListener(offsetReset = OffsetReset.EARLIEST, properties = @Property(name = "max.poll.records", value = "1"))
@Requires(property = "spec.name", value = "KafkaSeekOperationsSpec")
class ProductListener {
List<Product> processed = []
ProductListener(ProductListenerConfiguration config) {
// ...
}
@Topic("amazing-products")
void receive(Product product, KafkaSeekOperations ops) { // (1)
processed << product
ops.defer(KafkaSeekOperation.seekToEnd(new TopicPartition("amazing-products", 0))); // (2)
}
}
package io.micronaut.kafka.docs.seek.ops
import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.configuration.kafka.seek.*
import io.micronaut.context.annotation.*
import io.micronaut.kafka.docs.Product
import org.apache.kafka.common.TopicPartition
@KafkaListener(offsetReset = OffsetReset.EARLIEST, properties = [Property(name = "max.poll.records", value = "1")])
@Requires(property = "spec.name", value = "KafkaSeekOperationsTest")
class ProductListener constructor(config: ProductListenerConfiguration) {
var processed: MutableList<Product> = mutableListOf()
@Topic("amazing-products")
fun receive(product: Product, ops: KafkaSeekOperations) { // (1)
processed.add(product)
ops.defer(KafkaSeekOperation.seekToBeginning(TopicPartition("amazing-products", 0))) // (2)
}
}
1 | An instance of KafkaSeekOperations will be injected to the method |
2 | Any number of seek operations can be deferred. In this trivial example we just seek to the end of the partition. |
The seek
operations will be performed by Micronaut automatically, when the consumer method completes successfully, possibly after committing offsets via OffsetStrategy.AUTO
.
These operations determine the next offset retrieved by poll . Take into account that, even if the seek operation performs successfully, your consumer method may keep receiving records that were cached by the previous call. You can configure max.poll.records to control the maximum number of records returned by a single call to poll .
|
6.4.4 Creating Kafka Seek Operations
The interface KafkaSeekOperation.KafkaSeekOperation provides several static methods to create seek
operations:
-
seek
: Creates an absolute seek operation. -
seekRelativeToBeginning
: Creates a seek operation relative to the beginning. -
seekToBeginning
: Creates a seek to the beginning operation. -
seekRelativeToEnd
: Creates a seek operation relative to the end. -
seekToEnd
: Creates a seek to the end operation. -
seekForward
: Creates a forward seek operation. -
seekBackward
: Creates a backward seek operation. -
seekToTimestamp
: Creates a seek to the timestamp operation.
6.5 Kafka Batch Processing
By default @KafkaListener listener methods will receive each ConsumerRecord one by one.
There may be cases where you prefer to receive all of the ConsumerRecord data from the ConsumerRecords holder object in one go.
To achieve this you can set the batch
member of the @KafkaListener to true
and specify a container type (typically List
) to receive all of the data:
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import java.util.List;
import static org.slf4j.LoggerFactory.getLogger;
@KafkaListener(batch = true) // (1)
public class BookListener {
@Topic("all-the-books")
public void receiveList(List<Book> books) { // (2)
for (Book book : books) {
LOG.info("Got Book = {}", book.title()); // (3)
}
}
}
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import reactor.core.publisher.Flux
@KafkaListener(batch = true) // (1)
@Slf4j
class BookListener {
@Topic("all-the-books")
void receiveList(List<Book> books) { // (2)
for (Book book : books) {
log.info("Got Book = {}", book.title) // (3)
}
}
}
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import org.slf4j.LoggerFactory.getLogger
import reactor.core.publisher.Flux
import java.util.*
@KafkaListener(batch = true) // (1)
class BookListener {
@Topic("all-the-books")
fun receiveList(books: List<Book>) { // (2)
for (book in books) {
LOG.info("Got Book = {}", book.title) // (3)
}
}
}
1 | The @KafkaListener annotation’s batch member is set to true |
2 | The method defines that it receives a list of Book instances |
3 | The method processes the entire batch |
Note in the previous case offsets will automatically be committed for the whole batch by default when the method returns without error.
Manually Committing Offsets with Batch
As with one by one message processing, if you set the OffsetStrategy
to DISABLED it becomes your responsibility to commit offsets.
If you want to commit the entire batch of offsets at once during the course of processing, then the simplest approach is to add an argument of type Acknowledgement and call the ack()
method to commit the batch of offsets synchronously:
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // (1)
@Topic("all-the-books")
public void receive(List<Book> books, Acknowledgement acknowledgement) { // (2)
//process the books
acknowledgement.ack(); // (3)
}
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // (1)
@Topic("all-the-books")
void receive(List<Book> books, Acknowledgement acknowledgement) { // (2)
//process the books
acknowledgement.ack() // (3)
}
@KafkaListener(offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED, batch = true) // (1)
@Topic("all-the-books")
fun receive(books: List<Book?>?, acknowledgement: Acknowledgement) { // (2)
//process the books
acknowledgement.ack() // (3)
}
1 | Committing offsets automatically is disabled |
2 | The listener method specifies a parameter of type Acknowledgement |
3 | The ack() method is called once the records have been processed |
You can also take more control of committing offsets when doing batch processing by specifying a method that receives the offsets in addition to the batch:
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // (1)
@Topic("all-the-books")
public void receive(List<ConsumerRecord<String, Book>> records, Consumer kafkaConsumer) { // (2)
for (int i = 0; i < records.size(); i++) {
ConsumerRecord<String, Book> record = records.get(i); // (3)
// process the book
Book book = record.value();
// commit offsets
String topic = record.topic();
int partition = record.partition();
long offset = record.offset(); // (4)
kafkaConsumer.commitSync(Collections.singletonMap( // (5)
new TopicPartition(topic, partition),
new OffsetAndMetadata(offset + 1, "my metadata")
));
}
}
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // (1)
@Topic("all-the-books")
void receive(List<ConsumerRecord<String, Book>> records, Consumer kafkaConsumer) { // (2)
for (int i = 0; i < records.size(); i++) {
ConsumerRecord<String, Book> record = records.get(i) // (3)
// process the book
Book book = record.value()
// commit offsets
String topic = record.topic()
int partition = record.partition()
long offset = record.offset() // (4)
kafkaConsumer.commitSync(Collections.singletonMap( // (5)
new TopicPartition(topic, partition),
new OffsetAndMetadata(offset + 1, "my metadata")
))
}
}
@KafkaListener(offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED, batch = true) // (1)
@Topic("all-the-books")
fun receive(records: List<ConsumerRecord<String?, Book?>>, kafkaConsumer: Consumer<*, *>) { // (2)
for (i in records.indices) {
val record = records[i] // (3)
// process the book
val book = record.value()
// commit offsets
val topic = record.topic()
val partition = record.partition()
val offset = record.offset() // (4)
kafkaConsumer.commitSync(
Collections.singletonMap( // (5)
TopicPartition(topic, partition),
OffsetAndMetadata(offset + 1, "my metadata")
)
)
}
}
1 | Committing offsets automatically is disabled |
2 | The method receives the batch of books as a list of consumer records |
3 | Each record is processed |
4 | The offset, partition and topic is read for the record |
5 | Offsets are committed |
This example is fairly trivial in that it commits offsets after processing each record in a batch, but you can for example commit after processing every 10, or every 100 or whatever makes sense for your application.
Receiving a ConsumerRecords
When batching you can receive the entire ConsumerRecords
object being listened for. In this case you should specify appropriate generic types for the key and value of the ConsumerRecords
so that Micronaut can pick the correct deserializer for each.
This is useful when the need is to process or commit the records by partition, as the ConsumerRecords
object already groups records by partition:
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // (1)
@Topic("all-the-books")
public void receiveConsumerRecords(ConsumerRecords<String, Book> consumerRecords, Consumer kafkaConsumer) { // (2)
for (TopicPartition partition : consumerRecords.partitions()) { // (3)
long offset = Long.MIN_VALUE;
// process partition records
for (ConsumerRecord<String, Book> record : consumerRecords.records(partition)) { // (4)
// process the book
Book book = record.value();
// keep last offset
offset = record.offset(); // (5)
}
// commit partition offset
kafkaConsumer.commitSync(Collections.singletonMap( // (6)
partition,
new OffsetAndMetadata(offset + 1, "my metadata")
));
}
}
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // (1)
@Topic("all-the-books")
void receiveConsumerRecords(ConsumerRecords<String, Book> consumerRecords, Consumer kafkaConsumer) { // (2)
for (TopicPartition partition : consumerRecords.partitions()) { // (3)
long offset = Long.MIN_VALUE;
// process partition records
for (ConsumerRecord<String, Book> record : consumerRecords.records(partition)) { // (4)
// process the book
Book book = record.value();
// keep last offset
offset = record.offset(); // (5)
}
// commit partition offset
kafkaConsumer.commitSync(Collections.singletonMap( // (6)
partition,
new OffsetAndMetadata(offset + 1, "my metadata")
));
}
}
@KafkaListener(offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED, batch = true) // (1)
@Topic("all-the-books")
fun receiveConsumerRecords(consumerRecords: ConsumerRecords<String?, Book?>, kafkaConsumer: Consumer<*, *>) { // (2)
for (partition in consumerRecords.partitions()) { // (3)
var offset = Long.MIN_VALUE
// process partition records
for (record in consumerRecords.records(partition)) { // (4)
// process the book
val book = record.value()
// keep last offset
offset = record.offset() // (5)
}
// commit partition offset
kafkaConsumer.commitSync(
Collections.singletonMap( // (6)
partition,
OffsetAndMetadata(offset + 1, "my metadata")
)
)
}
}
1 | Committing offsets automatically is disabled |
2 | The method receives the batch of books as a ConsumerRecords holder object |
3 | Each partition is iterated over |
4 | Each record for the partition is processed |
5 | The last read offset for the partition is stored |
6 | The offset is committed once for each partition |
Reactive Batch Processing
Batch listeners also support defining reactive types (Reactor Flux
or RxJava Flowable) as the method argument.
Add the library Micronaut Reactor or Micronaut RxJava 3 to your application’s dependencies.
In this case the method will be passed a reactive type that can be returned from the method allowing non-blocking processing of the batch:
@Topic("all-the-books")
public Flux<Book> receiveFlux(Flux<Book> books) {
return books.doOnNext(book ->
LOG.info("Got Book = {}", book.title())
);
}
@Topic("all-the-books")
Flux<Book> receiveFlux(Flux<Book> books) {
books.doOnNext(book ->
log.info("Got Book = {}", book.title)
)
}
@Topic("all-the-books")
fun receiveFlux(books: Flux<Book>): Flux<Book> {
return books.doOnNext { book: Book ->
LOG.info("Got Book = {}", book.title)
}
}
Remember that as with non batch processing, the reactive type will be subscribed to on a different thread and offsets will be committed automatically likely prior to the point when the reactive type is subscribed to.
This means that you should only use reactive processing if message durability is not a requirement and you may wish to implement message re-delivery upon failure.
6.6 Forwarding Messages with @SendTo
On any @KafkaListener
method that returns a value, you can use the @SendTo annotation to forward the return value to the topic or topics specified by the @SendTo
annotation.
The key of the original ConsumerRecord
will be used as the key when forwarding the message.
@Topic("sendto-products") // (1)
@SendTo("product-quantities") // (2)
public int receive(@KafkaKey String brand, Product product) {
LOG.info("Got Product - {} by {}", product.name(), brand);
return product.quantity(); // (3)
}
@Topic("sendto-products") // (1)
@SendTo("product-quantities") // (2)
int receive(@KafkaKey String brand, Product product) {
log.info("Got Product - {} by {}", product.name, brand)
product.quantity // (3)
}
@Topic("sendto-products") // (1)
@SendTo("product-quantities") // (2)
fun receive(@KafkaKey brand: String?, product: Product): Int {
LOG.info("Got Product - {} by {}", product.name, brand)
return product.quantity // (3)
}
1 | The topic subscribed to is awesome-products |
2 | The topic to send the result to is product-quantities |
3 | The return value is used to indicate the value to forward |
You can also do the same using Reactive programming:
@Topic("sendto-products") // (1)
@SendTo("product-quantities") // (2)
public Mono<Integer> receiveProduct(@KafkaKey String brand,
Mono<Product> productSingle) {
return productSingle.map(product -> {
LOG.info("Got Product - {} by {}", product.name(), brand);
return product.quantity(); // (3)
});
}
@Topic("sendto-products") // (1)
@SendTo("product-quantities") // (2)
Mono<Integer> receiveProduct(@KafkaKey String brand, Mono<Product> productSingle) {
productSingle.map(product -> {
log.info("Got Product - {} by {}", product.name, brand)
product.quantity // (3)
})
}
@Topic("sendto-products") // (1)
@SendTo("product-quantities") // (2)
fun receiveProduct(@KafkaKey brand: String?, productSingle: Mono<Product>): Mono<Int> {
return productSingle.map(Function<Product, Int> { product: Product ->
LOG.info("Got Product - {} by {}", product.name, brand)
product.quantity // (3)
})
}
1 | The topic subscribed to is awesome-products |
2 | The topic to send the result to is product-quantities |
3 | The return is mapped from the single to the value of the quantity |
In the reactive case the poll
loop will continue and will not wait for the record to be sent unless you specifically annotate the method with @Blocking.
To enable transactional sending of the messages you need to define producerTransactionalId
in @KafkaListener
.
@KafkaListener(
offsetReset = OffsetReset.EARLIEST,
producerClientId = "word-counter-producer", // (1)
producerTransactionalId = "tx-word-counter-id", // (2)
offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, // (3)
isolation = IsolationLevel.READ_COMMITTED // (4)
)
public class WordCounter {
@Topic("tx-incoming-strings")
@SendTo("my-words-count")
List<KafkaMessage<byte[], Integer>> wordsCounter(String string) {
return Stream.of(string.split("\\s+"))
.collect(Collectors.groupingBy(Function.identity(), Collectors.summingInt(i -> 1)))
.entrySet()
.stream()
.map(e -> KafkaMessage.Builder.<byte[], Integer>withBody(e.getValue()).key(e.getKey().getBytes()).build())
.toList();
}
}
@KafkaListener(
offsetReset = OffsetReset.EARLIEST,
producerClientId = 'word-counter-producer', // (1)
producerTransactionalId = 'tx-word-counter-id', // (2)
offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, // (3)
isolation = IsolationLevel.READ_COMMITTED // (4)
)
class WordCounter {
@Topic('tx-incoming-strings')
@SendTo('my-words-count')
List<KafkaMessage<byte[], Integer>> wordsCounter(String string) {
string.split("\\s+")
.groupBy()
.collect { key, instanceList ->
KafkaMessage.Builder.withBody(instanceList.size()).key(key.bytes).build()
}
}
}
@KafkaListener(
offsetReset = OffsetReset.EARLIEST,
producerClientId = "word-counter-producer", // (1)
producerTransactionalId = "tx-word-counter-id", // (2)
offsetStrategy = OffsetStrategy.SEND_TO_TRANSACTION, // (3)
isolation = IsolationLevel.READ_COMMITTED // (4)
)
class WordCounter {
@Topic("tx-incoming-strings")
@SendTo("my-words-count")
fun wordsCounter(string: String) = string
.split(Regex("\\s+"))
.groupBy { it }
.map { KafkaMessage.Builder.withBody<ByteArray, Int>(it.value.size).key(it.key.toByteArray()).build() }
}
1 | The id of the producer to load additional config properties |
2 | The transactional id that is required to enable transactional processing |
3 | Enable offset strategy to commit the offsets to the transaction |
4 | Consumer read messages isolation |
6.7 Handling Consumer Exceptions
Consumer error strategies
It’s possible to define a different error strategy for @KafkaListener using errorStrategy
attribute:
@KafkaListener(
value = "myGroup",
errorStrategy = @ErrorStrategy(
value = ErrorStrategyValue.RETRY_ON_ERROR,
retryDelay = "50ms",
retryCount = 3
)
)
@KafkaListener(
value = 'myGroup',
errorStrategy = @ErrorStrategy(
value = ErrorStrategyValue.RETRY_ON_ERROR,
retryDelay = '50ms',
retryCount = 3
)
)
@KafkaListener(
value = "myGroup",
errorStrategy = ErrorStrategy(
value = ErrorStrategyValue.RETRY_ON_ERROR,
retryDelay = "50ms",
retryCount = 3
)
)
Setting the error strategy allows you to resume at the next offset or to seek the consumer (stop on error) to the failed offset so that it can retry if an error occurs.
You can choose one of the error strategies:
-
RETRY_ON_ERROR
- This strategy will stop consuming subsequent records in the case of an error and by default will attempt to re-consume the current record once. Possible retry delay can be defined byretryDelay
and retry count byretryCount
. -
RETRY_EXPONENTIALLY_ON_ERROR
- This strategy will stop consuming subsequent records in the case of an error and by default will attempt to re-consume the current record once. The exponentially growing time breaks between consumption attempts is computed using then * 2^(k - 1)
formula where the initial delayn
isretryDelay
and the number of retries isretryCount
. -
RETRY_CONDITIONALLY_ON_ERROR
- This strategy will stop consuming subsequent records in the case of an error and by default will attempt to re-consume the current record once. The retry behaviour can be overridden. Possible retry delay can be defined byretryDelay
and retry count byretryCount
. -
RETRY_CONDITIONALLY_EXPONENTIALLY_ON_ERROR
- This strategy will stop consuming subsequent records in the case of an error and by default will attempt to re-consume the current record once. The retry behaviour can be overridden. The exponentially growing time breaks between consumption attempts is computed using then * 2^(k - 1)
formula where the initial delayn
isretryDelay
and the number of retries isretryCount
. -
RESUME_AT_NEXT_RECORD
- This strategy will ignore the current error and will resume at the next offset, in this case it’s recommended to have a custom exception handler that moves the failed message into an error queue. -
NONE
- This error strategy will skip over all records from the current offset in the current poll when the consumer encounters an error. This option is deprecated and kept for consistent behaviour with previous versions of Micronaut Kafka that do not support error strategy.
The error strategies apply only for non-batch messages processing. |
When using retry error strategies in combination with reactive consumer methods, it is necessary to add the @Blocking annotation to the reactive consumer method.
|
You can also make the number of retries configurable by using retryCountValue
:
@KafkaListener(
value = "myGroup",
errorStrategy = @ErrorStrategy(
value = ErrorStrategyValue.RETRY_ON_ERROR,
retryCountValue = "${my.retry.count}"
)
)
@KafkaListener(
value = 'myGroup',
errorStrategy = @ErrorStrategy(
value = ErrorStrategyValue.RETRY_ON_ERROR,
retryCountValue = '${my.retry.count}'
)
)
@KafkaListener(
value = "myGroup",
errorStrategy = ErrorStrategy(
value = ErrorStrategyValue.RETRY_ON_ERROR,
retryCountValue = "\${my.retry.count}"
)
)
retryCountValue will be overridden by retryCount if they are both set.
|
Specify exceptions to retry
It’s possible to define only exceptions from which the retry will occur.
@KafkaListener(
value = "myGroup",
errorStrategy = @ErrorStrategy(
value = ErrorStrategyValue.RETRY_ON_ERROR,
retryDelay = "50ms",
retryCount = 3,
exceptionTypes = { MyException.class, MySecondException.class }
)
)
@KafkaListener(
value = 'myGroup',
errorStrategy = @ErrorStrategy(
value = ErrorStrategyValue.RETRY_ON_ERROR,
retryDelay = '50ms',
retryCount = 3,
exceptionTypes = [ MyException, MySecondException ]
)
)
@KafkaListener(
value = "myGroup",
errorStrategy = ErrorStrategy(
value = ErrorStrategyValue.RETRY_ON_ERROR,
retryDelay = "50ms",
retryCount = 3,
exceptionTypes = [MyException::class, MySecondException::class]
)
)
Specify exception to retry apply only for RETRY_ON_ERROR and RETRY_EXPONENTIALLY_ON_ERROR error strategies.
|
Conditional retries
It is possible to conditionally retry a message based on the exception thrown when the error strategy is RETRY_CONDITIONALLY_ON_ERROR
or RETRY_CONDITIONALLY_EXPONENTIALLY_ON_ERROR
.
@KafkaListener(
value = "myGroup",
errorStrategy = @ErrorStrategy(
value = ErrorStrategyValue.RETRY_CONDITIONALLY_ON_ERROR
)
)
public class ConditionalRetryListener implements ConditionalRetryBehaviourHandler {
@Override
public ConditionalRetryBehaviour conditionalRetryBehaviour(KafkaListenerException exception) {
return shouldRetry(exception) ? ConditionalRetryBehaviour.RETRY : ConditionalRetryBehaviour.SKIP;
}
// ...
@KafkaListener(
value = 'myGroup',
errorStrategy = @ErrorStrategy(
value = ErrorStrategyValue.RETRY_CONDITIONALLY_ON_ERROR
)
)
class ConditionalRetryListener implements ConditionalRetryBehaviourHandler {
@Override
ConditionalRetryBehaviour conditionalRetryBehaviour(KafkaListenerException exception) {
return shouldRetry(exception) ? ConditionalRetryBehaviour.RETRY : ConditionalRetryBehaviour.SKIP
}
// ...
@KafkaListener(
value = "myGroup",
errorStrategy = ErrorStrategy(
value = ErrorStrategyValue.RETRY_CONDITIONALLY_ON_ERROR
)
)
class ConditionalRetryListener :
ConditionalRetryBehaviourHandler {
override fun conditionalRetryBehaviour(exception: KafkaListenerException): ConditionalRetryBehaviour {
return if (shouldRetry(exception)) {
ConditionalRetryBehaviour.RETRY
} else {
ConditionalRetryBehaviour.SKIP
}
}
// ...
When a @KafkaListener does not implement @ConditionalRetryBehaviourHandler, the @DefaultConditionalRetryBehaviourHandler will be used and all messages that failed processing will be retried.
If you wish to apply the same conditional retry strategy for all of your @KafkaListener you can define a bean that implements ConditionalRetryBehaviourHandler
and use Micronaut’s Bean Replacement feature to replace the default bean: @Replaces(DefaultConditionalRetryBehaviourHandler.class)
.
Conditional retry behaviour only applies to RETRY_CONDITIONALLY_ON_ERROR and RETRY_CONDITIONALLY_EXPONENTIALLY_ON_ERROR error strategies.
|
Exception handlers
When an exception occurs in a @KafkaListener method by default the exception is simply logged. This is handled by DefaultKafkaListenerExceptionHandler.
The following options are available to configure the default Kafka listener exception handler:
Property | Type | Description |
---|---|---|
|
boolean |
Whether to skip record on deserialization failure. Default value true |
|
boolean |
Whether to commit record on deserialization failure. Default value false |
If you wish to replace this default exception handling with another implementation you can use the Micronaut’s Bean Replacement feature to define a bean that replaces it: @Replaces(DefaultKafkaListenerExceptionHandler.class)
.
You can also define per bean exception handling logic by implementing the KafkaListenerExceptionHandler interface in your @KafkaListener class.
The KafkaListenerExceptionHandler receives an exception of type KafkaListenerException which allows access to the original ConsumerRecord
, if available.
7 Running Kafka Applications
You can run a Micronaut Kafka application with or without the presence of an HTTP server.
If you run your application without the http-server-netty
dependency you will see output like the following on startup:
11:06:22.638 [main] INFO io.micronaut.runtime.Micronaut - Startup completed in 402ms. Server Running: 4 active message listeners.
No port is exposed, but the Kafka consumers are active and running. The process registers a shutdown hook such that the KafkaConsumer
instances are closed correctly when the server is shutdown.
7.1 Kafka Health Checks
In addition to http-server-netty
, if the management
dependency is added, then Micronaut’s Health Endpoint can be used to expose the health status of the Kafka consumer application.
For example if Kafka is not available the /health
endpoint will return:
{
"status": "DOWN",
"details": {
...
"kafka": {
"status": "DOWN",
"details": {
"error": "java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment."
}
}
}
}
By default, the details visible above are only shown to authenticated users. See the Health Endpoint documentation for how to configure that setting. |
The following options are available to configure the Kafka Health indicator:
Property | Type | Description |
---|---|---|
|
boolean |
Whether the Kafka health check is enabled. Default value true. |
|
boolean |
By default, the health check requires cluster-wide permissions in order to get information about the nodes in the Kafka cluster. If your application doesn’t have admin privileges (for example, this might happen in multi-tenant scenarios), you can switch to a "restricted" version of the health check which only validates basic connectivity but doesn’t require any additional permissions.. Default value false |
|
java.time.Duration |
The health check timeout. |
7.2 Kafka Metrics
You can enable Kafka Metrics collection by enabling Micrometer Metrics.
If you do not wish to collect Kafka metrics, you can set micronaut.metrics.binders.kafka.enabled
to false
in application.yml
.
In the case of Kafka Streams metrics, you can use micronaut.metrics.binders.kafka.streams.enabled
instead.
7.3 Kafka Distributed Tracing
Distributed tracing is supported via the Micronaut Tracing module using Open Telemetry.
7.4 Creating New Topics
You can automatically add topics to the broker when your application starts. To do so, add a bean of type a NewTopic
for each topic you want to create. NewTopic
instances let you specify the name, the number of partitions, the replication factor, the replicas assignments and the configuration properties you want to associate with the new topic. Additionally, you can add a bean of type CreateTopicsOptions
that will be used when the new topics are created.
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
@Requires(bean = AdminClient.class)
@Factory
public class MyTopicFactory {
@Bean
CreateTopicsOptions options() {
return new CreateTopicsOptions().timeoutMs(5000).validateOnly(true).retryOnQuotaViolation(false);
}
@Bean
NewTopic topic1() {
return new NewTopic("my-new-topic-1", 1, (short) 1);
}
@Bean
NewTopic topic2() {
return new NewTopic("my-new-topic-2", 2, (short) 1);
}
}
import io.micronaut.context.annotation.Bean
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.CreateTopicsOptions
import org.apache.kafka.clients.admin.NewTopic
@Requires(bean = AdminClient)
@Factory
class MyTopicFactory {
@Bean
CreateTopicsOptions options() {
new CreateTopicsOptions().timeoutMs(5000).validateOnly(true).retryOnQuotaViolation(false)
}
@Bean
NewTopic topic1() {
new NewTopic("my-new-topic-1", 1, (short) 1)
}
@Bean
NewTopic topic2() {
new NewTopic("my-new-topic-2", 2, (short) 1)
}
}
import io.micronaut.context.annotation.Bean
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.CreateTopicsOptions
import org.apache.kafka.clients.admin.NewTopic
@Requires(bean = AdminClient::class)
@Factory
class MyTopicFactory {
@Bean
fun options(): CreateTopicsOptions {
return CreateTopicsOptions().timeoutMs(5000).validateOnly(true).retryOnQuotaViolation(false)
}
@Bean
fun topic1(): NewTopic {
return NewTopic("my-new-topic-1", 1, 1)
}
@Bean
fun topic2(): NewTopic {
return NewTopic("my-new-topic-2", 2, 1)
}
}
Creating topics is not a transactional operation, so it may succeed for some topics while fail for others. This operation also executes asynchronously, so it may take several seconds until all the brokers become aware that the topics have been created. |
If you ever need to check if the operation has completed, you can @Inject
or retrieve the KafkaNewTopics bean from the application context and then retrieve the operation result that Kafka returned when the topics were created.
boolean areNewTopicsDone(KafkaNewTopics newTopics) {
return newTopics.getResult().all().isDone();
}
boolean areNewTopicsDone(KafkaNewTopics newTopics) {
newTopics.result.all().done
}
fun areNewTopicsDone(newTopics: KafkaNewTopics): Boolean {
return newTopics.result.all().isDone
}
7.5 Disabling Micronaut-Kafka
If you want to disable micronaut-kafka entirely, you can set kafka.enabled
to false
in application.yml
.
This will prevent the instantiation of all kafka-related beans.
You must, however, provide your own replacement implementations of any @KafkaClient
interfaces:
import io.micronaut.context.annotation.Replaces;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.StringUtils;
import jakarta.inject.Singleton;
@Requires(property = "kafka.enabled", notEquals = StringUtils.TRUE, defaultValue = StringUtils.TRUE) // (1)
@Replaces(MessageClient.class) // (2)
@Singleton
public class MessageClientFallback implements MessageClient { // (3)
@Override
public void send(String message) {
throw new UnsupportedOperationException(); // (4)
}
}
import io.micronaut.context.annotation.Replaces
import io.micronaut.context.annotation.Requires
import io.micronaut.core.util.StringUtils
import jakarta.inject.Singleton
@Requires(property = 'kafka.enabled', notEquals = StringUtils.TRUE, defaultValue = StringUtils.TRUE) // (1)
@Replaces(MessageClient.class) // (2)
@Singleton
class MessageClientFallback implements MessageClient { // (3)
@Override
void send(String message) {
throw new UnsupportedOperationException() // (4)
}
}
import io.micronaut.context.annotation.Replaces
import io.micronaut.context.annotation.Requires
import io.micronaut.core.util.StringUtils
import jakarta.inject.Singleton
@Requires(property = "kafka.enabled", notEquals = StringUtils.TRUE, defaultValue = StringUtils.TRUE) // (1)
@Replaces(MessageClient::class) // (2)
@Singleton
class MessageClientFallback : MessageClient { // (3)
override fun send(message: String) {
throw UnsupportedOperationException() // (4)
}
}
1 | Only instantiate when kafka.enabled is set to false |
2 | Replace the @KafkaClient interface |
3 | Implement the interface |
4 | Provide an alternative implementation for all client methods |
8 Kafka Streams
Using the CLI
If you are creating your project using the Micronaut CLI, supply the $ mn create-app my-app --features kafka-streams |
Kafka Streams is a platform for building real time streaming applications.
When using Micronaut with Kafka Stream, your application gains all of the features from Micronaut (configuration management, AOP, DI, health checks etc.), simplifying the construction of Kafka Stream applications.
Since Micronaut’s DI and AOP is compile time, you can build low overhead stream applications with ease.
Defining Kafka Streams
To define Kafka Streams you should first add the kafka-streams
configuration to your build.
implementation("io.micronaut.kafka:micronaut-kafka-streams")
<dependency>
<groupId>io.micronaut.kafka</groupId>
<artifactId>micronaut-kafka-streams</artifactId>
</dependency>
The minimum configuration required is to set the Kafka bootstrap servers:
kafka.bootstrap.servers=localhost:9092
kafka:
bootstrap:
servers: localhost:9092
[kafka]
[kafka.bootstrap]
servers="localhost:9092"
kafka {
bootstrap {
servers = "localhost:9092"
}
}
{
kafka {
bootstrap {
servers = "localhost:9092"
}
}
}
{
"kafka": {
"bootstrap": {
"servers": "localhost:9092"
}
}
}
You should then define a @Factory for your streams that defines beans that return a KStream
. For example to implement the Word Count example from the Kafka Streams documentation:
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
@Factory
public class WordCountStream {
@Singleton
@Named("word-count")
KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { // (1)
// set default serdes
Properties props = builder.getConfiguration();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500");
KStream<String, String> source = builder.stream("streams-plaintext-input"); // (2)
KTable<String, Long> groupedByWord = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
//Store the result in a store for lookup later
.count(Materialized.as("word-count-store-java")); // (3)
groupedByWord
//convert to stream
.toStream()
//send to output using specific serdes
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); // (4)
return source;
}
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import jakarta.inject.Named
import jakarta.inject.Singleton
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.Grouped
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.KTable
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.kstream.Produced
@Factory
class WordCountStream {
@Singleton
@Named('word-count')
KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { // (1)
// set default serdes
Properties props = builder.getConfiguration();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 'earliest')
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, '500')
KStream<String, String> source = builder.stream('streams-plaintext-input') // (2)
KTable<String, Long> groupedByWord = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
//Store the result in a store for lookup later
.count(Materialized.as('word-count-store-groovy')) // (3)
groupedByWord
//convert to stream
.toStream()
//send to output using specific serdes
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())) // (4)
return source
}
import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import jakarta.inject.Named
import jakarta.inject.Singleton
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.Grouped
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.kstream.Produced
import java.util.*
@Factory
class WordCountStream {
@Singleton
@Named("word-count")
fun wordCountStream(builder: ConfiguredStreamBuilder): KStream<String, String> { // (1)
// set default serdes
val props = builder.configuration
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.getName()
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.getName()
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
props[StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = "500"
val source = builder.stream<String, String>("streams-plaintext-input") // (2)
val groupedByWord = source
.flatMapValues { value: String ->
Arrays.asList(
*value.lowercase(Locale.getDefault()).split("\\W+".toRegex()).dropLastWhile { it.isEmpty() }
.toTypedArray())
}
.groupBy(
{ key: String?, word: String? -> word },
Grouped.with(Serdes.String(), Serdes.String())
)
//Store the result in a store for lookup later
.count(Materialized.`as`("word-count-store-kotlin")) // (3)
groupedByWord
//convert to stream
.toStream()
//send to output using specific serdes
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())) // (4)
return source
}
1 | An instance of ConfiguredStreamBuilder is injected that allows mutating the configuration |
2 | The input topic |
3 | Materialize the count stream and save to a state store |
4 | The output topic |
With Kafka streams the key and value Serdes (serializer/deserializer) must be classes with a zero argument constructor. If you wish to use JSON (de)serialization you can subclass JsonObjectSerde to define your Serdes
|
You can use the @KafkaClient annotation to send a sentence to be processed by the above stream:
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
@KafkaClient
public interface WordCountClient {
@Topic("streams-plaintext-input")
void publishSentence(String sentence);
}
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
@KafkaClient
interface WordCountClient {
@Topic("streams-plaintext-input")
void publishSentence(String sentence)
}
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
@KafkaClient
interface WordCountClient {
@Topic("streams-plaintext-input")
fun publishSentence(sentence: String?)
}
You can also define a @KafkaListener to listen for the result of the word count stream:
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;
@KafkaListener(offsetReset = EARLIEST, groupId = "WordCountListener")
public class WordCountListener {
private final Map<String, Long> wordCounts = new ConcurrentHashMap<>();
@Topic("streams-wordcount-output")
void count(@KafkaKey String word, long count) {
wordCounts.put(word, count);
}
public long getCount(String word) {
Long num = wordCounts.get(word);
return num != null ? num : 0;
}
public Map<String, Long> getWordCounts() {
return Collections.unmodifiableMap(wordCounts);
}
}
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import java.util.concurrent.ConcurrentHashMap
import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
@KafkaListener(offsetReset = EARLIEST, groupId = 'WordCountListener')
class WordCountListener {
private final Map<String, Long> wordCounts = new ConcurrentHashMap<>()
@Topic("streams-wordcount-output")
void count(@KafkaKey String word, long count) {
wordCounts.put(word, count)
}
long getCount(String word) {
Long num = wordCounts.get(word)
num ?: 0
}
Map<String, Long> getWordCounts() {
Collections.unmodifiableMap(wordCounts)
}
}
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import java.util.*
import java.util.concurrent.ConcurrentHashMap
@KafkaListener(offsetReset = OffsetReset.EARLIEST, groupId = "WordCountListener")
class WordCountListener {
private val wordCounts: MutableMap<String, Long> = ConcurrentHashMap()
@Topic("streams-wordcount-output")
fun count(@KafkaKey word: String, count: Long) {
wordCounts[word] = count
}
fun getCount(word: String): Long {
val num = wordCounts[word]
return num ?: 0
}
fun getWordCounts(): Map<String, Long> {
return Collections.unmodifiableMap(wordCounts)
}
}
Configuring Kafka Streams
If you have a single Kafka Stream configured on your Micronaut service, then you should use the default
key on the configuration.
kafka.streams.default.processing.guarantee=exactly_once
kafka.streams.default.auto.offset.reset=earliest
kafka:
streams:
default:
processing.guarantee: "exactly_once"
auto.offset.reset: "earliest"
[kafka]
[kafka.streams]
[kafka.streams.default]
"processing.guarantee"="exactly_once"
"auto.offset.reset"="earliest"
kafka {
streams {
'default' {
processing.guarantee = "exactly_once"
auto.offset.reset = "earliest"
}
}
}
{
kafka {
streams {
default {
"processing.guarantee" = "exactly_once"
"auto.offset.reset" = "earliest"
}
}
}
}
{
"kafka": {
"streams": {
"default": {
"processing.guarantee": "exactly_once",
"auto.offset.reset": "earliest"
}
}
}
}
The above configuration example sets the processing.guarantee
and auto.offset.reset
setting of the default
Stream. Most of the configuration properties pass directly through to the KafkaStreams
instance being initialized.
In addition to those standard properties, you may want to customize how long you wait for Kafka Streams to shut down (this is mostly useful during testing), with close-timeout
.
For example, this will make Micronaut Kafka wait for up to 10 seconds to shut down the default stream:
kafka.streams.default.close-timeout=10s
kafka:
streams:
default:
close-timeout: 10s
[kafka]
[kafka.streams]
[kafka.streams.default]
close-timeout="10s"
kafka {
streams {
'default' {
closeTimeout = "10s"
}
}
}
{
kafka {
streams {
default {
close-timeout = "10s"
}
}
}
}
{
"kafka": {
"streams": {
"default": {
"close-timeout": "10s"
}
}
}
}
You can define multiple Kafka Streams on the same Micronaut application, each with their own unique configuration.
To do this you should define the configuration with kafka.streams.[STREAM-NAME]
.
Assuming you have 2 or more Kafka Streams definitions on a single service, you will need to use the default
key for at least one of them and then define kafka.streams.[STREAM-NAME]
for the rest.
For example in application.yml
:
kafka.streams.default.num.stream.threads=1
kafka.streams.my-other-stream.num.stream.threads=10
kafka:
streams:
default:
num:
stream:
threads: 1
my-other-stream:
num:
stream:
threads: 10
[kafka]
[kafka.streams]
[kafka.streams.default]
[kafka.streams.default.num]
[kafka.streams.default.num.stream]
threads=1
[kafka.streams.my-other-stream]
[kafka.streams.my-other-stream.num]
[kafka.streams.my-other-stream.num.stream]
threads=10
kafka {
streams {
'default' {
num {
stream {
threads = 1
}
}
}
myOtherStream {
num {
stream {
threads = 10
}
}
}
}
}
{
kafka {
streams {
default {
num {
stream {
threads = 1
}
}
}
my-other-stream {
num {
stream {
threads = 10
}
}
}
}
}
}
{
"kafka": {
"streams": {
"default": {
"num": {
"stream": {
"threads": 1
}
}
},
"my-other-stream": {
"num": {
"stream": {
"threads": 10
}
}
}
}
}
}
The above configuration sets the num.stream.threads
setting of the Kafka StreamsConfig
to 1
for the default
stream, and the same setting to 10
for a stream named my-stream
.
You can then inject an ConfiguredStreamBuilder
specifically for the above configuration using jakarta.inject.Named
:
@Singleton
@Named("my-other-stream")
KStream<String, String> myOtherKStream(ConfiguredStreamBuilder builder) {
return builder.stream("my-other-stream");
}
@Singleton
@Named('my-other-stream')
KStream<String, String> myOtherKStream(ConfiguredStreamBuilder builder) {
return builder.stream('my-other-stream')
}
@Singleton
@Named("my-other-stream")
fun myOtherKStream(builder: ConfiguredStreamBuilder): KStream<String, String> {
return builder.stream("my-other-stream")
}
If you do not provide a @Named on the ConfiguredStreamBuilder you have multiple KStreams defined that share the default configurations like client id, application id, etc. It is advisable when using multiple streams in a single app to provide a @Named instance of ConfiguredStreamBuilder for each stream.
|
@Singleton
@Named("my-stream")
KStream<String, String> myStream(ConfiguredStreamBuilder builder) {
@Singleton
@Named('my-stream')
KStream<String, String> myStream(ConfiguredStreamBuilder builder) {
@Singleton
@Named("my-stream")
fun myStream(builder: ConfiguredStreamBuilder): KStream<String, String> {
When writing a test without starting the actual Kafka server, you can instruct Micronaut not to start Kafka Streams. To do this create a config file suffixed with an environment name, such as application-test.yml
and set the kafka.streams.[STREAM-NAME].start-kafka-streams
to false
.
For example in application-test.yml
:
kafka.streams.default.start-kafka-streams=false
kafka:
streams:
default:
start-kafka-streams: false
[kafka]
[kafka.streams]
[kafka.streams.default]
start-kafka-streams=false
kafka {
streams {
'default' {
startKafkaStreams = false
}
}
}
{
kafka {
streams {
default {
start-kafka-streams = false
}
}
}
}
{
"kafka": {
"streams": {
"default": {
"start-kafka-streams": false
}
}
}
}
8.1 Interactive Query Service
When using streams you can set a state store for your stream using a store builder and telling the stream to store its data. In the above example for the Kafka Streams Word Count, the output is materialized to a named store that can later be retrieved via the Interactive Query Service. Apache Kafka docs available here.
You can inject the InteractiveQueryService and use the method getQueryableStore(String storeName, QueryableStoreType<T> storeType)
to get values from a state store.
An example service that wraps the InteractiveQueryService
is included below. This is here to illustrate that when calling the getQueryableStore
method you must provide the store name and preferably the type of key and value you are trying to retrieve.
import io.micronaut.configuration.kafka.streams.InteractiveQueryService;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.Optional;
/**
* Example service that uses the InteractiveQueryService in a reusable way. This is only intended as an example.
*/
@Singleton
public class InteractiveQueryServiceExample {
private final InteractiveQueryService interactiveQueryService;
public InteractiveQueryServiceExample(InteractiveQueryService interactiveQueryService) {
this.interactiveQueryService = interactiveQueryService;
}
/**
* Method to get the word state store and word count from the store using the interactive query service.
*
* @param stateStore the name of the state store ie "foo-store"
* @param word the key to get, in this case the word as the stream and ktable have been grouped by word
* @return the Long count of the word in the store
*/
public Long getWordCount(String stateStore, String word) {
Optional<ReadOnlyKeyValueStore<String, Long>> queryableStore = interactiveQueryService.getQueryableStore(
stateStore, QueryableStoreTypes.keyValueStore());
return queryableStore.map(kvReadOnlyKeyValueStore ->
kvReadOnlyKeyValueStore.get(word)).orElse(0L);
}
/**
* Method to get byte array from a state store using the interactive query service.
*
* @param stateStore the name of the state store ie "bar-store"
* @param blobName the key to get, in this case the name of the blob
* @return the byte[] stored in the state store
*/
public byte[] getBytes(String stateStore, String blobName) {
Optional<ReadOnlyKeyValueStore<String, byte[]>> queryableStore = interactiveQueryService.getQueryableStore(
stateStore, QueryableStoreTypes.keyValueStore());
return queryableStore.map(stringReadOnlyKeyValueStore ->
stringReadOnlyKeyValueStore.get(blobName)).orElse(null);
}
/**
* Method to get value V by key K.
*
* @param stateStore the name of the state store ie "baz-store"
* @param name the key to get
* @return the value of type V stored in the state store
*/
public <K, V> V getGenericKeyValue(String stateStore, K name) {
Optional<ReadOnlyKeyValueStore<K, V>> queryableStore = interactiveQueryService.getQueryableStore(
stateStore, QueryableStoreTypes.<K, V>keyValueStore());
return queryableStore.map(kvReadOnlyKeyValueStore ->
kvReadOnlyKeyValueStore.get(name)).orElse(null);
}
}
import io.micronaut.configuration.kafka.streams.InteractiveQueryService
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
/**
* Example service that uses the InteractiveQueryService in a reusable way. This is only intended as an example.
*/
@Singleton
class InteractiveQueryServiceExample {
private final InteractiveQueryService interactiveQueryService;
InteractiveQueryServiceExample(InteractiveQueryService interactiveQueryService) {
this.interactiveQueryService = interactiveQueryService;
}
/**
* Method to get the word state store and word count from the store using the interactive query service.
*
* @param stateStore the name of the state store ie "foo-store"
* @param word the key to get, in this case the word as the stream and ktable have been grouped by word
* @return the Long count of the word in the store
*/
Long getWordCount(String stateStore, String word) {
Optional<ReadOnlyKeyValueStore<String, Long>> queryableStore = interactiveQueryService.getQueryableStore(
stateStore, QueryableStoreTypes.keyValueStore());
return queryableStore.map(kvReadOnlyKeyValueStore ->
kvReadOnlyKeyValueStore.get(word)).orElse(0L);
}
/**
* Method to get byte array from a state store using the interactive query service.
*
* @param stateStore the name of the state store ie "bar-store"
* @param blobName the key to get, in this case the name of the blob
* @return the byte[] stored in the state store
*/
byte[] getBytes(String stateStore, String blobName) {
Optional<ReadOnlyKeyValueStore<String, byte[]>> queryableStore = interactiveQueryService.getQueryableStore(
stateStore, QueryableStoreTypes.keyValueStore());
return queryableStore.map(stringReadOnlyKeyValueStore ->
stringReadOnlyKeyValueStore.get(blobName)).orElse(null);
}
/**
* Method to get value V by key K.
*
* @param stateStore the name of the state store ie "baz-store"
* @param name the key to get
* @return the value of type V stored in the state store
*/
<K, V> V getGenericKeyValue(String stateStore, K name) {
Optional<ReadOnlyKeyValueStore<K, V>> queryableStore = interactiveQueryService.getQueryableStore(
stateStore, QueryableStoreTypes.<K, V>keyValueStore());
return queryableStore.map(kvReadOnlyKeyValueStore ->
kvReadOnlyKeyValueStore.get(name)).orElse(null);
}
}
import io.micronaut.configuration.kafka.streams.InteractiveQueryService
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
import org.apache.kafka.streams.state.QueryableStoreTypes
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
/**
* Example service that uses the InteractiveQueryService in a reusable way. This is only intended as an example.
*/
@Singleton
class InteractiveQueryServiceExample(private val interactiveQueryService: InteractiveQueryService) {
/**
* Method to get the word state store and word count from the store using the interactive query service.
*
* @param stateStore the name of the state store ie "foo-store"
* @param word the key to get, in this case the word as the stream and ktable have been grouped by word
* @return the Long count of the word in the store
*/
fun getWordCount(stateStore: String, word: String): Long {
val queryableStore = interactiveQueryService.getQueryableStore(
stateStore, QueryableStoreTypes.keyValueStore<String, Long>())
return queryableStore.map { kvReadOnlyKeyValueStore: ReadOnlyKeyValueStore<String, Long> ->
kvReadOnlyKeyValueStore[word] }.orElse(0L)
}
/**
* Method to get byte array from a state store using the interactive query service.
*
* @param stateStore the name of the state store ie "bar-store"
* @param blobName the key to get, in this case the name of the blob
* @return the byte[] stored in the state store
*/
fun getBytes(stateStore: String, blobName: String): ByteArray? {
val queryableStore = interactiveQueryService.getQueryableStore(
stateStore, QueryableStoreTypes.keyValueStore<String, ByteArray>())
return queryableStore.map { stringReadOnlyKeyValueStore: ReadOnlyKeyValueStore<String, ByteArray> ->
stringReadOnlyKeyValueStore[blobName] }.orElse(null)
}
/**
* Method to get value V by key K.
*
* @param stateStore the name of the state store ie "baz-store"
* @param name the key to get
* @return the value of type V stored in the state store
*/
fun <K, V> getGenericKeyValue(stateStore: String, name: K): V {
val queryableStore = interactiveQueryService.getQueryableStore(
stateStore, QueryableStoreTypes.keyValueStore<K, V>())
return queryableStore.map { kvReadOnlyKeyValueStore: ReadOnlyKeyValueStore<K, V> ->
kvReadOnlyKeyValueStore[name] }.orElse(null)
}
}
8.2 Kafka Stream Health Checks
In addition to http-server-netty
, if the management
dependency is added, then Micronaut’s Health Endpoint can be used to expose the health status of the Kafka streams application.
For example stream health at the /health
endpoint will return:
{
"status": "UP",
"details": {
"kafkaStreams": {
"name": "my-application",
"status": "UP",
"details": {
"named-stream": {
"name": "my-application",
"status": "UP",
"details": {
"adminClientId": "my-consumer-id-admin",
"restoreConsumerClientId": "my-consumer-id-StreamThread-1-restore-consumer",
"threadState": "RUNNING",
"producerClientIds": [
"my-consumer-id-StreamThread-1-producer"
],
"consumerClientId": "my-consumer-id-StreamThread-1-consumer",
"threadName": "my-consumer-id-StreamThread-1"
}
}
}
},
...
}
}
By default, the details visible above are only shown to authenticated users. See the Health Endpoint documentation for how to configure that setting. |
If you wish to disable the Kafka streams health check while still using the management
dependency you can set the property kafka.health.streams.enabled
to false
in your application configuration.
kafka.health.streams.enabled=false
kafka:
health:
streams:
enabled: false
[kafka]
[kafka.health]
[kafka.health.streams]
enabled=false
kafka {
health {
streams {
enabled = false
}
}
}
{
kafka {
health {
streams {
enabled = false
}
}
}
}
{
"kafka": {
"health": {
"streams": {
"enabled": false
}
}
}
}
8.3 Handling Uncaught Exceptions
Since version 2.8.0, Kafka allows you to handle uncaught exceptions that may be thrown from your streams. This handler must return the action that must be taken, depending on the thrown exception.
There are three possible responses: REPLACE_THREAD
, SHUTDOWN_CLIENT
, or SHUTDOWN_APPLICATION
.
You can find more details about this mechanism here. |
If you just want to take the same action every time, you can set the application property kafka.streams.[STREAM-NAME].uncaught-exception-handler
to a valid action, such as REPLACE_THREAD
.
For example in application-test.yml
:
kafka.streams.my-stream.uncaught-exception-handler=REPLACE_THREAD
kafka:
streams:
my-stream:
uncaught-exception-handler: REPLACE_THREAD
[kafka]
[kafka.streams]
[kafka.streams.my-stream]
uncaught-exception-handler="REPLACE_THREAD"
kafka {
streams {
myStream {
uncaughtExceptionHandler = "REPLACE_THREAD"
}
}
}
{
kafka {
streams {
my-stream {
uncaught-exception-handler = "REPLACE_THREAD"
}
}
}
}
{
"kafka": {
"streams": {
"my-stream": {
"uncaught-exception-handler": "REPLACE_THREAD"
}
}
}
}
To implement your own handler, you can listen to the application event BeforeKafkaStreamStart and configure the streams with your own business logic:
import io.micronaut.configuration.kafka.streams.event.BeforeKafkaStreamStart;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventListener;
import jakarta.inject.Singleton;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
@Singleton
public class MyStreamsUncaughtExceptionHandler
implements ApplicationEventListener<BeforeKafkaStreamStart>, StreamsUncaughtExceptionHandler {
boolean dangerAvoided = false;
@Override
public void onApplicationEvent(BeforeKafkaStreamStart event) {
event.getKafkaStreams().setUncaughtExceptionHandler(this);
}
@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
if (exception.getCause() instanceof MyException) {
this.dangerAvoided = true;
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
}
}
9 Guides
See the following list of guides to learn more about working with Kafka in the Micronaut Framework:
10 Repository
You can find the source code of this project in this repository: