import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;
@MqttPublisher // (1)
public interface ProductClient {
    @Topic("product") // (2)
    void send(byte[] data); // (3)
}Table of Contents
Micronaut MQTT
Integration between Micronaut and MQTT
Version: 3.7.0
1 Introduction
This project includes integration between Micronaut and MQTT. The Eclipse Paho Client is used to do the actual publishing and subscribing.
There are two variants to support both MQTT V3 and V5.
2 Release History
3.0.x
Since Micronaut MQTT 3.0.0, you can use an implementation based on the HiveMQ MQTT Client
3 Release Notes
For this project, you can find a list of releases (with release notes) here:
4 MQTT Quick Start
To add support for MQTT to an existing project, you must first add either the Micronaut MQTT HiveMQ dependency or the Micronaut MQTT Eclipse Paho v3 or v5 (depending on the version of MQTT you require) to your build.
| If you are running into the exception org.eclipse.paho.client.mqttv3.MqttException: Timed out waiting for a response from the server, please make sure to run your program on a machine with more than one core. The Eclipse Paho MQTT client will run into connection timeouts if it has only one core available. | 
| See the guide for Publishing and Subscribing to MQTT Topics from a Micronaut Application to learn more. | 
4.1 Creating an MQTT Publisher with MqttPublisher annotation
To create an MQTT client that produces messages you can simply define an interface that is annotated with MqttPublisher.
For example the following is a trivial MqttPublisher interface:
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher
@MqttPublisher // (1)
interface ProductClient {
    @Topic("product") // (2)
    void send(byte[] data) // (3)
}import io.micronaut.mqtt.annotation.v5.MqttPublisher
import io.micronaut.mqtt.annotation.Topic
@MqttPublisher // (1)
interface ProductClient {
    @Topic("product") // (2)
    fun send(data: ByteArray)  // (3)
}| 1 | The MqttPublisherannotation is used to designate this interface as a client. | 
| 2 | The @Topic annotation indicates which topic the message should be published to. | 
| 3 | The sendmethod accepts single parameter which is the payload of the message. | 
At compile time Micronaut will produce an implementation of the above interface. You can retrieve an instance of ProductClient either by looking up the bean from the ApplicationContext or by injecting the bean with @Inject:
ProductClient productClient = applicationContext.getBean(ProductClient.class);
productClient.send("quickstart".getBytes());def productClient = applicationContext.getBean(ProductClient)
productClient.send("quickstart".getBytes())val productClient = ctx.getBean(ProductClient::class.java)
productClient.send("quickstart".toByteArray())| Because the sendmethod returnsvoidthis means the method will publish the message and block until the broker acknowledges the message. | 
4.2 Creating an MQTT Subscriber with MqttSubscriber annotation
To listen to MQTT messages you can use the @MqttSubscriber annotation to define a message listener.
The following example will listen for messages published by the ProductClient in the previous section:
import io.micronaut.mqtt.annotation.MqttSubscriber;
import io.micronaut.mqtt.annotation.Topic;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@MqttSubscriber // (1)
public class ProductListener {
    List<String> messageLengths = Collections.synchronizedList(new ArrayList<>());
    @Topic("product") // (2)
    public void receive(byte[] data) { // (3)
        messageLengths.add(new String(data));
        System.out.println("Java received " + data.length + " bytes from MQTT");
    }
}import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.Topic
@MqttSubscriber // (1)
class ProductListener {
    List<String> messageLengths = Collections.synchronizedList([])
    @Topic("product") // (2)
    void receive(byte[] data) { // (3)
        messageLengths.add(new String(data))
        println("Groovy received ${data.length} bytes from MQTT")
    }
}import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.Topic
import java.util.*
@MqttSubscriber // (1)
class ProductListener {
    val messageLengths: MutableList<String> = Collections.synchronizedList(ArrayList())
    @Topic("product") // (2)
    fun receive(data: ByteArray) { // (3)
        val string = String(data)
        messageLengths.add(string)
        println("Kotlin received ${data.size} bytes from MQTT: ${string}")
    }
}| 1 | The @MqttSubscriber is used to designate the bean as a message listener. | 
| 2 | The @Topic annotation is used to indicate which topic to subscribe to. | 
| 3 | The receivemethod accepts a single parameter which is the payload of the message. | 
4.3 MQTT Health Checks
When either the mqttv3 or mqttv5 module is activated an MqttHealthIndicator or MqttHealthIndicator is activated resulting in the /health endpoint and CurrentHealthStatus interface resolving the health of the MQTT connection.
The only configuration option supported is to enable or disable the indicator by the endpoints.health.mqtt.client.enabled key.
See the section on the Health Endpoint for more information.
5 MQTT HiveMQ
This section outlines details specific to the HiveMQ client implementation.
5.1 MQTT Dependency with HiveMQ
The following dependency uses the HiveMQ MQTT client under the hood.
implementation("io.micronaut.mqtt:micronaut-mqtt-hivemq")<dependency>
    <groupId>io.micronaut.mqtt</groupId>
    <artifactId>micronaut-mqtt-hivemq</artifactId>
</dependency>5.2 Configuration
Both v3 and v5 clients are configured via the same object with mqtt.client.mqtt-version set to either 3 or 5. (default is 5)
| Property | Type | Description | 
|---|---|---|
| 
 | boolean | Set true if a new sessions should be started for connection (v5 only). | 
| 
 | java.lang.Long | The session expiry interval in seconds (v5 only). | 
| 
 | java.lang.Integer | The maximum amount of not acknowledged publishes with QoS 1 or 2 the client accepts from the server concurrently (v5 only). | 
| 
 | java.lang.Integer | The maximum packet size the client sends to the server (v5 only). | 
| 
 | java.lang.Integer | The maximum amount of topic aliases the client accepts from the server (v5 only). | 
| 
 | boolean | Whether the client requests response information from the server (v5 only). | 
| 
 | boolean | Whether the client requests problem information from the server (v5 only). | 
| 
 | java.util.Map | The user defined properties that should be sent for every message (v5 only). | 
| 
 | boolean | Set true if a new session should be started for connection (v3 only). | 
| 
 | java.net.URI | The URI of server to connect to as [schema]://[serverHost]:[serverPort]. | 
| 
 | java.lang.String | The client identifier to use. | 
| 
 | int | The version of the MQTT protocol to use (one of 3 or 5). | 
| 
 | java.time.Duration | How long to wait for a connection to be established. | 
| 
 | boolean | True if you wish to manually acknowledge messages. | 
| 
 | byte | The password to use for MQTT connections. | 
| 
 | java.lang.String | The username to use for MQTT connections. | 
| 
 | long | The maximum delay for reconnecting in seconds. | 
| 
 | int | The keep alive interval in seconds. | 
| 
 | boolean | True if the client should attempt to reconnect to the server if the connection is lost. | 
| 
 | java.util.Map | Custom headers that should be sent with web socket connections. | 
| 
 | boolean | True if hostname verification should be used for https connections. | 
| 
 | javax.net.ssl.HostnameVerifier | The hostname verifier to use for hostname verification. | 
| It is also possible to disable the integration entirely with mqtt.enabled: false | 
6 MQTT V3 Eclipse Paho
This section outlines details specific to the v3 Eclipse Paho implementation.
6.1 MQTT v3 Dependency with Eclipse Paho
The following dependency uses the Eclipse Paho client under the hood.
For MQTT v3:
implementation("io.micronaut.mqtt:micronaut-mqttv3")<dependency>
    <groupId>io.micronaut.mqtt</groupId>
    <artifactId>micronaut-mqttv3</artifactId>
</dependency>6.2 Configuration
All properties on the MqttConnectOptions are available to be modified, either through configuration or a BeanCreatedEventListener.
The properties that can be converted from the string values in a configuration file can be configured directly.
| Property | Type | Description | 
|---|---|---|
| 
 | char | |
| 
 | java.lang.String | |
| 
 | int | |
| 
 | int | |
| 
 | int | |
| 
 | int | |
| 
 | int | |
| 
 | boolean | |
| 
 | java.lang.String | |
| 
 | boolean | |
| 
 | int | |
| 
 | java.util.Properties | |
| 
 | java.lang.String | |
| 
 | java.lang.String | |
| 
 | java.lang.Boolean | |
| 
 | javax.net.SocketFactory | |
| 
 | java.util.Properties | |
| 
 | boolean | |
| 
 | javax.net.ssl.HostnameVerifier | 
| Without any configuration the defaults in the MqttConnectOptions will be used. | 
| It is also possible to disable the integration entirely with mqtt.enabled: false | 
7 MQTT V5 Eclipse Paho
This section outlines details specific to the v5 Eclipse Paho implementation.
7.1 MQTT v5 Dependency with Eclipse Paho
The following dependency uses the Eclipse Paho client under the hood.
For MQTT v5:
implementation("io.micronaut.mqtt:micronaut-mqttv5")<dependency>
    <groupId>io.micronaut.mqtt</groupId>
    <artifactId>micronaut-mqttv5</artifactId>
</dependency>7.2 Configuration
All properties on the MqttConnectionOptions are available to be modified, either through configuration or a BeanCreatedEventListener.
The properties that can be converted from the string values in a configuration file can be configured directly.
| Property | Type | Description | 
|---|---|---|
| 
 | java.lang.String | |
| 
 | byte | |
| 
 | boolean | |
| 
 | int | |
| 
 | int | |
| 
 | int | |
| 
 | java.lang.String | |
| 
 | boolean | |
| 
 | java.lang.Long | |
| 
 | java.lang.Integer | |
| 
 | java.lang.Long | |
| 
 | java.lang.Integer | |
| 
 | boolean | |
| 
 | boolean | |
| 
 | java.util.List | |
| 
 | java.lang.String | |
| 
 | byte | |
| 
 | boolean | |
| 
 | java.util.Map | |
| 
 | boolean | |
| 
 | int | |
| 
 | java.lang.String | |
| 
 | java.lang.String | |
| 
 | javax.net.SocketFactory | |
| 
 | java.util.Properties | |
| 
 | boolean | |
| 
 | javax.net.ssl.HostnameVerifier | 
| Without any configuration the defaults in the MqttConnectionOptionswill be used. | 
| It is also possible to disable the integration entirely with mqtt.enabled: false | 
7.3 Properties
Publishers
It is also supported to supply properties when publishing messages. Any of the org.eclipse.paho.mqttv5.common.packet.MqttProperties can be set dynamically per execution or statically for all executions. Properties can be set using the @MqttProperty annotation.
import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttProperty;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;
@MqttPublisher
@MqttProperty(name = "appId", value = "myApp") // (1)
public interface ProductClient {
    @Topic("product")
    @MqttProperty(name = "contentType", value = "application/json") // (2)
    @MqttProperty(name = "userId", value = "guest")
    void send(byte[] data);
    @Topic("product")
    void send(@MqttProperty("userId") String user, @MqttProperty String contentType, byte[] data); // (3)
}import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttProperty
import io.micronaut.mqtt.annotation.v5.MqttPublisher
@MqttPublisher
@MqttProperty(name = "appId", value = "myApp") // (1)
interface ProductClient {
    @Topic("product")
    @MqttProperty(name = "contentType", value = "application/json") // (2)
    @MqttProperty(name = "userId", value = "guest")
    void send(byte[] data)
    @Topic("product")
    void send(@MqttProperty("userId") String user, @MqttProperty String contentType, byte[] data) // (3)
}import io.micronaut.mqtt.annotation.v5.MqttPublisher
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttProperties
import io.micronaut.mqtt.annotation.v5.MqttProperty
@MqttPublisher
@MqttProperty(name = "appId", value = "myApp") // (1)
interface ProductClient {
    @Topic("product")
    @MqttProperties( // (2)
            MqttProperty(name = "contentType", value = "application/json"),
            MqttProperty(name = "userId", value = "guest")
    )
    fun send(data: ByteArray)
    @Topic("product")
    fun send(@MqttProperty("userId") user: String, @MqttProperty contentType: String?, data: ByteArray)  // (3)
}| 1 | Properties can be defined at the class level and will apply to all methods. If a property is defined on the method with the same name as one on the class, both values will be added. | 
| 2 | Multiple annotations can be used to set multiple properties on the method or class level. | 
| 3 | Properties can be set per execution. The name is inferred from the argument if the annotation value is not set. Parameter values override any property value set from method or class level annotations. | 
For method arguments, if the value is not supplied to the annotation, the argument name will be used as the property name. For example, @MqttProperty String userId would result in the property userId being set on the properties object before publishing.
| If the annotation or argument name cannot be matched to a property name in the MqttPropertiesclass, a user property will be added instead. | 
Subscribers
Any properties found in messages received can be bound to subscriber arguments through the @v5,MqttProperty annotation.
import io.micronaut.core.annotation.Nullable;
import io.micronaut.mqtt.annotation.MqttSubscriber;
import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttProperty;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@MqttSubscriber
public class ProductListener {
    List<String> messageProperties = Collections.synchronizedList(new ArrayList<>());
    @Topic("product")
    public void receive(byte[] data,
                        @MqttProperty("userId") String user,  // (1)
                        @Nullable @MqttProperty String contentType,  // (2)
                        @MqttProperty String appId) {  // (3)
        messageProperties.add(user + "|" + contentType + "|" + appId);
    }
}import io.micronaut.core.annotation.Nullable
import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttProperty
@MqttSubscriber
class ProductListener {
    List<String> messageProperties = Collections.synchronizedList([])
    @Topic("product")
    void receive(byte[] data,
                 @MqttProperty("userId") String user,  // (1)
                 @Nullable @MqttProperty String contentType,  // (2)
                 @MqttProperty String appId) {  // (3)
        messageProperties.add(user + "|" + contentType + "|" + appId)
    }
}import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.v5.MqttProperty
import java.util.*
@MqttSubscriber
class ProductListener {
    val messageProperties: MutableList<String> = Collections.synchronizedList(ArrayList())
    @Topic("product")
    fun receive(data: ByteArray,
                @MqttProperty("userId") user: String, // (1)
                @MqttProperty contentType: String?, // (2)
                @MqttProperty appId: String) { // (3)
        messageProperties.add("$user|$contentType|$appId")
    }
}| 1 | The property will be bound from the user property userId. | 
| 2 | The property will be bound from the content type property and does not require the property to be present. | 
| 3 | The property will be bound from the user property appId. | 
| Arguments are required by default and an exception will be thrown if they cannot be found or converted to the requested type. Making the argument nullable allows null values to be accepted. | 
8 SSL Connections
This library supports connecting to MQTT brokers over SSL. Because that feature requires a third party dependency (Bouncy Castle), the functionality exists in a separate module that you must express a dependency on.
implementation("io.micronaut.mqtt:micronaut-mqtt-ssl")<dependency>
    <groupId>io.micronaut.mqtt</groupId>
    <artifactId>micronaut-mqtt-ssl</artifactId>
</dependency>After adding the dependency, you must configure the client with the appropriate certificate authority, certificate, key, and password.
| Property | Type | Description | 
|---|---|---|
| 
 | ||
| 
 | ||
| 
 | ||
| 
 | char | 
| The files can be configured with the file:prefix to reference absolute paths on the file system orclasspath:to reference files on the classpath. | 
Once the configuration is in place the client will connect over SSL.
| The server URI must start with ssl:. | 
9 MQTT Publishers
The example in the quick start presented a trivial definition of an interface that be implemented automatically for you using the @MqttPublisher annotation.
The implementation that powers @MqttPublisher is, however, very flexible and offers a range of options for defining MQTT producers.
9.1 Defining Publisher Methods
All methods that publish messages to MQTT must meet the following conditions:
- 
The method must reside in a class annotated with @MqttPublisher.
- 
The method must provide the topic to publish to, either through an argument, or the topic annotation on the method or class. 
| If the topic cannot be found, an exception will be thrown. Unless a reactive type or future is returned from the publishing method, the action is blocking. | 
9.1.1 Publisher Parameters
All options are available to be set for publishing messages, either through static values in annotations on the class or method, or through annotations on the method arguments.
9.1.1.1 Topic
To set the topic to publish to, apply the @Topic annotation to the method or an argument of the method. Apply the annotation to the class or method if the value is static for every execution. Apply the annotation to an argument of the method if the value should be set per execution.
import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;
@MqttPublisher
public interface ProductClient {
    @Topic("product") // (1)
    void send(byte[] data);
    void send(@Topic String binding, byte[] data); // (2)
}import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher
@MqttPublisher
interface ProductClient {
    @Topic("product") // (1)
    void send(byte[] data)
    void send(@Topic String binding, byte[] data) // (2)
}import io.micronaut.mqtt.annotation.v5.MqttPublisher
import io.micronaut.mqtt.annotation.Topic
@MqttPublisher
interface ProductClient {
    @Topic("product") // (1)
    fun send(data: ByteArray)
    fun send(@Topic binding: String, data: ByteArray)  // (2)
}| 1 | The topic is static | 
| 2 | The topic must be set per execution | 
9.1.1.2 Qos
Quality of service can be set for publishing messages either through the @Topic or @Qos annotations. The @Qos annotation can be applied to a method argument to set the Qos per execution.
import io.micronaut.mqtt.annotation.Qos;
import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;
@MqttPublisher
public interface ProductClient {
    @Topic(value = "product", qos = 2) // (1)
    void send(byte[] data);
    @Topic("product")
    void send(byte[] data, @Qos int qos); // (2)
}import io.micronaut.mqtt.annotation.Qos
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher
@MqttPublisher
interface ProductClient {
    @Topic(value = "product", qos = 2) // (1)
    void send(byte[] data)
    @Topic("product")
    void send(byte[] data, @Qos int qos) // (2)
}import io.micronaut.mqtt.annotation.Qos
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher
@MqttPublisher
interface ProductClient {
    @Topic(value = "product", qos = 2) // (1)
    fun send(data: ByteArray?)
    @Topic("product")
    fun send(data: ByteArray?, @Qos qos: Int) // (2)
}
// end:clazz[]| 1 | Qos is set via the topic annotation. It can be set through @Qosinstead if that is preferred. | 
| 2 | The Qos must be set per execution | 
9.1.1.3 Retained
The retained flagg can be set for publishing messages through the @Retained annotation. The annotation can also be applied to a method argument to set the retained flag per execution.
import io.micronaut.mqtt.annotation.Retained;
import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;
@MqttPublisher
public interface ProductClient {
    @Topic("product")
    @Retained(true)// (1)
    void send(byte[] data);
    @Topic("product")
    void send(byte[] data, @Retained boolean retained); // (2)
}import io.micronaut.mqtt.annotation.Retained
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher
@MqttPublisher
interface ProductClient {
    @Topic("product")
    @Retained(true)// (1)
    void send(byte[] data)
    @Topic("product")
    void send(byte[] data, @Retained boolean retained) // (2)
}import io.micronaut.mqtt.annotation.Retained
import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher
@MqttPublisher
interface ProductClient {
    @Topic("product")
    @Retained(true) // (1)
    fun send(data: ByteArray?)
    @Topic("product")
    fun send(data: ByteArray?, @Retained retained: Boolean) // (2)
}| 1 | The retained flag is set via the Retainedannotation. | 
| 2 | The retained flag must be set per execution | 
9.1.1.4 Payload
Most examples up to this point have been using a byte[] as the body type for simplicity. This library supports most standard Java types and JSON serialization (using Jackson) by default. The functionality is extensible and it is possible to add support for additional types and serialization strategies. See the section on Message Serialization/Deserialization for more information.
9.1.2 Publisher Acknowledgements
MQTT supports publisher acknowledgements that behave different based on the qos of the message being sent. The broker will acknowledge the message and then that will cause the publishing method to complete. For publishers that return void, that means the method will block the current thread until the acknowledgement is received. For futures and reactive types, they will complete after the acknowledgement is received.
Since the publisher is cold, the message will not actually be published until the stream is subscribed to.
For example:
import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;
import org.reactivestreams.Publisher;
@MqttPublisher
public interface ProductClient {
    @Topic("product")
    Publisher<Void> sendPublisher(byte[] data); // (1)
    @Topic("product")
    CompletableFuture<Void> sendFuture(byte[] data); // (2)
}import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher
import org.reactivestreams.Publisher
@MqttPublisher
interface ProductClient {
    @Topic("product")
    Publisher<Void> sendPublisher(byte[] data) // (1)
    @Topic("product")
    CompletableFuture<Void> sendFuture(byte[] data) // (2)
}import io.micronaut.mqtt.annotation.v5.MqttPublisher
import io.micronaut.mqtt.annotation.Topic
import org.reactivestreams.Publisher
@MqttPublisher
interface ProductClient {
    @Topic("product")
    fun sendPublisher(data: ByteArray): Publisher<Void>  // (1)
    @Topic("product")
    fun sendFuture(data: ByteArray): CompletableFuture<Void>  // (2)
    @Topic("product")
    suspend fun sendSuspend(data: ByteArray): Unit //suspend methods work too!
}| 1 | A Publishercan be returned to be able to know when the message was acknowledged. Any other reactive type can be used given the appropriate dependencies are in place. | 
| 2 | Java futures can also be used | 
10 MQTT Subscribers
The example in the quick start presented a trivial definition of a class that listens for messages using the @MqttSubscriber annotation.
The implementation that powers @MqttSubscriber (defined by the AbstractMqttSubscriberAdvice class) is, however, very flexible and offers a range of options for defining MQTT subscribers.
10.1 Defining @MqttSubscriber Methods
All methods that consume messages from MQTT must meet the following conditions:
- 
The method must reside in a class annotated with @MqttSubscriber. 
- 
The method must be annotated with one or more @Topic annotations. 
| In order for all of the functionality to work as designed in this guide your classes must be compiled with the parameters flag set to true. If your application was created with the Micronaut CLI, then that has already been configured for you. | 
10.1.1 Subscriber Parameters
In order for the subscriber method to be invoked, all arguments must be satisfied. To allow execution of the method with a null value, the argument must be declared as nullable. If the arguments cannot be satisfied, an exception will be thrown. See the section on consumer exceptions for details on how to handle exceptions thrown during the subscription process.
10.1.1.1 Topic
A @Topic annotation is required for a method to be a subscriber of messages from MQTT. Simply apply the annotation to the method and supply the name of the topic you would like to listen to. Multiple topics can be subscribed to by supplying multiple annotations. Each topic can have its own qos value set in the annotation.
import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.MqttSubscriber;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@MqttSubscriber
public class ProductListener {
    List<Integer> messageLengths = Collections.synchronizedList(new ArrayList<>());
    List<String> topics = Collections.synchronizedList(new ArrayList<>());
    @Topic("product") // (1)
    public void receive(byte[] data) {
        messageLengths.add(data.length);
    }
    @Topic("product/a")
    @Topic("product/b") // (2)
    public void receive(byte[] data, @Topic String topic) {
        topics.add(topic);
    }
}import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.Topic
@MqttSubscriber
class ProductListener {
    List<Integer> messageLengths = Collections.synchronizedList([])
    @Topic("product") // (1)
    void receive(byte[] data) {
        messageLengths.add(data.length)
    }
}import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.MqttSubscriber
import java.util.*
import kotlin.collections.ArrayList
@MqttSubscriber
class ProductListener {
    val messageLengths: MutableList<Int> = Collections.synchronizedList(ArrayList())
    @Topic("product") // (1)
    fun receive(data: ByteArray) {
        messageLengths.add(data.size)
    }
}| 1 | The topic annotation is set per method. Multiple methods may be defined with different topics in the same class. | 
| 2 | Multiple topics are subscribed to and the topic received is an argument to the method. The value will change depending on which topic the message was sent to. | 
10.1.1.2 Payload
Most examples up to this point have been using a byte[] as the body type for simplicity. This library supports most standard Java types and JSON deserialization (using Jackson) by default. The functionality is extensible and it is possible to add support for additional types and deserialization strategies. See the section on Message Serialization/Deserialization for more information.
10.1.2 Acknowledging Messages
If manual acknowledge is enabled through the mqtt.client.manual-acks configuration property, messages can be acknowledged manually.
For methods that accept an argument of type Acknowledgement, the message will only be acknowledged when the ack method is called. The nack method is not relevant for MQTT and should not be called.
Acknowledgement Type
import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.MqttSubscriber;
import io.micronaut.messaging.Acknowledgement;
import java.util.concurrent.atomic.AtomicInteger;
@MqttSubscriber
public class ProductListener {
    AtomicInteger messageCount = new AtomicInteger();
    @Topic("product")
    public void receive(byte[] data, Acknowledgement acknowledgement) { // (1)
        messageCount.getAndUpdate((intValue) -> ++intValue);
        acknowledgement.ack(); // (2)
    }
}import io.micronaut.messaging.Acknowledgement
import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.Topic
import java.util.concurrent.atomic.AtomicInteger
@MqttSubscriber
class ProductListener {
    AtomicInteger messageCount = new AtomicInteger()
    @Topic("product")
    void receive(byte[] data, Acknowledgement acknowledgement) { // (1)
        messageCount.getAndUpdate({ intValue -> ++intValue })
        acknowledgement.ack() // (2)
    }
}import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.messaging.Acknowledgement
import java.util.concurrent.atomic.AtomicInteger
@MqttSubscriber
class ProductListener {
    val messageCount = AtomicInteger()
    @Topic("product")
    fun receive(data: ByteArray, acknowledgement: Acknowledgement) { // (1)
        messageCount.getAndUpdate { intValue -> intValue + 1 }
        acknowledgement.ack() // (2)
    }
}| 1 | The acknowledgement argument is injected into the method | 
| 2 | The message is acknowledged | 
10.2 Handling Subscriber Exceptions
Exceptions can occur in a number of different ways. Possible problem areas include:
- 
Binding the message to the method arguments 
- 
Exceptions thrown from the subscriber methods 
- 
Exceptions as a result of message acknowledgement 
- 
Exceptions thrown attempting to subscribe to a topic 
If the subscriber bean implements MqttSubscriberExceptionHandler, then exceptions will be sent to the method implementation.
If the subscriber bean does not implement MqttSubscriberExceptionHandler, then the exceptions will be routed to the primary exception handler bean. To override the default exception handler, replace the DefaultMqttSubscriberExceptionHandler with your own implementation that is designated as @Primary.
10.3 Subscriber Execution
MQTT allows an ExecutorService to be supplied for new connections. The service is used to execute subscribers. A single connection is used for the entire application and it is configured to use the consumer named executor service. The executor can be configured through application configuration. See ExecutorConfiguration for the full list of options.
For example:
consumer thread poolmicronaut.executors.consumer.type=scheduled
micronaut.executors.consumer.corePoolSize=25micronaut:
    executors:
        consumer:
            type: scheduled
            corePoolSize: 25[micronaut]
  [micronaut.executors]
    [micronaut.executors.consumer]
      type="scheduled"
      corePoolSize=25micronaut {
  executors {
    consumer {
      type = "scheduled"
      corePoolSize = 25
    }
  }
}{
  micronaut {
    executors {
      consumer {
        type = "scheduled"
        corePoolSize = 25
      }
    }
  }
}{
  "micronaut": {
    "executors": {
      "consumer": {
        "type": "scheduled",
        "corePoolSize": 25
      }
    }
  }
}If no configuration is supplied, a scheduled thread pool with 2 times the amount of available processors is used.
| The executor type must be scheduled. | 
11 Customizing Parameter Binding
Default Binding Functionality
Argument binding is achieved through an MqttBinderRegistry. The registry is responsible for choosing which @MqttBinder should be responsible for binding an argument. There are two methods, bindFrom and bindTo, that control binding to subscribers and publishers respectively.
The registry supports argument binders that are used based on an annotation applied to the argument or the argument type. All argument binders must implement either AnnotatedMqttBinder or TypedMqttBinder. The exception to that rule is the FallbackMqttBinder which is used when no other binders support a given argument. The binder registry follows a small sequence of steps to attempt to find a binder that supports the argument.
- 
Search the annotation based binders for one that matches any annotation on the argument that is annotated with @Bindable. 
- 
Search the type based binders for one that matches or is a subclass of the argument type. 
- 
Return the default binder. 
The default binder checks if the argument name matches one of the supported properties. If the name does not match, the body of the message is bound to the argument.
The MqttBindingContext is the context used to binding data from and to messages. Each implementation (v3 and v5) has their own implementation. If you know your binder will only be working with a specific implementation, binders can reference the implementation instead of the interface.
Custom Binding
To inject your own argument binding behavior, it is as simple as registering a bean. The existing binder registry will inject it and include it in the normal processing.
Annotation Binding
A custom annotation can be created to bind subscriber arguments. A custom binder can then be created to use that annotation. In this example an annotation is used to bind to the correlation data (only available in v5).
import io.micronaut.core.bind.annotation.Bindable;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
@Bindable // (1)
public @interface Correlation {
}import io.micronaut.core.bind.annotation.Bindable
import java.lang.annotation.*
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target([ElementType.PARAMETER])
@Bindable // (1)
@interface Correlation {
}import io.micronaut.core.bind.annotation.Bindable
@MustBeDocumented
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.VALUE_PARAMETER)
@Bindable // (1)
annotation class Correlation| 1 | The @Bindable annotation is required for the annotation to be considered for binding. | 
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.mqtt.bind.AnnotatedMqttBinder;
import io.micronaut.mqtt.v5.bind.MqttV5BindingContext;
import jakarta.inject.Singleton;
import java.util.Optional;
@Singleton // (1)
public class CorrelationAnnotationBinder implements AnnotatedMqttBinder<MqttV5BindingContext, Correlation> { // (2)
    private final ConversionService conversionService;
    public CorrelationAnnotationBinder(ConversionService conversionService) { // (3)
        this.conversionService = conversionService;
    }
    @Override
    public Class<Correlation> getAnnotationType() {
        return Correlation.class;
    }
    @Override
    public void bindTo(MqttV5BindingContext context, Object value, Argument<Object> argument) {
        context.getProperties().setCorrelationData((byte[]) value); // (4)
    }
    @Override
    public Optional<Object> bindFrom(MqttV5BindingContext context, ArgumentConversionContext<Object> conversionContext) {
        return conversionService.convert(context.getProperties().getCorrelationData(), conversionContext); // (5)
    }
}import io.micronaut.core.type.Argument
import io.micronaut.mqtt.bind.AnnotatedMqttBinder
import io.micronaut.mqtt.v5.bind.MqttV5BindingContext
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import jakarta.inject.Singleton
@Singleton // (1)
class CorrelationAnnotationBinder implements AnnotatedMqttBinder<MqttV5BindingContext, Correlation> { // (2)
    private final ConversionService conversionService
    CorrelationAnnotationBinder(ConversionService conversionService) { // (3)
        this.conversionService = conversionService
    }
    @Override
    Class<Correlation> getAnnotationType() {
        Correlation
    }
    @Override
    void bindTo(MqttV5BindingContext context, Object value, Argument<Object> argument) {
        context.properties.correlationData = (byte[]) value // (4)
    }
    @Override
    Optional<Object> bindFrom(MqttV5BindingContext context, ArgumentConversionContext<Object> conversionContext) {
        conversionService.convert(context.properties.correlationData, conversionContext) // (5)
    }
}import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.mqtt.bind.AnnotatedMqttBinder
import io.micronaut.mqtt.v5.bind.MqttV5BindingContext
import java.util.*
import jakarta.inject.Singleton
@Singleton // (1)
class CorrelationAnnotationBinder(private val conversionService: ConversionService)// (3)
    : AnnotatedMqttBinder<MqttV5BindingContext, Correlation> { // (2)
    override fun getAnnotationType(): Class<Correlation> {
        return Correlation::class.java
    }
    override fun bindTo(context: MqttV5BindingContext, value: Any, argument: Argument<Any>) {
       context.properties.correlationData = value as? ByteArray // (4)
    }
    override fun bindFrom(context: MqttV5BindingContext, conversionContext: ArgumentConversionContext<Any>): Optional<Any> {
        return conversionService.convert(context.properties.correlationData, conversionContext) // (5)
    }
}| 1 | The class is made a bean by annotating with @Singleton. | 
| 2 | The custom annotation is used as the generic type for the interface. | 
| 3 | The conversion service is injected into the instance. | 
| 4 | The correlation data is set from the publisher method value. | 
| 5 | The correlation data is returned to be bound to a subscriber method value. | 
The annotation can now be used on the argument in a subscriber method.
import io.micronaut.mqtt.annotation.MqttSubscriber;
import io.micronaut.mqtt.annotation.Topic;
import java.util.*;
@MqttSubscriber
public class ProductListener {
    Set<String> messages = Collections.synchronizedSet(new HashSet<>());
    @Topic("product")
    public void receive(byte[] data, @Correlation byte[] correlation) { // (1)
        messages.add(new String(correlation));
    }
}import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.Topic
@MqttSubscriber
class ProductListener {
    Set<String> messages = Collections.synchronizedSet(new HashSet<>())
    @Topic("product")
    void receive(@Correlation byte[] correlation) { // (1)
        messages.add(new String(correlation))
    }
}import io.micronaut.mqtt.annotation.MqttSubscriber
import io.micronaut.mqtt.annotation.Topic
import java.nio.charset.StandardCharsets
import java.util.Collections
import java.util.HashSet
@MqttSubscriber
class ProductListener {
    val messages: MutableSet<String> = Collections.synchronizedSet(HashSet())
    @Topic("product")
    fun receive(@Correlation data: ByteArray) { // (1)
        messages.add(String(data, StandardCharsets.UTF_8))
    }
}And in a publisher method.
import io.micronaut.mqtt.annotation.Topic;
import io.micronaut.mqtt.annotation.v5.MqttPublisher;
@MqttPublisher
public interface ProductClient {
    @Topic("product")
    void send(@Correlation byte[] correlation);
}import io.micronaut.mqtt.annotation.Topic
import io.micronaut.mqtt.annotation.v5.MqttPublisher
@MqttPublisher
interface ProductClient {
    @Topic("product")
    void send(@Correlation byte[] correlation)
}import io.micronaut.mqtt.annotation.v5.MqttPublisher
import io.micronaut.mqtt.annotation.Topic
@MqttPublisher
interface ProductClient {
    @Topic("product")
    fun send(@Correlation data: ByteArray)
}Type Binding
A custom binder can be created to support any argument type. For example the following class could be created to bind values from user properties (V5 only). This functionality could allow the work of retrieving and converting the properties to occur in a single place instead of multiple times in your code.
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
public class ProductInfo {
    private String size;
    private Long count;
    private Boolean sealed;
    public ProductInfo(@Nullable String size, // (1)
                       @NonNull Long count, // (2)
                       @NonNull Boolean sealed) { // (3)
        this.size = size;
        this.count = count;
        this.sealed = sealed;
    }
    public String getSize() {
        return size;
    }
    public Long getCount() {
        return count;
    }
    public Boolean getSealed() {
        return sealed;
    }
}import io.micronaut.core.annotation.NonNull
import io.micronaut.core.annotation.Nullable
class ProductInfo {
    private String size
    private Long count
    private Boolean seal
    ProductInfo(@Nullable String size, // (1)
                @NonNull Long count, // (2)
                @NonNull Boolean seal) { // (3)
        this.size = size
        this.count = count
        this.seal = seal
    }
    String getSize() {
        size
    }
    Long getCount() {
        count
    }
    Boolean getSealed() {
        seal
    }
}class ProductInfo(val size: String?, // (1)
                  val count: Long, // (2)
                  val sealed: Boolean)// (3)| 1 | The sizeargument is not required | 
| 2 | The countargument is required | 
| 3 | The sealedargument is required | 
A type argument binder can then be created to create the ProductInfo instance to bind to and from method arguments.
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.mqtt.bind.TypedMqttBinder;
import io.micronaut.mqtt.v5.bind.MqttV5BindingContext;
import jakarta.inject.Singleton;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@Singleton // (1)
public class ProductInfoTypeBinder implements TypedMqttBinder<MqttV5BindingContext, ProductInfo> { //(2)
    private final ConversionService conversionService;
    ProductInfoTypeBinder(ConversionService conversionService) { //(3)
        this.conversionService = conversionService;
    }
    @Override
    public Argument<ProductInfo> getArgumentType() {
        return Argument.of(ProductInfo.class);
    }
    @Override
    public void bindTo(MqttV5BindingContext context, ProductInfo value, Argument<ProductInfo> argument) {
        List<UserProperty> userPropertiesList = context.getProperties().getUserProperties();
        if (value.getSize() != null) {
            userPropertiesList.add(new UserProperty("productSize", value.getSize()));
        }
        userPropertiesList.add(new UserProperty("productCount", value.getCount().toString())); // (4)
        userPropertiesList.add(new UserProperty("productSealed", value.getSealed().toString()));
    }
    @Override
    public Optional<ProductInfo> bindFrom(MqttV5BindingContext context, ArgumentConversionContext<ProductInfo> conversionContext) {
        List<UserProperty> userPropertiesList = context.getProperties().getUserProperties();
        Map<String, String> userProperties = userPropertiesList.stream()
                .collect(Collectors.toMap(UserProperty::getKey, UserProperty::getValue));
        String size = userProperties.get("productSize");
        Optional<Long> count = Optional.ofNullable(userProperties.get("productCount"))
                .flatMap(value -> conversionService.convert(value, Long.class));
        Optional<Boolean> sealed = Optional.ofNullable(userProperties.get("productSealed"))
                .flatMap(value -> conversionService.convert(value, Boolean.class));
        if (count.isPresent() && sealed.isPresent()) {
            return Optional.of(new ProductInfo(size, count.get(), sealed.get())); // (5)
        } else {
            return Optional.empty();
        }
    }
}import io.micronaut.mqtt.bind.TypedMqttBinder
import io.micronaut.mqtt.v5.bind.MqttV5BindingContext
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import jakarta.inject.Singleton
import org.eclipse.paho.mqttv5.common.packet.UserProperty
import java.util.stream.Collectors
@Singleton // (1)
class ProductInfoTypeBinder implements TypedMqttBinder<MqttV5BindingContext, ProductInfo> { //(2)
    private final ConversionService conversionService
    ProductInfoTypeBinder(ConversionService conversionService) { //(3)
        this.conversionService = conversionService
    }
    @Override
    Argument<ProductInfo> getArgumentType() {
        return Argument.of(ProductInfo)
    }
    @Override
    void bindTo(MqttV5BindingContext context, ProductInfo value, Argument<ProductInfo> argument) {
        List<UserProperty> userPropertiesList = context.properties.userProperties
        if (value.size != null) {
            userPropertiesList.add(new UserProperty("productSize", value.size))
        }
        userPropertiesList.add(new UserProperty("productCount", value.count.toString())) // (4)
        userPropertiesList.add(new UserProperty("productSealed", value.sealed.toString()))
    }
    @Override
    Optional<ProductInfo> bindFrom(MqttV5BindingContext context, ArgumentConversionContext<ProductInfo> conversionContext) {
        List<UserProperty> userPropertiesList = context.properties.userProperties
        Map<String, String> userProperties = userPropertiesList.stream()
                .collect(Collectors.toMap(UserProperty::getKey, UserProperty::getValue))
        String size = userProperties.get("productSize")
        Optional<Long> count = Optional.ofNullable(userProperties.get("productCount"))
                .flatMap(value -> conversionService.convert(value, Long.class))
        Optional<Boolean> sealed = Optional.ofNullable(userProperties.get("productSealed"))
                .flatMap(value -> conversionService.convert(value, Boolean.class))
        if (count.isPresent() && sealed.isPresent()) {
            return Optional.of(new ProductInfo(size, count.get(), sealed.get())) // (5)
        } else {
            return Optional.empty()
        }
    }
}import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.mqtt.bind.*
import io.micronaut.mqtt.v5.bind.MqttV5BindingContext
import jakarta.inject.Singleton
import org.eclipse.paho.mqttv5.common.packet.UserProperty
import java.util.Optional
import java.util.stream.Collectors
@Singleton // (1)
class ProductInfoTypeBinder constructor(private val conversionService: ConversionService) //(3)
    : TypedMqttBinder<MqttV5BindingContext, ProductInfo> { // (2)
    override fun bindTo(context: MqttV5BindingContext, value: ProductInfo, argument: Argument<ProductInfo>) {
        val userPropertiesList = context.properties.userProperties
        if (value.size != null) {
            userPropertiesList.add(UserProperty("productSize", value.size)) // (4)
        }
        userPropertiesList.add(UserProperty("productCount", value.count.toString()))
        userPropertiesList.add(UserProperty("productSealed", value.sealed.toString()))
    }
    override fun bindFrom(context: MqttV5BindingContext, conversionContext: ArgumentConversionContext<ProductInfo>): Optional<ProductInfo> {
        val userPropertiesList = context.properties.userProperties
        val userProperties = userPropertiesList.stream()
                .collect(Collectors.toMap(UserProperty::getKey, UserProperty::getValue))
        val size = userProperties.get("productSize")
        val count = Optional.ofNullable(userProperties.get("productCount"))
                .flatMap { value -> conversionService.convert(value, Long::class.java) }
        val sealed = Optional.ofNullable(userProperties.get("productSealed"))
                .flatMap { value -> conversionService.convert(value, Boolean::class.java) }
        if (count.isPresent && sealed.isPresent) {
            return Optional.of(ProductInfo(size, count.get(), sealed.get())) // (5)
        } else {
            return Optional.empty()
        }
    }
    override fun getArgumentType(): Argument<ProductInfo> {
        return Argument.of(ProductInfo::class.java)
    }
}| 1 | The class is made a bean by annotating with @Singleton. | 
| 2 | The custom type is used as the generic type for the interface. | 
| 3 | The conversion service is injected into the instance. | 
| 4 | The properties are populated into user properties when publishing | 
| 5 | The properties are read and converted into a ProductInfoto be bound to a subscriber argument | 
12 Message Serialization/Deserialization (SerDes)
The serialization and deserialization of message payloads is handled through instances of MqttPayloadSerDes. The ser-des (Serializer/Deserializer) is responsible for both serialization and deserialization of MQTT message payloads into the types defined in your publisher and subscriber methods.
The ser-des are managed by a MqttPayloadSerDesRegistry. All ser-des beans are injected in order into the registry and then searched for when serialization or deserialization is needed. The first ser-des that returns true for supports-java.lang.Class- is returned and used.
By default, standard Java lang types and JSON format (with Jackson) are supported. You can supply your own ser-des by simply registering a bean of type MqttPayloadSerDes. All ser-des implement the Ordered interface, so custom implementations can come before, after, or in between the default implementations.
12.1 Custom SerDes
A custom serializer/deserializer would be necessary to support custom data formats. In the section on Custom Binding an example was demonstrated that allowed binding a ProductInfo type from the properties of the message. If instead that object should represent the payload of the message with a custom data format, you could register your own serializer/deserializer to do so.
In this example a simple data format of the string representation of the fields are concatenated together with a pipe character.
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.mqtt.serdes.MqttPayloadSerDes;
import jakarta.inject.Singleton;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
@Singleton // (1)
public class ProductInfoSerDes implements MqttPayloadSerDes<ProductInfo> { // (2)
    private static final Charset CHARSET = StandardCharsets.UTF_8;
    private final ConversionService conversionService;
    public ProductInfoSerDes(ConversionService conversionService) { // (3)
        this.conversionService = conversionService;
    }
    @Nullable
    @Override
    public ProductInfo deserialize(byte[] payload, Argument<ProductInfo> argument) {
        String body = new String(payload, CHARSET);
        String[] parts = body.split("\\|");
        if (parts.length == 3) {
            String size = parts[0];
            if (size.equals("null")) {
                size = null;
            }
            Optional<Long> count = conversionService.convert(parts[1], Long.class);
            Optional<Boolean> sealed = conversionService.convert(parts[2], Boolean.class);
            if (count.isPresent() && sealed.isPresent()) {
                return new ProductInfo(size, count.get(), sealed.get()); // (4)
            }
        }
        return null;
    }
    @Nullable
    @Override
    public byte[] serialize(@Nullable ProductInfo data) {
        if (data == null) {
            return null;
        }
        return (data.getSize() + "|" + data.getCount() + "|" + data.getSealed()).getBytes(CHARSET); // (5)
    }
    @Override
    public boolean supports(Argument<ProductInfo> argument) { // (6)
        return argument.getType().isAssignableFrom(ProductInfo.class);
    }
}import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import jakarta.inject.Singleton
import java.nio.charset.Charset
@Singleton // (1)
class ProductInfoSerDes implements MqttPayloadSerDes<ProductInfo> { // (2)
    private static final Charset CHARSET = Charset.forName("UTF-8")
    private final ConversionService conversionService
    ProductInfoSerDes(ConversionService conversionService) { // (3)
        this.conversionService = conversionService
    }
    @Nullable
    @Override
    ProductInfo deserialize(byte[] payload, Argument<ProductInfo> argument) {
        String body = new String(payload, CHARSET)
        String[] parts = body.split("\\|")
        if (parts.length == 3) {
            String size = parts[0]
            if (size == "null") {
                size = null
            }
            Optional<Long> count = conversionService.convert(parts[1], Long.class)
            Optional<Boolean> sealed = conversionService.convert(parts[2], Boolean.class)
            if (count.isPresent() && sealed.isPresent()) {
                return new ProductInfo(size, count.get(), sealed.get()) // (4)
            }
        }
        return null
    }
    @Nullable
    @Override
    byte[] serialize(@Nullable ProductInfo data) {
        if (data == null) {
            return null
        }
        return "${data.size}|${data.count}|${data.sealed}".getBytes(CHARSET) // (5)
    }
    @Override
    boolean supports(Argument<ProductInfo> argument) { // (6)
        argument.type.isAssignableFrom(ProductInfo)
    }
}import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.mqtt.serdes.MqttPayloadSerDes
import jakarta.inject.Singleton
import java.nio.charset.Charset
@Singleton // (1)
class ProductInfoSerDes(private val conversionService: ConversionService)// (3)
    : MqttPayloadSerDes<ProductInfo> { // (2)
    override fun deserialize(payload: ByteArray, argument: Argument<ProductInfo>): ProductInfo? {
        val body = String(payload, CHARSET)
        val parts = body.split("\\|".toRegex())
        if (parts.size == 3) {
            var size: String? = parts[0]
            if (size == "null") {
                size = null
            }
            val count = conversionService.convert(parts[1], Long::class.java)
            val sealed = conversionService.convert(parts[2], Boolean::class.java)
            if (count.isPresent && sealed.isPresent) {
                return ProductInfo(size, count.get(), sealed.get()) // (4)
            }
        }
        return null
    }
    override fun serialize(data: ProductInfo?): ByteArray {
        return (data?.size + "|" + data?.count + "|" + data?.sealed).toByteArray(CHARSET) // (5)
    }
    override fun supports(argument: Argument<ProductInfo>): Boolean { // (6)
        return argument.type.isAssignableFrom(ProductInfo::class.java)
    }
    companion object {
        private val CHARSET = Charset.forName("UTF-8")
    }
}| 1 | The class is declared as a singleton so it will be registered with the context | 
| 2 | The generic specifies what type we want to accept and return | 
| 3 | The conversion service is injected to convert the parts of the message to the required types | 
| 4 | The deserialize method takes the bytes from the message and constructs a ProductInfo | 
| 5 | The serialize method takes the ProductInfoand returns the bytes to publish. | 
| 6 | The supports method ensures only the correct body types are processed by this ser-des | 
| Because the getOrdermethod was not overridden, the default order of 0 is used. All default ser-des have a lower precedent than the default order which means this ser-des will be checked before the others. | 
13 GraalVM support
Micronaut MQTT is compatible with GraalVM. Everything is handled automatically by the library so users don’t need any special configuration.
The only additional GraalVM configuration you need if you are using mqtt-ssl module is adding the option
--report-unsupported-elements-at-runtime.
| See the section on GraalVM in the user guide for more information. | 
14 Breaking Changes
This section documents breaking changes for Mqtt:
3.0.0
The annotations for Mqtt have moved into the core module, and therefore have changed package names. The new package names are:
Mqtt3
| Old package name | New package name | 
| 
 | 
 | 
Mqtt5
| Old package name | New package name | 
| 
 | 
 | 
| 
 | 
 | 
| 
 | 
 | 
15 Repository
You can find the source code of this project in this repository: