Micronaut Kafka

Integration between Micronaut and Kafka Messaging

Version: 1.2.0

1 Introduction

Apache Kafka is a distributed stream processing platform that can be used for a range of messaging requirements in addition to stream processing and real-time data handling.

Micronaut features dedicated support for defining both Kafka Producer and Consumer instances. Micronaut applications built with Kafka can be deployed with or without the presence of an HTTP server.

With Micronaut’s efficient compile-time AOP and cloud native features, writing efficient Kafka consumer applications that use very little resources is a breeze.

Release History

1.0.4

  • Upgrade to Kafka 2.0.1

1.1.0

  • Upgrade to Kafka 2.1.1

  • Support for Open Tracing

  • Support for Pausing / Resuming Consumers via ConsumerRegistry interface

1.2.0

  • Micronaut 1.1.x minimum version

  • Upgrade to Kafka 2.3.0

  • GraalVM Native Support (with Micronaut 1.2+)

  • Better Exception Handling

2 Using the Micronaut CLI

To create a project with Kafka support using the Micronaut CLI, supply the kafka feature to the features flag.

$ mn create-app my-kafka-app --features kafka

This will create a project with the minimum necessary configuration for Kafka.

Kafka Profile

The Micronaut CLI includes a specialized profile for Kafka-based messaging applications. This profile will create a Micronaut app with Kafka support, and without an HTTP server (although you can add one if you desire). The profile also provides a couple commands for generating Kafka listeners and producers.

To create a project using the Kafka profile, use the profile flag:

$ mn create-app my-kafka-service --profile kafka

As you’d expect, you can start the application with ./gradlew run (for Gradle) or ./mvnw compile exec:exec (Maven). The application will (with the default config) attempt to connect to Kafka at http://localhost:9092, and will continue to run without starting up an HTTP server. All communication to/from the service will take place via Kafka producers and/or listeners.

Within the new project, you can now run the Kafka-specific code generation commands:

$ mn create-kafka-producer Message
| Rendered template Producer.java to destination src/main/java/my/kafka/app/MessageProducer.java

$ mn create-kafka-listener Message
| Rendered template Listener.java to destination src/main/java/my/kafka/app/MessageListener.java

3 Kafka Quick Start

To add support for Kafka to an existing project, you should first add the Micronaut Kafka configuration to your build configuration. For example in Gradle:

compile 'io.micronaut.configuration:micronaut-kafka'
<dependency>
    <groupId>io.micronaut.configuration</groupId>
    <artifactId>micronaut-kafka</artifactId>
</dependency>

Configuring Kafka

The minimum requirement to configure Kafka is set the value of the kafka.bootstrap.servers property in application.yml:

Configuring Kafka
kafka:
    bootstrap:
        servers: localhost:9092

The value can also be list of available servers:

Configuring Kafka
kafka:
    bootstrap:
        servers:
            - foo:9092
            - bar:9092
You can also set the environment variable KAFKA_BOOTSTRAP_SERVERS to a comma separated list of values to externalize configuration.

If your broker needs to setup SSL, it can be configured this way:

Configuring Kafka
kafka:
  bootstrap:
    servers: localhost:9092
  ssl:
    keystore:
      location: /path/to/client.keystore.p12
      password: secret
    truststore:
      location: /path/to/client.truststore.jks
      password: secret
      type: PKCS12
  security:
    protocol: ssl

Creating a Kafka Producer with @KafkaClient

To create a Kafka Producer that sends messages you can simply define an interface that is annotated with @KafkaClient.

For example the following is a trivial @KafkaClient interface:

ProductClient.java
import io.micronaut.configuration.kafka.annotation.*;

@KafkaClient (1)
public interface ProductClient {

    @Topic("my-products") (2)
    void sendProduct(@KafkaKey String brand, String name); (3)

    void sendProduct(@Topic String topic, @KafkaKey String brand, String name); (4)
}
1 The @KafkaClient annotation is used to designate this interface as a client
2 The @Topic annotation indicates which topics the ProducerRecord should be published to
3 The method defines two parameters: The parameter that is the Kafka key and the value.
4 It is also possible for the topic to be dynamic by making it a method argument
You can omit the key, however this will result in a null key which means Kafka will not know how to partition the record.

At compile time Micronaut will produce an implementation of the above interface. You can retrieve an instance of ProductClient either by looking up the bean from the ApplicationContext or by injecting the bean with @Inject:

Using ProductClient
ProductClient client = applicationContext.getBean(ProductClient.class);
client.sendProduct("Nike", "Blue Trainers");

Note that since the sendProduct method returns void this means the method will send the ProducerRecord and block until the response is received. You can return a Future or Publisher to support non-blocking message delivery.

Creating a Kafka Consumer with @KafkaListener

To listen to Kafka messages you can use the @KafkaListener annotation to define a message listener.

The following example will listen for messages published by the ProductClient in the previous section:

ProductListener.java
import io.micronaut.configuration.kafka.annotation.*;

@KafkaListener(offsetReset = OffsetReset.EARLIEST) (1)
public class ProductListener {

    @Topic("my-products") (2)
    public void receive(@KafkaKey String brand, String name) { (3)
        System.out.println("Got Product - " + name + " by " + brand);
    }
}
1 The @KafkaListener is used with offsetReset set to EARLIEST which makes the listener start listening to messages from the beginning of the partition.
2 The @Topic annotation is again used to indicate which topic(s) to subscribe to.
3 The receive method defines 2 arguments: The argument that will receive the key and the argument that will receive the value.

4 Kafka Producers Using @KafkaClient

The example in the quick start presented a trivial definition of an interface that be implemented automatically for you using the @KafkaClient annotation.

The implementation that powers @KafkaClient (defined by the KafkaClientIntroductionAdvice class) is, however, very flexible and offers a range of options for defining Kafka clients.

4.1 Defining @KafkaClient Methods

Specifying the Key and the Value

The Kafka key can be specified by providing a parameter annotated with @KafkaKey. If no such parameter is specified the record is sent with a null key.

The value to send is resolved by selecting the argument annotated with @Body, otherwise the first argument with no specific binding annotation is used. For example:

@Topic("my-products")
void sendProduct(@KafkaKey String brand, String name);

The method above will use the parameter brand as the key and the parameter name as the value.

Including Message Headers

There are a number of ways you can include message headers. One way is to annotate an argument with the @Header annotation and include a value when calling the method:

@Topic("my-products")
void sendProduct(
    @KafkaKey String brand,
    String name,
    @Header("My-Header") String myHeader);

The example above will include the value of the myHeader argument as a header called My-Header.

Another way to include headers is at the type level with the values driven from configuration:

Declaring @KafkaClient Headers
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.messaging.annotation.Header;

@KafkaClient(id="product-client")
@Header(name = "X-Token", value = "${my.application.token}")
public interface ProductClient {
    ...
}

The above example will send a header called X-Token with the value read from the setting my.application.token in application.yml (or the environnment variable MY_APPLICATION_TOKEN).

If the my.application.token is not set then an error will occur creating the client.

Reactive and Non-Blocking Method Definitions

The @KafkaClient annotation supports the definition of reactive return types (such as Flowable or Reactor Flux) as well as Futures.

The following sections cover possible method signatures and behaviour:

Single Value and Return Type

Single<Book> sendBook(
    @KafkaKey String author,
    Single<Book> book
);

The implementation will return a Single that when subscribed to will subscribe to the passed Single and send the emitted item as a ProducerRecord emitting the item again if successful or an error otherwise.

Flowable Value and Return Type

Flowable<Book> sendBooks(
    @KafkaKey String author,
    Flowable<Book> book
);

The implementation will return a Flowable that when subscribed to will subscribe to the passed Flowable and for each emitted item will send a ProducerRecord emitting the item again if successful or an error otherwise.

Flowable Value and Return Type

Flux<RecordMetadata> sendBooks(
    @KafkaKey String author,
    Flux<Book> book
);

The implementation will return a Reactor Flux that when subscribed to will subscribe to the passed Flux and for each emitted item will send a ProducerRecord emitting the resulting Kafka RecordMetadata if successful or an error otherwise.

Available Annotations

There are a number of annotations available that allow you to specify how a method argument is treated.

The following table summarizes the annotations and their purpose, with an example:

Table 1. Kafka Messaging Annotations
Annotation Description Example

@Body

Allows explicitly indicating the body of the message to sent

@Body Product product

@Header

Allows specifying a parameter that should be sent as a header

@Header("X-My-Header") String myHeader

@KafkaKey

Allows specifying the parameter that is the Kafka key

@KafkaKey String key

For example, you can use the @Header annotation to bind a parameter value to a header in the ProducerRecord.

4.2 Configuring @KafkaClient beans

@KafkaClient and Producer Properties

There are a number of ways to pass configuration properties to the KafkaProducer. You can set default producer properties using kafka.producers.default in application.yml:

Applying Default Configuration
kafka:
    producers:
        default:
            retries: 5

Any property in the ProducerConfig class can be set. The above example will set the default number of times to retry sending a record.

Per @KafkaClient Producer Properties

To configure different properties for each client, you should set a @KafkaClient id using the annotation:

Using a Client ID
@KafkaClient("product-client")

This serves 2 purposes. Firstly it sets the value of the client.id setting used to build the KafkaProducer. Secondly, it allows you to apply per producer configuration in application.yml:

Applying Default Configuration
kafka:
    producers:
        product-client:
            retries: 5

Finally, the @KafkaClient annotation itself provides a properties member that you can use to set producer specific properties:

Configuring Producer Properties with @KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.context.annotation.Property;
import org.apache.kafka.clients.producer.ProducerConfig;

@KafkaClient(
    id="product-client",
    acks = KafkaClient.Acknowledge.ALL,
    properties = @Property(name = ProducerConfig.RETRIES_CONFIG, value = "5")
)
public interface ProductClient {
    ...
}

@KafkaClient and Serializers

When serializing keys and values Micronaut will by default attempt to automatically pick a Serializer to use. This is done via the CompositeSerdeRegistry bean.

You can replace the default SerdeRegistry bean with your own implementation by defining a bean that uses @Replaces(CompositeSerdeRegistry.class). See the section on Bean Replacement.

All common java.lang types (String, Integer, primitives etc.) are supported and for POJOs by default a Jackson based JSON serializer is used.

You can, however, explicitly override the Serializer used by providing the appropriate configuration in application.yml:

Applying Default Configuration
kafka:
    producers:
        product-client:
            value:
                serializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

You may want to do this if for example you choose an alternative serialization format such as Avro or Protobuf.

4.3 Sending Records in Batch

By default if you define a method that takes a container type such as a List the list will be serialized using the specified value.serializer (the default will result in a JSON array).

For example the following two methods will both send serialized arrays:

Sending Arrays and Lists
@Topic("books")
void sendList(List<Book> books);

@Topic("books")
void sendBooks(Book...books);

Instead of a sending a serialized array you may wish to instead send batches of ProducerRecord either synchronously or asynchronously.

To do this you can specify a value of true to the batch member of the @KafkaClient annotation:

Sending ProducerRecord batches
@KafkaClient(batch=true)
@Topic("books")
void send(List<Book> books);

In the above case instead of sending a serialized array the client implementation will iterate over each item in the list and send a ProducerRecord for each. The previous example is blocking, however you can return a reactive type if desired:

Sending ProducerRecord batches Reactively
@KafkaClient(batch=true)
@Topic("books")
Flowable<RecordMetadata> send(List<Book> books);

You can also use an unbound reactive type such as Flowable as the source of your batch data:

Sending ProducerRecord batches from a Flowable
@KafkaClient(batch=true)
@Topic("books")
Flowable<RecordMetadata> send(Flowable<Book> books);

4.4 Injecting Kafka Producer Beans

If you need maximum flexibility and don’t want to use the @KafkaClient support you can use the @KafkaClient annotation as qualifier for dependency injection of KafkaProducer instances.

Consider the following example:

Using a KafkaProducer directly
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import org.apache.kafka.clients.producer.*;

import javax.inject.Singleton;
import java.util.concurrent.Future;

@Singleton
public class BookSender {

    private final Producer<String, Book> kafkaProducer;

    public BookSender(
            @KafkaClient("book-producer") Producer<String, Book> kafkaProducer) { (1)
        this.kafkaProducer = kafkaProducer;
    }

    public Future<RecordMetadata> send(String author, Book book) {
        return kafkaProducer.send(new ProducerRecord<>("books", author, book)); (2)
    }

}
1 The KafkaProducer is dependency injected into the constructor. If not specified in configuration, the key and value serializer are inferred from the generic type arguments.
2 The KafkaProducer is used to send records

Note that there is no need to call the close() method to shut down the KafkaProducer, it is fully managed by Micronaut and will be shutdown when the application shuts down.

The previous example can be tested in JUnit with the following test:

Using a KafkaProducer directly
@Test
public void testBookSender() throws IOException {
    Map<String, Object> config = Collections.singletonMap( (1)
            AbstractKafkaConfiguration.EMBEDDED, true
    );

    try (ApplicationContext ctx = ApplicationContext.run(config)) {
        BookSender bookSender = ctx.getBean(BookSender.class); (2)
        Book book = new Book();
        book.setTitle("The Stand");
        bookSender.send("Stephen King", book);
    }
}
1 An embedded version of Kafka is used
2 The BookSender is retrieved from the ApplicationContext and a ProducerRecord sent

By using the KafkaProducer API directly you open up even more options if you require transactions (exactly-once delivery) or want control over when records are flushed etc.

4.5 Embedding Kafka

The previous section introduced the ability to embed Kafka for your tests. This is possible in Micronaut by specifying the kafka.embedded.enabled setting to true and adding the following dependencies to your test classpath:

Kafka Gradle Test Dependencies
testCompile 'org.apache.kafka:kafka-clients:2.3.0:test'
testCompile 'org.apache.kafka:kafka_2.12:2.3.0'
testCompile 'org.apache.kafka:kafka_2.12:2.3.0:test'

or

Kafka Maven Test Dependencies
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.3.0</version>
  <classifier>test</classifier>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.12</artifactId>
  <version>2.3.0</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.12</artifactId>
  <version>2.3.0</version>
  <classifier>test</classifier>
</dependency>

Note that because of the distributed nature of Kafka it is relatively slow to startup so it is generally better to do the initialization with @BeforeClass (or setupSpec in Spock) and have a large number of test methods rather than many test classes otherwise your test execution performance will suffer.

5 Kafka Consumers Using @KafkaListener

The quick start section presented a trivial example of what is possible with the @KafkaListener annotation.

Using the @KafkaListener annotation Micronaut will build a KafkaConsumer and start the poll loop by running the KafkaConsumer in a special consumer thread pool. You can configure the size of the thread pool based on the number of consumers in your application in application.yml as desired:

Configuring the consumer thread pool
micronaut:
    executors:
        consumer:
            type: fixed
            nThreads: 25

KafkaConsumer instances are single threaded, hence for each @KafkaListener method you define a new thread is created to execute the poll loop.

You may wish to scale the number of consumers you have listening on a particular topic. There are several ways you may achieve this. You could for example run multiple instances of your application each containing a single consumer in each JVM.

Alternatively, you can also scale via threads. By setting the number of threads a particular consumer bean will create:

Scaling with Threads
@KafkaListener(groupId="myGroup", threads=10)

The above example will create 10 KafkaConsumer instances, each running in a unique thread and participating in the myGroup consumer group.

@KafkaListener beans are by default singleton. When using multiple threads you must either synchronize access to local state or declare the bean as @Prototype.

By default Micronaut will inspect the method signature of the method annotated with @Topic that will listen for ConsumerRecord instances and from the types infer an appropriate key and value Deserializer.

5.1 Defining @KafkaListener Methods

The @KafkaListener annotation examples up until now have been relatively trivial, but Micronaut offers a lot of flexibility when it comes to the types of method signatures you can define.

The following sections detail examples of supported use cases.

Specifying Topics

The @Topic annotation can be used at the method or the class level to specify which topics to be listened for.

Care needs to be taken when using @Topic at the class level because every public method of the class annotated with @KafkaListener will become a Kafka consumer, which may be undesirable.

You can specify multiple topics to listen for:

Specifying Multiple Topics
@Topic("fun-products", "awesome-products")

You can also specify one or many regular expressions to listen for:

Using regular expressions to match Topics
@Topic(patterns="products-\\w+")

Available Annotations

There are a number of annotations available that allow you to specify how a method argument is bound.

The following table summarizes the annotations and their purpose, with an example:

Table 1. Kafka Messaging Annotations
Annotation Description Example

@Body

Allows explicitly indicating the body of the message

@Body Product product

@Header

Allows binding a parameter to a message header

@Header("X-My-Header") String myHeader

@KafkaKey

Allows specifying the parameter that is the key

@KafkaKey String key

For example, you can use the @Header annotation to bind a parameter value from a header contained within a ConsumerRecord.

Topics, Partitions and Offsets

If you want a reference to the topic, partition or offset it is a simple matter of defining a parameter for each.

The following table summarizes example parameters and how they related to the ConsumerRecord being processed:

Table 2. @KafkaListener Method Parameters
Parameter Description

String topic

The name of the topic

long offset

The offset of the ConsumerRecord

int partition

The partition of the ConsumerRecord

long timestamp

The timestamp of the ConsumerRecord

As an example, following listener method will receive all of the above mentioned parameters:

Specifying Parameters for offset, topic etc.
@Topic("awesome-products")
public void receive(
        @KafkaKey String brand, (1)
        Product product, (2)
        long offset, (3)
        int partition, (4)
        String topic, (5)
        long timestamp) { (6)
    System.out.println("Got Product - " + product.getName() + " by " + brand);
}
1 The Kafka key
2 The message body
3 The offset of the ConsumerRecord
4 The partition of the ConsumerRecord
5 The topic. Note that the @Topic annotation supports multiple topics.
6 The timestamp of the ConsumerRecord

Receiving a ConsumerRecord

If you prefer you can also receive the entire ConsumerRecord object being listened for. In this case you should specify appropriate generic types for the key and value of the ConsumerRecord so that Micronaut can pick the correct deserializer for each.

Consider the following example:

Specifying Parameters for offset, topic etc.
@Topic("awesome-products")
public void receive(ConsumerRecord<String, Product> record) { (1)
    Product product = record.value(); (2)
    String brand = record.key(); (3)
    System.out.println("Got Product - " + product.getName() + " by " + brand);
}
1 The method signature accepts a ConsumerRecord that specifies a String for the key type and a POJO (Product) for the value type.
2 The value() method is used to retrieve the value
3 The key() method is used to retrieve the key

Receiving and returning Reactive Types

In addition to common Java types and POJOs you can also define listener methods that receive a Reactive type such as a Single or a Reactor Mono. For example:

Using Reactive Types
@Topic("reactive-products")
public Single<Product> receive(
        @KafkaKey String brand,  (1)
        Single<Product> productFlowable) { (2)
    return productFlowable.doOnSuccess((product) ->
            System.out.println("Got Product - " + product.getName() + " by " + brand) (3)
    );
}
1 The @KafkaKey annotation is used to indicate the key
2 A Single is used to receive the message body
3 The doOnSuccess method is used to process the result

Note that in this case the method returns an Single this indicates to Micronaut that the poll loop should continue and if enable.auto.commit is set to true (the default) the offsets will be committed potentially before the doOnSuccess is called.

The idea here is that you are able to write consumers that don’t block, however care must be taken in the case where an error occurs in the doOnSuccess method otherwise the message could be lost. You could for example re-deliver the message in case of an error.

Alternatively, you can use the @Blocking annotation to tell Micronaut to subscribe to the returned reactive type in a blocking manner which will result in blocking the poll loop, preventing offsets from being committed automatically:

Blocking with Reactive Consumers
@Blocking
@Topic("reactive-products")
public Single<Product> receive(
    ...
}

5.2 Configuring @KafkaListener beans

@KafkaListener and Consumer Groups

Kafka consumers created with @KafkaListener will by default run within a consumer group that is the value of micronaut.application.name unless you explicitly specify a value to the @KafkaListener annotation. For example:

Specifying a Consumer Group
@KafkaListener("myGroup")

The above example will run the consumer within a consumer group called myGroup.

You can make the consumer group configurable using a placeholder: @KafkaListener("${my.consumer.group:myGroup}")

@KafkaListener and Consumer Properties

There are a number of ways to pass configuration properties to the KafkaConsumer. You can set default consumer properties using kafka.consumers.default in application.yml:

Applying Default Configuration
kafka:
    consumers:
        default:
            session:
                timeout:
                    ms: 30000

The above example will set the default session.timeout.ms that Kafka uses to decide whether a consumer is alive or not and applies it to all created KafkaConsumer instances.

You can also provide configuration specific to a consumer group. For example consider the following configuration:

Applying Consumer Group Specific config
kafka:
    consumers:
        myGroup:
            session:
                timeout:
                    ms: 30000

The above configuration will pass properties to only the @KafkaListener beans that apply to the consumer group myGroup.

Finally, the @KafkaListener annotation itself provides a properties member that you can use to set consumer specific properties:

Configuring Consumer Properties with @KafkaListener
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.context.annotation.Property;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;

@KafkaListener(
    groupId = "products",
    pollTimeout = "500ms",
    properties = @Property(name = ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, value = "10000")
)
public class ProductListener {
    ...
}

@KafkaListener and Deserializers

As mentioned previously when defining @KafkaListener methods, Micronaut will attempt to pick an appropriate deserializer for the method signature. This is done via the CompositeSerdeRegistry bean.

You can replace the default SerdeRegistry bean with your own implementation by defining a bean that uses @Replaces(CompositeSerdeRegistry.class). See the section on Bean Replacement.

All common java.lang types (String, Integer, primitives etc.) are supported and for POJOs by default a Jackson based JSON deserializer is used.

You can, however, explicitly override the Deserializer used by providing the appropriate configuration in application.yml:

Applying Default Configuration
kafka:
    consumers:
        myGroup:
            value:
                deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

You may want to do this if for example you choose an alternative deserialization format such as Avro or Protobuf.

5.3 Commiting Kafka Offsets

Automatically Committing Offsets

The way offsets are handled by a @KafkaListener bean is defined by the OffsetStrategy enum.

The following table summarizes the enum values and behaviour:

Table 1. Kafka Messaging Annotations
Value Description

AUTO

Automatically commit offsets. Sets enable.auto.commit to true

DISABLED

Disables automatically committing offsets. Sets enable.auto.commit to false

SYNC

Commits offsets manually at the end of each poll() loop if no exceptions occur. Sets enable.auto.commit to false

ASYNC

Asynchronously commits offsets manually at the end of each poll() loop if no exceptions occur. Sets enable.auto.commit to false

SYNC_PER_RECORD

Commits offsets manually after each ConsumerRecord is processed. Sets enable.auto.commit to false

ASYNC_PER_RECORD

Commits offsets asynchronously after each ConsumerRecord is processed. Sets enable.auto.commit to false

Depending on the your level of paranoia or durability requirements you can choose to tune how and when offsets are committed.

Manually Committing Offsets

If you set the OffsetStrategy to DISABLED it becomes your responsibility to commit offsets.

There are a couple of ways that can be achieved.

The simplest way is to define an argument of type Acknowledgement and call the ack() method to commit offsets synchronously:

Committing offsets with ack()
@KafkaListener(
    offsetReset = OffsetReset.EARLIEST,
    offsetStrategy = OffsetStrategy.DISABLED (1)
)
@Topic("awesome-products")
void receive(
        Product product,
        Acknowledgement acknowledgement) { (2)
    // process product record

    acknowledgement.ack(); (3)
}
1 Committing offsets automatically is disabled
2 The listener method specifies a parameter of type Acknowledgement
3 The ack() method is called once the record has been processed

Alternatively, you an supply a KafkaConsumer method argument and then call commitSync (or commitAsync) yourself when you are ready to commit offsets:

Committing offsets with the KafkaConsumer API
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.configuration.kafka.docs.consumer.config.Product;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import java.util.Collections;

@KafkaListener(
        offsetReset = OffsetReset.EARLIEST,
        offsetStrategy = OffsetStrategy.DISABLED (1)
)
@Topic("awesome-products")
void receive(
        Product product,
        long offset,
        int partition,
        String topic,
        Consumer kafkaConsumer) { (2)
    // process product record

    // commit offsets
    kafkaConsumer.commitSync(Collections.singletonMap( (3)
            new TopicPartition(topic, partition),
            new OffsetAndMetadata(offset + 1, "my metadata")
    ));

}
1 Committing offsets automatically is disabled
2 The listener method specifies that it receives the offset data and a KafkaConsumer
3 The commitSync() method is called once the record has been processed

Manually Assigning Offsets to a Consumer Bean

Sometimes you may wish to control exactly the position you wish to resume consuming messages from.

For example if you store offsets in a database you may wish to read the offsets from the database when the consumer starts and start reading from the position stored in the database.

To support this use case your consumer bean can implement the ConsumerRebalanceListener and KafkaConsumerAware interfaces:

Manually seeking offsets with the KafkaConsumer API
import io.micronaut.configuration.kafka.annotation.*;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;

import javax.annotation.Nonnull;
import java.util.Collection;

@KafkaListener
public class ProductListener implements ConsumerRebalanceListener, ConsumerAware {

    private Consumer consumer;

    @Override
    public void setKafkaConsumer(@Nonnull Consumer consumer) { (1)
        this.consumer = consumer;
    }

    @Topic("awesome-products")
    void receive(Product product) {
        // process product
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) { (2)
        // save offsets here
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) { (3)
        // seek to offset here
        for (TopicPartition partition : partitions) {
            consumer.seek(partition, 1);
        }
    }
}
1 The setKafkaConsumer of the KafkaConsumerAware allows access to the underlying producer
2 The onPartitionsRevoked can be used to save offsets
3 The onPartitionsAssigned can use used to read offsets and seek to a specific position. In this trivial example we just seek to the offset 1 (skipping the first record).

Manual Offsets with Multiple Topics

It is possible for a single @KafkaListener bean to represent multiple consumers. If you have more than one method annotated with @Topic then setKafkaConsumer will be called multiple times for each backing consumer.

It is recommended in the case of manually seeking offsets that you use a single listener bean per consumer, the alternative is to store an internal Set of all consumers associated with a particular listener and manually search for the correct listener in the onPartitionsAssigned using the partition assignment data.

Not doing so will lead to a ConcurrentModificationException error.

5.4 Kafka Batch Processing

By default @KafkaListener listener methods will receive each ConsumerRecord one by one.

There may be cases where you prefer to receive all of the ConsumerRecord data from the ConsumerRecords holder object in one go.

To achieve this you can set the batch member of the @KafkaListener to true and specify a container type (typically List) to receive all of the data:

Receiving a Batch of Records
import io.micronaut.configuration.kafka.annotation.*;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;

import java.util.Collections;
import java.util.List;

@KafkaListener(batch = true) (1)
public class BookListener {

  @Topic("all-the-books")
  public void receiveList(List<Book> books) { (2)
      for (Book book : books) {
          System.out.println("Got Book = " + book.getTitle()); (3)
      }
  }

}
1 The @KafkaListener annotation’s batch member is set to true
2 The method defines that it receives a list of Book instances
3 The method processes the entire batch

Note in the previous case offsets will automatically be committed for the whole batch by default when the method returns without error.

Manually Committing Offsets with Batch

You can also take more control of committing offsets when doing batch processing by specifying a method that receives the offsets in addition to the batch:

Committing Offsets Manually with Batch
@Topic("all-the-books")
public void receive(
        List<Book> books,
        List<Long> offsets,
        List<Integer> partitions,
        List<String> topics,
        Consumer kafkaConsumer) { (1)
    for (int i = 0; i < books.size(); i++) {

        // process the book
        Book book = books.get(i); (2)

        // commit offsets
        String topic = topics.get(i);
        int partition = partitions.get(i);
        long offset = offsets.get(i); (3)

        kafkaConsumer.commitSync(Collections.singletonMap( (4)
                new TopicPartition(topic, partition),
                new OffsetAndMetadata(offset + 1, "my metadata")
        ));

    }
}
1 The method receives the batch of records as well as the offsets, partitions and topics
2 Each record is processed
3 The offset, partition and topic is read for the record
4 Offsets are committed

This example is fairly trivial in that it commits offsets after processing each record in a batch, but you can for example commit after processing every 10, or every 100 or whatever makes sense for your application.

Reactive Batch Processing

Batch listeners also support defining reactive types (either Flowable or Reactor Flux) as the method argument.

In this case the method will be passed a reactive type that can be returned from the method allowing non-blocking processing of the batch:

Reactive Processing of Batch Records
@Topic("all-the-books")
public Flux<Book> receiveFlux(Flux<Book> books) {
    return books.doOnNext(book ->
            System.out.println("Got Book = " + book.getTitle())
    );
}

Remember that as with non batch processing, the reactive type will be subscribed to on a different thread and offsets will be committed automatically likely prior to the point when the reactive type is subscribed to.

This means that you should only use reactive processing if message durability is not a requirement and you may wish to implement message re-delivery upon failure.

5.5 Forwarding Messages with @SendTo

On any @KafkaListener method that returns a value, you can use the @SendTo annotation to forward the return value to the topic or topics specified by the @SendTo annotation.

The key of the original ConsumerRecord will be used as the key when forwarding the message.

Committing offsets with the KafkaConsumer API
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.configuration.kafka.docs.consumer.config.Product;
import io.micronaut.messaging.annotation.SendTo;
import io.reactivex.Single;
import io.reactivex.functions.Function;

@Topic("awesome-products") (1)
@SendTo("product-quantities") (2)
public int receive(
        @KafkaKey String brand,
        Product product) {
    System.out.println("Got Product - " + product.getName() + " by " + brand);

    return product.getQuantity(); (3)
}
1 The topic subscribed to is awesome-products
2 The topic to send the result to is product-quantities
3 The return value is used to indicate the value to forward

You can also do the same using Reactive programming:

Committing offsets with the KafkaConsumer API
@Topic("awesome-products") (1)
@SendTo("product-quantities") (2)
public Single<Integer> receiveProduct(
        @KafkaKey String brand,
        Single<Product> productSingle) {

    return productSingle.map(product -> {
        System.out.println("Got Product - " + product.getName() + " by " + brand);
        return product.getQuantity(); (3)
    });
}
1 The topic subscribed to is awesome-products
2 The topic to send the result to is product-quantities
3 The return is mapped from the single to the value of the quantity

In the reactive case the poll loop will continue and will not wait for the record to be sent unless you specifically annotate the method with @Blocking.

5.6 Handling Consumer Exceptions

When an exception occurs in a @KafkaListener method by default the exception is simply logged. This is handled by DefaultKafkaListenerExceptionHandler.

If you wish to replace this default exception handling with another implementation you can use the Micronaut’s Bean Replacement feature to define a bean that replaces it: @Replaces(DefaultKafkaListenerExceptionHandler.class).

You can also define per bean exception handling logic by implementation the KafkaListenerExceptionHandler interface in your @KafkaListener class.

The KafkaListenerExceptionHandler receives an exception of type KafkaListenerException which allows access to the original ConsumerRecord, if available.

6 Running Kafka Applications

You can run a Micronaut Kafka application with or without the presence of an HTTP server.

If you run your application without the http-server-netty dependency you will see output like the following on startup:

11:06:22.638 [main] INFO  io.micronaut.runtime.Micronaut - Startup completed in 402ms. Server Running: 4 active message listeners.

No port is exposed, but the Kafka consumers are active and running. The process registers a shutdown hook such that the KafkaConsumer instances are closed correctly when the server is shutdown.

6.1 Kafka Health Checks

In addition to http-server-netty, if the management dependency is added, then Micronaut’s Health Endpoint can be used to expose the health status of the Kafka consumer application.

For example if Kafka is not available the /health endpoint will return:

{
    "status": "DOWN",
    "details": {
        ...
        "kafka": {
            "status": "DOWN",
            "details": {
                "error": "java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment."
            }
        }
    }
}
By default, the details visible above are only shown to authenticated users. See the Health Endpoint documentation for how to configure that setting.

If you wish to disable the kafka health check while still using the management dependency you can set the property kafka.health.enabled to false in your application configuration.

kafka:
	health:
		enabled: false

6.2 Kafka Metrics

You can enable Kafka Metrics collection by enabling Micrometer Metrics.

If you do not wish to collect Kafka metrics, you can set micronaut.metrics.binders.kafka.enabled to false in application.yml.

6.3 Kafka Distributed Tracing

You can enable distributed tracing with either Jaeger or Zipkin by first configuring Micronaut Tracing, then adding the following dependency:

runtime 'io.opentracing.contrib:opentracing-kafka-client:0.0.16'
<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-client</artifactId>
    <version>0.0.16</version>
    <scope>runtime</scope>
</dependency>

7 Building Kafka Stream Applications

Using the CLI

If you are creating your project using the Micronaut CLI, supply the kafka-streams feature to include a simple Kafka Streams configuration in your project:

$ mn create-app my-app --features kafka-streams

Kafka Streams is a platform for building real time streaming applications.

When using Micronaut with Kafka Stream, your application gains all of the features from Micronaut (configuration management, AOP, DI, health checks etc.), simplifying the construction of Kafka Stream applications.

Since Micronaut’s DI and AOP is compile time, you can build low overhead stream applications with ease.

Defining Kafka Streams

To define Kafka Streams you should first add the kafka-streams configuration to your build.

compile 'io.micronaut.configuration:micronaut-kafka-streams'
<dependency>
    <groupId>io.micronaut.configuration</groupId>
    <artifactId>micronaut-kafka-streams</artifactId>
</dependency>

The minimum configuration required is to set the Kafka bootstrap servers:

Configuring Kafka
kafka:
    bootstrap:
        servers: localhost:9092

You should then define a @Factory for your streams that defines beans that return a KStream. For example to implement the Word Count example from the Kafka Streams documentation:

Kafka Streams Word Count
import io.micronaut.context.annotation.Factory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import javax.inject.Named;
import javax.inject.Singleton;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;

@Factory
public class WordCountStream {

    public static final String STREAM_WORD_COUNT = "word-count";
    public static final String INPUT = "streams-plaintext-input"; (1)
    public static final String OUTPUT = "streams-wordcount-output"; (2)
    public static final String WORD_COUNT_STORE = "word-count-store";


    @Singleton
    @Named(STREAM_WORD_COUNT)
    KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { (3)
        // set default serdes
        Properties props = builder.getConfiguration();
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KStream<String, String> source = builder
                .stream(INPUT);

        KTable<String, Long> groupedByWord = source
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
                //Store the result in a store for lookup later
                .count(Materialized.as(WORD_COUNT_STORE)); (4)

        groupedByWord
                //convert to stream
                .toStream()
                //send to output using specific serdes
                .to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));

        return source;
    }

}
1 The input topic
2 The output topic
3 An instance of ConfiguredStreamBuilder is injected that allows mutating the configuration
4 Materialize the count stream and save to a state store
With Kafka streams the key and value Serdes (serializer/deserializer) must be classes with a zero argument constructor. If you wish to use JSON (de)serialization you can subclass JsonSerde to define your Serdes

You can use the @KafkaClient annotation to send a sentence to be processed by the above stream:

Defining a Kafka Client
/*
 * Copyright 2017-2019 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.micronaut.configuration.kafka.streams;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;

@KafkaClient
public interface WordCountClient {

    @Topic(WordCountStream.INPUT)
    void publishSentence(String sentence);
}

You can also define a @KafkaListener to listen for the result of the word count stream:

Defining a Kafka Listener
/*
 * Copyright 2017-2019 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.micronaut.configuration.kafka.streams;

import io.micronaut.configuration.kafka.annotation.*;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@KafkaListener(offsetReset = OffsetReset.EARLIEST)
public class WordCountListener {

    private final Map<String, Long> wordCounts = new ConcurrentHashMap<>();

    @Topic(WordCountStream.OUTPUT)
    void count(@KafkaKey String word, long count) {
        System.out.println("word = " + word);
        wordCounts.put(word, count);
    }

    public long getCount(String word) {
        Long num = wordCounts.get(word);
        if (num != null) {
            return num;
        }
        return 0;
    }

    public Map<String, Long> getWordCounts() {
        return Collections.unmodifiableMap(wordCounts);
    }
}

Configuring Kafka Streams

You can define multiple Kafka streams each with their own unique configuration. To do this you should define the configuration with kafka.streams.[STREAM-NAME]. For example in application.yml:

Defining Per Stream Configuration
kafka:
    streams:
        my-stream:
            num:
                stream:
                    threads: 10

The above configuration sets the num.stream.threads setting of the Kafka StreamsConfig to 10 for a stream named my-stream.

You can then inject a ConfiguredStreamBuilder specfically for the above configuration using javax.inject.Named:

Kafka Streams Word Count
public static final String MY_STREAM = "my-stream";
public static final String NAMED_WORD_COUNT_INPUT = "named-word-count-input";
public static final String NAMED_WORD_COUNT_OUTPUT = "named-word-count-output";

@Singleton
@Named(MY_STREAM)
KStream<String, String> myStream(
        @Named(MY_STREAM) ConfiguredStreamBuilder builder) {

}

Interactive Query Service

When using streams you can set a state store for your stream using a store builder and telling the stream to store its data. In the above example for the Kafka Streams Word Count, the output is materialized to a named store that can later be retrieved via the Interactive Query Service. Apache Kafka docs available here.

You can inject the InteractiveQueryService and use the method getQueryableStore to get values from a state store.

Unresolved directive in <stdin> - include::kafka-streams/src/main/groovy/io/micronaut/configuration/kafka/streams/InteractiveQueryService.java[tags=getQueryableStore, indent=0]

An example service that wraps the InteractiveQueryService is included below. This is here to illustrate that when calling the getQueryableStore method you must provide the store name and preferably the type of key and value you are trying to retrieve.

Unresolved directive in <stdin> - include::{testkafkastreams}/InteractiveQueryServiceExample.java[]