Micronaut MQTT

Integration between Micronaut and MQTT

Version: 3.4.0

1 Introduction

This project includes integration between Micronaut and MQTT. The Eclipse Paho Client is used to do the actual publishing and subscribing.

There are two variants to support both MQTT V3 and V5.

2 Release History

3.0.x

Since Micronaut MQTT 3.0.0, you can use an implementation based on the HiveMQ MQTT Client

3 Release Notes

For this project, you can find a list of releases (with release notes) here:

4 MQTT Quick Start

To add support for MQTT to an existing project, you must first add either the Micronaut MQTT HiveMQ dependency or the Micronaut MQTT Eclipse Paho v3 or v5 (depending on the version of MQTT you require) to your build.

If you are running into the exception org.eclipse.paho.client.mqttv3.MqttException: Timed out waiting for a response from the server, please make sure to run your program on a machine with more than one core. The Eclipse Paho MQTT client will run into connection timeouts if it has only one core available.

4.1 Creating an MQTT Publisher with MqttPublisher annotation

To create an MQTT client that produces messages you can simply define an interface that is annotated with MqttPublisher.

For example the following is a trivial MqttPublisher interface:

import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;

@MqttPublisher // (1)
public interface ProductClient {

    @Topic("product") // (2)
    void send(byte[] data); // (3)
}
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher

@MqttPublisher // (1)
interface ProductClient {

    @Topic("product") // (2)
    void send(byte[] data) // (3)
}
import io.micronaut.mqtt.annotation.v5.MqttPublisher
import io.micronaut.mqtt.annotation.Topic

@MqttPublisher // (1)
interface ProductClient {

    @Topic("product") // (2)
    fun send(data: ByteArray)  // (3)
}
1 The MqttPublisher annotation is used to designate this interface as a client.
2 The @Topic annotation indicates which topic the message should be published to.
3 The send method accepts single parameter which is the payload of the message.

At compile time Micronaut will produce an implementation of the above interface. You can retrieve an instance of ProductClient either by looking up the bean from the ApplicationContext or by injecting the bean with @Inject:

ProductClient productClient = applicationContext.getBean(ProductClient.class);
productClient.send("quickstart".getBytes());
def productClient = applicationContext.getBean(ProductClient)
productClient.send("quickstart".getBytes())
val productClient = ctx.getBean(ProductClient::class.java)
productClient.send("quickstart".toByteArray())
Because the send method returns void this means the method will publish the message and block until the broker acknowledges the message.

4.2 Creating an MQTT Subscriber with MqttSubscriber annotation

To listen to MQTT messages you can use the @MqttSubscriber annotation to define a message listener.

The following example will listen for messages published by the ProductClient in the previous section:

import io.micronaut.mqtt.annotation.MqttSubscriber;
import io.micronaut.mqtt.annotation.Topic;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@MqttSubscriber // (1)
public class ProductListener {

    List<String> messageLengths = Collections.synchronizedList(new ArrayList<>());

    @Topic("product") // (2)
    public void receive(byte[] data) { // (3)
        messageLengths.add(new String(data));
        System.out.println("Java received " + data.length + " bytes from MQTT");
    }
}
import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.Topic

@MqttSubscriber // (1)
class ProductListener {

    List<String> messageLengths = Collections.synchronizedList([])

    @Topic("product") // (2)
    void receive(byte[] data) { // (3)
        messageLengths.add(new String(data))
        println("Groovy received ${data.length} bytes from MQTT")
    }
}
import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.Topic
import java.util.*

@MqttSubscriber // (1)
class ProductListener {

    val messageLengths: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Topic("product") // (2)
    fun receive(data: ByteArray) { // (3)
        val string = String(data)
        messageLengths.add(string)
        println("Kotlin received ${data.size} bytes from MQTT: ${string}")
    }
}
1 The @MqttSubscriber is used to designate the bean as a message listener.
2 The @Topic annotation is used to indicate which topic to subscribe to.
3 The receive method accepts a single parameter which is the payload of the message.

4.3 MQTT Health Checks

When either the mqttv3 or mqttv5 module is activated an MqttHealthIndicator or MqttHealthIndicator is activated resulting in the /health endpoint and CurrentHealthStatus interface resolving the health of the MQTT connection.

The only configuration option supported is to enable or disable the indicator by the endpoints.health.mqtt.client.enabled key.

See the section on the Health Endpoint for more information.

5 MQTT HiveMQ

This section outlines details specific to the HiveMQ client implementation.

5.1 MQTT Dependency with HiveMQ

The following dependency uses the HiveMQ MQTT client under the hood.

implementation("io.micronaut.mqtt:micronaut-mqtt-hivemq")
<dependency>
    <groupId>io.micronaut.mqtt</groupId>
    <artifactId>micronaut-mqtt-hivemq</artifactId>
</dependency>

5.2 Configuration

Both v3 and v5 clients are configured via the same object with mqtt.client.mqtt-version set to either 3 or 5. (default is 5)

🔗
Table 1. Configuration Properties for MqttClientConfigurationProperties
Property Type Description

mqtt.client.clean-start

boolean

Set true if a new sessions should be started for connection (v5 only).

mqtt.client.session-expiry-interval

java.lang.Long

The session expiry interval in seconds (v5 only).

mqtt.client.receive-maximum

java.lang.Integer

The maximum amount of not acknowledged publishes with QoS 1 or 2 the client accepts from the server concurrently (v5 only).

mqtt.client.maximum-packet-size

java.lang.Integer

The maximum packet size the client sends to the server (v5 only).

mqtt.client.topic-alias-maximum

java.lang.Integer

The maximum amount of topic aliases the client accepts from the server (v5 only).

mqtt.client.request-response-info

boolean

Whether the client requests response information from the server (v5 only).

mqtt.client.request-problem-info

boolean

Whether the client requests problem information from the server (v5 only).

mqtt.client.user-properties

java.util.Map

The user defined properties that should be sent for every message (v5 only).

mqtt.client.clean-session

boolean

Set true if a new session should be started for connection (v3 only).

mqtt.client.server-uri

java.net.URI

The URI of server to connect to as [schema]://[serverHost]:[serverPort].

mqtt.client.client-id

java.lang.String

The client identifier to use.

mqtt.client.mqtt-version

int

The version of the MQTT protocol to use (one of 3 or 5).

mqtt.client.connection-timeout

java.time.Duration

How long to wait for a connection to be established.

mqtt.client.manual-acks

boolean

True if you wish to manually acknowledge messages.

mqtt.client.password

byte

The password to use for MQTT connections.

mqtt.client.user-name

java.lang.String

The username to use for MQTT connections.

mqtt.client.max-reconnect-delay

long

The maximum delay for reconnecting in seconds.

mqtt.client.keep-alive-interval

int

The keep alive interval in seconds.

mqtt.client.automatic-reconnect

boolean

True if the client should attempt to reconnect to the server if the connection is lost.

mqtt.client.custom-web-socket-headers

java.util.Map

Custom headers that should be sent with web socket connections.

mqtt.client.https-hostname-verification-enabled

boolean

True if hostname verification should be used for https connections.

mqtt.client.sslhostname-verifier

javax.net.ssl.HostnameVerifier

The hostname verifier to use for hostname verification.

It is also possible to disable the integration entirely with mqtt.enabled: false

6 MQTT V3 Eclipse Paho

This section outlines details specific to the v3 Eclipse Paho implementation.

6.1 MQTT v3 Dependency with Eclipse Paho

The following dependency uses the Eclipse Paho client under the hood.

For MQTT v3:

implementation("io.micronaut.mqtt:micronaut-mqttv3")
<dependency>
    <groupId>io.micronaut.mqtt</groupId>
    <artifactId>micronaut-mqttv3</artifactId>
</dependency>

6.2 Configuration

All properties on the MqttConnectOptions are available to be modified, either through configuration or a BeanCreatedEventListener.

The properties that can be converted from the string values in a configuration file can be configured directly.

🔗
Table 1. Configuration Properties for MqttClientConfigurationProperties
Property Type Description

mqtt.client.password

char

mqtt.client.user-name

java.lang.String

mqtt.client.max-reconnect-delay

int

mqtt.client.keep-alive-interval

int

mqtt.client.mqtt-version

int

mqtt.client.max-inflight

int

mqtt.client.connection-timeout

int

mqtt.client.clean-session

boolean

mqtt.client.server-uris

java.lang.String

mqtt.client.automatic-reconnect

boolean

mqtt.client.executor-service-timeout

int

mqtt.client.custom-web-socket-headers

java.util.Properties

mqtt.client.server-uri

java.lang.String

mqtt.client.client-id

java.lang.String

mqtt.client.manual-acks

java.lang.Boolean

mqtt.client.socket-factory

javax.net.SocketFactory

mqtt.client.sslproperties

java.util.Properties

mqtt.client.https-hostname-verification-enabled

boolean

mqtt.client.sslhostname-verifier

javax.net.ssl.HostnameVerifier

Without any configuration the defaults in the MqttConnectOptions will be used.
It is also possible to disable the integration entirely with mqtt.enabled: false

7 MQTT V5 Eclipse Paho

This section outlines details specific to the v5 Eclipse Paho implementation.

7.1 MQTT v5 Dependency with Eclipse Paho

The following dependency uses the Eclipse Paho client under the hood.

For MQTT v5:

implementation("io.micronaut.mqtt:micronaut-mqttv5")
<dependency>
    <groupId>io.micronaut.mqtt</groupId>
    <artifactId>micronaut-mqttv5</artifactId>
</dependency>

7.2 Configuration

All properties on the MqttConnectionOptions are available to be modified, either through configuration or a BeanCreatedEventListener.

The properties that can be converted from the string values in a configuration file can be configured directly.

🔗
Table 1. Configuration Properties for MqttClientConfigurationProperties
Property Type Description

mqtt.client.user-name

java.lang.String

mqtt.client.password

byte

mqtt.client.clean-start

boolean

mqtt.client.keep-alive-interval

int

mqtt.client.connection-timeout

int

mqtt.client.max-reconnect-delay

int

mqtt.client.server-uris

java.lang.String

mqtt.client.automatic-reconnect

boolean

mqtt.client.session-expiry-interval

java.lang.Long

mqtt.client.receive-maximum

java.lang.Integer

mqtt.client.maximum-packet-size

java.lang.Long

mqtt.client.topic-alias-maximum

java.lang.Integer

mqtt.client.request-response-info

boolean

mqtt.client.request-problem-info

boolean

mqtt.client.user-properties

java.util.List

mqtt.client.auth-method

java.lang.String

mqtt.client.auth-data

byte

mqtt.client.use-subscription-identifiers

boolean

mqtt.client.custom-web-socket-headers

java.util.Map

mqtt.client.send-reason-messages

boolean

mqtt.client.executor-service-timeout

int

mqtt.client.server-uri

java.lang.String

mqtt.client.client-id

java.lang.String

mqtt.client.socket-factory

javax.net.SocketFactory

mqtt.client.sslproperties

java.util.Properties

mqtt.client.https-hostname-verification-enabled

boolean

mqtt.client.sslhostname-verifier

javax.net.ssl.HostnameVerifier

Without any configuration the defaults in the MqttConnectionOptions will be used.
It is also possible to disable the integration entirely with mqtt.enabled: false

7.3 Properties

Publishers

It is also supported to supply properties when publishing messages. Any of the org.eclipse.paho.mqttv5.common.packet.MqttProperties can be set dynamically per execution or statically for all executions. Properties can be set using the @MqttProperty annotation.

import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttProperty;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;

@MqttPublisher
@MqttProperty(name = "appId", value = "myApp") // (1)
public interface ProductClient {

    @Topic("product")
    @MqttProperty(name = "contentType", value = "application/json") // (2)
    @MqttProperty(name = "userId", value = "guest")
    void send(byte[] data);

    @Topic("product")
    void send(@MqttProperty("userId") String user, @MqttProperty String contentType, byte[] data); // (3)
}
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttProperty
import io.micronaut.mqtt.annotation.v5.MqttPublisher

@MqttPublisher
@MqttProperty(name = "appId", value = "myApp") // (1)
interface ProductClient {

    @Topic("product")
    @MqttProperty(name = "contentType", value = "application/json") // (2)
    @MqttProperty(name = "userId", value = "guest")
    void send(byte[] data)

    @Topic("product")
    void send(@MqttProperty("userId") String user, @MqttProperty String contentType, byte[] data) // (3)
}
import io.micronaut.mqtt.annotation.v5.MqttPublisher
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttProperties
import io.micronaut.mqtt.annotation.v5.MqttProperty

@MqttPublisher
@MqttProperty(name = "appId", value = "myApp") // (1)
interface ProductClient {

    @Topic("product")
    @MqttProperties( // (2)
            MqttProperty(name = "contentType", value = "application/json"),
            MqttProperty(name = "userId", value = "guest")
    )
    fun send(data: ByteArray)

    @Topic("product")
    fun send(@MqttProperty("userId") user: String, @MqttProperty contentType: String?, data: ByteArray)  // (3)
}
1 Properties can be defined at the class level and will apply to all methods. If a property is defined on the method with the same name as one on the class, both values will be added.
2 Multiple annotations can be used to set multiple properties on the method or class level.
3 Properties can be set per execution. The name is inferred from the argument if the annotation value is not set. Parameter values override any property value set from method or class level annotations.

For method arguments, if the value is not supplied to the annotation, the argument name will be used as the property name. For example, @MqttProperty String userId would result in the property userId being set on the properties object before publishing.

If the annotation or argument name cannot be matched to a property name in the MqttProperties class, a user property will be added instead.

Subscribers

Any properties found in messages received can be bound to subscriber arguments through the @v5,MqttProperty annotation.

import io.micronaut.core.annotation.Nullable;
import io.micronaut.mqtt.annotation.MqttSubscriber;
import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttProperty;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;


@MqttSubscriber
public class ProductListener {

    List<String> messageProperties = Collections.synchronizedList(new ArrayList<>());

    @Topic("product")
    public void receive(byte[] data,
                        @MqttProperty("userId") String user,  // (1)
                        @Nullable @MqttProperty String contentType,  // (2)
                        @MqttProperty String appId) {  // (3)
        messageProperties.add(user + "|" + contentType + "|" + appId);
    }
}
import io.micronaut.core.annotation.Nullable

import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttProperty

@MqttSubscriber
class ProductListener {

    List<String> messageProperties = Collections.synchronizedList([])

    @Topic("product")
    void receive(byte[] data,
                 @MqttProperty("userId") String user,  // (1)
                 @Nullable @MqttProperty String contentType,  // (2)
                 @MqttProperty String appId) {  // (3)
        messageProperties.add(user + "|" + contentType + "|" + appId)
    }
}
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.v5.MqttProperty
import java.util.*

@MqttSubscriber
class ProductListener {

    val messageProperties: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Topic("product")
    fun receive(data: ByteArray,
                @MqttProperty("userId") user: String, // (1)
                @MqttProperty contentType: String?, // (2)
                @MqttProperty appId: String) { // (3)
        messageProperties.add("$user|$contentType|$appId")
    }
}
1 The property will be bound from the user property userId.
2 The property will be bound from the content type property and does not require the property to be present.
3 The property will be bound from the user property appId.
Arguments are required by default and an exception will be thrown if they cannot be found or converted to the requested type. Making the argument nullable allows null values to be accepted.

8 SSL Connections

This library supports connecting to MQTT brokers over SSL. Because that feature requires a third party dependency (Bouncy Castle), the functionality exists in a separate module that you must express a dependency on.

implementation("io.micronaut.mqtt:micronaut-mqtt-ssl")
<dependency>
    <groupId>io.micronaut.mqtt</groupId>
    <artifactId>micronaut-mqtt-ssl</artifactId>
</dependency>

After adding the dependency, you must configure the client with the appropriate certificate authority, certificate, key, and password.

🔗
Table 1. Configuration Properties for MqttCertificateConfiguration
Property Type Description

mqtt.client.ssl.certificate-authority

Readable

mqtt.client.ssl.certificate

Readable

mqtt.client.ssl.private-key

Readable

mqtt.client.ssl.password

char

The files can be configured with the file: prefix to reference absolute paths on the file system or classpath: to reference files on the classpath.

Once the configuration is in place the client will connect over SSL.

The server URI must start with ssl:.

9 MQTT Publishers

The example in the quick start presented a trivial definition of an interface that be implemented automatically for you using the @MqttPublisher annotation.

The implementation that powers @MqttPublisher is, however, very flexible and offers a range of options for defining MQTT producers.

9.1 Defining Publisher Methods

All methods that publish messages to MQTT must meet the following conditions:

  • The method must reside in a class annotated with @MqttPublisher.

  • The method must provide the topic to publish to, either through an argument, or the topic annotation on the method or class.

If the topic cannot be found, an exception will be thrown. Unless a reactive type or future is returned from the publishing method, the action is blocking.

9.1.1 Publisher Parameters

All options are available to be set for publishing messages, either through static values in annotations on the class or method, or through annotations on the method arguments.

9.1.1.1 Topic

To set the topic to publish to, apply the @Topic annotation to the method or an argument of the method. Apply the annotation to the class or method if the value is static for every execution. Apply the annotation to an argument of the method if the value should be set per execution.

import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;

@MqttPublisher
public interface ProductClient {

    @Topic("product") // (1)
    void send(byte[] data);

    void send(@Topic String binding, byte[] data); // (2)
}
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher

@MqttPublisher
interface ProductClient {

    @Topic("product") // (1)
    void send(byte[] data)

    void send(@Topic String binding, byte[] data) // (2)
}
import io.micronaut.mqtt.annotation.v5.MqttPublisher
import io.micronaut.mqtt.annotation.Topic

@MqttPublisher
interface ProductClient {

    @Topic("product") // (1)
    fun send(data: ByteArray)

    fun send(@Topic binding: String, data: ByteArray)  // (2)
}
1 The topic is static
2 The topic must be set per execution

9.1.1.2 Qos

Quality of service can be set for publishing messages either through the @Topic or @Qos annotations. The @Qos annotation can be applied to a method argument to set the Qos per execution.

import io.micronaut.mqtt.annotation.Qos;
import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;

@MqttPublisher
public interface ProductClient {

    @Topic(value = "product", qos = 2) // (1)
    void send(byte[] data);

    @Topic("product")
    void send(byte[] data, @Qos int qos); // (2)
}
import io.micronaut.mqtt.annotation.Qos
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher

@MqttPublisher
interface ProductClient {

    @Topic(value = "product", qos = 2) // (1)
    void send(byte[] data)

    @Topic("product")
    void send(byte[] data, @Qos int qos) // (2)
}
import io.micronaut.mqtt.annotation.Qos
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher

@MqttPublisher
interface ProductClient {

    @Topic(value = "product", qos = 2) // (1)
    fun send(data: ByteArray?)

    @Topic("product")
    fun send(data: ByteArray?, @Qos qos: Int) // (2)
}
// end:clazz[]
1 Qos is set via the topic annotation. It can be set through @Qos instead if that is preferred.
2 The Qos must be set per execution

9.1.1.3 Retained

The retained flagg can be set for publishing messages through the @Retained annotation. The annotation can also be applied to a method argument to set the retained flag per execution.

import io.micronaut.mqtt.annotation.Retained;
import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;

@MqttPublisher
public interface ProductClient {

    @Topic("product")
    @Retained(true)// (1)
    void send(byte[] data);

    @Topic("product")
    void send(byte[] data, @Retained boolean retained); // (2)
}
import io.micronaut.mqtt.annotation.Retained
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher

@MqttPublisher
interface ProductClient {

    @Topic("product")
    @Retained(true)// (1)
    void send(byte[] data)

    @Topic("product")
    void send(byte[] data, @Retained boolean retained) // (2)
}
import io.micronaut.mqtt.annotation.Retained
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher

@MqttPublisher
interface ProductClient {

    @Topic("product")
    @Retained(true) // (1)
    fun send(data: ByteArray?)

    @Topic("product")
    fun send(data: ByteArray?, @Retained retained: Boolean) // (2)
}
1 The retained flag is set via the Retained annotation.
2 The retained flag must be set per execution

9.1.1.4 Payload

Most examples up to this point have been using a byte[] as the body type for simplicity. This library supports most standard Java types and JSON serialization (using Jackson) by default. The functionality is extensible and it is possible to add support for additional types and serialization strategies. See the section on Message Serialization/Deserialization for more information.

9.1.2 Publisher Acknowledgements

MQTT supports publisher acknowledgements that behave different based on the qos of the message being sent. The broker will acknowledge the message and then that will cause the publishing method to complete. For publishers that return void, that means the method will block the current thread until the acknowledgement is received. For futures and reactive types, they will complete after the acknowledgement is received.

Since the publisher is cold, the message will not actually be published until the stream is subscribed to.

For example:

import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;
import org.reactivestreams.Publisher;

@MqttPublisher
public interface ProductClient {

    @Topic("product")
    Publisher<Void> sendPublisher(byte[] data); // (1)

    @Topic("product")
    CompletableFuture<Void> sendFuture(byte[] data); // (2)
}
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher
import org.reactivestreams.Publisher

@MqttPublisher
interface ProductClient {

    @Topic("product")
    Publisher<Void> sendPublisher(byte[] data) // (1)

    @Topic("product")
    CompletableFuture<Void> sendFuture(byte[] data) // (2)
}
import io.micronaut.mqtt.annotation.v5.MqttPublisher
import io.micronaut.mqtt.annotation.Topic
import org.reactivestreams.Publisher

@MqttPublisher
interface ProductClient {

    @Topic("product")
    fun sendPublisher(data: ByteArray): Publisher<Void>  // (1)

    @Topic("product")
    fun sendFuture(data: ByteArray): CompletableFuture<Void>  // (2)

    @Topic("product")
    suspend fun sendSuspend(data: ByteArray): Unit //suspend methods work too!
}
1 A Publisher can be returned to be able to know when the message was acknowledged. Any other reactive type can be used given the appropriate dependencies are in place.
2 Java futures can also be used

10 MQTT Subscribers

The example in the quick start presented a trivial definition of a class that listens for messages using the @MqttSubscriber annotation.

The implementation that powers @MqttSubscriber (defined by the AbstractMqttSubscriberAdvice class) is, however, very flexible and offers a range of options for defining MQTT subscribers.

10.1 Defining @MqttSubscriber Methods

All methods that consume messages from MQTT must meet the following conditions:

  • The method must reside in a class annotated with @MqttSubscriber.

  • The method must be annotated with one or more @Topic annotations.

In order for all of the functionality to work as designed in this guide your classes must be compiled with the parameters flag set to true. If your application was created with the Micronaut CLI, then that has already been configured for you.

10.1.1 Subscriber Parameters

In order for the subscriber method to be invoked, all arguments must be satisfied. To allow execution of the method with a null value, the argument must be declared as nullable. If the arguments cannot be satisfied, an exception will be thrown. See the section on consumer exceptions for details on how to handle exceptions thrown during the subscription process.

10.1.1.1 Topic

A @Topic annotation is required for a method to be a subscriber of messages from MQTT. Simply apply the annotation to the method and supply the name of the topic you would like to listen to. Multiple topics can be subscribed to by supplying multiple annotations. Each topic can have its own qos value set in the annotation.

import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.MqttSubscriber;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@MqttSubscriber
public class ProductListener {

    List<Integer> messageLengths = Collections.synchronizedList(new ArrayList<>());
    List<String> topics = Collections.synchronizedList(new ArrayList<>());

    @Topic("product") // (1)
    public void receive(byte[] data) {
        messageLengths.add(data.length);
    }

    @Topic("product/a")
    @Topic("product/b") // (2)
    public void receive(byte[] data, @Topic String topic) {
        topics.add(topic);
    }
}
import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.Topic

@MqttSubscriber
class ProductListener {

    List<Integer> messageLengths = Collections.synchronizedList([])

    @Topic("product") // (1)
    void receive(byte[] data) {
        messageLengths.add(data.length)
    }
}
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.MqttSubscriber
import java.util.*
import kotlin.collections.ArrayList

@MqttSubscriber
class ProductListener {

    val messageLengths: MutableList<Int> = Collections.synchronizedList(ArrayList())

    @Topic("product") // (1)
    fun receive(data: ByteArray) {
        messageLengths.add(data.size)
    }
}
1 The topic annotation is set per method. Multiple methods may be defined with different topics in the same class.
2 Multiple topics are subscribed to and the topic received is an argument to the method. The value will change depending on which topic the message was sent to.

10.1.1.2 Payload

Most examples up to this point have been using a byte[] as the body type for simplicity. This library supports most standard Java types and JSON deserialization (using Jackson) by default. The functionality is extensible and it is possible to add support for additional types and deserialization strategies. See the section on Message Serialization/Deserialization for more information.

10.1.2 Acknowledging Messages

If manual acknowledge is enabled through the mqtt.client.manual-acks configuration property, messages can be acknowledged manually.

For methods that accept an argument of type Acknowledgement, the message will only be acknowledged when the ack method is called. The nack method is not relevant for MQTT and should not be called.

Acknowledgement Type

import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.MqttSubscriber;
import io.micronaut.messaging.Acknowledgement;

import java.util.concurrent.atomic.AtomicInteger;

@MqttSubscriber
public class ProductListener {

    AtomicInteger messageCount = new AtomicInteger();

    @Topic("product")
    public void receive(byte[] data, Acknowledgement acknowledgement) { // (1)
        messageCount.getAndUpdate((intValue) -> ++intValue);
        acknowledgement.ack(); // (2)
    }
}
import io.micronaut.messaging.Acknowledgement
import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.Topic

import java.util.concurrent.atomic.AtomicInteger

@MqttSubscriber
class ProductListener {

    AtomicInteger messageCount = new AtomicInteger()

    @Topic("product")
    void receive(byte[] data, Acknowledgement acknowledgement) { // (1)
        messageCount.getAndUpdate({ intValue -> ++intValue })
        acknowledgement.ack() // (2)
    }
}
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.messaging.Acknowledgement

import java.util.concurrent.atomic.AtomicInteger

@MqttSubscriber
class ProductListener {

    val messageCount = AtomicInteger()

    @Topic("product")
    fun receive(data: ByteArray, acknowledgement: Acknowledgement) { // (1)
        messageCount.getAndUpdate { intValue -> intValue + 1 }
        acknowledgement.ack() // (2)
    }
}
1 The acknowledgement argument is injected into the method
2 The message is acknowledged

10.2 Handling Subscriber Exceptions

Exceptions can occur in a number of different ways. Possible problem areas include:

  • Binding the message to the method arguments

  • Exceptions thrown from the subscriber methods

  • Exceptions as a result of message acknowledgement

  • Exceptions thrown attempting to subscribe to a topic

If the subscriber bean implements MqttSubscriberExceptionHandler, then exceptions will be sent to the method implementation.

If the subscriber bean does not implement MqttSubscriberExceptionHandler, then the exceptions will be routed to the primary exception handler bean. To override the default exception handler, replace the DefaultMqttSubscriberExceptionHandler with your own implementation that is designated as @Primary.

10.3 Subscriber Execution

MQTT allows an ExecutorService to be supplied for new connections. The service is used to execute subscribers. A single connection is used for the entire application and it is configured to use the consumer named executor service. The executor can be configured through application configuration. See ExecutorConfiguration for the full list of options.

For example:

Configuring the consumer thread pool
micronaut.executors.consumer.type=scheduled
micronaut.executors.consumer.corePoolSize=25
micronaut:
    executors:
        consumer:
            type: scheduled
            corePoolSize: 25
[micronaut]
  [micronaut.executors]
    [micronaut.executors.consumer]
      type="scheduled"
      corePoolSize=25
micronaut {
  executors {
    consumer {
      type = "scheduled"
      corePoolSize = 25
    }
  }
}
{
  micronaut {
    executors {
      consumer {
        type = "scheduled"
        corePoolSize = 25
      }
    }
  }
}
{
  "micronaut": {
    "executors": {
      "consumer": {
        "type": "scheduled",
        "corePoolSize": 25
      }
    }
  }
}

If no configuration is supplied, a scheduled thread pool with 2 times the amount of available processors is used.

The executor type must be scheduled.

11 Customizing Parameter Binding

Default Binding Functionality

Argument binding is achieved through an MqttBinderRegistry. The registry is responsible for choosing which @MqttBinder should be responsible for binding an argument. There are two methods, bindFrom and bindTo, that control binding to subscribers and publishers respectively.

The registry supports argument binders that are used based on an annotation applied to the argument or the argument type. All argument binders must implement either AnnotatedMqttBinder or TypedMqttBinder. The exception to that rule is the FallbackMqttBinder which is used when no other binders support a given argument. The binder registry follows a small sequence of steps to attempt to find a binder that supports the argument.

  1. Search the annotation based binders for one that matches any annotation on the argument that is annotated with @Bindable.

  2. Search the type based binders for one that matches or is a subclass of the argument type.

  3. Return the default binder.

The default binder checks if the argument name matches one of the supported properties. If the name does not match, the body of the message is bound to the argument.

The MqttBindingContext is the context used to binding data from and to messages. Each implementation (v3 and v5) has their own implementation. If you know your binder will only be working with a specific implementation, binders can reference the implementation instead of the interface.

Custom Binding

To inject your own argument binding behavior, it is as simple as registering a bean. The existing binder registry will inject it and include it in the normal processing.

Annotation Binding

A custom annotation can be created to bind subscriber arguments. A custom binder can then be created to use that annotation. In this example an annotation is used to bind to the correlation data (only available in v5).

import io.micronaut.core.bind.annotation.Bindable;

import java.lang.annotation.*;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
@Bindable // (1)
public @interface Correlation {
}
import io.micronaut.core.bind.annotation.Bindable

import java.lang.annotation.*

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target([ElementType.PARAMETER])
@Bindable // (1)
@interface Correlation {
}
import io.micronaut.core.bind.annotation.Bindable


@MustBeDocumented
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.VALUE_PARAMETER)
@Bindable // (1)
annotation class Correlation
1 The @Bindable annotation is required for the annotation to be considered for binding.
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.mqtt.bind.AnnotatedMqttBinder;
import io.micronaut.mqtt.v5.bind.MqttV5BindingContext;
import jakarta.inject.Singleton;

import java.util.Optional;

@Singleton // (1)
public class CorrelationAnnotationBinder implements AnnotatedMqttBinder<MqttV5BindingContext, Correlation> { // (2)

    private final ConversionService conversionService;

    public CorrelationAnnotationBinder(ConversionService conversionService) { // (3)
        this.conversionService = conversionService;
    }

    @Override
    public Class<Correlation> getAnnotationType() {
        return Correlation.class;
    }

    @Override
    public void bindTo(MqttV5BindingContext context, Object value, Argument<Object> argument) {
        context.getProperties().setCorrelationData((byte[]) value); // (4)
    }

    @Override
    public Optional<Object> bindFrom(MqttV5BindingContext context, ArgumentConversionContext<Object> conversionContext) {
        return conversionService.convert(context.getProperties().getCorrelationData(), conversionContext); // (5)
    }
}
import io.micronaut.core.type.Argument
import io.micronaut.mqtt.bind.AnnotatedMqttBinder
import io.micronaut.mqtt.v5.bind.MqttV5BindingContext
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService

import jakarta.inject.Singleton

@Singleton // (1)
class CorrelationAnnotationBinder implements AnnotatedMqttBinder<MqttV5BindingContext, Correlation> { // (2)

    private final ConversionService conversionService

    CorrelationAnnotationBinder(ConversionService conversionService) { // (3)
        this.conversionService = conversionService
    }

    @Override
    Class<Correlation> getAnnotationType() {
        Correlation
    }

    @Override
    void bindTo(MqttV5BindingContext context, Object value, Argument<Object> argument) {
        context.properties.correlationData = (byte[]) value // (4)
    }

    @Override
    Optional<Object> bindFrom(MqttV5BindingContext context, ArgumentConversionContext<Object> conversionContext) {
        conversionService.convert(context.properties.correlationData, conversionContext) // (5)
    }
}
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.mqtt.bind.AnnotatedMqttBinder
import io.micronaut.mqtt.v5.bind.MqttV5BindingContext
import java.util.*

import jakarta.inject.Singleton

@Singleton // (1)
class CorrelationAnnotationBinder(private val conversionService: ConversionService)// (3)
    : AnnotatedMqttBinder<MqttV5BindingContext, Correlation> { // (2)

    override fun getAnnotationType(): Class<Correlation> {
        return Correlation::class.java
    }

    override fun bindTo(context: MqttV5BindingContext, value: Any, argument: Argument<Any>) {
       context.properties.correlationData = value as? ByteArray // (4)
    }

    override fun bindFrom(context: MqttV5BindingContext, conversionContext: ArgumentConversionContext<Any>): Optional<Any> {
        return conversionService.convert(context.properties.correlationData, conversionContext) // (5)
    }
}
1 The class is made a bean by annotating with @Singleton.
2 The custom annotation is used as the generic type for the interface.
3 The conversion service is injected into the instance.
4 The correlation data is set from the publisher method value.
5 The correlation data is returned to be bound to a subscriber method value.

The annotation can now be used on the argument in a subscriber method.

import io.micronaut.mqtt.annotation.MqttSubscriber;
import io.micronaut.mqtt.annotation.Topic;

import java.util.*;

@MqttSubscriber
public class ProductListener {

    Set<String> messages = Collections.synchronizedSet(new HashSet<>());

    @Topic("product")
    public void receive(byte[] data, @Correlation byte[] correlation) { // (1)
        messages.add(new String(correlation));
    }
}
import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.Topic

@MqttSubscriber
class ProductListener {

    Set<String> messages = Collections.synchronizedSet(new HashSet<>())

    @Topic("product")
    void receive(@Correlation byte[] correlation) { // (1)
        messages.add(new String(correlation))
    }
}
import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.Topic
import java.nio.charset.StandardCharsets

import java.util.Collections
import java.util.HashSet

@MqttSubscriber
class ProductListener {

    val messages: MutableSet<String> = Collections.synchronizedSet(HashSet())

    @Topic("product")
    fun receive(@Correlation data: ByteArray) { // (1)
        messages.add(String(data, StandardCharsets.UTF_8))
    }
}

And in a publisher method.

import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;

@MqttPublisher
public interface ProductClient {

    @Topic("product")
    void send(@Correlation byte[] correlation);
}
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher

@MqttPublisher
interface ProductClient {

    @Topic("product")
    void send(@Correlation byte[] correlation)
}
import io.micronaut.mqtt.annotation.v5.MqttPublisher
import io.micronaut.mqtt.annotation.Topic

@MqttPublisher
interface ProductClient {

    @Topic("product")
    fun send(@Correlation data: ByteArray)
}

Type Binding

A custom binder can be created to support any argument type. For example the following class could be created to bind values from user properties (V5 only). This functionality could allow the work of retrieving and converting the properties to occur in a single place instead of multiple times in your code.

import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;

public class ProductInfo {

    private String size;
    private Long count;
    private Boolean sealed;

    public ProductInfo(@Nullable String size, // (1)
                       @NonNull Long count, // (2)
                       @NonNull Boolean sealed) { // (3)
        this.size = size;
        this.count = count;
        this.sealed = sealed;
    }

    public String getSize() {
        return size;
    }

    public Long getCount() {
        return count;
    }

    public Boolean getSealed() {
        return sealed;
    }
}
import io.micronaut.core.annotation.NonNull
import io.micronaut.core.annotation.Nullable

class ProductInfo {

    private String size
    private Long count
    private Boolean seal

    ProductInfo(@Nullable String size, // (1)
                @NonNull Long count, // (2)
                @NonNull Boolean seal) { // (3)
        this.size = size
        this.count = count
        this.seal = seal
    }

    String getSize() {
        size
    }

    Long getCount() {
        count
    }

    Boolean getSealed() {
        seal
    }
}
class ProductInfo(val size: String?, // (1)
                  val count: Long, // (2)
                  val sealed: Boolean)// (3)
1 The size argument is not required
2 The count argument is required
3 The sealed argument is required

A type argument binder can then be created to create the ProductInfo instance to bind to and from method arguments.

import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.mqtt.bind.TypedMqttBinder;
import io.micronaut.mqtt.v5.bind.MqttV5BindingContext;
import jakarta.inject.Singleton;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

@Singleton // (1)
public class ProductInfoTypeBinder implements TypedMqttBinder<MqttV5BindingContext, ProductInfo> { //(2)

    private final ConversionService conversionService;

    ProductInfoTypeBinder(ConversionService conversionService) { //(3)
        this.conversionService = conversionService;
    }

    @Override
    public Argument<ProductInfo> getArgumentType() {
        return Argument.of(ProductInfo.class);
    }

    @Override
    public void bindTo(MqttV5BindingContext context, ProductInfo value, Argument<ProductInfo> argument) {
        List<UserProperty> userPropertiesList = context.getProperties().getUserProperties();
        if (value.getSize() != null) {
            userPropertiesList.add(new UserProperty("productSize", value.getSize()));
        }
        userPropertiesList.add(new UserProperty("productCount", value.getCount().toString())); // (4)
        userPropertiesList.add(new UserProperty("productSealed", value.getSealed().toString()));
    }

    @Override
    public Optional<ProductInfo> bindFrom(MqttV5BindingContext context, ArgumentConversionContext<ProductInfo> conversionContext) {
        List<UserProperty> userPropertiesList = context.getProperties().getUserProperties();
        Map<String, String> userProperties = userPropertiesList.stream()
                .collect(Collectors.toMap(UserProperty::getKey, UserProperty::getValue));
        String size = userProperties.get("productSize");
        Optional<Long> count = Optional.ofNullable(userProperties.get("productCount"))
                .flatMap(value -> conversionService.convert(value, Long.class));
        Optional<Boolean> sealed = Optional.ofNullable(userProperties.get("productSealed"))
                .flatMap(value -> conversionService.convert(value, Boolean.class));

        if (count.isPresent() && sealed.isPresent()) {
            return Optional.of(new ProductInfo(size, count.get(), sealed.get())); // (5)
        } else {
            return Optional.empty();
        }
    }
}
import io.micronaut.mqtt.bind.TypedMqttBinder
import io.micronaut.mqtt.v5.bind.MqttV5BindingContext
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import jakarta.inject.Singleton
import org.eclipse.paho.mqttv5.common.packet.UserProperty

import java.util.stream.Collectors

@Singleton // (1)
class ProductInfoTypeBinder implements TypedMqttBinder<MqttV5BindingContext, ProductInfo> { //(2)

    private final ConversionService conversionService

    ProductInfoTypeBinder(ConversionService conversionService) { //(3)
        this.conversionService = conversionService
    }

    @Override
    Argument<ProductInfo> getArgumentType() {
        return Argument.of(ProductInfo)
    }

    @Override
    void bindTo(MqttV5BindingContext context, ProductInfo value, Argument<ProductInfo> argument) {
        List<UserProperty> userPropertiesList = context.properties.userProperties
        if (value.size != null) {
            userPropertiesList.add(new UserProperty("productSize", value.size))
        }
        userPropertiesList.add(new UserProperty("productCount", value.count.toString())) // (4)
        userPropertiesList.add(new UserProperty("productSealed", value.sealed.toString()))
    }

    @Override
    Optional<ProductInfo> bindFrom(MqttV5BindingContext context, ArgumentConversionContext<ProductInfo> conversionContext) {
        List<UserProperty> userPropertiesList = context.properties.userProperties
        Map<String, String> userProperties = userPropertiesList.stream()
                .collect(Collectors.toMap(UserProperty::getKey, UserProperty::getValue))
        String size = userProperties.get("productSize")
        Optional<Long> count = Optional.ofNullable(userProperties.get("productCount"))
                .flatMap(value -> conversionService.convert(value, Long.class))
        Optional<Boolean> sealed = Optional.ofNullable(userProperties.get("productSealed"))
                .flatMap(value -> conversionService.convert(value, Boolean.class))

        if (count.isPresent() && sealed.isPresent()) {
            return Optional.of(new ProductInfo(size, count.get(), sealed.get())) // (5)
        } else {
            return Optional.empty()
        }
    }
}
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.mqtt.bind.*
import io.micronaut.mqtt.v5.bind.MqttV5BindingContext
import jakarta.inject.Singleton
import org.eclipse.paho.mqttv5.common.packet.UserProperty

import java.util.Optional
import java.util.stream.Collectors

@Singleton // (1)
class ProductInfoTypeBinder constructor(private val conversionService: ConversionService) //(3)
    : TypedMqttBinder<MqttV5BindingContext, ProductInfo> { // (2)

    override fun bindTo(context: MqttV5BindingContext, value: ProductInfo, argument: Argument<ProductInfo>) {
        val userPropertiesList = context.properties.userProperties
        if (value.size != null) {
            userPropertiesList.add(UserProperty("productSize", value.size)) // (4)
        }
        userPropertiesList.add(UserProperty("productCount", value.count.toString()))
        userPropertiesList.add(UserProperty("productSealed", value.sealed.toString()))
    }

    override fun bindFrom(context: MqttV5BindingContext, conversionContext: ArgumentConversionContext<ProductInfo>): Optional<ProductInfo> {
        val userPropertiesList = context.properties.userProperties
        val userProperties = userPropertiesList.stream()
                .collect(Collectors.toMap(UserProperty::getKey, UserProperty::getValue))
        val size = userProperties.get("productSize")
        val count = Optional.ofNullable(userProperties.get("productCount"))
                .flatMap { value -> conversionService.convert(value, Long::class.java) }
        val sealed = Optional.ofNullable(userProperties.get("productSealed"))
                .flatMap { value -> conversionService.convert(value, Boolean::class.java) }

        if (count.isPresent && sealed.isPresent) {
            return Optional.of(ProductInfo(size, count.get(), sealed.get())) // (5)
        } else {
            return Optional.empty()
        }
    }

    override fun getArgumentType(): Argument<ProductInfo> {
        return Argument.of(ProductInfo::class.java)
    }

}
1 The class is made a bean by annotating with @Singleton.
2 The custom type is used as the generic type for the interface.
3 The conversion service is injected into the instance.
4 The properties are populated into user properties when publishing
5 The properties are read and converted into a ProductInfo to be bound to a subscriber argument

12 Message Serialization/Deserialization (SerDes)

The serialization and deserialization of message payloads is handled through instances of MqttPayloadSerDes. The ser-des (Serializer/Deserializer) is responsible for both serialization and deserialization of MQTT message payloads into the types defined in your publisher and subscriber methods.

The ser-des are managed by a MqttPayloadSerDesRegistry. All ser-des beans are injected in order into the registry and then searched for when serialization or deserialization is needed. The first ser-des that returns true for supports-java.lang.Class- is returned and used.

By default, standard Java lang types and JSON format (with Jackson) are supported. You can supply your own ser-des by simply registering a bean of type MqttPayloadSerDes. All ser-des implement the Ordered interface, so custom implementations can come before, after, or in between the default implementations.

12.1 Custom SerDes

A custom serializer/deserializer would be necessary to support custom data formats. In the section on Custom Binding an example was demonstrated that allowed binding a ProductInfo type from the properties of the message. If instead that object should represent the payload of the message with a custom data format, you could register your own serializer/deserializer to do so.

In this example a simple data format of the string representation of the fields are concatenated together with a pipe character.

import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.mqtt.serdes.MqttPayloadSerDes;
import jakarta.inject.Singleton;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Optional;


@Singleton // (1)
public class ProductInfoSerDes implements MqttPayloadSerDes<ProductInfo> { // (2)

    private static final Charset CHARSET = StandardCharsets.UTF_8;

    private final ConversionService conversionService;

    public ProductInfoSerDes(ConversionService conversionService) { // (3)
        this.conversionService = conversionService;
    }

    @Nullable
    @Override
    public ProductInfo deserialize(byte[] payload, Argument<ProductInfo> argument) {
        String body = new String(payload, CHARSET);
        String[] parts = body.split("\\|");
        if (parts.length == 3) {
            String size = parts[0];
            if (size.equals("null")) {
                size = null;
            }

            Optional<Long> count = conversionService.convert(parts[1], Long.class);
            Optional<Boolean> sealed = conversionService.convert(parts[2], Boolean.class);

            if (count.isPresent() && sealed.isPresent()) {
                return new ProductInfo(size, count.get(), sealed.get()); // (4)
            }
        }
        return null;
    }

    @Nullable
    @Override
    public byte[] serialize(@Nullable ProductInfo data) {
        if (data == null) {
            return null;
        }
        return (data.getSize() + "|" + data.getCount() + "|" + data.getSealed()).getBytes(CHARSET); // (5)
    }

    @Override
    public boolean supports(Argument<ProductInfo> argument) { // (6)
        return argument.getType().isAssignableFrom(ProductInfo.class);
    }
}
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import jakarta.inject.Singleton

import java.nio.charset.Charset

@Singleton // (1)
class ProductInfoSerDes implements MqttPayloadSerDes<ProductInfo> { // (2)

    private static final Charset CHARSET = Charset.forName("UTF-8")

    private final ConversionService conversionService

    ProductInfoSerDes(ConversionService conversionService) { // (3)
        this.conversionService = conversionService
    }

    @Nullable
    @Override
    ProductInfo deserialize(byte[] payload, Argument<ProductInfo> argument) {
        String body = new String(payload, CHARSET)
        String[] parts = body.split("\\|")
        if (parts.length == 3) {
            String size = parts[0]
            if (size == "null") {
                size = null
            }

            Optional<Long> count = conversionService.convert(parts[1], Long.class)
            Optional<Boolean> sealed = conversionService.convert(parts[2], Boolean.class)

            if (count.isPresent() && sealed.isPresent()) {
                return new ProductInfo(size, count.get(), sealed.get()) // (4)
            }
        }
        return null
    }

    @Nullable
    @Override
    byte[] serialize(@Nullable ProductInfo data) {
        if (data == null) {
            return null
        }
        return "${data.size}|${data.count}|${data.sealed}".getBytes(CHARSET) // (5)
    }

    @Override
    boolean supports(Argument<ProductInfo> argument) { // (6)
        argument.type.isAssignableFrom(ProductInfo)
    }
}
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.mqtt.serdes.MqttPayloadSerDes
import jakarta.inject.Singleton

import java.nio.charset.Charset

@Singleton // (1)
class ProductInfoSerDes(private val conversionService: ConversionService)// (3)
    : MqttPayloadSerDes<ProductInfo> { // (2)

    override fun deserialize(payload: ByteArray, argument: Argument<ProductInfo>): ProductInfo? {
        val body = String(payload, CHARSET)
        val parts = body.split("\\|".toRegex())
        if (parts.size == 3) {
            var size: String? = parts[0]
            if (size == "null") {
                size = null
            }

            val count = conversionService.convert(parts[1], Long::class.java)
            val sealed = conversionService.convert(parts[2], Boolean::class.java)

            if (count.isPresent && sealed.isPresent) {
                return ProductInfo(size, count.get(), sealed.get()) // (4)
            }
        }
        return null
    }

    override fun serialize(data: ProductInfo?): ByteArray {
        return (data?.size + "|" + data?.count + "|" + data?.sealed).toByteArray(CHARSET) // (5)
    }

    override fun supports(argument: Argument<ProductInfo>): Boolean { // (6)
        return argument.type.isAssignableFrom(ProductInfo::class.java)
    }

    companion object {
        private val CHARSET = Charset.forName("UTF-8")
    }

}
1 The class is declared as a singleton so it will be registered with the context
2 The generic specifies what type we want to accept and return
3 The conversion service is injected to convert the parts of the message to the required types
4 The deserialize method takes the bytes from the message and constructs a ProductInfo
5 The serialize method takes the ProductInfo and returns the bytes to publish.
6 The supports method ensures only the correct body types are processed by this ser-des
Because the getOrder method was not overridden, the default order of 0 is used. All default ser-des have a lower precedent than the default order which means this ser-des will be checked before the others.

13 GraalVM support

Micronaut MQTT is compatible with GraalVM. Everything is handled automatically by the library so users don’t need any special configuration.

The only additional GraalVM configuration you need if you are using mqtt-ssl module is adding the option --report-unsupported-elements-at-runtime.

See the section on GraalVM in the user guide for more information.

14 Breaking Changes

This section documents breaking changes for Mqtt:

3.0.0

The annotations for Mqtt have moved into the core module, and therefore have changed package names. The new package names are:

Mqtt3

Old package name

New package name

io.micronaut.mqtt.v3.annotation.MqttPublisher

io.micronaut.mqtt.annotation.v3.MqttPublisher

Mqtt5

Old package name

New package name

io.micronaut.mqtt.v5.annotation.MqttPublisher

io.micronaut.mqtt.annotation.v5.MqttPublisher

io.micronaut.mqtt.v5.annotation.MqttProperty

io.micronaut.mqtt.annotation.v5.MqttProperty

io.micronaut.mqtt.v5.annotation.MqttProperties

io.micronaut.mqtt.annotation.v5.MqttProperties

15 Repository

You can find the source code of this project in this repository: