Micronaut Nats

Integration between Micronaut and nats.io

Version:

1 Introduction

This project includes integration between Micronaut and nats.io. The standard Java Client is used to do the actual publishing and consuming.

2 Release History

For this project, you can find a list of releases (with release notes) here:

3 Using the Micronaut CLI

To create a project with NATS support using the Micronaut CLI, supply the nats feature to the features flag.

$ mn create-app my-nats-app --features nats

This will create a project with the minimum necessary configuration for NATS.

Messaging Application

The Micronaut CLI can generate messaging applications. This will create a Micronaut app with NATS support, and without an HTTP server (although you can add one if you desire). The profile also provides a couple commands for generating NATS consumers and producers.

To create a NATS messaging application, use the the following command:

$ mn create-messaging-app my-nats-service --features nats

As you’d expect, you can start the application with ./gradlew run (for Gradle) or ./mvnw compile exec:exec (Maven). The application will (with the default config) attempt to connect to NATS at nats://localhost:4222, and will continue to run without starting up an HTTP server. All communication to/from the service will take place via NATS producers and/or consumers.

Within the new project, you can now run the NATS specific code generation commands:

$ mn create-nats-producer Message
| Rendered template Producer.java to destination src/main/java/my/nats/app/MessageProducer.java

$ mn create-nats-listener Message
| Rendered template Listener.java to destination src/main/java/my/nats/app/MessageListener.java

4 NATS Quick Start

To add support for NATS.io to an existing project, you should first add the Micronaut NATS configuration to your build configuration. For example:

implementation("io.micronaut.nats:micronaut-nats")
<dependency>
    <groupId>io.micronaut.nats</groupId>
    <artifactId>micronaut-nats</artifactId>
</dependency>

Creating a NATS Producer with @NatsClient

To create a NATS Producer that sends messages you can simply define an interface that is annotated with @NatsClient.

For example the following is a trivial @NatsClient interface:

import io.micronaut.nats.annotation.Subject;
import io.micronaut.nats.annotation.NatsClient;

@NatsClient // (1)
public interface ProductClient {

    @Subject("product") // (2)
    void send(byte[] data); // (3)
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject

@NatsClient // (1)
interface ProductClient {

    @Subject("product") // (2)
    void send(byte[] data) // (3)
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject

@NatsClient // (1)
interface ProductClient {

    @Subject("product") // (2)
    fun send(data: ByteArray) // (3)
}
1 The @NatsClient annotation is used to designate this interface as a client
2 The @Subject annotation indicates which subject the Message should be published to
3 It is also possible for the subject to be dynamic by making it a method argument

At compile time Micronaut will produce an implementation of the above interface. You can retrieve an instance of ProductClient either by looking up the bean from the ApplicationContext or by injecting the bean with @Inject:

Creating a NATS Consumer with @NatsListener

To listen to NATS messages you can use the @NatsListener annotation to define a message listener.

The following example will listen for messages published by the ProductClient in the previous section:

1 The @NatsListener is used to designate this class as a listener.
2 The @Subject annotation is again used to indicate which subject to subscribe to.
3 The receive method defines one argument, which will receive the value.

5 Configuring the connection

All properties on the Options are available to be modified, either through configuration or a BeanCreatedEventListener.

The properties that can be converted from the string values in a configuration file can be configured directly.

🔗
Table 1. Configuration Properties for SingleNatsConnectionFactoryConfig
Property Type Description

nats.addresses

java.util.List

The list of addresses

nats.username

java.lang.String

the username

nats.password

java.lang.String

the password

nats.token

java.lang.String

the token

nats.max-reconnect

int

times to try reconnect

nats.reconnect-wait

java.time.Duration

time to wait

nats.connection-timeout

java.time.Duration

maximumTime for inital connection

nats.ping-interval

java.time.Duration

time between server pings

nats.reconnect-buffer-size

long

size of the buffer, in bytes, used to store publish messages during reconnect

nats.inbox-prefix

java.lang.String

custom prefix for request/reply inboxes

nats.no-echo

boolean

enable or disable echo messages, messages that are sent by this connection back to this connection

nats.utf8-support

boolean

whether or not the client should support for UTF8 subject names

nats.credentials

java.lang.String

path to the credentials file to use for authentication with an account enabled server

🔗
Table 2. Configuration Properties for ClusterNatsConnectionFactoryConfig
Property Type Description

nats.servers.*.addresses

java.util.List

The list of addresses

nats.servers.*.username

java.lang.String

the username

nats.servers.*.password

java.lang.String

the password

nats.servers.*.token

java.lang.String

the token

nats.servers.*.max-reconnect

int

times to try reconnect

nats.servers.*.reconnect-wait

java.time.Duration

time to wait

nats.servers.*.connection-timeout

java.time.Duration

maximumTime for inital connection

nats.servers.*.ping-interval

java.time.Duration

time between server pings

nats.servers.*.reconnect-buffer-size

long

size of the buffer, in bytes, used to store publish messages during reconnect

nats.servers.*.inbox-prefix

java.lang.String

custom prefix for request/reply inboxes

nats.servers.*.no-echo

boolean

enable or disable echo messages, messages that are sent by this connection back to this connection

nats.servers.*.utf8-support

boolean

whether or not the client should support for UTF8 subject names

nats.servers.*.credentials

java.lang.String

path to the credentials file to use for authentication with an account enabled server

Without any configuration the defaults in the Options will be used.
It is also possible to disable the integration entirely with nats.enabled: false

Connections

It is possible to configure multiple connections to the same server, different servers, or a single connection to one of a list of servers.

nats:
    server1:
      addresses:
        - "nats://localhost:4222"
      username: guest
      password: guest
    server2:
      addresses:
        - "nats://randomServer:4222"
      username: guest
      password: guest

NATS also supports a fail over connection strategy where the first server that connects successfully will be used among a list of servers. To use this option in Micronaut, simply supply a list of host:port addresses.

nats:
    addresses:
      - "nats://localhost:4222"
      - "nats://randomServer:4222"
    username: guest
    password: guest
When the configuration option nats.servers is used, no other options underneath nats are read; for example nats.username.

If you need to setup TLS, it can be configured this way:

nats:
    addresses:
      - "nats://localhost:4222" (1)
    tls:
      trust-store-path:  /path/to/client.truststore.jks (2)
      trust-store-password: secret
      certificate-path: /path/to/certificate.crt (3)
1 You can either use nats://localhost:4222 or tls://localhost:4222 as protocol.
2 You can configure a complete truststore
3 Or ou can use a single certificate for connecting to NATS securely.

6 NATS Producers

The example in the quick start presented a trivial definition of an interface that be implemented automatically for you using the @NatsClient annotation.

The implementation that powers @NatsClient (defined by the NatsIntroductionAdvice class) is, however, very flexible and offers a range of options for defining NATS clients.

6.1 Defining @NatsClient Methods

All methods that publish messages to NATS must meet the following conditions:

  • The method must reside in an interface annotated with @NatsClient.

  • The method or a method parameter must be annotated with @Subject.

  • The method must contain an argument representing the body of the message.

If a body argument cannot be found, an exception will be thrown.
In order for all of the functionality to work as designed in this guide your classes must be compiled with the parameters flag set to true. If your application was created with the Micronaut CLI, then that has already been configured for you.
Unless a reactive type is returned from the publishing method, the action is blocking.

6.1.1 Publishing Parameters

All options are available to be set for publishing messages. The publish method is used by the NatsIntroductionAdvice to publish messages and all arguments can be set through annotations or method arguments.

6.1.1.1 Subject

If you need to specify the subject of the message, apply the @Subject annotation to the method or an argument of the method. Apply the annotation to the method itself if the value is static for every execution. Apply the annotation to an argument of the method if the value should be set per execution.

import io.micronaut.nats.annotation.NatsClient;
import io.micronaut.nats.annotation.Subject;

@NatsClient
public interface ProductClient {

    @Subject("product") // (1)
    void send(byte[] data);

    void send(@Subject String subject, byte[] data); // (2)
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject

@NatsClient
interface ProductClient {

    @Subject("product") // (1)
    void send(byte[] data)

    void send(@Subject String subject, byte[] data) // (2)
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject

@NatsClient
interface ProductClient {

    @Subject("product") // (1)
    fun send(data: ByteArray)

    fun send(@Subject subject: String, data:ByteArray) // (2)
}
1 The subject is static
2 The subject must be set per execution

Producer Connection

If multiple Nats servers have been configured, the name of the server can be set in the @Subject annotation to designate which connection should be used to publish messages.

import io.micronaut.nats.annotation.NatsClient;
import io.micronaut.nats.annotation.Subject;

@NatsClient
public interface ProductClient {

    @Subject(value = "product", connection = "product-cluster") // (1)
    void send(byte[] data);
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject

@NatsClient // (1)
interface ProductClient {

    @Subject(value = "product", connection = "product-cluster") // (2)
    void send(byte[] data) // (3)
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject

@NatsClient
interface ProductClient {

    @Subject(value = "product", connection = "product-cluster") // (1)
    fun send(data: ByteArray)

}
1 The connection is set on the subject annotation.
The connection option is also available to be set on the @NatsClient annotation.

Queues

The NATS server will route the message to the queue and select a message receiver.

6.1.1.2 Headers

Headers can be set on the message with the @MessageHeader annotation applied to the method or an argument of the method. Apply the annotation to the method itself if the value is static for every execution. Apply the annotation to an argument of the method if the value should be set per execution.

import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.annotation.MessageHeader;
import io.micronaut.nats.annotation.NatsClient;
import io.micronaut.nats.annotation.Subject;
import io.nats.client.impl.Headers;

@NatsClient
@MessageHeader(name = "x-product-sealed", value = "true") // (1)
@MessageHeader(name = "productSize", value = "large")
public interface ProductClient {

    @Subject("product")
    @MessageHeader(name = "x-product-count", value = "10") // (2)
    @MessageHeader(name = "productSize", value = "small")
    void send(byte[] data);

    @Subject("product")
    void send(@MessageHeader String productSize, // (3)
              @MessageHeader("x-product-count") Long count,
              byte[] data);

    @Subject("products")
    @MessageHeader(name = "x-product-count", value = "20")
    void send(@MessageBody byte[] data, @MessageHeader List<String> productSizes);// (4)

    @Subject("productHeader")
    void send(@MessageBody byte[] data, Headers headers);// (5)

}
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject
import io.nats.client.impl.Headers

@NatsClient
@MessageHeader(name = "x-product-sealed", value = "true") // (1)
@MessageHeader(name = "productSize", value = "large")
interface ProductClient {


    @Subject("product")
    @MessageHeader(name = "x-product-count", value = "10") // (2)
    @MessageHeader(name = "productSize", value = "small")
    void send(byte[] data)

    @Subject("product")
    void send(@MessageHeader String productSize, // (3)
              @MessageHeader("x-product-count") Long count,
              byte[] data)

    @Subject("products")
    @MessageHeader(name = "x-product-count", value = "20")
    void send(@MessageBody byte[] data, @MessageHeader List<String> productSizes) // (4)

    @Subject("productHeader")
    void send(@MessageBody byte[] data, Headers headers) // (5)
}
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.messaging.annotation.MessageHeaders
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject
import io.nats.client.impl.Headers

@NatsClient
@MessageHeaders(
    MessageHeader(name = "x-product-sealed", value = "true"), // (1)
    MessageHeader(name = "productSize", value = "large")
)
interface ProductClient {

    @Subject("product")
    @MessageHeaders(
        MessageHeader(name = "x-product-count", value = "10"), // (2)
        MessageHeader(name = "productSize", value = "small")
    )
    fun send(data: ByteArray)

    @Subject("product")
    fun send(@MessageHeader productSize: String?, // (3)
             @MessageHeader("x-product-count") count: Long,
             data: ByteArray)

    @Subject("products")
    @MessageHeader(name = "x-product-count", value = "20")
    fun send(@MessageBody data:ByteArray, @MessageHeader productSizes: List<String>) // (4)

    @Subject("productHeader")
    fun send(@MessageBody data: ByteArray, headers: Headers) // (5)

}
1 Headers can be defined at the class level and will apply to all methods. If a header is defined on the method with the same name as one on the class, the value on the method will be used.
2 Multiple annotations can be used to set multiple headers on the method or class level.
3 Headers can be set per execution. The name is inferred from the argument if the annotation value is not set. Null values will be ignored.
4 You can also use a List as header.
5 A Headers argument can be used to pass custom headers.

6.1.1.3 Message Body

Most examples up to this point have been using a byte[] as the body type for simplicity. This library supports most standard Java types and JSON serialization (using Jackson) by default. The functionality is extensible and it is possible to add support for additional types and serialization strategies. See the section on Message Serialization/Deserialization for more information.

7 NATS Consumers

The quick start section presented a trivial example of what is possible with the @NatsListener annotation.

The implementation that powers @NatsListener (defined by the NatsConsumerAdvice class) is, however, very flexible and offers a range of options for consuming NATS message.

7.1 Defining @NatsListener Methods

All methods that consume messages from NATS must meet the following conditions:

  • The method must reside in a class annotated with @NatsListener.

  • The method must be annotated with @Subject.

In order for all of the functionality to work as designed in this guide your classes must be compiled with the parameters flag set to true. If your application was created with the Micronaut CLI, then that has already been configured for you.

7.1.1 Consumer Parameters

The createDispatcher method is used by the NatsConsumerAdvice to consume messages. Some of the options can be directly configured through annotations.

In order for the consumer method to be invoked, all arguments must be satisfied. To allow execution of the method with a null value, the argument must be declared as nullable. If the arguments cannot be satisfied, the message will be rejected.

7.1.1.1 Subject

A @Subject annotation is required for a method to be a consumer of messages from Nats. Simply apply the annotation to the method and supply the name of the subject you would like to listen to.

import io.micronaut.nats.annotation.NatsListener;
import io.micronaut.nats.annotation.Subject;

@NatsListener
public class ProductListener {

    List<Integer> messageLengths = Collections.synchronizedList(new ArrayList<>());

    @Subject("product") // (1)
    public void receive(byte[] data) {
        messageLengths.add(data.length);
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject

import java.util.concurrent.CopyOnWriteArrayList

@NatsListener
class ProductListener {

    CopyOnWriteArrayList<Integer> messageLengths = []

    @Subject("product") // (1)
    void receive(byte[] data) {
        messageLengths << data.length
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import java.util.Collections

@NatsListener
class ProductListener {

    val messageLengths: MutableList<Int> = Collections.synchronizedList(ArrayList())

    @Subject("product") // (1)
    fun receive(data: ByteArray) {
        messageLengths.add(data.size)
    }
}
1 The subject annotation is set per method. Multiple methods may be defined with different subjects in the same class.

Queue Support

Subscribers may specify queue groups at subscription time. When a message is published to the group, NATS will deliver it to a one-and-only-one subscriber.

import io.micronaut.nats.annotation.NatsListener;
import io.micronaut.nats.annotation.Subject;

@NatsListener
public class ProductListener {

    List<String> messageLengths = Collections.synchronizedList(new ArrayList<>());

    @Subject(value = "product", queue = "product-queue") // (1)
    public void receiveByQueue1(byte[] data) {
        messageLengths.add(new String(data));
        System.out.println("Java received " + data.length + " bytes from Nats");
    }

    @Subject(value = "product", queue = "product-queue")
    public void receiveByQueue2(byte[] data) {
        messageLengths.add(new String(data));
        System.out.println("Java received " + data.length + " bytes from Nats");
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject

import java.util.concurrent.CopyOnWriteArrayList

@NatsListener
class ProductListener {

    CopyOnWriteArrayList<String> messageLengths = []

    @Subject(value = "product", queue = "product-queue") // (1)
    void receiveByQueue1(byte[] data) {
        messageLengths << new String(data)
    }

    @Subject(value = "product", queue = "product-queue")
    public void receiveByQueue2(byte[] data) {
        messageLengths << new String(data)
    }
}
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import io.nats.client.impl.Headers
import java.util.Collections

@NatsListener
class ProductListener {

    var messageLengths: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Subject(value = "product", queue = "product-queue") // (1)
    fun receiveByQueue1(data: ByteArray) {
        messageLengths.add(String(data))
    }

    @Subject(value = "product", queue = "product-queue")
    fun receiveByQueue2(data: ByteArray) {
        messageLengths.add(String(data))
    }

}
1 Defining a queue can be done in the @Subject
Queue groups do not persist messages. If no listeners are available, the message is discarded.

Other Options

If multiple Nats servers have been configured, the name of the server can be set in the @Subject annotation to designate which connection should be used to listen for messages.

import io.micronaut.nats.annotation.NatsListener;
import io.micronaut.nats.annotation.Subject;

@NatsListener
public class ProductListener {

    List<String> messageLengths = Collections.synchronizedList(new ArrayList<>());

    @Subject(value = "product", connection = "product-cluster") // (1)
    public void receive(byte[] data) {
        messageLengths.add(new String(data));
        System.out.println("Java received " + data.length + " bytes from Nats");
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject

@NatsListener
class ProductListener {

    List<String> messageLengths = Collections.synchronizedList([])

    @Subject(value = "product", connection = "product-cluster") // (1)
    void receive(byte[] data) {
        messageLengths << new String(data)
    }
}
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import io.nats.client.impl.Headers
import java.util.Collections

@NatsListener
class ProductListener {

    var messageLengths: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Subject(value = "product", connection = "product-cluster") // (1)
    fun receive(data: ByteArray) {
        messageLengths.add(String(data))
    }

}
1 The connection is set on the subject annotation.
The connection option is also available to be set on the @NatsListener annotation.

7.1.1.2 Headers

Headers can be retrieved with the @MessageHeader annotation applied to the arguments of the method.

import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.annotation.MessageHeader;
import io.micronaut.nats.annotation.NatsListener;
import io.micronaut.nats.annotation.Subject;
import io.nats.client.impl.Headers;

@NatsListener
public class ProductListener {

    Set<String> messageProperties = Collections.synchronizedSet(new HashSet<>());

    @Subject("product")
    public void receive(byte[] data,
            @MessageHeader("x-product-sealed") Boolean sealed, // (1)
            @MessageHeader("x-product-count") Long count, // (2)
            @Nullable @MessageHeader String productSize) { // (3)
        messageProperties.add(sealed + "|" + count + "|" + productSize);
    }

    @Subject("products")
    public void receive(@MessageBody byte[] data, @MessageHeader("x-product-sealed") Boolean sealed,
            @MessageHeader("x-product-count") Long count, @MessageHeader List<String> productSizes) { // (4)
        for (String productSize : productSizes) {
            messageProperties.add(sealed + "|" + count + "|" + productSize);
        }
    }

    @Subject("productHeader")
    public void receive(@MessageBody byte[] data, Headers headers) { // (5)
        String productSize = headers.get("productSize").get(0);
        messageProperties.add(
                headers.get("x-product-sealed").get(0) + "|" +
                        headers.get("x-product-count").get(0) + "|" +
                        productSize);
    }
}
import io.micronaut.core.annotation.Nullable
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import io.nats.client.impl.Headers

import java.util.concurrent.CopyOnWriteArrayList

@NatsListener
class ProductListener {

    CopyOnWriteArrayList<String> messageProperties = []

    @Subject("product")
    void receive(byte[] data,
                 @MessageHeader("x-product-sealed") Boolean sealed, // (1)
                 @MessageHeader("x-product-count") Long count, // (2)
                 @Nullable @MessageHeader String productSize) { // (3)
        messageProperties << sealed.toString() + "|" + count + "|" + productSize
    }

    @Subject("products")
    void receive(@MessageBody byte[] data, @MessageHeader("x-product-sealed") Boolean sealed,
                 @MessageHeader("x-product-count") Long count, @MessageHeader List<String> productSizes) { // (4)
        productSizes.forEach {
            messageProperties << sealed.toString() + "|" + count + "|" + it
        }
    }

    @Subject("productHeader")
    void receive(@MessageBody byte[] data, Headers headers) { // (5)
        String productSize = headers.get("productSize").get(0)
        messageProperties << headers.get("x-product-sealed").get(0) + "|" +
                        headers.get("x-product-count").get(0) + "|" +
                        productSize
    }
}
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import io.nats.client.impl.Headers
import java.util.Collections

@NatsListener
class ProductListener {

    var messageProperties: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Subject("product")
    fun receive(data: ByteArray,
                @MessageHeader("x-product-sealed") sealed: Boolean, // (1)
                @MessageHeader("x-product-count") count: Long, // (2)
                @MessageHeader productSize: String?) { // (3)
        messageProperties.add(sealed.toString() + "|" + count + "|" + productSize)
    }

    @Subject("products")
    fun receive(@MessageBody data: ByteArray,
                @MessageHeader("x-product-sealed") sealed: Boolean,
                @MessageHeader("x-product-count") count: Long,
                @MessageHeader productSizes: List<String>?) { // (4)
        productSizes?.forEach {
            messageProperties.add("${sealed}|${count}|${it}")
        }
    }

    @Subject("productHeader")
    fun receive(@MessageBody data: ByteArray, headers: Headers) { // (5)
        messageProperties.add("${headers["x-product-sealed"][0]}|${headers["x-product-count"][0]}|${headers["productSize"][0]}")
    }
}
1 The header name comes from the annotation and the value is retrieved and converted to a Boolean.
2 The header name comes from the annotation and the value is retrieved and converted to a Long.
3 The header name comes from the argument name. This argument allows null values.
4 The header can also be a list representing multiple values.
5 All headers can be bound to a Headers argument.

7.1.1.3 Nats Types

Arguments can also be bound based on their type. Several types are supported by default and each type has a corresponding NatsTypeArgumentBinder. The argument binders are covered in detail in the section on Custom Parameter Binding.

There is only type that is supported for retrieving data about the Message.

import io.micronaut.nats.annotation.NatsListener;
import io.micronaut.nats.annotation.Subject;
import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.Subscription;
import io.nats.client.impl.Headers;

@NatsListener
public class ProductListener {

    List<String> messages = Collections.synchronizedList(new ArrayList<>());

    @Subject("product")
    public void receive(byte[] data,
            Message message,
            Connection connection,
            Subscription subscription,
            Headers headers) { // (1)
        messages.add(String.format("subject: [%s], maxPayload: [%s], pendingMessageCount: [%s], x-productCount: [%s]",
                message.getSubject(),
                connection.getMaxPayload(), subscription.getPendingMessageCount(),
                headers.get("x-product-count").get(0)));
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject

import io.nats.client.Connection
import io.nats.client.Message
import io.nats.client.Subscription
import io.nats.client.impl.Headers

import java.util.concurrent.CopyOnWriteArrayList

@NatsListener
class ProductListener {

    CopyOnWriteArrayList<String> messages = []

    @Subject("product")
    void receive(byte[] data,
                 Message message,
                 Connection connection,
                 Subscription subscription,
                 Headers headers) { // (1)
        def count = headers.get("x-product-count").get(0)
        messages << "subject: [$message.subject], maxPayload: [$connection.maxPayload], pendingMessageCount: [$subscription.pendingMessageCount], x-productCount: [$count]".toString()

    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import io.nats.client.Connection
import io.nats.client.Message
import io.nats.client.Subscription
import io.nats.client.impl.Headers
import java.util.Collections


@NatsListener
class ProductListener {

    var messages: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Subject(value = "product")
    fun receive(data: ByteArray,
                message: Message,
                connection: Connection,
                subscription: Subscription,
                headers: Headers) { // (1)
        messages.add("subject: [${message.subject}], maxPayload: [${connection.maxPayload}], pendingMessageCount: [${subscription.pendingMessageCount}], x-productCount: [${headers["x-product-count"][0]}]")
    }

}
1 The arguments are bound from the Message.

7.1.1.4 Message Body

Most examples up to this point have been using a byte[] as the body type for simplicity. This library supports most standard Java types and JSON deserialization (using Jackson) by default. The functionality is extensible and it is possible to add support for additional types and deserialization strategies. See the section on Message Serialization/Deserialization for more information.

7.1.1.5 Custom Parameter Binding

Default Binding Functionality

Consumer argument binding is achieved through an ArgumentBinderRegistry that is specific for binding consumers from Nats messages. The class responsible for this is the NatsBinderRegistry.

The registry supports argument binders that are used based on an annotation applied to the argument or the argument type. All argument binders must implement either NatsAnnotatedArgumentBinder or NatsTypeArgumentBinder. The exception to that rule is the NatsDefaultBinder which is used when no other binders support a given argument.

When an argument needs bound, the Message is used as the source of all of the available data. The binder registry follows a small sequence of steps to attempt to find a binder that supports the argument.

  1. Search the annotation based binders for one that matches any annotation on the argument that is annotated with @Bindable.

  2. Search the type based binders for one that matches or is a subclass of the argument type.

  3. Return the default binder.

The default binder binds the body of the message to the argument.

Custom Binding

To inject your own argument binding behavior, it is as simple as registering a bean. The existing binder registry will inject it and include it in the normal processing.

Annotation Binding

A custom annotation can be created to bind consumer arguments. A custom binder can then be created to use that annotation and the Message to supply a value for the argument. The value may in fact come from anywhere, however for the purposes of this documentation, the replyTo in the message is used.

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import io.micronaut.core.bind.annotation.Bindable;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
@Bindable // (1)
public @interface SID {
}
import io.micronaut.core.bind.annotation.Bindable

import java.lang.annotation.Documented
import java.lang.annotation.ElementType
import java.lang.annotation.Retention
import java.lang.annotation.RetentionPolicy
import java.lang.annotation.Target

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target([ElementType.PARAMETER])
@Bindable // (1)
@interface SID {
}
import io.micronaut.core.bind.annotation.Bindable

@MustBeDocumented
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.VALUE_PARAMETER)
@Bindable // (1)
annotation class SID
1 The @Bindable annotation is required for the annotation to be considered for binding.
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.nats.bind.NatsAnnotatedArgumentBinder;
import io.nats.client.Message;
import jakarta.inject.Singleton;

@Singleton // (1)
public class SIDAnnotationBinder implements NatsAnnotatedArgumentBinder<SID> { // (2)

    private final ConversionService<?> conversionService;

    public SIDAnnotationBinder(ConversionService<?> conversionService) { // (3)
        this.conversionService = conversionService;
    }

    @Override
    public Class<SID> getAnnotationType() {
        return SID.class;
    }

    @Override
    public BindingResult<Object> bind(ArgumentConversionContext<Object> context, Message source) {
        String sid = source.getSID(); // (4)
        return () -> conversionService.convert(sid, context); // (5)
    }
}
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import io.micronaut.nats.bind.NatsAnnotatedArgumentBinder
import io.nats.client.Message

import jakarta.inject.Singleton

@Singleton // (1)
class SIDAnnotationBinder implements NatsAnnotatedArgumentBinder<SID> { // (2)

    private final ConversionService conversionService

    SIDAnnotationBinder(ConversionService conversionService) { // (3)
        this.conversionService = conversionService
    }

    @Override
    Class<SID> getAnnotationType() {
        SID
    }

    @Override
    BindingResult<Object> bind(ArgumentConversionContext<Object> context, Message source) {
        String sid = source.getSID() // (4)
        return { -> conversionService.convert(sid, context) } // (5)
    }
}
import io.micronaut.core.bind.ArgumentBinder
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import io.micronaut.nats.bind.NatsAnnotatedArgumentBinder
import io.nats.client.Message
import jakarta.inject.Singleton

@Singleton // (1)
class SIDAnnotationBinder(private val conversionService: ConversionService<*>) // (3)
    : NatsAnnotatedArgumentBinder<SID> { // (2)

    override fun getAnnotationType(): Class<SID> {
        return SID::class.java
    }

    override fun bind(context: ArgumentConversionContext<Any>, source: Message): ArgumentBinder.BindingResult<Any> {
        val sid = source.sid // (4)
        return ArgumentBinder.BindingResult { conversionService.convert(sid, context) } // (5)
    }
}
1 The class is made a bean by annotating with @Singleton.
2 The custom annotation is used as the generic type for the interface.
3 The conversion service is injected into the instance.
4 The replyTo is retrieved from the message state.
5 The replyTo is converted to the argument type.

The annotation can now be used on the argument in a consumer method.

import io.micronaut.nats.annotation.NatsListener;
import io.micronaut.nats.annotation.Subject;

@NatsListener
public class ProductListener {

    List<String> messages = Collections.synchronizedList(new ArrayList<>());

    @Subject("product")
    public void receive(byte[] data, @SID String sid) { // (1)
        messages.add(sid);
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import io.micronaut.nats.docs.consumer.custom.type.ProductInfo

import java.util.concurrent.CopyOnWriteArrayList

@NatsListener
class ProductListener {

    CopyOnWriteArrayList<ProductInfo> messages = []

    @Subject("product")
    void receive(byte[] data, @SID String sid) {
        messages << sid
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import io.micronaut.nats.docs.consumer.custom.type.ProductInfo
import java.util.Collections

@NatsListener
class ProductListener {

    var messages: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Subject("product")
    fun receive(data: ByteArray, @SID sid: String)  {// (1)
        messages.add(sid)
    }

}

Type Binding

A custom binder can be created to support any argument type. For example the following class could be created to bind values from the headers. This functionality could allow the work of retrieving and converting the headers to occur in a single place instead of multiple times in your code.

import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;

public class ProductInfo {

    private String size;
    private Long count;
    private Boolean sealed;

    public ProductInfo(@Nullable String size, // (1)
                       @NonNull Long count, // (2)
                       @NonNull Boolean sealed) { // (3)
        this.size = size;
        this.count = count;
        this.sealed = sealed;
    }

    public String getSize() {
        return size;
    }

    public Long getCount() {
        return count;
    }

    public Boolean getSealed() {
        return sealed;
    }
}
import io.micronaut.core.annotation.NonNull
import io.micronaut.core.annotation.Nullable

class ProductInfo {

    private String size
    private Long count
    private Boolean sealed

    ProductInfo(@Nullable String size, // (1)
                @NonNull Long count, // (2)
                @NonNull Boolean sealed) { // (3)
        this.size = size
        this.count = count
        this.sealed = sealed
    }

    String getSize() {
        size
    }

    Long getCount() {
        count
    }

    Boolean getSealed() {
        sealed
    }
}
class ProductInfo(val size: String?, // (1)
                  val count: Long, // (2)
                  val sealed: Boolean)// (3)
1 The size argument is not required
2 The count argument is required
3 The sealed argument is required

A type argument binder can then be created to create the ProductInfo instance to bind to your consumer method argument.

import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionError;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.nats.bind.NatsHeaderConvertibleValues;
import io.micronaut.nats.bind.NatsTypeArgumentBinder;
import io.nats.client.Message;
import io.nats.client.impl.Headers;
import jakarta.inject.Singleton;

@Singleton // (1)
public class ProductInfoTypeBinder implements NatsTypeArgumentBinder<ProductInfo> { //(2)

    private final ConversionService<?> conversionService;

    ProductInfoTypeBinder(ConversionService<?> conversionService) { //(3)
        this.conversionService = conversionService;
    }

    @Override
    public Argument<ProductInfo> argumentType() {
        return Argument.of(ProductInfo.class);
    }

    @Override
    public BindingResult<ProductInfo> bind(ArgumentConversionContext<ProductInfo> context, Message source) {
        Headers rawHeaders = source.getHeaders(); //(4)

        if (rawHeaders == null) {
            return BindingResult.EMPTY;
        }

        NatsHeaderConvertibleValues headers = new NatsHeaderConvertibleValues(rawHeaders, conversionService);

        String size = headers.get("productSize", String.class).orElse(null);  //(5)
        Optional<Long> count = headers.get("x-product-count", Long.class); //(6)
        Optional<Boolean> sealed = headers.get("x-product-sealed", Boolean.class); // (7)

        if (headers.getConversionErrors().isEmpty() && count.isPresent() && sealed.isPresent()) {
            return () -> Optional.of(new ProductInfo(size, count.get(), sealed.get())); //(8)
        } else {
            return new BindingResult<ProductInfo>() {
                @Override
                public Optional<ProductInfo> getValue() {
                    return Optional.empty();
                }

                @Override
                public List<ConversionError> getConversionErrors() {
                    return headers.getConversionErrors(); //(9)
                }
            };
        }
    }
}
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionError
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.nats.bind.NatsHeaderConvertibleValues
import io.micronaut.nats.bind.NatsTypeArgumentBinder
import io.nats.client.Message
import io.nats.client.impl.Headers

import jakarta.inject.Singleton

@Singleton // (1)
class ProductInfoTypeBinder implements NatsTypeArgumentBinder<ProductInfo> { //(2)

    private final ConversionService conversionService

    ProductInfoTypeBinder(ConversionService conversionService) { //(3)
        this.conversionService = conversionService
    }

    @Override
    Argument<ProductInfo> argumentType() {
        return Argument.of(ProductInfo)
    }

    @Override
    BindingResult<ProductInfo> bind(ArgumentConversionContext<ProductInfo> context, Message source) {
        Headers rawHeaders = source.headers //(4)

        if (rawHeaders == null) {
            return BindingResult.EMPTY
        }

        def headers = new NatsHeaderConvertibleValues(rawHeaders, conversionService)

        String size = headers.get("productSize", String).orElse(null)  //(5)
        Optional<Long> count = headers.get("x-product-count", Long) //(6)
        Optional<Boolean> sealed = headers.get("x-product-sealed", Boolean) // (7)

        if (headers.conversionErrors.isEmpty() && count.isPresent() && sealed.isPresent()) {
            { -> Optional.of(new ProductInfo(size, count.get(), sealed.get())) } //(8)
        } else {
            new BindingResult<ProductInfo>() {
                @Override
                Optional<ProductInfo> getValue() {
                    Optional.empty()
                }

                @Override
                List<ConversionError> getConversionErrors() {
                    headers.conversionErrors //(9)
                }
            }
        }
    }
}
import io.micronaut.core.bind.ArgumentBinder.BindingResult
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionError
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.nats.bind.NatsHeaderConvertibleValues
import io.micronaut.nats.bind.NatsTypeArgumentBinder
import io.micronaut.nats.docs.consumer.custom.type.ProductInfo
import io.nats.client.Message
import jakarta.inject.Singleton
import java.util.Optional

@Singleton // (1)
class ProductInfoTypeBinder constructor(private val conversionService: ConversionService<*>) //(3)
    : NatsTypeArgumentBinder<ProductInfo> { // (2)

    override fun argumentType(): Argument<ProductInfo> {
        return Argument.of(ProductInfo::class.java)
    }

    override fun bind(context: ArgumentConversionContext<ProductInfo>, source: Message): BindingResult<ProductInfo> {
        val rawHeaders = source.headers ?: return BindingResult { Optional.empty<ProductInfo>() } //(4)

        val headers = NatsHeaderConvertibleValues(rawHeaders, conversionService)

        val size = headers.get("productSize", String::class.java).orElse(null)  //(5)
        val count = headers.get("x-product-count", Long::class.java) //(6)
        val sealed = headers.get("x-product-sealed", Boolean::class.java) // (7)

        if (headers.conversionErrors.isEmpty() && count.isPresent && sealed.isPresent) {
            return BindingResult<ProductInfo> { Optional.of(ProductInfo(size, count.get(), sealed.get())) } //(8)
        } else {
            return object : BindingResult<ProductInfo> {
                override fun getValue(): Optional<ProductInfo> {
                    return Optional.empty()
                }

                override fun getConversionErrors(): List<ConversionError> {
                    return headers.conversionErrors //(9)
                }
            }
        }
    }
}
1 The class is made a bean by annotating with @Singleton.
2 The custom type is used as the generic type for the interface.
3 The conversion service is injected into the instance.
4 The headers are retrieved from the message state.
5 The productSize header is retrieved, defaulting to null if the value was not found or could not be converted.
6 The x-product-count header is retrieved and converted with a new argument context that is used to retrieve conversion errors later.
7 The x-product-sealed header is retrieved and converted with a new argument context that is used to retrieve conversion errors later.
8 There are no conversion errors and the two required arguments are present, so the instance can be constructed.
9 There are conversion errors or one of the required arguments is not present so a custom BindingResult is returned that allows the conversion errors to be handled appropriately.

8 Request-Reply (RPC)

This library supports RPC through the usage of Request-Reply. Both blocking and non blocking variations are supported.

The following is an example direct reply to where the consumer is converting the body to upper case and replying with the converted string.

Client Side

The "client side" in this case starts by publishing a message. A consumer somewhere will then receive the message and reply with a new value.

import io.micronaut.nats.annotation.NatsClient;
import io.micronaut.nats.annotation.Subject;
import org.reactivestreams.Publisher;

@NatsClient
public interface ProductClient {

    @Subject("product")
    String send(String data); // (1)

    @Subject("product")
    Publisher<String> sendReactive(String data); // (2)
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject
import org.reactivestreams.Publisher

@NatsClient
interface ProductClient {

    @Subject("product")
    String send(String data) // (1)

    @Subject("product")
    Publisher<String> sendReactive(String data) // (2)
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject
import org.reactivestreams.Publisher

@NatsClient
interface ProductClient {

    @Subject("product")
    fun send(data: String): String // (1)

    @Subject("product")
    fun sendReactive(data: String): Publisher<String> // (2)
}
1 The send method is blocking and will return when the response is received.
2 The sendReactive method returns a reactive type that will complete when the response is received. Reactive methods will be executed on the IO thread pool.
In order for the publisher to assume RPC should be used instead of just completing when the publish is confirmed, the data type must not be Void. In both cases above, the data type is String.

Server Side

The "server side" in this case starts with the consumption of a message, and then a new message is published by returning the result

import io.micronaut.nats.annotation.NatsListener;
import io.micronaut.nats.annotation.Subject;

@NatsListener
public class ProductListener {

    @Subject("product")
    public String toUpperCase(String data) { // (1)
        return data.toUpperCase(); // (2)
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject

@NatsListener
class ProductListener {

    @Subject("product")
    String toUpperCase(String data) { // (1)
        data.toUpperCase() // (2)
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import java.util.Collections

@NatsListener
class ProductListener {

    @Subject("product")
    fun receive(data: String): String { // (1)
        return data.uppercase() // (2)
    }
}
1 The data from message is injected.
2 The converted message is returned.
If the reply publish fails for any reason, the original message will be rejected.
RPC consumer methods must never return a reactive type. Because the resulting publish needs to occur on the same thread and only a single item can be emitted, there is no value in doing so.

9 Message Serialization/Deserialization (SerDes)

The serialization and deserialization of message bodies is handled through instances of NatsMessageSerDes. The ser-des (Serializer/Deserializer) is responsible for both serialization and deserialization of Nats message bodies into the message body types defined in your clients and consumers methods.

The ser-des are managed by a NatsMessageSerDesRegistry. All ser-des beans are injected in order into the registry and then searched for when serialization or deserialization is needed. The first ser-des that returns true for supports-java.lang.Class- is returned and used.

By default, standard Java lang types and JSON format (with Jackson) are supported. You can supply your own ser-des by simply registering a bean of type NatsMessageSerDes. All ser-des implement the Ordered interface, so custom implementations can come before, after, or in between the default implementations.

9.1 Custom SerDes

A custom serializer/deserializer would be necessary to support custom data formats. In the section on Custom Consumer Binding an example was demonstrated that allowed binding a ProductInfo type from the headers of the message. If instead that object should represent the body of the message with a custom data format, you could register your own serializer/deserializer to do so.

In this example a simple data format of the string representation of the fields are concatenated together with a pipe character.

import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.nats.serdes.NatsMessageSerDes;
import io.nats.client.Message;
import jakarta.inject.Singleton;

@Singleton // (1)
public class ProductInfoSerDes implements NatsMessageSerDes<ProductInfo> { // (2)

    private final ConversionService<?> conversionService;

    public ProductInfoSerDes(ConversionService<?> conversionService) { // (3)
        this.conversionService = conversionService;
    }

    @Override
    public ProductInfo deserialize(Message message, Argument<ProductInfo> argument) { // (4)
        String body = new String(message.getData(), StandardCharsets.UTF_8);
        String[] parts = body.split("\\|");
        if (parts.length == 3) {
            String size = parts[0];
            if (size.equals("null")) {
                size = null;
            }

            Optional<Long> count = conversionService.convert(parts[1], Long.class);
            Optional<Boolean> sealed = conversionService.convert(parts[2], Boolean.class);

            if (count.isPresent() && sealed.isPresent()) {
                return new ProductInfo(size, count.get(), sealed.get());
            }
        }
        return null;
    }

    @Override
    public byte[] serialize(ProductInfo data) { // (5)
        if (data == null) {
            return null;
        }
        return (data.getSize() + "|" + data.getCount() + "|" + data.getSealed()).getBytes(StandardCharsets.UTF_8);
    }

    @Override
    public boolean supports(Argument<ProductInfo> argument) { // (6)
        return argument.getType().isAssignableFrom(ProductInfo.class);
    }
}
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.nats.serdes.NatsMessageSerDes
import io.nats.client.Message

import jakarta.inject.Singleton

@Singleton // (1)
class ProductInfoSerDes implements NatsMessageSerDes<ProductInfo> { // (2)

    private static final Charset UTF8 = Charset.forName("UTF-8")

    private final ConversionService conversionService

    ProductInfoSerDes(ConversionService conversionService) { // (3)
        this.conversionService = conversionService
    }

    @Override
    ProductInfo deserialize(Message message, Argument<ProductInfo> argument) { // (4)
        String body = new String(message.data, UTF8)
        String[] parts = body.split("\\|")
        if (parts.length == 3) {
            String size = parts[0]
            if (size == "null") {
                size = null
            }

            Optional<Long> count = conversionService.convert(parts[1], Long)
            Optional<Boolean> sealed = conversionService.convert(parts[2], Boolean)

            if (count.isPresent() && sealed.isPresent()) {
                return new ProductInfo(size, count.get(), sealed.get())
            }
        }
        null
    }

    @Override
    byte[] serialize(ProductInfo data) { // (5)
        if (data == null) {
            return null
        }
        (data.size + "|" + data.count + "|" + data.sealed).getBytes(UTF8)
    }

    @Override
    boolean supports(Argument<ProductInfo> argument) { // (6)
        argument.type.isAssignableFrom(ProductInfo)
    }
}
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.nats.serdes.NatsMessageSerDes
import io.nats.client.Message
import jakarta.inject.Singleton
import java.nio.charset.Charset

@Singleton // (1)
class ProductInfoSerDes(private val conversionService: ConversionService<*>)// (3)
    : NatsMessageSerDes<ProductInfo> { // (2)

    override fun deserialize(message: Message, argument: Argument<ProductInfo>): ProductInfo? { // (4)
        val body = String(message.data, CHARSET)
        val parts = body.split("\\|".toRegex())
        if (parts.size == 3) {
            var size: String? = parts[0]
            if (size == "null") {
                size = null
            }

            val count = conversionService.convert(parts[1], Long::class.java)
            val sealed = conversionService.convert(parts[2], Boolean::class.java)

            if (count.isPresent && sealed.isPresent) {
                return ProductInfo(size, count.get(), sealed.get())
            }
        }
        return null
    }

    override fun serialize(data: ProductInfo?): ByteArray { // (5)
        return (data?.size + "|" + data?.count + "|" + data?.sealed).toByteArray(CHARSET)
    }

    override fun supports(argument: Argument<ProductInfo>): Boolean { // (6)
        return argument.type.isAssignableFrom(ProductInfo::class.java)
    }

    companion object {
        private val CHARSET = Charset.forName("UTF-8")
    }
}
1 The class is declared as a singleton so it will be registered with the context
2 The generic specifies what type we want to accept and return
3 The conversion service is injected to convert the parts of the message to the required types
4 The deserialize method takes the bytes from the message and constructs a ProductInfo
5 The serialize method takes the ProductInfo and returns the bytes to publish. A mutable version of the properties is also provided so properties such as the content type can be set before publishing.
6 The supports method ensures only the correct body types are processed by this ser-des
Because the getOrder method was not overridden, the default order of 0 is used. All default ser-des have a lower precedent than the default order which means this ser-des will be checked before the others.

10 NATS Health Indicator

This library comes with a health indicator for applications that are using the management module in Micronaut. See the Health Endpoint documentation for more information about the endpoint itself.

The information reported from the health indicator is under the nats key.

"nats": {
  "status": "UP",
  "details": {
    "servers": ["nats://localhost:4222"]
  }
}
To disable the NATS health indicator entirely, add endpoints.health.nats.enabled: false.

11 Repository

You can find the source code of this project in this repository: