mn create-app --features=kafka,reactor,graalvm,serialization-jackson example.micronaut.books --build=gradle --lang=groovy
Kafka and the Micronaut Framework - Event-Driven Applications
Use Kafka to communicate between your Micronaut applications.
Authors: Burt Beckwith
Micronaut Version: 3.9.2
1. Getting Started
In this guide, we will create a Micronaut application written in Groovy.
In this guide, we will create two microservices that will use Kafka to communicate with each other in an asynchronous and decoupled way.
2. What you will need
To complete this guide, you will need the following:
-
Some time on your hands
-
A decent text editor or IDE
-
JDK 1.8 or greater installed with
JAVA_HOME
configured appropriately -
Docker and Docker Compose installed if you will be running Kafka in Docker, and for running tests.
3. Solution
We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.
-
Download and unzip the source
4. Writing the application
Let’s describe the microservices you will build through the guide.
-
books
- It returns a list of books. It uses a domain consisting of a book name and ISBN. It also publishes a message in Kafka every time a book is accessed. -
analytics
- It connects to Kafka to update the analytics for every book (a counter). It also exposes an endpoint to get the analytics.
4.1. Books Microservice
Create the books
microservice using the Micronaut Command Line Interface or with Micronaut Launch.
If you don’t specify the --build argument, Gradle is used as the build tool. If you don’t specify the --lang argument, Java is used as the language.
|
If you use Micronaut Launch, select Micronaut Application as application type and add the kafka
, reactor
, graalvm
, and serialization-jackson
features.
The previous command creates a directory named books
and a Micronaut application inside it with default package example.micronaut
.
In addition to the dependencies added by the above features, we also need a test dependency for the Awaitility library:
testImplementation("org.awaitility:awaitility:4.2.0")
Create a Book
POJO:
package example.micronaut
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import io.micronaut.serde.annotation.Serdeable
@Canonical
@CompileStatic
@Serdeable
class Book {
String isbn
String name
}
To keep this guide simple there is no database persistence - BookService
keeps the list of books in memory:
package example.micronaut
import groovy.transform.CompileStatic
import javax.annotation.PostConstruct
import jakarta.inject.Singleton
@CompileStatic
@Singleton
class BookService {
private final List<Book> bookStore = []
@PostConstruct
void init() {
bookStore << new Book('1491950358', 'Building Microservices')
bookStore << new Book('1680502395', 'Release It!')
bookStore << new Book('0321601912', 'Continuous Delivery')
}
List<Book> listAll() {
bookStore
}
Optional<Book> findByIsbn(String isbn) {
Optional.ofNullable(bookStore.find { it.isbn == isbn })
}
}
Create a BookController
class to handle incoming HTTP requests to the books
microservice:
package example.micronaut
import groovy.transform.CompileStatic
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
@CompileStatic
@Controller('/books') (1)
class BookController {
private final BookService bookService
BookController(BookService bookService) { (2)
this.bookService = bookService
}
@Get (3)
List<Book> listAll() {
bookService.listAll()
}
@Get('/{isbn}') (4)
Optional<Book> findBook(String isbn) {
bookService.findByIsbn(isbn)
}
}
1 | The @Controller annotation defines the class as a controller mapped to the root URI /books |
2 | Use constructor injection to inject a bean of type BookService . |
3 | The @Get annotation maps the listAll method to an HTTP GET request on /books . |
4 | The @Get annotation maps the findBook method to an HTTP GET request on /books/{isbn} . |
4.2. Analytics Microservice
Create the analytics
microservice using the Micronaut Command Line Interface or with Micronaut Launch.
mn create-app --features=kafka,graalvm,serialization-jackson example.micronaut.analytics --build=gradle --lang=groovy
If you don’t specify the --build argument, Gradle is used as the build tool. If you don’t specify the --lang argument, Java is used as the language.
|
If you use Micronaut Launch, select Micronaut Application as application type and add the kafka
and graalvm
features.
Create a Book
POJO:
package example.micronaut
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import io.micronaut.serde.annotation.Serdeable
@Canonical
@CompileStatic
@Serdeable
class Book {
String isbn
String name
}
This Book POJO is the same as the one in the books microservice. In a real application this would be in a shared library but to keep things simple we’ll just duplicate it.
|
Create a BookAnalytics
POJO:
package example.micronaut
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import io.micronaut.serde.annotation.Serdeable
@Canonical
@CompileStatic
@Serdeable
class BookAnalytics {
String bookIsbn
long count
}
To keep this guide simple there is no database persistence - AnalyticsService
keeps book analytics in memory:
package example.micronaut
import groovy.transform.CompileStatic
import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap
@CompileStatic
@Singleton
class AnalyticsService {
private final Map<Book, Long> bookAnalytics = new ConcurrentHashMap<>() (1)
void updateBookAnalytics(Book book) { (2)
bookAnalytics.compute(book, (k, v) -> {
v == null ? 1L : v + 1
})
}
List<BookAnalytics> listAnalytics() { (3)
bookAnalytics.collect { e -> new BookAnalytics(e.key.isbn, e.value) }
}
}
1 | Keep the book analytics in memory |
2 | Initialize and update the analytics for the book passed as parameter |
3 | Return all the analytics |
Write a test for AnalyticsService
:
package example.micronaut
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import spock.lang.Specification
import jakarta.inject.Inject
@MicronautTest
class AnalyticsServiceSpec extends Specification {
@Inject
AnalyticsService analyticsService
void 'test update book analytics and get analytics'() {
given:
Book b1 = new Book('1491950358', 'Building Microservices')
Book b2 = new Book('1680502395', 'Release It!')
when:
analyticsService.updateBookAnalytics b1
analyticsService.updateBookAnalytics b1
analyticsService.updateBookAnalytics b1
analyticsService.updateBookAnalytics b2
List<BookAnalytics> analytics = analyticsService.listAnalytics()
then:
2 == analytics.size()
3 == findBookAnalytics(b1, analytics).count
1 == findBookAnalytics(b2, analytics).count
}
private BookAnalytics findBookAnalytics(Book b, List<BookAnalytics> analytics) {
BookAnalytics bookAnalytics = analytics.find { it.bookIsbn == b.isbn }
if (!bookAnalytics) {
throw new RuntimeException('Book not found')
}
bookAnalytics
}
}
Create a Controller to expose the analytics:
package example.micronaut
import groovy.transform.CompileStatic
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
@CompileStatic
@Controller('/analytics')
class AnalyticsController {
private final AnalyticsService analyticsService
AnalyticsController(AnalyticsService analyticsService) {
this.analyticsService = analyticsService
}
@Get (1)
List<BookAnalytics> listAnalytics() {
analyticsService.listAnalytics()
}
}
1 | Just expose the analytics |
The application doesn’t expose the method |
To run the tests:
./gradlew test
Modify the Application
class to use dev
as a default environment:
The Micronaut framework supports the concept of one or many default environments. A default environment is one that is only applied if no other environments are explicitly specified or deduced.
package example.micronaut
import groovy.transform.CompileStatic
import io.micronaut.runtime.Micronaut
import static io.micronaut.context.env.Environment.DEVELOPMENT
@CompileStatic
class Application {
static void main(String[] args) {
Micronaut.build(args)
.mainClass(Application)
.defaultEnvironments(DEVELOPMENT)
.start()
}
}
Create src/main/resources/application-dev.yml
. The Micronaut framework applies this configuration file only for the dev
environment.
micronaut:
server:
port: 8081 (1)
1 | Start the analytics microservice on port 8081 |
5. Running the application
Start the books
microservice:
./gradlew run
16:35:55.614 [main] INFO io.micronaut.runtime.Micronaut - Startup completed in 576ms. Server Running: http://localhost:8080
Start the analytics
microservice:
./gradlew run
16:35:55.614 [main] INFO io.micronaut.runtime.Micronaut - Startup completed in 623ms. Server Running: http://localhost:8081
You can use curl
to test the application:
curl http://localhost:8080/books
[{"isbn":"1491950358","name":"Building Microservices"},{"isbn":"1680502395","name":"Release It!"},{"isbn":"0321601912","name":"Continuous Delivery"}]
curl http://localhost:8080/books/1491950358
{"isbn":"1491950358","name":"Building Microservices"}
curl http://localhost:8081/analytics
[]
Note that getting the analytics returns an empty list because the applications are not communicating with each other (yet).
6. Test Resources
When the application is started locally — either under test or by running the application — resolution of the property kafka.bootstrap.servers
is detected and the Test Resources service will start a local Kafka docker container, and inject the properties required to use this as the broker.
When running under production, you should replace this property with the location of your production Kafka instance via an environment variable.
KAFKA_BOOTSTRAP_SERVERS=production-server:9092
For more information, see the Kafka section of the Test Resources documentation.
7. Kafka and the Micronaut Framework
7.1. Install Kafka
A fast way to start using Kafka is via Docker. Create this docker-compose.yml
file:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper
ports:
- 2181:2181 (1)
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka
depends_on:
- zookeeper
ports:
- 9092:9092 (2)
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
1 | Zookeeper uses port 2181 by default, but you can change the value if necessary. |
2 | Kafka uses port 9092 by default, but you can change the value if necessary. |
Start Zookeeper and Kafka (use CTRL-C to stop both):
docker-compose up
Alternatively you can install and run a local Kafka instance.
7.2. Books Microservice
The generated code will use the Test Resources plugin to start a local Kafka broker inside Docker, and configure the connection URL.
7.2.1. Create Kafka client (producer)
Let’s create an interface to send messages to Kafka. The Micronaut framework will implement the interface at compilation time:
package example.micronaut
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.Topic
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
@KafkaClient
interface AnalyticsClient {
@Topic('analytics') (1)
Mono<Book> updateAnalytics(Book book) (2)
}
1 | Set the topic name |
2 | Send the Book POJO. The Framework will automatically convert it to JSON before sending it |
7.2.2. Create Tests
We could use mocks to test the message sending logic between BookController
, AnalyticsFilter
, and AnalyticsClient
, but it’s more realistic to use a running Kafka broker this is why Test Resources are used to run Kafka inside a Docker container.
Write a test for BookController
to verify the interaction with AnalyticsService
:
package example.micronaut
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.core.type.Argument
import io.micronaut.http.HttpRequest
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.exceptions.HttpClientResponseException
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import spock.lang.Specification
import spock.util.concurrent.PollingConditions
import jakarta.inject.Inject
import java.util.concurrent.ConcurrentLinkedDeque
import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
@MicronautTest (1)
class BookControllerSpec extends Specification {
private static final Collection<Book> received = new ConcurrentLinkedDeque<>()
@Inject
AnalyticsListener analyticsListener (2)
@Inject
@Client('/')
HttpClient client (3)
void 'test message is published to Kafka when book found'() {
when:
String isbn = '1491950358'
Optional<Book> result = retrieveGet('/books/' + isbn) (4)
then:
result != null
result.present
isbn == result.get().isbn
new PollingConditions(timeout: 5).eventually { (5)
!received.isEmpty()
1 == received.size() (6)
}
when:
Book bookFromKafka = received[0]
then:
bookFromKafka
isbn == bookFromKafka.isbn
}
void 'test message is not published to Kafka when book not found'() {
when:
retrieveGet '/books/INVALID'
then:
thrown HttpClientResponseException
when:
sleep 5000 (7)
then:
0 == received.size()
}
void cleanup() {
received.clear()
}
@KafkaListener(offsetReset = EARLIEST)
static class AnalyticsListener {
@Topic('analytics')
void updateAnalytics(Book book) {
received << book
}
}
private Optional<Book> retrieveGet(String url) {
client.toBlocking().retrieve(HttpRequest.GET(url), Argument.of(Optional, Book))
}
}
1 | Classes that implement TestPropertyProvider must use this annotation to create a single class instance for all tests (not necessary in Spock tests). |
2 | Dependency injection for the AnalyticsListener class declared below, a Kafka listener class that replicates the functionality of the class of the same name in the analytics microservice |
3 | Dependency injection for an HTTP client that the Micronaut framework will implement at compile to make calls to BookController |
4 | Use the HttpClient to retrieve a Book , which will trigger sending a message with Kafka |
5 | Wait a few seconds for the message to arrive; it should happen very quickly, but the message will be sent on a separate thread |
6 | Verify that the message was received and has the correct data |
7 | Wait a few seconds to make sure no message is sent |
7.2.3. Send Analytics information automatically
Sending a message to Kafka is as simple as injecting AnalyticsClient
and calling the updateAnalytics
method. The goal is to do it automatically every time a book is returned, i.e., every time there is a call to http://localhost:8080/books/{isbn}
.
To achieve this we will create an Http Server Filter.
Create the AnalyticsFilter
class:
package example.micronaut
import io.micronaut.http.HttpRequest
import io.micronaut.http.MutableHttpResponse
import io.micronaut.http.annotation.Filter
import io.micronaut.http.filter.HttpServerFilter
import io.micronaut.http.filter.ServerFilterChain
import reactor.core.publisher.Flux
import org.reactivestreams.Publisher
@Filter('/books/?*') (1)
class AnalyticsFilter implements HttpServerFilter { (2)
private final AnalyticsClient analyticsClient
AnalyticsFilter(AnalyticsClient analyticsClient) { (3)
this.analyticsClient = analyticsClient
}
@Override
Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request,
ServerFilterChain chain) { (4)
return Flux
.from(chain.proceed(request)) (5)
.flatMap(response -> {
Book book = response.getBody(Book).orElse(null) (6)
if (book) {
Flux.from(analyticsClient.updateAnalytics(book)).map(b -> response) (7)
}
else {
Flux.just response
}
})
}
}
1 | Annotate the class with @Filter and define the Ant-style matcher pattern to intercept all calls to the desired URIs |
2 | The class must implement HttpServerFilter |
3 | Dependency injection for the Kafka AnalyticsClient |
4 | Implement the doFilter method |
5 | Execute the request; this will invoke the controller action |
6 | Get the response from the controller and return the body as a Book |
7 | If the book is found, use the Kafka client to send a message |
7.3. Analytics Microservice
7.3.1. Create Kafka consumer
Create a new class to act as a consumer of the messages sent to Kafka by the books
microservice. The Micronaut framework will implement logic to invoke the consumer at compile time. Create the AnalyticsListener
class:
package example.micronaut
import groovy.transform.CompileStatic
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.context.env.Environment
@CompileStatic
@Requires(notEnv = Environment.TEST) (1)
@KafkaListener (2)
class AnalyticsListener {
private final AnalyticsService analyticsService (3)
AnalyticsListener(AnalyticsService analyticsService) { (3)
this.analyticsService = analyticsService
}
@Topic('analytics') (4)
void updateAnalytics(Book book) {
analyticsService.updateBookAnalytics(book) (5)
}
}
1 | Do not load this bean for the test environment - this lets us run the tests without having Kafka running |
2 | Annotate the class with @KafkaListener to indicate that this bean will consume messages from Kafka |
3 | Constructor injection for AnalyticsService |
4 | Annotate the method with @Topic and specify the topic name to use |
5 | Call AnalyticsService to update the analytics for the book |
7.4. Running the application
Start the books
microservice:
./gradlew run
16:35:55.614 [main] INFO io.micronaut.runtime.Micronaut - Startup completed in 576ms. Server Running: http://localhost:8080
Execute a curl
request to get one book:
curl http://localhost:8080/books/1491950358
{"isbn":"1491950358","name":"Building Microservices"}
Start the analytics
microservice:
./gradlew run
16:35:55.614 [main] INFO io.micronaut.runtime.Micronaut - Startup completed in 623ms. Server Running: http://localhost:8081
The application will consume and process the message automatically after startup.
Now, use curl
to see the analytics:
curl http://localhost:8081/analytics
[{"bookIsbn":"1491950358","count":1}]
Update the curl
command to the books
microservice to retrieve other books and repeat the invocations, then re-run the curl
command to the analytics
microservice to see that the counts increase.
8. Next steps
Read more about Kafka support in Micronaut framework.
Read more about Test Resources in Micronaut.