$ mn create-app my-kafka-app --features kafka
Table of Contents
Micronaut Kafka
Integration between Micronaut and Kafka Messaging
Version: 1.1.1
1 Introduction
Apache Kafka is a distributed stream processing platform that can be used for a range of messaging requirements in addition to stream processing and real-time data handling.
Micronaut features dedicated support for defining both Kafka Producer and Consumer instances. Micronaut applications built with Kafka can be deployed with or without the presence of an HTTP server.
With Micronaut’s efficient compile-time AOP and cloud native features, writing efficient Kafka consumer applications that use very little resources is a breeze.
Release History
1.0.4
- 
Upgrade to Kafka 2.0.1 
1.1.0
- 
Upgrade to Kafka 2.1.1 
- 
Support for Open Tracing 
- 
Support for Pausing / Resuming Consumers via ConsumerRegistry interface 
2 Using the Micronaut CLI
To create a project with Kafka support using the Micronaut CLI, supply the kafka feature to the features flag.
This will create a project with the minimum necessary configuration for Kafka.
Kafka Profile
The Micronaut CLI includes a specialized profile for Kafka-based messaging applications. This profile will create a Micronaut app with Kafka support, and without an HTTP server (although you can add one if you desire). The profile also provides a couple commands for generating Kafka listeners and producers.
To create a project using the Kafka profile, use the profile flag:
$ mn create-app my-kafka-service --profile kafka
As you’d expect, you can start the application with ./gradlew run (for Gradle) or ./mvnw compile exec:exec (Maven). The application will (with the default config) attempt to connect to Kafka at http://localhost:9092, and will continue to run without starting up an HTTP server. All communication to/from the service will take place via Kafka producers and/or listeners.
Within the new project, you can now run the Kafka-specific code generation commands:
$ mn create-kafka-producer Message | Rendered template Producer.java to destination src/main/java/my/kafka/app/MessageProducer.java $ mn create-kafka-listener Message | Rendered template Listener.java to destination src/main/java/my/kafka/app/MessageListener.java
3 Kafka Quick Start
To add support for Kafka to an existing project, you should first add the Micronaut Kafka configuration to your build configuration. For example in Gradle:
compile 'io.micronaut.configuration:micronaut-kafka'<dependency>
    <groupId>io.micronaut.configuration</groupId>
    <artifactId>micronaut-kafka</artifactId>
</dependency>Configuring Kafka
The minimum requirement to configure Kafka is set the value of the kafka.bootstrap.servers property in application.yml:
kafka:
    bootstrap:
        servers: localhost:9092The value can also be list of available servers:
kafka:
    bootstrap:
        servers:
            - foo:9092
            - bar:9092| You can also set the environment variable KAFKA_BOOTSTRAP_SERVERSto a comma separated list of values to externalize configuration. | 
Creating a Kafka Producer with @KafkaClient
To create a Kafka Producer that sends messages you can simply define an interface that is annotated with @KafkaClient.
For example the following is a trivial @KafkaClient interface:
import io.micronaut.configuration.kafka.annotation.*;
@KafkaClient (1)
public interface ProductClient {
    @Topic("my-products") (2)
    void sendProduct(@KafkaKey String brand, String name); (3)
}| 1 | The @KafkaClient annotation is used to designate this interface as a client | 
| 2 | The @Topic annotation indicates which topics the ProducerRecordshould be published to | 
| 3 | The method defines two parameters: The parameter that is the Kafka key and the value. | 
| You can omit the key, however this will result in a nullkey which means Kafka will not know how to partition the record. | 
At compile time Micronaut will produce an implementation of the above interface. You can retrieve an instance of ProductClient either by looking up the bean from the ApplicationContext or by injecting the bean with @Inject:
ProductClient client = applicationContext.getBean(ProductClient.class);
client.sendProduct("Nike", "Blue Trainers");Note that since the sendProduct method returns void this means the method will send the ProducerRecord and block until the response is received. You can return a Future or Publisher to support non-blocking message delivery.
Creating a Kafka Consumer with @KafkaListener
To listen to Kafka messages you can use the @KafkaListener annotation to define a message listener.
The following example will listen for messages published by the ProductClient in the previous section:
import io.micronaut.configuration.kafka.annotation.*;
@KafkaListener(offsetReset = OffsetReset.EARLIEST) (1)
public class ProductListener {
    @Topic("my-products") (2)
    public void receive(@KafkaKey String brand, String name) { (3)
        System.out.println("Got Product - " + name + " by " + brand);
    }
}| 1 | The @KafkaListener is used with offsetResetset toEARLIESTwhich makes the listener start listening to messages from the beginning of the partition. | 
| 2 | The @Topic annotation is again used to indicate which topic(s) to subscribe to. | 
| 3 | The receivemethod defines 2 arguments: The argument that will receive the key and the argument that will receive the value. | 
4 Kafka Producers Using @KafkaClient
The example in the quick start presented a trivial definition of an interface that be implemented automatically for you using the @KafkaClient annotation.
The implementation that powers @KafkaClient (defined by the KafkaClientIntroductionAdvice class) is, however, very flexible and offers a range of options for defining Kafka clients.
4.1 Defining @KafkaClient Methods
Specifying the Key and the Value
The Kafka key can be specified by providing a parameter annotated with @KafkaKey. If no such parameter is specified the record is sent with a null key.
The value to send is resolved by selecting the argument annotated with @Body, otherwise the first argument with no specific binding annotation is used. For example:
@Topic("my-products")
void sendProduct(@KafkaKey String brand, String name);The method above will use the parameter brand as the key and the parameter name as the value.
Including Message Headers
There are a number of ways you can include message headers. One way is to annotate an argument with the @Header annotation and include a value when calling the method:
@Topic("my-products")
void sendProduct(
    @KafkaKey String brand,
    String name,
    @Header("My-Header") String myHeader);The example above will include the value of the myHeader argument as a header called My-Header.
Another way to include headers is at the type level with the values driven from configuration:
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.messaging.annotation.Header;
@KafkaClient(id="product-client")
@Header(name = "X-Token", value = "${my.application.token}")
public interface ProductClient {
    ...
}The above example will send a header called X-Token with the value read from the setting my.application.token in application.yml (or the environnment variable MY_APPLICATION_TOKEN).
If the my.application.token is not set then an error will occur creating the client.
Reactive and Non-Blocking Method Definitions
The @KafkaClient annotation supports the definition of reactive return types (such as Flowable or Reactor Flux) as well as Futures.
The following sections cover possible method signatures and behaviour:
Single Value and Return Type
Single<Book> sendBook(
    @KafkaKey String author,
    Single<Book> book
);Flowable Value and Return Type
Flowable<Book> sendBooks(
    @KafkaKey String author,
    Flowable<Book> book
);Flowable Value and Return Type
Flux<RecordMetadata> sendBooks(
    @KafkaKey String author,
    Flux<Book> book
);The implementation will return a Reactor Flux that when subscribed to will subscribe to the passed Flux and for each emitted item will send a ProducerRecord emitting the resulting Kafka RecordMetadata if successful or an error otherwise.
Available Annotations
There are a number of annotations available that allow you to specify how a method argument is treated.
The following table summarizes the annotations and their purpose, with an example:
| Annotation | Description | Example | 
|---|---|---|
| Allows explicitly indicating the body of the message to sent | 
 | |
| Allows specifying a parameter that should be sent as a header | 
 | |
| Allows specifying the parameter that is the Kafka key | 
 | 
For example, you can use the @Header annotation to bind a parameter value to a header in the ProducerRecord.
4.2 Configuring @KafkaClient beans
@KafkaClient and Producer Properties
There are a number of ways to pass configuration properties to the KafkaProducer. You can set default producer properties using kafka.producers.default in application.yml:
kafka:
    producers:
        default:
            retries: 5Any property in the ProducerConfig class can be set. The above example will set the default number of times to retry sending a record.
Per @KafkaClient Producer Properties
To configure different properties for each client, you should set a @KafkaClient id using the annotation:
@KafkaClient("product-client")This serves 2 purposes. Firstly it sets the value of the client.id setting used to build the KafkaProducer. Secondly, it allows you to apply per producer configuration in application.yml:
kafka:
    producers:
        product-client:
            retries: 5Finally, the @KafkaClient annotation itself provides a properties member that you can use to set producer specific properties:
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.context.annotation.Property;
import org.apache.kafka.clients.producer.ProducerConfig;
@KafkaClient(
    id="product-client",
    acks = KafkaClient.Acknowledge.ALL,
    properties = @Property(name = ProducerConfig.RETRIES_CONFIG, value = "5")
)
public interface ProductClient {
    ...
}@KafkaClient and Serializers
When serializing keys and values Micronaut will by default attempt to automatically pick a Serializer to use. This is done via the CompositeSerdeRegistry bean.
| You can replace the default SerdeRegistry bean with your own implementation by defining a bean that uses @Replaces(CompositeSerdeRegistry.class). See the section on Bean Replacement. | 
All common java.lang types (String, Integer, primitives etc.) are supported and for POJOs by default a Jackson based JSON serializer is used.
You can, however, explicitly override the Serializer used by providing the appropriate configuration in application.yml:
kafka:
    producers:
        product-client:
            value:
                serializer: org.apache.kafka.common.serialization.ByteArrayDeserializerYou may want to do this if for example you choose an alternative serialization format such as Avro or Protobuf.
4.3 Sending Records in Batch
By default if you define a method that takes a container type such as a List the list will be serialized using the specified value.serializer (the default will result in a JSON array).
For example the following two methods will both send serialized arrays:
@Topic("books")
void sendList(List<Book> books);
@Topic("books")
void sendBooks(Book...books);Instead of a sending a serialized array you may wish to instead send batches of ProducerRecord either synchronously or asynchronously.
To do this you can specify a value of true to the batch member of the @KafkaClient annotation:
ProducerRecord batches@KafkaClient(batch=true)
@Topic("books")
void send(List<Book> books);In the above case instead of sending a serialized array the client implementation will iterate over each item in the list and send a ProducerRecord for each. The previous example is blocking, however you can return a reactive type if desired:
ProducerRecord batches Reactively@KafkaClient(batch=true)
@Topic("books")
Flowable<RecordMetadata> send(List<Book> books);You can also use an unbound reactive type such as Flowable as the source of your batch data:
ProducerRecord batches from a Flowable@KafkaClient(batch=true)
@Topic("books")
Flowable<RecordMetadata> send(Flowable<Book> books);4.4 Injecting Kafka Producer Beans
If you need maximum flexibility and don’t want to use the @KafkaClient support you can use the @KafkaClient annotation as qualifier for dependency injection of KafkaProducer instances.
Consider the following example:
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import org.apache.kafka.clients.producer.*;
import javax.inject.Singleton;
import java.util.concurrent.Future;
@Singleton
public class BookSender {
    private final Producer<String, Book> kafkaProducer;
    public BookSender(
            @KafkaClient("book-producer") Producer<String, Book> kafkaProducer) { (1)
        this.kafkaProducer = kafkaProducer;
    }
    public Future<RecordMetadata> send(String author, Book book) {
        return kafkaProducer.send(new ProducerRecord<>("books", author, book)); (2)
    }
}| 1 | The KafkaProduceris dependency injected into the constructor. If not specified in configuration, the key and value serializer are inferred from the generic type arguments. | 
| 2 | The KafkaProduceris used to send records | 
Note that there is no need to call the close() method to shut down the KafkaProducer, it is fully managed by Micronaut and will be shutdown when the application shuts down.
The previous example can be tested in JUnit with the following test:
@Test
public void testBookSender() throws IOException {
    Map<String, Object> config = Collections.singletonMap( (1)
            AbstractKafkaConfiguration.EMBEDDED, true
    );
    try (ApplicationContext ctx = ApplicationContext.run(config)) {
        BookSender bookSender = ctx.getBean(BookSender.class); (2)
        Book book = new Book();
        book.setTitle("The Stand");
        bookSender.send("Stephen King", book);
    }
}| 1 | An embedded version of Kafka is used | 
| 2 | The BookSenderis retrieved from the ApplicationContext and aProducerRecordsent | 
By using the KafkaProducer API directly you open up even more options if you require transactions (exactly-once delivery) or want control over when records are flushed etc.
4.5 Embedding Kafka
The previous section introduced the ability to embed Kafka for your tests. This is possible in Micronaut by specifying the kafka.embedded.enabled setting to true and adding the following dependencies to your test classpath:
testCompile 'org.apache.kafka:kafka-clients:2.1.1:test'
testCompile 'org.apache.kafka:kafka_2.12:2.1.1'
testCompile 'org.apache.kafka:kafka_2.12:2.1.1:test'or
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.1.1</version>
  <classifier>test</classifier>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.12</artifactId>
  <version>2.1.1</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.12</artifactId>
  <version>2.1.1</version>
  <classifier>test</classifier>
</dependency>Note that because of the distributed nature of Kafka it is relatively slow to startup so it is generally better to do the initialization with @BeforeClass (or setupSpec in Spock) and have a large number of test methods rather than many test classes otherwise your test execution performance will suffer.
5 Kafka Consumers Using @KafkaListener
The quick start section presented a trivial example of what is possible with the @KafkaListener annotation.
Using the @KafkaListener annotation Micronaut will build a KafkaConsumer and start the poll loop by running the KafkaConsumer in a special consumer thread pool. You can configure the size of the thread pool based on the number of consumers in your application in application.yml as desired:
consumer thread poolmicronaut:
    executors:
        consumer:
            type: fixed
            nThreads: 25KafkaConsumer instances are single threaded, hence for each @KafkaListener method you define a new thread is created to execute the poll loop.
You may wish to scale the number of consumers you have listening on a particular topic. There are several ways you may achieve this. You could for example run multiple instances of your application each containing a single consumer in each JVM.
Alternatively, you can also scale via threads. By setting the number of threads a particular consumer bean will create:
@KafkaListener(groupId="myGroup", threads=10)The above example will create 10 KafkaConsumer instances, each running in a unique thread and participating in the myGroup consumer group.
| @KafkaListener beans are by default singleton. When using multiple threads you must either synchronize access to local state or declare the bean as @Prototype. | 
By default Micronaut will inspect the method signature of the method annotated with @Topic that will listen for ConsumerRecord instances and from the types infer an appropriate key and value Deserializer.
5.1 Defining @KafkaListener Methods
The @KafkaListener annotation examples up until now have been relatively trivial, but Micronaut offers a lot of flexibility when it comes to the types of method signatures you can define.
The following sections detail examples of supported use cases.
Specifying Topics
The @Topic annotation can be used at the method or the class level to specify which topics to be listened for.
Care needs to be taken when using @Topic at the class level because every public method of the class annotated with @KafkaListener will become a Kafka consumer, which may be undesirable.
You can specify multiple topics to listen for:
@Topic("fun-products", "awesome-products")You can also specify one or many regular expressions to listen for:
@Topic(patterns="products-\\w+")Available Annotations
There are a number of annotations available that allow you to specify how a method argument is bound.
The following table summarizes the annotations and their purpose, with an example:
| Annotation | Description | Example | 
|---|---|---|
| Allows explicitly indicating the body of the message | 
 | |
| Allows binding a parameter to a message header | 
 | |
| Allows specifying the parameter that is the key | 
 | 
For example, you can use the @Header annotation to bind a parameter value from a header contained within a ConsumerRecord.
Topics, Partitions and Offsets
If you want a reference to the topic, partition or offset it is a simple matter of defining a parameter for each.
The following table summarizes example parameters and how they related to the ConsumerRecord being processed:
| Parameter | Description | 
|---|---|
| 
 | The name of the topic | 
| 
 | The offset of the  | 
| 
 | The partition of the  | 
| 
 | The timestamp of the  | 
As an example, following listener method will receive all of the above mentioned parameters:
@Topic("awesome-products")
public void receive(
        @KafkaKey String brand, (1)
        Product product, (2)
        long offset, (3)
        int partition, (4)
        String topic, (5)
        long timestamp) { (6)
    System.out.println("Got Product - " + product.getName() + " by " + brand);
}| 1 | The Kafka key | 
| 2 | The message body | 
| 3 | The offset of the ConsumerRecord | 
| 4 | The partition of the ConsumerRecord | 
| 5 | The topic. Note that the @Topicannotation supports multiple topics. | 
| 6 | The timestamp of the ConsumerRecord | 
Receiving a ConsumerRecord
If you prefer you can also receive the entire ConsumerRecord object being listened for. In this case you should specify appropriate generic types for the key and value of the ConsumerRecord so that Micronaut can pick the correct deserializer for each.
Consider the following example:
@Topic("awesome-products")
public void receive(ConsumerRecord<String, Product> record) { (1)
    Product product = record.value(); (2)
    String brand = record.key(); (3)
    System.out.println("Got Product - " + product.getName() + " by " + brand);
}| 1 | The method signature accepts a ConsumerRecordthat specifies aStringfor the key type and a POJO (Product) for the value type. | 
| 2 | The value()method is used to retrieve the value | 
| 3 | The key()method is used to retrieve the key | 
Receiving and returning Reactive Types
In addition to common Java types and POJOs you can also define listener methods that receive a Reactive type such as a Single or a Reactor Mono. For example:
@Topic("reactive-products")
public Single<Product> receive(
        @KafkaKey String brand,  (1)
        Single<Product> productFlowable) { (2)
    return productFlowable.doOnSuccess((product) ->
            System.out.println("Got Product - " + product.getName() + " by " + brand) (3)
    );
}| 1 | The @KafkaKeyannotation is used to indicate the key | 
| 2 | A Single is used to receive the message body | 
| 3 | The doOnSuccessmethod is used to process the result | 
Note that in this case the method returns an Single this indicates to Micronaut that the poll loop should continue and if enable.auto.commit is set to true (the default) the offsets will be committed potentially before the doOnSuccess is called.
The idea here is that you are able to write consumers that don’t block, however care must be taken in the case where an error occurs in the doOnSuccess method otherwise the message could be lost. You could for example re-deliver the message in case of an error.
Alternatively, you can use the @Blocking annotation to tell Micronaut to subscribe to the returned reactive type in a blocking manner which will result in blocking the poll loop, preventing offsets from being committed automatically:
@Blocking
@Topic("reactive-products")
public Single<Product> receive(
    ...
}5.2 Configuring @KafkaListener beans
@KafkaListener and Consumer Groups
Kafka consumers created with @KafkaListener will by default run within a consumer group that is the value of micronaut.application.name unless you explicitly specify a value to the @KafkaListener annotation. For example:
@KafkaListener("myGroup")The above example will run the consumer within a consumer group called myGroup.
| You can make the consumer group configurable using a placeholder: @KafkaListener("${my.consumer.group:myGroup}") | 
@KafkaListener and Consumer Properties
There are a number of ways to pass configuration properties to the KafkaConsumer. You can set default consumer properties using kafka.consumers.default in application.yml:
kafka:
    consumers:
        default:
            session:
                timeout:
                    ms: 30000The above example will set the default session.timeout.ms that Kafka uses to decide whether a consumer is alive or not and applies it to all created KafkaConsumer instances.
You can also provide configuration specific to a consumer group. For example consider the following configuration:
kafka:
    consumers:
        myGroup:
            session:
                timeout:
                    ms: 30000The above configuration will pass properties to only the @KafkaListener beans that apply to the consumer group myGroup.
Finally, the @KafkaListener annotation itself provides a properties member that you can use to set consumer specific properties:
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.context.annotation.Property;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@KafkaListener(
    groupId = "products",
    pollTimeout = "500ms",
    properties = @Property(name = ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, value = "10000")
)
public class ProductListener {
    ...
}@KafkaListener and Deserializers
As mentioned previously when defining @KafkaListener methods, Micronaut will attempt to pick an appropriate deserializer for the method signature. This is done via the CompositeSerdeRegistry bean.
| You can replace the default SerdeRegistry bean with your own implementation by defining a bean that uses @Replaces(CompositeSerdeRegistry.class). See the section on Bean Replacement. | 
All common java.lang types (String, Integer, primitives etc.) are supported and for POJOs by default a Jackson based JSON deserializer is used.
You can, however, explicitly override the Deserializer used by providing the appropriate configuration in application.yml:
kafka:
    consumers:
        myGroup:
            value:
                deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializerYou may want to do this if for example you choose an alternative deserialization format such as Avro or Protobuf.
5.3 Commiting Kafka Offsets
Automatically Committing Offsets
The way offsets are handled by a @KafkaListener bean is defined by the OffsetStrategy enum.
The following table summarizes the enum values and behaviour:
| Value | Description | 
|---|---|
| Automatically commit offsets. Sets  | |
| Disables automatically committing offsets. Sets  | |
| Commits offsets manually at the end of each  | |
| Asynchronously commits offsets manually at the end of each  | |
| Commits offsets manually after each  | |
| Commits offsets asynchronously after each  | 
Depending on the your level of paranoia or durability requirements you can choose to tune how and when offsets are committed.
Manually Committing Offsets
If you set the OffsetStrategy to DISABLED it becomes your responsibility to commit offsets.
There are a couple of ways that can be achieved.
The simplest way is to define an argument of type Acknowledgement and call the ack() method to commit offsets synchronously:
ack()@KafkaListener(
    offsetReset = OffsetReset.EARLIEST,
    offsetStrategy = OffsetStrategy.DISABLED (1)
)
@Topic("awesome-products")
void receive(
        Product product,
        Acknowledgement acknowledgement) { (2)
    // process product record
    acknowledgement.ack(); (3)
}| 1 | Committing offsets automatically is disabled | 
| 2 | The listener method specifies a parameter of type Acknowledgement | 
| 3 | The ack()method is called once the record has been processed | 
Alternatively, you an supply a KafkaConsumer method argument and then call commitSync (or commitAsync) yourself when you are ready to commit offsets:
KafkaConsumer APIimport io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.configuration.kafka.docs.consumer.config.Product;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import java.util.Collections;
@KafkaListener(
        offsetReset = OffsetReset.EARLIEST,
        offsetStrategy = OffsetStrategy.DISABLED (1)
)
@Topic("awesome-products")
void receive(
        Product product,
        long offset,
        int partition,
        String topic,
        Consumer kafkaConsumer) { (2)
    // process product record
    // commit offsets
    kafkaConsumer.commitSync(Collections.singletonMap( (3)
            new TopicPartition(topic, partition),
            new OffsetAndMetadata(offset + 1, "my metadata")
    ));
}| 1 | Committing offsets automatically is disabled | 
| 2 | The listener method specifies that it receives the offset data and a KafkaConsumer | 
| 3 | The commitSync()method is called once the record has been processed | 
Manually Assigning Offsets to a Consumer Bean
Sometimes you may wish to control exactly the position you wish to resume consuming messages from.
For example if you store offsets in a database you may wish to read the offsets from the database when the consumer starts and start reading from the position stored in the database.
To support this use case your consumer bean can implement the ConsumerRebalanceListener and KafkaConsumerAware interfaces:
KafkaConsumer APIimport io.micronaut.configuration.kafka.annotation.*;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import javax.annotation.Nonnull;
import java.util.Collection;
@KafkaListener
public class ProductListener implements ConsumerRebalanceListener, ConsumerAware {
    private Consumer consumer;
    @Override
    public void setKafkaConsumer(@Nonnull Consumer consumer) { (1)
        this.consumer = consumer;
    }
    @Topic("awesome-products")
    void receive(Product product) {
        // process product
    }
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) { (2)
        // save offsets here
    }
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) { (3)
        // seek to offset here
        for (TopicPartition partition : partitions) {
            consumer.seek(partition, 1);
        }
    }
}| 1 | The setKafkaConsumerof the KafkaConsumerAware allows access to the underlying producer | 
| 2 | The onPartitionsRevokedcan be used to save offsets | 
| 3 | The onPartitionsAssignedcan use used to read offsets and seek to a specific position. In this trivial example we just seek to the offset 1 (skipping the first record). | 
Manual Offsets with Multiple Topics
It is possible for a single @KafkaListener bean to represent multiple consumers. If you have more than one method annotated with @Topic then setKafkaConsumer will be called multiple times for each backing consumer.
It is recommended in the case of manually seeking offsets that you use a single listener bean per consumer, the alternative is to store an internal Set of all consumers associated with a particular listener and manually search for the correct listener in the onPartitionsAssigned using the partition assignment data.
| Not doing so will lead to a ConcurrentModificationExceptionerror. | 
5.4 Kafka Batch Processing
By default @KafkaListener listener methods will receive each ConsumerRecord one by one.
There may be cases where you prefer to receive all of the ConsumerRecord data from the ConsumerRecords holder object in one go.
To achieve this you can set the batch member of the @KafkaListener to true and specify a container type (typically List) to receive all of the data:
import io.micronaut.configuration.kafka.annotation.*;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import java.util.Collections;
import java.util.List;
@KafkaListener(batch = true) (1)
public class BookListener {
  @Topic("all-the-books")
  public void receiveList(List<Book> books) { (2)
      for (Book book : books) {
          System.out.println("Got Book = " + book.getTitle()); (3)
      }
  }
}| 1 | The @KafkaListener annotation’s batchmember is set totrue | 
| 2 | The method defines that it receives a list of Bookinstances | 
| 3 | The method processes the entire batch | 
Note in the previous case offsets will automatically be committed for the whole batch by default when the method returns without error.
Manually Committing Offsets with Batch
You can also take more control of committing offsets when doing batch processing by specifying a method that receives the offsets in addition to the batch:
@Topic("all-the-books")
public void receive(
        List<Book> books,
        List<Long> offsets,
        List<Integer> partitions,
        List<String> topics,
        Consumer kafkaConsumer) { (1)
    for (int i = 0; i < books.size(); i++) {
        // process the book
        Book book = books.get(i); (2)
        // commit offsets
        String topic = topics.get(i);
        int partition = partitions.get(i);
        long offset = offsets.get(i); (3)
        kafkaConsumer.commitSync(Collections.singletonMap( (4)
                new TopicPartition(topic, partition),
                new OffsetAndMetadata(offset + 1, "my metadata")
        ));
    }
}| 1 | The method receives the batch of records as well as the offsets, partitions and topics | 
| 2 | Each record is processed | 
| 3 | The offset, partition and topic is read for the record | 
| 4 | Offsets are committed | 
This example is fairly trivial in that it commits offsets after processing each record in a batch, but you can for example commit after processing every 10, or every 100 or whatever makes sense for your application.
Reactive Batch Processing
Batch listeners also support defining reactive types (either Flowable or Reactor Flux) as the method argument.
In this case the method will be passed a reactive type that can be returned from the method allowing non-blocking processing of the batch:
@Topic("all-the-books")
public Flux<Book> receiveFlux(Flux<Book> books) {
    return books.doOnNext(book ->
            System.out.println("Got Book = " + book.getTitle())
    );
}Remember that as with non batch processing, the reactive type will be subscribed to on a different thread and offsets will be committed automatically likely prior to the point when the reactive type is subscribed to.
This means that you should only use reactive processing if message durability is not a requirement and you may wish to implement message re-delivery upon failure.
5.5 Forwarding Messages with @SendTo
On any @KafkaListener method that returns a value, you can use the @SendTo annotation to forward the return value to the topic or topics specified by the @SendTo annotation.
The key of the original ConsumerRecord will be used as the key when forwarding the message.
KafkaConsumer APIimport io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.configuration.kafka.docs.consumer.config.Product;
import io.micronaut.messaging.annotation.SendTo;
import io.reactivex.Single;
import io.reactivex.functions.Function;
@Topic("awesome-products") (1)
@SendTo("product-quantities") (2)
public int receive(
        @KafkaKey String brand,
        Product product) {
    System.out.println("Got Product - " + product.getName() + " by " + brand);
    return product.getQuantity(); (3)
}| 1 | The topic subscribed to is awesome-products | 
| 2 | The topic to send the result to is product-quantities | 
| 3 | The return value is used to indicate the value to forward | 
You can also do the same using Reactive programming:
KafkaConsumer API@Topic("awesome-products") (1)
@SendTo("product-quantities") (2)
public Single<Integer> receiveProduct(
        @KafkaKey String brand,
        Single<Product> productSingle) {
    return productSingle.map(product -> {
        System.out.println("Got Product - " + product.getName() + " by " + brand);
        return product.getQuantity(); (3)
    });
}| 1 | The topic subscribed to is awesome-products | 
| 2 | The topic to send the result to is product-quantities | 
| 3 | The return is mapped from the single to the value of the quantity | 
In the reactive case the poll loop will continue and will not wait for the record to be sent unless you specifically annotate the method with @Blocking.
5.6 Handling Consumer Exceptions
When an exception occurs in a @KafkaListener method by default the exception is simply logged. This is handled by DefaultKafkaListenerExceptionHandler.
If you wish to replace this default exception handling with another implementation you can use the Micronaut’s Bean Replacement feature to define a bean that replaces it: @Replaces(DefaultKafkaListenerExceptionHandler.class).
You can also define per bean exception handling logic by implementation the KafkaListenerExceptionHandler interface in your @KafkaListener class.
The KafkaListenerExceptionHandler receives an exception of type KafkaListenerException which allows access to the original ConsumerRecord, if available.
6 Running Kafka Applications
You can run a Micronaut Kafka application with or without the presence of an HTTP server.
If you run your application without the http-server-netty dependency you will see output like the following on startup:
11:06:22.638 [main] INFO  io.micronaut.runtime.Micronaut - Startup completed in 402ms. Server Running: 4 active message listeners.No port is exposed, but the Kafka consumers are active and running. The process registers a shutdown hook such that the KafkaConsumer instances are closed correctly when the server is shutdown.
6.1 Kafka Health Checks
In addition to http-server-netty, if the management dependency is added, then Micronaut’s Health Endpoint can be used to expose the health status of the Kafka consumer application.
For example if Kafka is not available the /health endpoint will return:
{
    "status": "DOWN",
    "details": {
        ...
        "kafka": {
            "status": "DOWN",
            "details": {
                "error": "java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment."
            }
        }
    }
}| By default, the details visible above are only shown to authenticated users. See the Health Endpoint documentation for how to configure that setting. | 
If you wish to disable the kafka health check while still using the management dependency you can set the property kafka.health.enabled to false in your application configuration.
kafka:
	health:
		enabled: false6.2 Kafka Metrics
You can enable Kafka Metrics collection by enabling Micrometer Metrics.
If you do not wish to collect metrics you can set micronaut.metrics.kafka.enabled to false in application.yml.
6.3 Kafka Distributed Tracing
You can enable distributed tracing with either Jaeger or Zipkin by first configuring Micronaut Tracing, then adding the following dependency:
runtime 'io.opentracing.contrib:opentracing-kafka-client:0.0.16'<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-client</artifactId>
    <version>0.0.16</version>
    <scope>runtime</scope>
</dependency>7 Building Kafka Stream Applications
| Using the CLI If you are creating your project using the Micronaut CLI, supply the  $ mn create-app my-app --features kafka-streams | 
Kafka Streams is a platform for building real time streaming applications.
When using Micronaut with Kafka Stream, your application gains all of the features from Micronaut (configuration management, AOP, DI, health checks etc.), simplifying the construction of Kafka Stream applications.
Since Micronaut’s DI and AOP is compile time, you can build low overhead stream applications with ease.
Defining Kafka Streams
To define Kafka Streams you should first add the kafka-streams configuration to your build.
compile 'io.micronaut.configuration:micronaut-kafka-streams'<dependency>
    <groupId>io.micronaut.configuration</groupId>
    <artifactId>micronaut-kafka-streams</artifactId>
</dependency>The minimum configuration required is to set the Kafka bootstrap servers:
kafka:
    bootstrap:
        servers: localhost:9092You should then define a @Factory for your streams that defines beans that return a KStream. For example to implement the Word Count example from the Kafka Streams documentation:
import io.micronaut.context.annotation.Factory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import javax.inject.Named;
import javax.inject.Singleton;
import java.util.*;
@Factory
public class WordCountStream {
    public static final String INPUT = "streams-plaintext-input"; (1)
    public static final String OUTPUT = "streams-wordcount-output"; (2)
    @Singleton
    @Named("wordCount")
    KStream<String, String> wordCountStream(ConfiguredStreamBuilder builder) { (3)
        // set default serdes
        Properties props = builder.getConfiguration();
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        KStream<String, String> source = builder.stream(INPUT);
        KTable<String, Long> counts = source
                .flatMapValues( value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
                .groupBy((key, value) -> value)
                .count();
        // need to override value serde to Long type
        counts.toStream().to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));
        return source;
    }
}| 1 | The input topic | 
| 2 | The output topic | 
| 3 | An instance of ConfiguredStreamBuilder is injected that allows mutating the configuration | 
| With Kafka streams the key and value Serdes(serializer/deserializer) must be classes with a zero argument constructor. If you wish to use JSON (de)serialization you can subclass JsonSerde to define yourSerdes | 
You can use the @KafkaClient annotation to send a sentence to be processed by the above stream:
/*
 * Copyright 2017-2019 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.micronaut.configuration.kafka.streams;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;
@KafkaClient
public interface WordCountClient {
    @Topic(WordCountStream.INPUT)
    void publishSentence(String sentence);
}You can also define a @KafkaListener to listen for the result of the word count stream:
/*
 * Copyright 2017-2019 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.micronaut.configuration.kafka.streams;
import io.micronaut.configuration.kafka.annotation.*;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
public class WordCountListener {
    private final Map<String, Long> wordCounts = new ConcurrentHashMap<>();
    @Topic(WordCountStream.OUTPUT)
    void count(@KafkaKey String word, long count) {
        System.out.println("word = " + word);
        wordCounts.put(word, count);
    }
    public long getCount(String word) {
        Long num = wordCounts.get(word);
        if (num != null) {
            return num;
        }
        return 0;
    }
    public Map<String, Long> getWordCounts() {
        return Collections.unmodifiableMap(wordCounts);
    }
}Configuring Kafka Streams
You can define multiple Kafka streams each with their own unique configuration. To do this you should define the configuration with kafka.streams.[STREAM-NAME]. For example in application.yml:
kafka:
    streams:
        my-stream:
            num:
                stream:
                    threads: 10The above configuration sets the num.stream.threads setting of the Kafka StreamsConfig to 10 for a stream named my-stream.
You can then inject a ConfiguredStreamBuilder specfically for the above configuration using javax.inject.Named:
@Singleton
@Named("myStream")
KStream<String, String> myStream(
        @Named("my-stream") ConfiguredStreamBuilder builder) {
}