Micronaut Nats

Integration between Micronaut and nats.io

Version: 4.3.0

1 Introduction

This project includes integration between Micronaut and nats.io. The standard Java Client is used to do the actual publishing and consuming.

2 Release History

For this project, you can find a list of releases (with release notes) here:

3 Using the Micronaut CLI

To create a project with NATS support using the Micronaut CLI, supply the nats feature to the features flag.

$ mn create-app my-nats-app --features nats

This will create a project with the minimum necessary configuration for NATS.

Messaging Application

The Micronaut CLI can generate messaging applications. This will create a Micronaut app with NATS support, and without an HTTP server (although you can add one if you desire). The profile also provides a couple commands for generating NATS consumers and producers.

To create a NATS messaging application, use the the following command:

$ mn create-messaging-app my-nats-service --features nats

As you’d expect, you can start the application with ./gradlew run (for Gradle) or ./mvnw compile exec:exec (Maven). The application will (with the default config) attempt to connect to NATS at nats://localhost:4222, and will continue to run without starting up an HTTP server. All communication to/from the service will take place via NATS producers and/or consumers.

Within the new project, you can now run the NATS specific code generation commands:

$ mn create-nats-producer Message
| Rendered template Producer.java to destination src/main/java/my/nats/app/MessageProducer.java

$ mn create-nats-listener Message
| Rendered template Listener.java to destination src/main/java/my/nats/app/MessageListener.java

4 NATS Quick Start

To add support for NATS.io to an existing project, you should first add the Micronaut NATS configuration to your build configuration. For example:

implementation("io.micronaut.nats:micronaut-nats")
<dependency>
    <groupId>io.micronaut.nats</groupId>
    <artifactId>micronaut-nats</artifactId>
</dependency>

Creating a NATS Producer with @NatsClient

To create a NATS Producer that sends messages you can simply define an interface that is annotated with @NatsClient.

For example the following is a trivial @NatsClient interface:

import io.micronaut.nats.annotation.Subject;
import io.micronaut.nats.annotation.NatsClient;

@NatsClient // (1)
public interface ProductClient {

    @Subject("product") // (2)
    void send(byte[] data); // (3)
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject

@NatsClient // (1)
interface ProductClient {

    @Subject("product") // (2)
    void send(byte[] data) // (3)
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject

@NatsClient // (1)
interface ProductClient {

    @Subject("product") // (2)
    fun send(data: ByteArray) // (3)
}
1 The @NatsClient annotation is used to designate this interface as a client
2 The @Subject annotation indicates which subject the Message should be published to
3 It is also possible for the subject to be dynamic by making it a method argument

At compile time Micronaut will produce an implementation of the above interface. You can retrieve an instance of ProductClient either by looking up the bean from the ApplicationContext or by injecting the bean with @Inject:

Creating a NATS Consumer with @NatsListener

To listen to NATS messages you can use the @NatsListener annotation to define a message listener.

The following example will listen for messages published by the ProductClient in the previous section:

1 The @NatsListener is used to designate this class as a listener.
2 The @Subject annotation is again used to indicate which subject to subscribe to.
3 The receive method defines one argument, which will receive the value.

5 Configuring the connection

All properties on the Options are available to be modified, either through configuration or a BeanCreatedEventListener.

The properties that can be converted from the string values in a configuration file can be configured directly.

🔗
Table 1. Configuration Properties for NatsConnectionFactoryConfig
Property Type Description

nats.*.addresses

java.util.List

nats.*.username

java.lang.String

nats.*.password

java.lang.String

nats.*.token

java.lang.String

nats.*.max-reconnect

int

nats.*.reconnect-wait

java.time.Duration

nats.*.connection-timeout

java.time.Duration

nats.*.ping-interval

java.time.Duration

nats.*.reconnect-buffer-size

long

nats.*.inbox-prefix

java.lang.String

nats.*.no-echo

boolean

nats.*.utf8-support

boolean

nats.*.credentials

java.lang.String

Without any configuration the defaults in the Options will be used.
It is also possible to disable the integration entirely with nats.enabled: false

Connections

It is possible to configure multiple connections to the same server, different servers, or a single connection to one of a list of servers.

nats.server1.addresses[0]=nats://localhost:4222
nats.server1.username=guest
nats.server1.password=guest
nats.server2.addresses[0]=nats://randomServer:4222
nats.server2.username=guest
nats.server2.password=guest
nats:
  server1:
    addresses:
      - "nats://localhost:4222"
    username: guest
    password: guest
  server2:
    addresses:
      - "nats://randomServer:4222"
    username: guest
    password: guest
[nats]
  [nats.server1]
    addresses=[
      "nats://localhost:4222"
    ]
    username="guest"
    password="guest"
  [nats.server2]
    addresses=[
      "nats://randomServer:4222"
    ]
    username="guest"
    password="guest"
nats {
  server1 {
    addresses = ["nats://localhost:4222"]
    username = "guest"
    password = "guest"
  }
  server2 {
    addresses = ["nats://randomServer:4222"]
    username = "guest"
    password = "guest"
  }
}
{
  nats {
    server1 {
      addresses = ["nats://localhost:4222"]
      username = "guest"
      password = "guest"
    }
    server2 {
      addresses = ["nats://randomServer:4222"]
      username = "guest"
      password = "guest"
    }
  }
}
{
  "nats": {
    "server1": {
      "addresses": ["nats://localhost:4222"],
      "username": "guest",
      "password": "guest"
    },
    "server2": {
      "addresses": ["nats://randomServer:4222"],
      "username": "guest",
      "password": "guest"
    }
  }
}

NATS also supports a fail over connection strategy where the first server that connects successfully will be used among a list of servers. To use this option in Micronaut, simply supply a list of host:port addresses.

nats.default.addresses[0]=nats://localhost:4222
nats.default.addresses[1]=nats://randomServer:4222
nats.default.username=guest
nats.default.password=guest
nats:
  default:
    addresses:
      - "nats://localhost:4222"
      - "nats://randomServer:4222"
    username: guest
    password: guest
[nats]
  [nats.default]
    addresses=[
      "nats://localhost:4222",
      "nats://randomServer:4222"
    ]
    username="guest"
    password="guest"
nats {
  'default' {
    addresses = ["nats://localhost:4222", "nats://randomServer:4222"]
    username = "guest"
    password = "guest"
  }
}
{
  nats {
    default {
      addresses = ["nats://localhost:4222", "nats://randomServer:4222"]
      username = "guest"
      password = "guest"
    }
  }
}
{
  "nats": {
    "default": {
      "addresses": ["nats://localhost:4222", "nats://randomServer:4222"],
      "username": "guest",
      "password": "guest"
    }
  }
}
When the configuration option nats.servers is used, no other options underneath nats are read; for example nats.username.

If you need to setup TLS, it can be configured this way:

nats.default.addresses[0]=nats://localhost:4222
nats.default.tls.trust-store-path=/path/to/client.truststore.jks
nats.default.tls.trust-store-password=secret
nats.default.tls.certificate-path=/path/to/certificate.crt
nats:
  default:
    addresses:
      - "nats://localhost:4222" # (1)
    tls:
      trust-store-path:  /path/to/client.truststore.jks # (2)
      trust-store-password: secret
      certificate-path: /path/to/certificate.crt # (3)
[nats]
  [nats.default]
    addresses=[
      "nats://localhost:4222"
    ]
    [nats.default.tls]
      trust-store-path="/path/to/client.truststore.jks"
      trust-store-password="secret"
      certificate-path="/path/to/certificate.crt"
nats {
  'default' {
    addresses = ["nats://localhost:4222"]
    tls {
      trustStorePath = "/path/to/client.truststore.jks"
      trustStorePassword = "secret"
      certificatePath = "/path/to/certificate.crt"
    }
  }
}
{
  nats {
    default {
      addresses = ["nats://localhost:4222"]
      tls {
        trust-store-path = "/path/to/client.truststore.jks"
        trust-store-password = "secret"
        certificate-path = "/path/to/certificate.crt"
      }
    }
  }
}
{
  "nats": {
    "default": {
      "addresses": ["nats://localhost:4222"],
      "tls": {
        "trust-store-path": "/path/to/client.truststore.jks",
        "trust-store-password": "secret",
        "certificate-path": "/path/to/certificate.crt"
      }
    }
  }
}
1 You can either use nats://localhost:4222 or tls://localhost:4222 as protocol.
2 You can configure a complete truststore
3 Or ou can use a single certificate for connecting to NATS securely.

6 NATS Producers

The example in the quick start presented a trivial definition of an interface that be implemented automatically for you using the @NatsClient annotation.

The implementation that powers @NatsClient (defined by the NatsIntroductionAdvice class) is, however, very flexible and offers a range of options for defining NATS clients.

6.1 Defining @NatsClient Methods

All methods that publish messages to NATS must meet the following conditions:

  • The method must reside in an interface annotated with @NatsClient.

  • The method or a method parameter must be annotated with @Subject.

  • The method must contain an argument representing the body of the message.

If a body argument cannot be found, an exception will be thrown.
In order for all of the functionality to work as designed in this guide your classes must be compiled with the parameters flag set to true. If your application was created with the Micronaut CLI, then that has already been configured for you.
Unless a reactive type is returned from the publishing method, the action is blocking.

6.1.1 Publishing Parameters

All options are available to be set for publishing messages. The publish method is used by the NatsIntroductionAdvice to publish messages and all arguments can be set through annotations or method arguments.

6.1.1.1 Subject

If you need to specify the subject of the message, apply the @Subject annotation to the method or an argument of the method. Apply the annotation to the method itself if the value is static for every execution. Apply the annotation to an argument of the method if the value should be set per execution.

import io.micronaut.nats.annotation.NatsClient;
import io.micronaut.nats.annotation.Subject;

@NatsClient
public interface ProductClient {

    @Subject("product") // (1)
    void send(byte[] data);

    void send(@Subject String subject, byte[] data); // (2)
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject

@NatsClient
interface ProductClient {

    @Subject("product") // (1)
    void send(byte[] data)

    void send(@Subject String subject, byte[] data) // (2)
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject

@NatsClient
interface ProductClient {

    @Subject("product") // (1)
    fun send(data: ByteArray)

    fun send(@Subject subject: String, data:ByteArray) // (2)
}
1 The subject is static
2 The subject must be set per execution

Producer Connection

If multiple Nats servers have been configured, the name of the server can be set in the @Subject annotation to designate which connection should be used to publish messages.

import io.micronaut.nats.annotation.NatsClient;
import io.micronaut.nats.annotation.Subject;

@NatsClient
public interface ProductClient {

    @Subject(value = "product", connection = "product-cluster") // (1)
    void send(byte[] data);
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject

@NatsClient // (1)
interface ProductClient {

    @Subject(value = "product", connection = "product-cluster") // (2)
    void send(byte[] data) // (3)
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject

@NatsClient
interface ProductClient {

    @Subject(value = "product", connection = "product-cluster") // (1)
    fun send(data: ByteArray)

}
1 The connection is set on the subject annotation.
The connection option is also available to be set on the @NatsClient annotation.

Queues

The NATS server will route the message to the queue and select a message receiver.

6.1.1.2 Headers

Headers can be set on the message with the @MessageHeader annotation applied to the method or an argument of the method. Apply the annotation to the method itself if the value is static for every execution. Apply the annotation to an argument of the method if the value should be set per execution.

import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.annotation.MessageHeader;
import io.micronaut.nats.annotation.NatsClient;
import io.micronaut.nats.annotation.Subject;
import io.nats.client.impl.Headers;

@NatsClient
@MessageHeader(name = "x-product-sealed", value = "true") // (1)
@MessageHeader(name = "productSize", value = "large")
public interface ProductClient {

    @Subject("product")
    @MessageHeader(name = "x-product-count", value = "10") // (2)
    @MessageHeader(name = "productSize", value = "small")
    void send(byte[] data);

    @Subject("product")
    void send(@MessageHeader String productSize, // (3)
              @MessageHeader("x-product-count") Long count,
              byte[] data);

    @Subject("products")
    @MessageHeader(name = "x-product-count", value = "20")
    void send(@MessageBody byte[] data, @MessageHeader List<String> productSizes);// (4)

    @Subject("productHeader")
    void send(@MessageBody byte[] data, Headers headers);// (5)

}
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject
import io.nats.client.impl.Headers

@NatsClient
@MessageHeader(name = "x-product-sealed", value = "true") // (1)
@MessageHeader(name = "productSize", value = "large")
interface ProductClient {


    @Subject("product")
    @MessageHeader(name = "x-product-count", value = "10") // (2)
    @MessageHeader(name = "productSize", value = "small")
    void send(byte[] data)

    @Subject("product")
    void send(@MessageHeader String productSize, // (3)
              @MessageHeader("x-product-count") Long count,
              byte[] data)

    @Subject("products")
    @MessageHeader(name = "x-product-count", value = "20")
    void send(@MessageBody byte[] data, @MessageHeader List<String> productSizes) // (4)

    @Subject("productHeader")
    void send(@MessageBody byte[] data, Headers headers) // (5)
}
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.messaging.annotation.MessageHeaders
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject
import io.nats.client.impl.Headers

@NatsClient
@MessageHeaders(
    MessageHeader(name = "x-product-sealed", value = "true"), // (1)
    MessageHeader(name = "productSize", value = "large")
)
interface ProductClient {

    @Subject("product")
    @MessageHeaders(
        MessageHeader(name = "x-product-count", value = "10"), // (2)
        MessageHeader(name = "productSize", value = "small")
    )
    fun send(data: ByteArray)

    @Subject("product")
    fun send(@MessageHeader productSize: String?, // (3)
             @MessageHeader("x-product-count") count: Long,
             data: ByteArray)

    @Subject("products")
    @MessageHeader(name = "x-product-count", value = "20")
    fun send(@MessageBody data:ByteArray, @MessageHeader productSizes: List<String>) // (4)

    @Subject("productHeader")
    fun send(@MessageBody data: ByteArray, headers: Headers) // (5)

}
1 Headers can be defined at the class level and will apply to all methods. If a header is defined on the method with the same name as one on the class, the value on the method will be used.
2 Multiple annotations can be used to set multiple headers on the method or class level.
3 Headers can be set per execution. The name is inferred from the argument if the annotation value is not set. A null value results in the header not being set.
4 You can also use a List as header. An empty list or a null value do not set the header.
5 A Headers argument can be used to pass custom headers. Note: if the @MessageHeader is used on a method argument, the Headers argument will be ignored.

6.1.1.3 Message Body

Most examples up to this point have been using a byte[] as the body type for simplicity. This library supports most standard Java types and JSON serialization (using Jackson) by default. The functionality is extensible and it is possible to add support for additional types and serialization strategies. See the section on Message Serialization/Deserialization for more information.

7 NATS Consumers

The quick start section presented a trivial example of what is possible with the @NatsListener annotation.

The implementation that powers @NatsListener (defined by the NatsConsumerAdvice class) is, however, very flexible and offers a range of options for consuming NATS message.

7.1 Defining @NatsListener Methods

All methods that consume messages from NATS must meet the following conditions:

  • The method must reside in a class annotated with @NatsListener.

  • The method must be annotated with @Subject.

In order for all of the functionality to work as designed in this guide your classes must be compiled with the parameters flag set to true. If your application was created with the Micronaut CLI, then that has already been configured for you.

7.1.1 Consumer Parameters

The createDispatcher method is used by the NatsConsumerAdvice to consume messages. Some of the options can be directly configured through annotations.

In order for the consumer method to be invoked, all arguments must be satisfied. To allow execution of the method with a null value, the argument must be declared as nullable. If the arguments cannot be satisfied, the message will be rejected.

7.1.1.1 Subject

A @Subject annotation is required for a method to be a consumer of messages from Nats. Simply apply the annotation to the method and supply the name of the subject you would like to listen to.

import io.micronaut.nats.annotation.NatsListener;
import io.micronaut.nats.annotation.Subject;

@NatsListener
public class ProductListener {

    List<Integer> messageLengths = Collections.synchronizedList(new ArrayList<>());

    @Subject("product") // (1)
    public void receive(byte[] data) {
        messageLengths.add(data.length);
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject

import java.util.concurrent.CopyOnWriteArrayList

@NatsListener
class ProductListener {

    CopyOnWriteArrayList<Integer> messageLengths = []

    @Subject("product") // (1)
    void receive(byte[] data) {
        messageLengths << data.length
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import java.util.Collections

@NatsListener
class ProductListener {

    val messageLengths: MutableList<Int> = Collections.synchronizedList(ArrayList())

    @Subject("product") // (1)
    fun receive(data: ByteArray) {
        messageLengths.add(data.size)
    }
}
1 The subject annotation is set per method. Multiple methods may be defined with different subjects in the same class.

Queue Support

Subscribers may specify queue groups at subscription time. When a message is published to the group, NATS will deliver it to a one-and-only-one subscriber.

import io.micronaut.nats.annotation.NatsListener;
import io.micronaut.nats.annotation.Subject;

@NatsListener
public class ProductListener {

    List<String> messageLengths = Collections.synchronizedList(new ArrayList<>());

    @Subject(value = "product", queue = "product-queue") // (1)
    public void receiveByQueue1(byte[] data) {
        messageLengths.add(new String(data));
        System.out.println("Java received " + data.length + " bytes from Nats");
    }

    @Subject(value = "product", queue = "product-queue")
    public void receiveByQueue2(byte[] data) {
        messageLengths.add(new String(data));
        System.out.println("Java received " + data.length + " bytes from Nats");
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject

import java.util.concurrent.CopyOnWriteArrayList

@NatsListener
class ProductListener {

    CopyOnWriteArrayList<String> messageLengths = []

    @Subject(value = "product", queue = "product-queue") // (1)
    void receiveByQueue1(byte[] data) {
        messageLengths << new String(data)
    }

    @Subject(value = "product", queue = "product-queue")
    public void receiveByQueue2(byte[] data) {
        messageLengths << new String(data)
    }
}
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import io.nats.client.impl.Headers
import java.util.Collections

@NatsListener
class ProductListener {

    var messageLengths: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Subject(value = "product", queue = "product-queue") // (1)
    fun receiveByQueue1(data: ByteArray) {
        messageLengths.add(String(data))
    }

    @Subject(value = "product", queue = "product-queue")
    fun receiveByQueue2(data: ByteArray) {
        messageLengths.add(String(data))
    }

}
1 Defining a queue can be done in the @Subject
Queue groups do not persist messages. If no listeners are available, the message is discarded.

Other Options

If multiple Nats servers have been configured, the name of the server can be set in the @Subject annotation to designate which connection should be used to listen for messages.

import io.micronaut.nats.annotation.NatsListener;
import io.micronaut.nats.annotation.Subject;

@NatsListener
public class ProductListener {

    List<String> messageLengths = Collections.synchronizedList(new ArrayList<>());

    @Subject(value = "product", connection = "product-cluster") // (1)
    public void receive(byte[] data) {
        messageLengths.add(new String(data));
        System.out.println("Java received " + data.length + " bytes from Nats");
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject

@NatsListener
class ProductListener {

    List<String> messageLengths = Collections.synchronizedList([])

    @Subject(value = "product", connection = "product-cluster") // (1)
    void receive(byte[] data) {
        messageLengths << new String(data)
    }
}
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import io.nats.client.impl.Headers
import java.util.Collections

@NatsListener
class ProductListener {

    var messageLengths: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Subject(value = "product", connection = "product-cluster") // (1)
    fun receive(data: ByteArray) {
        messageLengths.add(String(data))
    }

}
1 The connection is set on the subject annotation.
The connection option is also available to be set on the @NatsListener annotation.

7.1.1.2 Headers

Headers can be retrieved with the @MessageHeader annotation applied to the arguments of the method.

import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.annotation.MessageHeader;
import io.micronaut.nats.annotation.NatsListener;
import io.micronaut.nats.annotation.Subject;
import io.nats.client.impl.Headers;

@NatsListener
public class ProductListener {

    Set<String> messageProperties = Collections.synchronizedSet(new HashSet<>());

    @Subject("product")
    public void receive(byte[] data,
            @MessageHeader("x-product-sealed") Boolean sealed, // (1)
            @MessageHeader("x-product-count") Long count, // (2)
            @Nullable @MessageHeader String productSize) { // (3)
        messageProperties.add(sealed + "|" + count + "|" + productSize);
    }

    @Subject("products")
    public void receive(@MessageBody byte[] data, @MessageHeader("x-product-sealed") Boolean sealed,
            @MessageHeader("x-product-count") Long count, @MessageHeader List<String> productSizes) { // (4)
        for (String productSize : productSizes) {
            messageProperties.add(sealed + "|" + count + "|" + productSize);
        }
    }

    @Subject("productHeader")
    public void receive(@MessageBody byte[] data, Headers headers) { // (5)
        String productSize = headers.get("productSize").get(0);
        messageProperties.add(
                headers.get("x-product-sealed").get(0) + "|" +
                        headers.get("x-product-count").get(0) + "|" +
                        productSize);
    }
}
import io.micronaut.core.annotation.Nullable
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import io.nats.client.impl.Headers

import java.util.concurrent.CopyOnWriteArrayList

@NatsListener
class ProductListener {

    CopyOnWriteArrayList<String> messageProperties = []

    @Subject("product")
    void receive(byte[] data,
                 @MessageHeader("x-product-sealed") Boolean sealed, // (1)
                 @MessageHeader("x-product-count") Long count, // (2)
                 @Nullable @MessageHeader String productSize) { // (3)
        messageProperties << sealed.toString() + "|" + count + "|" + productSize
    }

    @Subject("products")
    void receive(@MessageBody byte[] data, @MessageHeader("x-product-sealed") Boolean sealed,
                 @MessageHeader("x-product-count") Long count, @MessageHeader List<String> productSizes) { // (4)
        productSizes.forEach {
            messageProperties << sealed.toString() + "|" + count + "|" + it
        }
    }

    @Subject("productHeader")
    void receive(@MessageBody byte[] data, Headers headers) { // (5)
        String productSize = headers.get("productSize").get(0)
        messageProperties << headers.get("x-product-sealed").get(0) + "|" +
                        headers.get("x-product-count").get(0) + "|" +
                        productSize
    }
}
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import io.nats.client.impl.Headers
import java.util.Collections

@NatsListener
class ProductListener {

    var messageProperties: MutableList<String> = Collections.synchronizedList(ArrayList())
    private var datas: MutableList<ByteArray> = Collections.synchronizedList(ArrayList())


    @Subject("product")
    fun receive(data: ByteArray,
                @MessageHeader("x-product-sealed") sealed: Boolean, // (1)
                @MessageHeader("x-product-count") count: Long, // (2)
                @MessageHeader productSize: String?) { // (3)
        messageProperties.add(sealed.toString() + "|" + count + "|" + productSize)
        datas.add(data)
    }

    @Subject("products")
    fun receive(@MessageBody data: ByteArray,
                @MessageHeader("x-product-sealed") sealed: Boolean,
                @MessageHeader("x-product-count") count: Long,
                @MessageHeader productSizes: List<String>?) { // (4)
        productSizes?.forEach {
            messageProperties.add("${sealed}|${count}|${it}")
        }
        datas.add(data)
    }

    @Subject("productHeader")
    fun receive(@MessageBody data: ByteArray, headers: Headers) { // (5)
        messageProperties.add("${headers["x-product-sealed"][0]}|${headers["x-product-count"][0]}|${headers["productSize"][0]}")
        datas.add(data)
    }
}
1 The header name comes from the annotation and the value is retrieved and converted to a Boolean.
2 The header name comes from the annotation and the value is retrieved and converted to a Long.
3 The header name comes from the argument name. This argument allows null values.
4 The header can also be a list representing multiple values.
5 All headers can be bound to a Headers argument.

7.1.1.3 Nats Types

Arguments can also be bound based on their type. Several types are supported by default and each type has a corresponding NatsTypeArgumentBinder. The argument binders are covered in detail in the section on Custom Parameter Binding.

There is only type that is supported for retrieving data about the Message.

import io.micronaut.nats.annotation.NatsListener;
import io.micronaut.nats.annotation.Subject;
import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.Subscription;
import io.nats.client.impl.Headers;

@NatsListener
public class ProductListener {

    List<String> messages = Collections.synchronizedList(new ArrayList<>());

    @Subject("product")
    public void receive(byte[] data,
            Message message,
            Connection connection,
            Subscription subscription,
            Headers headers) { // (1)
        messages.add(String.format("subject: [%s], maxPayload: [%s], pendingMessageCount: [%s], x-productCount: [%s]",
                message.getSubject(),
                connection.getMaxPayload(), subscription.getPendingMessageCount(),
                headers.get("x-product-count").get(0)));
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject

import io.nats.client.Connection
import io.nats.client.Message
import io.nats.client.Subscription
import io.nats.client.impl.Headers

import java.util.concurrent.CopyOnWriteArrayList

@NatsListener
class ProductListener {

    CopyOnWriteArrayList<String> messages = []

    @Subject("product")
    void receive(byte[] data,
                 Message message,
                 Connection connection,
                 Subscription subscription,
                 Headers headers) { // (1)
        def count = headers.get("x-product-count").get(0)
        messages << "subject: [$message.subject], maxPayload: [$connection.maxPayload], pendingMessageCount: [$subscription.pendingMessageCount], x-productCount: [$count]".toString()

    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import io.nats.client.Connection
import io.nats.client.Message
import io.nats.client.Subscription
import io.nats.client.impl.Headers
import java.util.Collections


@NatsListener
class ProductListener {

    var messages: MutableList<String> = Collections.synchronizedList(ArrayList())
    private var datas: MutableList<ByteArray> = Collections.synchronizedList(ArrayList())

    @Subject(value = "product")
    fun receive(message: Message,
                connection: Connection,
                subscription: Subscription,
                headers: Headers) { // (1)
        messages.add("subject: [${message.subject}], maxPayload: [${connection.maxPayload}], pendingMessageCount: [${subscription.pendingMessageCount}], x-productCount: [${headers["x-product-count"][0]}]")
    }

}
1 The arguments are bound from the Message.

7.1.1.4 Message Body

Most examples up to this point have been using a byte[] as the body type for simplicity. This library supports most standard Java types and JSON deserialization (using Jackson) by default. The functionality is extensible and it is possible to add support for additional types and deserialization strategies. See the section on Message Serialization/Deserialization for more information.

7.1.1.5 Custom Parameter Binding

Default Binding Functionality

Consumer argument binding is achieved through an ArgumentBinderRegistry that is specific for binding consumers from Nats messages. The class responsible for this is the NatsBinderRegistry.

The registry supports argument binders that are used based on an annotation applied to the argument or the argument type. All argument binders must implement either NatsAnnotatedArgumentBinder or NatsTypeArgumentBinder. The exception to that rule is the NatsDefaultBinder which is used when no other binders support a given argument.

When an argument needs bound, the Message is used as the source of all of the available data. The binder registry follows a small sequence of steps to attempt to find a binder that supports the argument.

  1. Search the annotation based binders for one that matches any annotation on the argument that is annotated with @Bindable.

  2. Search the type based binders for one that matches or is a subclass of the argument type.

  3. Return the default binder.

The default binder binds the body of the message to the argument.

Custom Binding

To inject your own argument binding behavior, it is as simple as registering a bean. The existing binder registry will inject it and include it in the normal processing.

Annotation Binding

A custom annotation can be created to bind consumer arguments. A custom binder can then be created to use that annotation and the Message to supply a value for the argument. The value may in fact come from anywhere, however for the purposes of this documentation, the replyTo in the message is used.

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import io.micronaut.core.bind.annotation.Bindable;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
@Bindable // (1)
public @interface SID {
}
import io.micronaut.core.bind.annotation.Bindable

import java.lang.annotation.Documented
import java.lang.annotation.ElementType
import java.lang.annotation.Retention
import java.lang.annotation.RetentionPolicy
import java.lang.annotation.Target

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target([ElementType.PARAMETER])
@Bindable // (1)
@interface SID {
}
import io.micronaut.core.bind.annotation.Bindable

@MustBeDocumented
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.VALUE_PARAMETER)
@Bindable // (1)
annotation class SID
1 The @Bindable annotation is required for the annotation to be considered for binding.
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.nats.bind.NatsAnnotatedArgumentBinder;
import io.nats.client.Message;
import jakarta.inject.Singleton;

@Singleton // (1)
public class SIDAnnotationBinder implements NatsAnnotatedArgumentBinder<SID> { // (2)

    private final ConversionService conversionService;

    public SIDAnnotationBinder(ConversionService conversionService) { // (3)
        this.conversionService = conversionService;
    }

    @Override
    public Class<SID> getAnnotationType() {
        return SID.class;
    }

    @Override
    public BindingResult<Object> bind(ArgumentConversionContext<Object> context, Message source) {
        String sid = source.getSID(); // (4)
        return () -> conversionService.convert(sid, context); // (5)
    }
}
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import io.micronaut.nats.bind.NatsAnnotatedArgumentBinder
import io.nats.client.Message

import jakarta.inject.Singleton

@Singleton // (1)
class SIDAnnotationBinder implements NatsAnnotatedArgumentBinder<SID> { // (2)

    private final ConversionService conversionService

    SIDAnnotationBinder(ConversionService conversionService) { // (3)
        this.conversionService = conversionService
    }

    @Override
    Class<SID> getAnnotationType() {
        SID
    }

    @Override
    BindingResult<Object> bind(ArgumentConversionContext<Object> context, Message source) {
        String sid = source.getSID() // (4)
        return { -> conversionService.convert(sid, context) } // (5)
    }
}
import io.micronaut.core.bind.ArgumentBinder
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import io.micronaut.nats.bind.NatsAnnotatedArgumentBinder
import io.nats.client.Message
import jakarta.inject.Singleton

@Singleton // (1)
class SIDAnnotationBinder(private val conversionService: ConversionService) // (3)
    : NatsAnnotatedArgumentBinder<SID> { // (2)

    override fun getAnnotationType(): Class<SID> {
        return SID::class.java
    }

    override fun bind(context: ArgumentConversionContext<Any>, source: Message): ArgumentBinder.BindingResult<Any> {
        val sid = source.sid // (4)
        return ArgumentBinder.BindingResult { conversionService.convert(sid, context) } // (5)
    }
}
1 The class is made a bean by annotating with @Singleton.
2 The custom annotation is used as the generic type for the interface.
3 The conversion service is injected into the instance.
4 The replyTo is retrieved from the message state.
5 The replyTo is converted to the argument type.

The annotation can now be used on the argument in a consumer method.

import io.micronaut.nats.annotation.NatsListener;
import io.micronaut.nats.annotation.Subject;

@NatsListener
public class ProductListener {

    List<String> messages = Collections.synchronizedList(new ArrayList<>());

    @Subject("product")
    public void receive(byte[] data, @SID String sid) { // (1)
        messages.add(sid);
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import io.micronaut.nats.docs.consumer.custom.type.ProductInfo

import java.util.concurrent.CopyOnWriteArrayList

@NatsListener
class ProductListener {

    CopyOnWriteArrayList<ProductInfo> messages = []

    @Subject("product")
    void receive(byte[] data, @SID String sid) {
        messages << sid
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import io.micronaut.nats.docs.consumer.custom.type.ProductInfo
import java.util.Collections

@NatsListener
class ProductListener {

    var messages: MutableList<String> = Collections.synchronizedList(ArrayList())
    private var datas: MutableList<ByteArray> = Collections.synchronizedList(ArrayList())

    @Subject("product")
    fun receive(data: ByteArray, @SID sid: String)  {// (1)
        messages.add(sid)
        datas.add(data)
    }

}

Type Binding

A custom binder can be created to support any argument type. For example the following class could be created to bind values from the headers. This functionality could allow the work of retrieving and converting the headers to occur in a single place instead of multiple times in your code.

import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;

public class ProductInfo {

    private String size;
    private Long count;
    private Boolean sealed;

    public ProductInfo(@Nullable String size, // (1)
                       @NonNull Long count, // (2)
                       @NonNull Boolean sealed) { // (3)
        this.size = size;
        this.count = count;
        this.sealed = sealed;
    }

    public String getSize() {
        return size;
    }

    public Long getCount() {
        return count;
    }

    public Boolean getSealed() {
        return sealed;
    }
}
import io.micronaut.core.annotation.NonNull
import io.micronaut.core.annotation.Nullable

class ProductInfo {

    private String size
    private Long count
    private Boolean sealed

    ProductInfo(@Nullable String size, // (1)
                @NonNull Long count, // (2)
                @NonNull Boolean sealed) { // (3)
        this.size = size
        this.count = count
        this.sealed = sealed
    }

    String getSize() {
        size
    }

    Long getCount() {
        count
    }

    Boolean getSealed() {
        sealed
    }
}
class ProductInfo(val size: String?, // (1)
                  val count: Long, // (2)
                  val sealed: Boolean)// (3)
1 The size argument is not required
2 The count argument is required
3 The sealed argument is required

A type argument binder can then be created to create the ProductInfo instance to bind to your consumer method argument.

import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionError;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.nats.bind.NatsHeaderConvertibleValues;
import io.micronaut.nats.bind.NatsTypeArgumentBinder;
import io.nats.client.Message;
import io.nats.client.impl.Headers;
import jakarta.inject.Singleton;

@Singleton // (1)
public class ProductInfoTypeBinder implements NatsTypeArgumentBinder<ProductInfo> { //(2)

    private final ConversionService conversionService;

    ProductInfoTypeBinder(ConversionService conversionService) { //(3)
        this.conversionService = conversionService;
    }

    @Override
    public Argument<ProductInfo> argumentType() {
        return Argument.of(ProductInfo.class);
    }

    @Override
    public BindingResult<ProductInfo> bind(ArgumentConversionContext<ProductInfo> context, Message source) {
        Headers rawHeaders = source.getHeaders(); //(4)

        if (rawHeaders == null) {
            return BindingResult.EMPTY;
        }

        NatsHeaderConvertibleValues headers = new NatsHeaderConvertibleValues(rawHeaders, conversionService);

        String size = headers.get("productSize", String.class).orElse(null);  //(5)
        Optional<Long> count = headers.get("x-product-count", Long.class); //(6)
        Optional<Boolean> sealed = headers.get("x-product-sealed", Boolean.class); // (7)

        if (headers.getConversionErrors().isEmpty() && count.isPresent() && sealed.isPresent()) {
            return () -> Optional.of(new ProductInfo(size, count.get(), sealed.get())); //(8)
        } else {
            return new BindingResult<ProductInfo>() {
                @Override
                public Optional<ProductInfo> getValue() {
                    return Optional.empty();
                }

                @Override
                public List<ConversionError> getConversionErrors() {
                    return headers.getConversionErrors(); //(9)
                }
            };
        }
    }
}
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionError
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.nats.bind.NatsHeaderConvertibleValues
import io.micronaut.nats.bind.NatsTypeArgumentBinder
import io.nats.client.Message
import io.nats.client.impl.Headers

import jakarta.inject.Singleton

@Singleton // (1)
class ProductInfoTypeBinder implements NatsTypeArgumentBinder<ProductInfo> { //(2)

    private final ConversionService conversionService

    ProductInfoTypeBinder(ConversionService conversionService) { //(3)
        this.conversionService = conversionService
    }

    @Override
    Argument<ProductInfo> argumentType() {
        return Argument.of(ProductInfo)
    }

    @Override
    BindingResult<ProductInfo> bind(ArgumentConversionContext<ProductInfo> context, Message source) {
        Headers rawHeaders = source.headers //(4)

        if (rawHeaders == null) {
            return BindingResult.EMPTY
        }

        def headers = new NatsHeaderConvertibleValues(rawHeaders, conversionService)

        String size = headers.get("productSize", String).orElse(null)  //(5)
        Optional<Long> count = headers.get("x-product-count", Long) //(6)
        Optional<Boolean> sealed = headers.get("x-product-sealed", Boolean) // (7)

        if (headers.conversionErrors.isEmpty() && count.isPresent() && sealed.isPresent()) {
            { -> Optional.of(new ProductInfo(size, count.get(), sealed.get())) } //(8)
        } else {
            new BindingResult<ProductInfo>() {
                @Override
                Optional<ProductInfo> getValue() {
                    Optional.empty()
                }

                @Override
                List<ConversionError> getConversionErrors() {
                    headers.conversionErrors //(9)
                }
            }
        }
    }
}
import io.micronaut.core.bind.ArgumentBinder.BindingResult
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionError
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.nats.bind.NatsHeaderConvertibleValues
import io.micronaut.nats.bind.NatsTypeArgumentBinder
import io.micronaut.nats.docs.consumer.custom.type.ProductInfo
import io.nats.client.Message
import jakarta.inject.Singleton
import java.util.Optional

@Singleton // (1)
class ProductInfoTypeBinder constructor(private val conversionService: ConversionService) //(3)
    : NatsTypeArgumentBinder<ProductInfo> { // (2)

    override fun argumentType(): Argument<ProductInfo> {
        return Argument.of(ProductInfo::class.java)
    }

    override fun bind(context: ArgumentConversionContext<ProductInfo>, source: Message): BindingResult<ProductInfo> {
        val rawHeaders = source.headers ?: return BindingResult { Optional.empty<ProductInfo>() } //(4)

        val headers = NatsHeaderConvertibleValues(rawHeaders, conversionService)

        val size = headers.get("productSize", String::class.java).orElse(null)  //(5)
        val count = headers.get("x-product-count", Long::class.java) //(6)
        val sealed = headers.get("x-product-sealed", Boolean::class.java) // (7)

        if (headers.conversionErrors.isEmpty() && count.isPresent && sealed.isPresent) {
            return BindingResult<ProductInfo> { Optional.of(ProductInfo(size, count.get(), sealed.get())) } //(8)
        } else {
            return object : BindingResult<ProductInfo> {
                override fun getValue(): Optional<ProductInfo> {
                    return Optional.empty()
                }

                override fun getConversionErrors(): List<ConversionError> {
                    return headers.conversionErrors //(9)
                }
            }
        }
    }
}
1 The class is made a bean by annotating with @Singleton.
2 The custom type is used as the generic type for the interface.
3 The conversion service is injected into the instance.
4 The headers are retrieved from the message state.
5 The productSize header is retrieved, defaulting to null if the value was not found or could not be converted.
6 The x-product-count header is retrieved and converted with a new argument context that is used to retrieve conversion errors later.
7 The x-product-sealed header is retrieved and converted with a new argument context that is used to retrieve conversion errors later.
8 There are no conversion errors and the two required arguments are present, so the instance can be constructed.
9 There are conversion errors or one of the required arguments is not present so a custom BindingResult is returned that allows the conversion errors to be handled appropriately.

8 Request-Reply (RPC)

This library supports RPC through the usage of Request-Reply. Both blocking and non blocking variations are supported.

The following is an example direct reply to where the consumer is converting the body to upper case and replying with the converted string.

Client Side

The "client side" in this case starts by publishing a message. A consumer somewhere will then receive the message and reply with a new value.

import io.micronaut.nats.annotation.NatsClient;
import io.micronaut.nats.annotation.Subject;
import org.reactivestreams.Publisher;

@NatsClient
public interface ProductClient {

    @Subject("product")
    String send(String data); // (1)

    @Subject("product")
    Publisher<String> sendReactive(String data); // (2)
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject
import org.reactivestreams.Publisher

@NatsClient
interface ProductClient {

    @Subject("product")
    String send(String data) // (1)

    @Subject("product")
    Publisher<String> sendReactive(String data) // (2)
}
import io.micronaut.nats.annotation.NatsClient
import io.micronaut.nats.annotation.Subject
import org.reactivestreams.Publisher

@NatsClient
interface ProductClient {

    @Subject("product")
    fun send(data: String): String // (1)

    @Subject("product")
    fun sendReactive(data: String): Publisher<String> // (2)
}
1 The send method is blocking and will return when the response is received.
2 The sendReactive method returns a reactive type that will complete when the response is received. Reactive methods will be executed on the IO thread pool.
In order for the publisher to assume RPC should be used instead of just completing when the publish is confirmed, the data type must not be Void. In both cases above, the data type is String.

Server Side

The "server side" in this case starts with the consumption of a message, and then a new message is published by returning the result

import io.micronaut.nats.annotation.NatsListener;
import io.micronaut.nats.annotation.Subject;

@NatsListener
public class ProductListener {

    @Subject("product")
    public String toUpperCase(String data) { // (1)
        return data.toUpperCase(); // (2)
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject

@NatsListener
class ProductListener {

    @Subject("product")
    String toUpperCase(String data) { // (1)
        data.toUpperCase() // (2)
    }
}
import io.micronaut.nats.annotation.NatsListener
import io.micronaut.nats.annotation.Subject
import java.util.Collections

@NatsListener
class ProductListener {

    @Subject("product")
    fun receive(data: String): String { // (1)
        return data.uppercase() // (2)
    }
}
1 The data from message is injected.
2 The converted message is returned.
If the reply publish fails for any reason, the original message will be rejected.
RPC consumer methods must never return a reactive type. Because the resulting publish needs to occur on the same thread and only a single item can be emitted, there is no value in doing so.

9 Message Serialization/Deserialization (SerDes)

The serialization and deserialization of message bodies is handled through instances of NatsMessageSerDes. The ser-des (Serializer/Deserializer) is responsible for both serialization and deserialization of Nats message bodies into the message body types defined in your clients and consumers methods.

The ser-des are managed by a NatsMessageSerDesRegistry. All ser-des beans are injected in order into the registry and then searched for when serialization or deserialization is needed. The first ser-des that returns true for supports-java.lang.Class- is returned and used.

By default, standard Java lang types and JSON format (with Jackson) are supported. You can supply your own ser-des by simply registering a bean of type NatsMessageSerDes. All ser-des implement the Ordered interface, so custom implementations can come before, after, or in between the default implementations.

9.1 Custom SerDes

A custom serializer/deserializer would be necessary to support custom data formats. In the section on Custom Consumer Binding an example was demonstrated that allowed binding a ProductInfo type from the headers of the message. If instead that object should represent the body of the message with a custom data format, you could register your own serializer/deserializer to do so.

In this example a simple data format of the string representation of the fields are concatenated together with a pipe character.

@Singleton // (1)
public class ProductInfoSerDes implements NatsMessageSerDes<ProductInfo> { // (2)

    private final ConversionService conversionService;

    public ProductInfoSerDes(ConversionService conversionService) { // (3)
        this.conversionService = conversionService;
    }

    @Override
    public ProductInfo deserialize(Message message, Argument<ProductInfo> argument) { // (4)
        String body = new String(message.getData(), StandardCharsets.UTF_8);
        String[] parts = body.split("\\|");
        if (parts.length == 3) {
            String size = parts[0];
            if (size.equals("null")) {
                size = null;
            }

            Optional<Long> count = conversionService.convert(parts[1], Long.class);
            Optional<Boolean> sealed = conversionService.convert(parts[2], Boolean.class);

            if (count.isPresent() && sealed.isPresent()) {
                return new ProductInfo(size, count.get(), sealed.get());
            }
        }
        return null;
    }

    @Override
    public byte[] serialize(ProductInfo data) { // (5)
        if (data == null) {
            return null;
        }
        return (data.getSize() + "|" + data.getCount() + "|" + data.getSealed()).getBytes(StandardCharsets.UTF_8);
    }

    @Override
    public boolean supports(Argument<ProductInfo> argument) { // (6)
        return argument.getType().isAssignableFrom(ProductInfo.class);
    }
}
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.nats.serdes.NatsMessageSerDes
import io.nats.client.Message

import jakarta.inject.Singleton

@Singleton // (1)
class ProductInfoSerDes implements NatsMessageSerDes<ProductInfo> { // (2)

    private static final Charset UTF8 = Charset.forName("UTF-8")

    private final ConversionService conversionService

    ProductInfoSerDes(ConversionService conversionService) { // (3)
        this.conversionService = conversionService
    }

    @Override
    ProductInfo deserialize(Message message, Argument<ProductInfo> argument) { // (4)
        String body = new String(message.data, UTF8)
        String[] parts = body.split("\\|")
        if (parts.length == 3) {
            String size = parts[0]
            if (size == "null") {
                size = null
            }

            Optional<Long> count = conversionService.convert(parts[1], Long)
            Optional<Boolean> sealed = conversionService.convert(parts[2], Boolean)

            if (count.isPresent() && sealed.isPresent()) {
                return new ProductInfo(size, count.get(), sealed.get())
            }
        }
        null
    }

    @Override
    byte[] serialize(ProductInfo data) { // (5)
        if (data == null) {
            return null
        }
        (data.size + "|" + data.count + "|" + data.sealed).getBytes(UTF8)
    }

    @Override
    boolean supports(Argument<ProductInfo> argument) { // (6)
        argument.type.isAssignableFrom(ProductInfo)
    }
}
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.nats.serdes.NatsMessageSerDes
import io.nats.client.Message
import jakarta.inject.Singleton
import java.nio.charset.Charset

@Singleton // (1)
class ProductInfoSerDes(private val conversionService: ConversionService)// (3)
    : NatsMessageSerDes<ProductInfo> { // (2)

    override fun deserialize(message: Message, argument: Argument<ProductInfo>): ProductInfo? { // (4)
        val body = String(message.data, CHARSET)
        val parts = body.split("\\|".toRegex())
        if (parts.size == 3) {
            var size: String? = parts[0]
            if (size == "null") {
                size = null
            }

            val count = conversionService.convert(parts[1], Long::class.java)
            val sealed = conversionService.convert(parts[2], Boolean::class.java)

            if (count.isPresent && sealed.isPresent) {
                return ProductInfo(size, count.get(), sealed.get())
            }
        }
        return null
    }

    override fun serialize(data: ProductInfo?): ByteArray { // (5)
        return (data?.size + "|" + data?.count + "|" + data?.sealed).toByteArray(CHARSET)
    }

    override fun supports(argument: Argument<ProductInfo>): Boolean { // (6)
        return argument.type.isAssignableFrom(ProductInfo::class.java)
    }

    companion object {
        private val CHARSET = Charset.forName("UTF-8")
    }
}
1 The class is declared as a singleton so it will be registered with the context
2 The generic specifies what type we want to accept and return
3 The conversion service is injected to convert the parts of the message to the required types
4 The deserialize method takes the bytes from the message and constructs a ProductInfo
5 The serialize method takes the ProductInfo and returns the bytes to publish. A mutable version of the properties is also provided so properties such as the content type can be set before publishing.
6 The supports method ensures only the correct body types are processed by this ser-des
Because the getOrder method was not overridden, the default order of 0 is used. All default ser-des have a lower precedent than the default order which means this ser-des will be checked before the others.

10 NATS Health Indicator

This library comes with a health indicator for applications that are using the management module in Micronaut. See the Health Endpoint documentation for more information about the endpoint itself.

The information reported from the health indicator is under the nats key.

nats.status=UP
nats.details.servers[0]=nats://localhost:4222
"nats": {
  "status": "UP",
  "details": {
    "servers": ["nats://localhost:4222"]
  }
}
[nats]
  status="UP"
  [nats.details]
    servers=[
      "nats://localhost:4222"
    ]
nats {
  status = "UP"
  details {
    servers = ["nats://localhost:4222"]
  }
}
{
  nats {
    status = "UP"
    details {
      servers = ["nats://localhost:4222"]
    }
  }
}
{
  "nats": {
    "status": "UP",
    "details": {
      "servers": ["nats://localhost:4222"]
    }
  }
}
To disable the NATS health indicator entirely, add endpoints.health.nats.enabled: false.

11 Jetstream

Jetstream is built-in distributed persistence system built into Nats.io which enables new functionalities like

  • fault-tolerance

  • replication

  • exactly-once semantics

  • replay policies

  • retention policy and limits

  • streaming

11.1 Streams

Streams are 'message stores', each stream defines how messages are stored and what the limits (duration, size, interest) of the retention are. Streams consume normal NATS subjects, any message published on those subjects will be captured in the defined storage system. You can do a normal publish to the subject for unacknowledged delivery, though it’s better to use the JetStream publish calls instead as the JetStream server will reply with an acknowledgement that it was successfully stored.

For further information have a look at JetStream Model Deep Dive.

11.1.1 Configuration

All properties on the JetStreamOptions.Builder and StreamConfiguration.Builder are available to be modified, either through configuration or a BeanCreatedEventListener.

The following properties are available for a stream configuration:

🔗
Table 1. Configuration Properties for NatsConnectionFactoryConfig$JetStreamConfiguration$StreamConfiguration
Property Type Description

nats.*.jetstream.streams.*.description

java.lang.String

nats.*.jetstream.streams.*.retention-policy

io.nats.client.api.RetentionPolicy

nats.*.jetstream.streams.*.compression-option

io.nats.client.api.CompressionOption

nats.*.jetstream.streams.*.max-consumers

long

nats.*.jetstream.streams.*.max-messages

long

nats.*.jetstream.streams.*.max-messages-per-subject

long

nats.*.jetstream.streams.*.max-bytes

long

nats.*.jetstream.streams.*.max-age

java.time.Duration

nats.*.jetstream.streams.*.max-msg-size

long

nats.*.jetstream.streams.*.storage-type

io.nats.client.api.StorageType

nats.*.jetstream.streams.*.replicas

int

nats.*.jetstream.streams.*.no-ack

boolean

nats.*.jetstream.streams.*.template-owner

java.lang.String

nats.*.jetstream.streams.*.discard-policy

io.nats.client.api.DiscardPolicy

nats.*.jetstream.streams.*.duplicate-window

java.time.Duration

nats.*.jetstream.streams.*.placement

io.nats.client.api.Placement

nats.*.jetstream.streams.*.republish

io.nats.client.api.Republish

nats.*.jetstream.streams.*.subject-transform

io.nats.client.api.SubjectTransform

nats.*.jetstream.streams.*.consumer-limits

io.nats.client.api.ConsumerLimits

nats.*.jetstream.streams.*.mirror

io.nats.client.api.Mirror

nats.*.jetstream.streams.*.sources

io.nats.client.api.Source

nats.*.jetstream.streams.*.allow-rollup

boolean

nats.*.jetstream.streams.*.allow-direct

boolean

nats.*.jetstream.streams.*.mirror-direct

boolean

nats.*.jetstream.streams.*.deny-delete

boolean

nats.*.jetstream.streams.*.deny-purge

boolean

nats.*.jetstream.streams.*.discard-new-per-subject

boolean

nats.*.jetstream.streams.*.metadata

java.util.Map

nats.*.jetstream.streams.*.first-sequence

long

nats.*.jetstream.streams.*.subjects

java.util.List

get the subjects of the stream.

A simple configuration for jetstream and a single stream can look like:

nats.default.jetstream.streams.events.storage-type=Memory
nats.default.jetstream.streams.events.subjects[0]=events.>
nats:
  default:
    jetstream:
      streams:
        events:
          storage-type: Memory
          subjects:
            - events.>
[nats]
  [nats.default]
    [nats.default.jetstream]
      [nats.default.jetstream.streams]
        [nats.default.jetstream.streams.events]
          storage-type="Memory"
          subjects=[
            "events.>"
          ]
nats {
  'default' {
    jetstream {
      streams {
        events {
          storageType = "Memory"
          subjects = ["events.>"]
        }
      }
    }
  }
}
{
  nats {
    default {
      jetstream {
        streams {
          events {
            storage-type = "Memory"
            subjects = ["events.>"]
          }
        }
      }
    }
  }
}
{
  "nats": {
    "default": {
      "jetstream": {
        "streams": {
          "events": {
            "storage-type": "Memory",
            "subjects": ["events.>"]
          }
        }
      }
    }
  }
}

11.1.2 Producer

The example in the quick start presented a trivial definition of an interface that be implemented automatically for you using the @JetstreamClient annotation.

The implementation that powers @JetstreamClient (defined by the JetStreamIntroductionAdvice class) is, however, very flexible and offers a range of options for defining Jetstream clients.

The @JetstreamClient extends the default @NatsClient and is based on the same methods. So you can still use all header and subject functionalities as you already know.

@JetstreamClient have a special extension for the options you want to publish.

@JetStreamClient
public interface ProductClient {

    PublishAck send(@Subject String subject, @MessageBody byte[] data, PublishOptions publishOptions); // (1)
}
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.nats.annotation.Subject
import io.micronaut.nats.jetstream.annotation.JetStreamClient
import io.nats.client.PublishOptions
import io.nats.client.api.PublishAck

@JetStreamClient
interface ProductClient {

    PublishAck send(@Subject String subject, @MessageBody byte[] data, PublishOptions publishOptions); // (2)
}
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.nats.annotation.Subject
import io.micronaut.nats.jetstream.annotation.JetStreamClient
import io.nats.client.PublishOptions
import io.nats.client.api.PublishAck

@JetStreamClient
interface ProductClient {

    fun send(@Subject subject: String, @MessageBody data: ByteArray, publishOptions: PublishOptions): PublishAck // (1)
}
1 With PublishOptions you can define additional options for publishing message with jetstream. You can also retrieve a PublishAck object representing a JetStream enabled server acknowledgment.

11.1.3 Consumer

A consumer is a stateful view of a stream. It acts as interface for clients to consume a subset of messages stored in a stream and will keep track of which messages were delivered and acknowledged by clients.

Unlike with core NATS which provides an at most once delivery guarantee of a message, a consumer can provide an at least once delivery guarantee. This is achieved by the combination of published messages being persisted to the stream as well as the consumer tracking delivery and acknowledgement of each individual message as clients receive and process them. JetStream consumers support multiple kinds of acknowledgements and multiple acknowledgement policies. They will take care of automatically re-deliver un-acked (or 'nacked') messages up to a user specified maximum number of delivery attempts (there is an advisory being emitted when a message reaches this limit).

Consumers can be push-based where messages will be delivered to a specified subject or pull-based which allows clients to request batches of messages on demand. The choice of what kind of consumer to use depends on the use-case but typically in the case of a client application that needs to get their own individual replay of messages from a stream you would use an 'ordered push consumer'. If there is a need to process messages and easily scale horizontally, you would use a 'pull consumer'.

In addition to the choice of being push or pull, a consumer can also be ephemeral or durable. A consumer is considered durable when an explicit name is set on the Durable field when creating the consumer, otherwise it is considered ephemeral. Durables and ephemeral behave exactly the same except that an ephemeral will be automatically cleaned up (deleted) after a period of inactivity, specifically when there are no subscriptions bound to the consumer. By default, durables will remain even when there are periods of inactivity (unless InactiveThreshold is set explicitly).

11.1.3.1 Push based

A push consumer is where the server is in control and sends messages to the client. It can be made durable or ephemeral based on your use case.

Push consumers are very similiar to the already known @NatsListener. Let’s look at a quick example.

import io.micronaut.nats.jetstream.annotation.JetStreamListener;
import io.micronaut.nats.jetstream.annotation.PushConsumer;

@JetStreamListener // (1)
public class ProductListener {

    List<byte[]> messageLengths = Collections.synchronizedList(new ArrayList<>());

    @PushConsumer(value = "events", subject = "events.>", durable = "test") // (2)
    public void receive(byte[] data) {
        messageLengths.add(data);
    }
}
import io.micronaut.nats.jetstream.annotation.JetStreamListener
import io.micronaut.nats.jetstream.annotation.PushConsumer

@JetStreamListener // (1)
class ProductListener {

    CopyOnWriteArrayList<byte[]> messageLengths = []

    @PushConsumer(value = "events", subject = "events.>", durable = "test") // (2)
    void receive(byte[] data) {
        messageLengths << data
    }
}
import io.micronaut.nats.jetstream.annotation.JetStreamListener
import io.micronaut.nats.jetstream.annotation.PushConsumer

@JetStreamListener // (1)
class ProductListener {

    val messageLengths: MutableList<ByteArray> = Collections.synchronizedList(ArrayList())

    @PushConsumer(value = "events", subject = "events.>", durable = "test") // (2)
    fun receive(data: ByteArray) {
        messageLengths.add(data)
    }
}
1 The class needs to be annotated with @JetstreamListener.
2 The @PushConsumer configures the stream(value) and the subject to listen for. It is possible to use all known values from SubscribeOptions and PushSubscribeOptions such as durable.

11.1.3.2 Pull based

A pull consumer allows you to control when the server sends the client messages.

For creating a new pull subscription you need to inject PullConsumerRegistrey.

Let’s look at a quick example.

import io.micronaut.nats.jetstream.PullConsumerRegistry;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.PullSubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;

@Singleton
public class PullConsumerHelper {

    private final PullConsumerRegistry pullConsumerRegistry;

    public PullConsumerHelper(PullConsumerRegistry pullConsumerRegistry) { // (1)
        this.pullConsumerRegistry = pullConsumerRegistry;
    }

    public List<Message> pullMessages() throws JetStreamApiException, IOException {
        PullSubscribeOptions pullSubscribeOptions =
            PullSubscribeOptions.builder()
                                .stream("events")
                                .configuration(
                                    ConsumerConfiguration
                                        .builder()
                                        .ackWait(
                                            Duration.ofMillis(
                                                2500))
                                        .build())
                                .build();
        JetStreamSubscription jetStreamSubscription =
            pullConsumerRegistry.newPullConsumer("events.>", pullSubscribeOptions); // (2)

        List<Message> messages = jetStreamSubscription.fetch(2, Duration.ofSeconds(2L)); // (3)
        messages.forEach(Message::ack); // (4)
        return messages;
    }
}
import io.micronaut.nats.jetstream.PullConsumerRegistry
import io.nats.client.JetStreamApiException
import io.nats.client.JetStreamSubscription
import io.nats.client.Message
import io.nats.client.PullSubscribeOptions
import io.nats.client.api.ConsumerConfiguration

@Singleton
class PullConsumerHelper {

    private final PullConsumerRegistry pullConsumerRegistry;

    PullConsumerHelper(PullConsumerRegistry pullConsumerRegistry) {  // (1)
        this.pullConsumerRegistry = pullConsumerRegistry
    }

    List<Message> pullMessages() throws JetStreamApiException, IOException {
        JetStreamSubscription jetStreamSubscription =
                pullConsumerRegistry.newPullConsumer("events.>",
                        PullSubscribeOptions.builder()
                                .stream("events")
                                .configuration(
                                        ConsumerConfiguration.builder().ackWait(Duration.ofMillis(2500)).build())
                                .build()) // (2)

        List<Message> messages = jetStreamSubscription.fetch(2, Duration.ofSeconds(2L)) // (3)
        messages.forEach(Message::ack) // (4)
        return messages
    }

}
import io.micronaut.nats.jetstream.PullConsumerRegistry
import io.nats.client.JetStreamSubscription
import io.nats.client.Message
import io.nats.client.PullSubscribeOptions
import io.nats.client.api.ConsumerConfiguration

@Singleton
class PullConsumerHelper(private val pullConsumerRegistry: PullConsumerRegistry) { // (1)

    fun pullMessages(): List<Message> {
        val pullSubscribeOptions: PullSubscribeOptions =
            PullSubscribeOptions.builder()
                .stream("events")
                .configuration(
                    ConsumerConfiguration
                        .builder()
                        .ackWait(
                            Duration.ofMillis(
                                2500
                            )
                        )
                        .build()
                )
                .build()
        val jetStreamSubscription: JetStreamSubscription =
            pullConsumerRegistry.newPullConsumer("events.>", pullSubscribeOptions) // (2)

        val messages: List<Message> = jetStreamSubscription.fetch(2, Duration.ofSeconds(2L)) // (3)
        messages.forEach(Message::ack) // (4)
        return messages
    }

}
1 For creating a new pull subscription you need to inject PullConsumerRegistry.
2 Create a new JetStreamSubscription with the subjects you want to subscribe to and the necessary PullSubscribeOptions.
3 Fetch the messages from the subscription.
4 Acknowledge the fetched messages.

11.2 Key/Value Store

JetSteam, the persistence layer of NATS, doesn’t just allow for higher qualities of service and features associated with 'streaming', but it also enables some functionalities not found in messaging systems.

One such feature is the Key/Value store functionality, which allows client applications to create 'buckets' and use them as immediately consistent, persistent associative arrays.

You can use KV buckets to perform the typical operations you would expect from an immediately consistent key/value store:

  • put: associate a value with a key

  • get: retrieve the value associated with a key

  • delete: clear any value associated with a key

  • purge: clear all the values associated with all keys

  • create: associate the value with a key only if there is currently no value associated with that key (i.e. compare to null and set)

  • update: compare and set (aka compare and swap) the value for a key

  • keys: get a copy of all the keys (with a value or operation associated to it)

You can set limits for your buckets, such as: - the maximum size of the bucket - the maximum size for any single value - a TTL: how long the store will keep values for

Finally, you can even do things that typically can not be done with a Key/Value Store:

  • watch: watch for changes happening for a key, which is similar to subscribing (in the publish/subscribe sense) to the key: the watcher receives updates due to put or delete operations on the key pushed to it in real-time as they happen

  • watch all: watch for all the changes happening on all the keys in the bucket

  • history: retrieve a history of the values (and delete operations) associated with each key over time (by default the history of buckets is set to 1, meaning that only the latest value/operation is stored)

11.2.1 Configuration

All properties on the JetStreamOptions.Builder and KeyValueConfiguration.Builder are available to be modified, either through configuration or a BeanCreatedEventListener.

The following properties are available for a stream configuration:

🔗
Table 1. Configuration Properties for NatsConnectionFactoryConfig$JetStreamConfiguration$KeyValueConfiguration
Property Type Description

nats.*.jetstream.keyvalue.*.description

java.lang.String

nats.*.jetstream.keyvalue.*.max-history-per-key

int

nats.*.jetstream.keyvalue.*.max-bucket-size

long

nats.*.jetstream.keyvalue.*.max-value-size

long

nats.*.jetstream.keyvalue.*.ttl

java.time.Duration

nats.*.jetstream.keyvalue.*.storage-type

io.nats.client.api.StorageType

nats.*.jetstream.keyvalue.*.replicas

int

nats.*.jetstream.keyvalue.*.placement

io.nats.client.api.Placement

nats.*.jetstream.keyvalue.*.compression

boolean

nats.*.jetstream.keyvalue.*.metadata

java.util.Map

nats.*.jetstream.keyvalue.*.republish

io.nats.client.api.Republish

nats.*.jetstream.keyvalue.*.mirror

io.nats.client.api.Mirror

A simple configuration for key value can look like:

nats.default.jetstream.keyvalue.examplebucket.storage-type=Memory
nats.default.jetstream.keyvalue.examplebucket.max-history-per-key=5
nats:
  default:
    jetstream:
      keyvalue:
        examplebucket:
          storage-type: Memory
          max-history-per-key: 5
[nats]
  [nats.default]
    [nats.default.jetstream]
      [nats.default.jetstream.keyvalue]
        [nats.default.jetstream.keyvalue.examplebucket]
          storage-type="Memory"
          max-history-per-key=5
nats {
  'default' {
    jetstream {
      keyvalue {
        examplebucket {
          storageType = "Memory"
          maxHistoryPerKey = 5
        }
      }
    }
  }
}
{
  nats {
    default {
      jetstream {
        keyvalue {
          examplebucket {
            storage-type = "Memory"
            max-history-per-key = 5
          }
        }
      }
    }
  }
}
{
  "nats": {
    "default": {
      "jetstream": {
        "keyvalue": {
          "examplebucket": {
            "storage-type": "Memory",
            "max-history-per-key": 5
          }
        }
      }
    }
  }
}

11.2.2 Usage

Nats.io provides an KeyValue interface for the usage of Key/Value Stores.

To use it, just inject your Key/Value Store as follows:

@Singleton
public class KeyValueStoreHolder {

    @Inject
    @KeyValueStore("examplebucket")
    KeyValue store; // (1)

    public void put(String key, String value) throws JetStreamApiException, IOException {
        store.put(key, value);
    }

}
import io.micronaut.nats.jetstream.annotation.KeyValueStore
import io.nats.client.KeyValue
import jakarta.inject.Inject
import jakarta.inject.Singleton

@Singleton
class KeyValueStoreHolder {

    @Inject
    @KeyValueStore("examplebucket")
    KeyValue store // (1)

    void put(String key, String value) {
        store.put(key, value)
    }
}
import io.micronaut.nats.jetstream.annotation.KeyValueStore
import io.nats.client.KeyValue
import jakarta.inject.Inject
import jakarta.inject.Singleton

@Singleton
class KeyValueStoreHolder {

    @Inject
    @field:KeyValueStore("examplebucket")
    lateinit var store: KeyValue // (1)

    fun put(key: String, value:String) {
        store.put(key, value)
    }

}
1 Simply inject the key/value store via the @KeyValueStore annotation.

11.3 Object Store (Experimental)

Experimental Preview

The Object Store allows you to store data of any (i.e. large) size by implementing a chunking mechanism, allowing you to for example store and retrieve files (i.e. the object) of any size by associating them with a path and a file name (i.e. the key). You obtain a ObjectStoreManager object from your JetStream context.

11.3.1 Configuration

All properties on the JetStreamOptions.Builder and ObjectStoreConfiguration.Builder are available to be modified, either through configuration or a BeanCreatedEventListener.

🔗
Table 1. Configuration Properties for NatsConnectionFactoryConfig$JetStreamConfiguration$ObjectStoreConfiguration
Property Type Description

nats.*.jetstream.objectstore.*.description

java.lang.String

nats.*.jetstream.objectstore.*.max-bucket-size

long

nats.*.jetstream.objectstore.*.ttl

java.time.Duration

nats.*.jetstream.objectstore.*.storage-type

io.nats.client.api.StorageType

nats.*.jetstream.objectstore.*.replicas

int

nats.*.jetstream.objectstore.*.placement

io.nats.client.api.Placement

nats.*.jetstream.objectstore.*.compression

boolean

nats.*.jetstream.objectstore.*.metadata

java.util.Map

A simple configuration for key value can look like:

nats.default.jetstream.objectstore.examplebucket.storage-type=Memory
nats:
  default:
    jetstream:
      objectstore:
        examplebucket:
          storage-type: Memory
[nats]
  [nats.default]
    [nats.default.jetstream]
      [nats.default.jetstream.objectstore]
        [nats.default.jetstream.objectstore.examplebucket]
          storage-type="Memory"
nats {
  'default' {
    jetstream {
      objectstore {
        examplebucket {
          storageType = "Memory"
        }
      }
    }
  }
}
{
  nats {
    default {
      jetstream {
        objectstore {
          examplebucket {
            storage-type = "Memory"
          }
        }
      }
    }
  }
}
{
  "nats": {
    "default": {
      "jetstream": {
        "objectstore": {
          "examplebucket": {
            "storage-type": "Memory"
          }
        }
      }
    }
  }
}

11.3.2 Usage

Nats.io provides an ObjectStore interface for the usage of Object Stores.

To use it, just inject your Object Store as follows:

import io.micronaut.nats.jetstream.annotation.ObjectStore;
import io.nats.client.JetStreamApiException;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.io.IOException;
import java.io.InputStream;
import java.security.NoSuchAlgorithmException;

@Singleton
public class ObjectStoreHolder {

    @Inject
    @ObjectStore("examplebucket")
    io.nats.client.ObjectStore store; // (1)

    public void put(String key, InputStream inputStream) throws JetStreamApiException, IOException, NoSuchAlgorithmException {
        store.put(key, inputStream);
    }

}
import io.micronaut.nats.jetstream.annotation.ObjectStore
import jakarta.inject.Inject
import jakarta.inject.Singleton

@Singleton
class ObjectStoreHolder {

    @Inject
    @ObjectStore("examplebucket")
    io.nats.client.ObjectStore store // (1)

    void put(String key, InputStream value) {
        store.put(key, value)
    }
}
import io.micronaut.nats.jetstream.annotation.ObjectStore
import jakarta.inject.Inject
import jakarta.inject.Singleton
import java.io.InputStream


@Singleton
class ObjectStoreHolder {

    @Inject
    @field:ObjectStore("examplebucket")
    lateinit var store: io.nats.client.ObjectStore // (1)

    fun put(key: String, value:InputStream) {
        store.put(key, value)
    }

}
1 Simply inject the object store via the @ObjectStore annotation.

12 Repository

You can find the source code of this project in this repository: