Micronaut RabbitMQ

Integration between Micronaut and RabbitMQ

Version: 4.6.0

1 Introduction

This project includes integration between Micronaut and RabbitMQ. The standard Java Client is used to do the actual publishing and consuming.

2 Release History

For this project, you can find a list of releases (with release notes) here:

Upgrading to Micronaut RabbitMQ 4.0

Micronaut RabbitMQ 4.0 is a significant major version which includes a number of changes you will need to consider when upgrading.

Micronaut 4, AMQP Java Client 5, & Java 17 baseline

Micronaut RabbitMQ 4.0 requires the following minimum set of dependencies:

  • Java 17 or above

  • AMQP Java Client 5 or above

  • Micronaut 4 or above

@Queue annotation member numberOfConsumers is now a String

Previous versions of Micronaut RabbitMQ used an int for the numberOfConsumers setting of the @Queue annotation. In order to allow this value to be changed via external configuration using an expression such as @Queue(numberOfConsumers = "${configured-number-of-consumers}"), the type of numberOfConsumers has been changed to String.

3 Using the Micronaut CLI

To create a project with RabbitMQ support using the Micronaut CLI, supply the rabbitmq feature to the features flag.

$ mn create-messaging-app my-rabbitmq-app --features rabbitmq

This will create a project with the minimum necessary configuration for RabbitMQ.

As you’d expect, you can start the application with ./gradlew run (for Gradle) or ./mvnw compile exec:exec (Maven). The application will (with the default config) attempt to connect to RabbitMQ at http://localhost:5672, and will continue to run without starting up an HTTP server. All communication to/from the service will take place via RabbitMQ producers and/or consumers.

Within the new project, you can now run the RabbitMQ specific code generation commands:

$ mn create-rabbitmq-producer MessageProducer
| Rendered template Producer.java to destination src/main/java/my/rabbitmq/app/MessageProducer.java

$ mn create-rabbitmq-listener MessageListener
| Rendered template Listener.java to destination src/main/java/my/rabbitmq/app/MessageListener.java

4 RabbitMQ Quick Start

To add support for RabbitMQ to an existing project, you should first add the Micronaut RabbitMQ configuration to your build configuration. For example:

implementation("io.micronaut.rabbitmq:micronaut-rabbitmq")
<dependency>
    <groupId>io.micronaut.rabbitmq</groupId>
    <artifactId>micronaut-rabbitmq</artifactId>
</dependency>

Creating a RabbitMQ Producer with @RabbitClient

To create a RabbitMQ client that produces messages you can simply define an interface that is annotated with @RabbitClient.

For example the following is a trivial @RabbitClient interface:

import io.micronaut.rabbitmq.annotation.Binding;
import io.micronaut.rabbitmq.annotation.RabbitClient;

@RabbitClient // (1)
public interface ProductClient {

    @Binding("product") // (2)
    void send(byte[] data); // (3)
}
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.RabbitClient

@RabbitClient // (1)
interface ProductClient {

    @Binding("product") // (2)
    void send(byte[] data) // (3)
}
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.RabbitClient

@RabbitClient // (1)
interface ProductClient {

    @Binding("product") // (2)
    fun send(data: ByteArray)  // (3)
}
1 The @RabbitClient annotation is used to designate this interface as a client
2 The @Binding annotation indicates which binding or routing key the message should be routed to.
3 The send method accepts single parameter which is the body of the message.

At compile time Micronaut will produce an implementation of the above interface. You can retrieve an instance of ProductClient either by looking up the bean from the ApplicationContext or by injecting the bean with @Inject:

productClient.send("quickstart".getBytes());
productClient.send("quickstart".bytes)
productClient.send("quickstart".toByteArray())
Because the send method returns void this means the method will publish the message and return immediately without any acknowledgement from the broker.

Creating a RabbitMQ Consumer with @RabbitListener

To listen to RabbitMQ messages you can use the @RabbitListener annotation to define a message listener.

The following example will listen for messages published by the ProductClient in the previous section:

import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitListener;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@RabbitListener // (1)
public class ProductListener {

    List<String> messageLengths = Collections.synchronizedList(new ArrayList<>());

    @Queue("product") // (2)
    public void receive(byte[] data) { // (3)
        messageLengths.add(new String(data));
        System.out.println("Java received " + data.length + " bytes from RabbitMQ");
    }
}
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener

import java.util.concurrent.CopyOnWriteArrayList

@RabbitListener // (1)
class ProductListener {

    CopyOnWriteArrayList<String> messageLengths = []

    @Queue("product") // (2)
    void receive(byte[] data) { // (3)
        messageLengths << new String(data)
    }
}
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener
import java.util.Collections

@RabbitListener // (1)
class ProductListener {

    val messageLengths: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Queue("product") // (2)
    fun receive(data: ByteArray) { // (3)
        val string = String(data)
        messageLengths.add(string)
        println("Kotlin received ${data.size} bytes from RabbitMQ: ${string}")
    }
}
1 The @RabbitListener is used to designate the bean as a message listener.
2 The @Queue annotation is used to indicate which queue to subscribe to.
3 The receive method accepts a single parameter which is the body of the message.

5 Configuring The Connection

All properties on the ConnectionFactory are available to be modified, either through configuration or a BeanCreatedEventListener.

The properties that can be converted from the string values in a configuration file can be configured directly.

🔗
Table 1. Configuration Properties for SingleRabbitConnectionFactoryConfig
Property Type Description

rabbitmq.host

java.lang.String

rabbitmq.port

int

rabbitmq.username

java.lang.String

rabbitmq.password

java.lang.String

rabbitmq.credentials-provider

com.rabbitmq.client.impl.CredentialsProvider

rabbitmq.virtual-host

java.lang.String

rabbitmq.uri

java.net.URI

rabbitmq.requested-channel-max

int

rabbitmq.requested-frame-max

int

rabbitmq.requested-heartbeat

int

rabbitmq.connection-timeout

int

rabbitmq.handshake-timeout

int

rabbitmq.shutdown-timeout

int

rabbitmq.client-properties

java.util.Map

rabbitmq.sasl-config

com.rabbitmq.client.SaslConfig

rabbitmq.socket-factory

javax.net.SocketFactory

rabbitmq.socket-configurator

com.rabbitmq.client.SocketConfigurator

rabbitmq.shared-executor

java.util.concurrent.ExecutorService

rabbitmq.shutdown-executor

java.util.concurrent.ExecutorService

rabbitmq.heartbeat-executor

java.util.concurrent.ScheduledExecutorService

rabbitmq.thread-factory

java.util.concurrent.ThreadFactory

rabbitmq.exception-handler

com.rabbitmq.client.ExceptionHandler

rabbitmq.automatic-recovery-enabled

boolean

rabbitmq.topology-recovery-enabled

boolean

rabbitmq.topology-recovery-executor

java.util.concurrent.ExecutorService

rabbitmq.metrics-collector

com.rabbitmq.client.MetricsCollector

rabbitmq.observation-collector

com.rabbitmq.client.observation.ObservationCollector

rabbitmq.credentials-refresh-service

com.rabbitmq.client.impl.CredentialsRefreshService

rabbitmq.network-recovery-interval

int

rabbitmq.recovery-delay-handler

com.rabbitmq.client.RecoveryDelayHandler

rabbitmq.nio-params

com.rabbitmq.client.impl.nio.NioParams

rabbitmq.channel-rpc-timeout

int

rabbitmq.max-inbound-message-body-size

int

rabbitmq.ssl-context-factory

com.rabbitmq.client.SslContextFactory

rabbitmq.channel-should-check-rpc-response-type

boolean

rabbitmq.work-pool-timeout

int

rabbitmq.error-on-write-listener

com.rabbitmq.client.impl.ErrorOnWriteListener

rabbitmq.topology-recovery-filter

com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter

rabbitmq.connection-recovery-triggering-condition

java.util.function.Predicate

rabbitmq.topology-recovery-retry-handler

com.rabbitmq.client.impl.recovery.RetryHandler

rabbitmq.recovered-queue-name-supplier

com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier

rabbitmq.traffic-listener

com.rabbitmq.client.TrafficListener

rabbitmq.addresses

java.util.List

rabbitmq.consumer-executor

java.lang.String

rabbitmq.confirm-timeout

java.time.Duration

🔗
Table 2. Configuration Properties for ClusterRabbitConnectionFactoryConfig
Property Type Description

rabbitmq.servers.*.host

java.lang.String

rabbitmq.servers.*.port

int

rabbitmq.servers.*.username

java.lang.String

rabbitmq.servers.*.password

java.lang.String

rabbitmq.servers.*.credentials-provider

com.rabbitmq.client.impl.CredentialsProvider

rabbitmq.servers.*.virtual-host

java.lang.String

rabbitmq.servers.*.uri

java.net.URI

rabbitmq.servers.*.requested-channel-max

int

rabbitmq.servers.*.requested-frame-max

int

rabbitmq.servers.*.requested-heartbeat

int

rabbitmq.servers.*.connection-timeout

int

rabbitmq.servers.*.handshake-timeout

int

rabbitmq.servers.*.shutdown-timeout

int

rabbitmq.servers.*.client-properties

java.util.Map

rabbitmq.servers.*.sasl-config

com.rabbitmq.client.SaslConfig

rabbitmq.servers.*.socket-factory

javax.net.SocketFactory

rabbitmq.servers.*.socket-configurator

com.rabbitmq.client.SocketConfigurator

rabbitmq.servers.*.shared-executor

java.util.concurrent.ExecutorService

rabbitmq.servers.*.shutdown-executor

java.util.concurrent.ExecutorService

rabbitmq.servers.*.heartbeat-executor

java.util.concurrent.ScheduledExecutorService

rabbitmq.servers.*.thread-factory

java.util.concurrent.ThreadFactory

rabbitmq.servers.*.exception-handler

com.rabbitmq.client.ExceptionHandler

rabbitmq.servers.*.automatic-recovery-enabled

boolean

rabbitmq.servers.*.topology-recovery-enabled

boolean

rabbitmq.servers.*.topology-recovery-executor

java.util.concurrent.ExecutorService

rabbitmq.servers.*.metrics-collector

com.rabbitmq.client.MetricsCollector

rabbitmq.servers.*.observation-collector

com.rabbitmq.client.observation.ObservationCollector

rabbitmq.servers.*.credentials-refresh-service

com.rabbitmq.client.impl.CredentialsRefreshService

rabbitmq.servers.*.network-recovery-interval

int

rabbitmq.servers.*.recovery-delay-handler

com.rabbitmq.client.RecoveryDelayHandler

rabbitmq.servers.*.nio-params

com.rabbitmq.client.impl.nio.NioParams

rabbitmq.servers.*.channel-rpc-timeout

int

rabbitmq.servers.*.max-inbound-message-body-size

int

rabbitmq.servers.*.ssl-context-factory

com.rabbitmq.client.SslContextFactory

rabbitmq.servers.*.channel-should-check-rpc-response-type

boolean

rabbitmq.servers.*.work-pool-timeout

int

rabbitmq.servers.*.error-on-write-listener

com.rabbitmq.client.impl.ErrorOnWriteListener

rabbitmq.servers.*.topology-recovery-filter

com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter

rabbitmq.servers.*.connection-recovery-triggering-condition

java.util.function.Predicate

rabbitmq.servers.*.topology-recovery-retry-handler

com.rabbitmq.client.impl.recovery.RetryHandler

rabbitmq.servers.*.recovered-queue-name-supplier

com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier

rabbitmq.servers.*.traffic-listener

com.rabbitmq.client.TrafficListener

rabbitmq.servers.*.addresses

java.util.List

rabbitmq.servers.*.consumer-executor

java.lang.String

rabbitmq.servers.*.confirm-timeout

java.time.Duration

Without any configuration the defaults in the ConnectionFactory will be used.

To configure things like the CredentialsProvider a bean created event listener can be registered to intercept the creation of the connection factory.

package io.micronaut.rabbitmq.docs.config;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.impl.DefaultCredentialsProvider;
import io.micronaut.context.event.BeanCreatedEvent;
import io.micronaut.context.event.BeanCreatedEventListener;
import jakarta.inject.Singleton;

@Singleton
public class ConnectionFactoryInterceptor implements BeanCreatedEventListener<ConnectionFactory> {

    @Override
    public ConnectionFactory onCreated(BeanCreatedEvent<ConnectionFactory> event) {
        ConnectionFactory connectionFactory = event.getBean();
        connectionFactory.setCredentialsProvider(new DefaultCredentialsProvider("guest", "guest"));
        return connectionFactory;
    }
}
package io.micronaut.rabbitmq.docs.config

import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.impl.DefaultCredentialsProvider
import io.micronaut.context.event.BeanCreatedEvent
import io.micronaut.context.event.BeanCreatedEventListener
import jakarta.inject.Singleton

@Singleton
class ConnectionFactoryInterceptor implements BeanCreatedEventListener<ConnectionFactory> {

    @Override
    ConnectionFactory onCreated(BeanCreatedEvent<ConnectionFactory> event) {
        def connectionFactory = event.bean
        connectionFactory.credentialsProvider = new DefaultCredentialsProvider("guest", "guest")
        connectionFactory
    }
}
package io.micronaut.rabbitmq.docs.config

import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.impl.DefaultCredentialsProvider
import io.micronaut.context.event.BeanCreatedEvent
import io.micronaut.context.event.BeanCreatedEventListener
import jakarta.inject.Singleton

@Singleton
class ConnectionFactoryInterceptor: BeanCreatedEventListener<ConnectionFactory> {

    override fun onCreated(event: BeanCreatedEvent<ConnectionFactory>?): ConnectionFactory {
        val connectionFactory = event!!.bean
        connectionFactory.setCredentialsProvider(DefaultCredentialsProvider("guest", "guest"))
        return connectionFactory
    }
}
It is also possible to disable the integration entirely with rabbitmq.enabled: false

Connections

It is possible to configure multiple connections to the same server, different servers, or a single connection to one of a list of servers.

One may want to configure multiple connections to the same server in order to have one or more sets of consumers to be executed on a different thread pool. Additionally, the below config could be used to connect to different servers with the same consumer executor by simply omitting the consumer-executor configuration option or supplying the same value.

For example:

rabbitmq:
    servers:
        server-a:
            host: localhost
            port: 5672
            consumer-executor: "a-pool"
        server-b:
            host: localhost
            port: 5672
            consumer-executor: "b-pool"

When the connection is specified in the @Queue annotation to be "server-b" for example, the "b-pool" executor service will be used to execute the consumers.

When the configuration option rabbitmq.servers is used, no other options underneath rabbitmq are read; for example rabbitmq.uri.

RabbitMQ also supports a fail over connection strategy where the first server that connects successfully will be used among a list of servers. To use this option in Micronaut, simply supply a list of host:port addresses.

rabbitmq:
    addresses:
      - localhost:12345
      - localhost:12346
    username: guest
    password: guest
The addresses option can also be used with the multiple server configuration.

6 RabbitMQ Producers

The example in the quick start presented a trivial definition of an interface that be implemented automatically for you using the @RabbitClient annotation.

The implementation that powers @RabbitClient (defined by the RabbitMQIntroductionAdvice class) is, however, very flexible and offers a range of options for defining RabbitMQ producers.

Exchange

The exchange to publish messages to can be provided through the @RabbitClient annotation. In this example, the client is publishing messages to a custom header exchange called animals.

import io.micronaut.messaging.annotation.MessageHeader;
import io.micronaut.rabbitmq.annotation.RabbitClient;

@RabbitClient("animals") // (1)
public interface AnimalClient {

    void send(@MessageHeader String animalType, Animal animal); // (2)

    default void send(Animal animal) { // (3)
        send(animal.getClass().getSimpleName(), animal);
    }
}
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.rabbitmq.annotation.RabbitClient

@RabbitClient("animals") // (1)
abstract class AnimalClient {

    abstract void send(@MessageHeader String animalType, Animal animal) // (2)

    void send(Animal animal) { // (3)
        send(animal.getClass().simpleName, animal)
    }
}
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.rabbitmq.annotation.RabbitClient

@RabbitClient("animals") // (1)
abstract class AnimalClient {

    abstract fun send(@MessageHeader animalType: String, animal: Animal)  // (2)

    fun send(animal: Animal) { //(3)
        send(animal.javaClass.simpleName, animal)
    }
}
1 The exchange name is provided through the @RabbitClient annotation.
2 The header value is used to route the message to a queue.
3 A helper method was created to provide the header value automatically.
Exchanges must already exist before you can publish messages to them.

6.1 Defining @RabbitClient Methods

All methods that publish messages to RabbitMQ must meet the following conditions:

  • The method must reside in a class annotated with @RabbitClient.

  • The method must contain an argument representing the body of the message.

If a body argument cannot be found, an exception will be thrown.
In order for all of the functionality to work as designed in this guide your classes must be compiled with the parameters flag set to true. If your application was created with the Micronaut CLI, then that has already been configured for you.
Unless a reactive type is returned from the publishing method, the action is blocking.

6.1.1 Publishing Parameters

All options are available to be set for publishing messages. The basicPublish method is used by the RabbitMQIntroductionAdvice to publish messages and all arguments can be set through annotations or method arguments.

6.1.1.1 Binding (Routing Key)

If you need to specify the routing key of the message, apply the @Binding annotation to the method or an argument of the method. Apply the annotation to the method itself if the value is static for every execution. Apply the annotation to an argument of the method if the value should be set per execution.

import io.micronaut.rabbitmq.annotation.Binding;
import io.micronaut.rabbitmq.annotation.RabbitClient;

@RabbitClient
public interface ProductClient {

    @Binding("product") // (1)
    void send(byte[] data);

    void send(@Binding String binding, byte[] data); // (2)
}
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.RabbitClient

@RabbitClient
interface ProductClient {

    @Binding("product") // (1)
    void send(byte[] data)

    void send(@Binding String binding, byte[] data) // (2)
}
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.RabbitClient

@RabbitClient
interface ProductClient {

    @Binding("product") // (1)
    fun send(data: ByteArray)

    fun send(@Binding binding: String, data: ByteArray)  // (2)
}
1 The binding is static
2 The binding must be set per execution

Producer Connection

If multiple RabbitMQ servers have been configured, the name of the server can be set in the @Binding annotation to designate which connection should be used to publish messages.

import io.micronaut.rabbitmq.annotation.Binding;
import io.micronaut.rabbitmq.annotation.RabbitClient;

@RabbitClient // (1)
public interface ProductClient {

    @Binding(value = "product", connection = "product-cluster") // (2)
    void send(byte[] data); // (3)
}
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.RabbitClient

@RabbitClient // (1)
interface ProductClient {

    @Binding(value = "product", connection = "product-cluster") // (2)
    void send(byte[] data) // (3)
}
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.RabbitClient

@RabbitClient // (1)
interface ProductClient {

    @Binding(value = "product", connection = "product-cluster") // (2)
    fun send(data: ByteArray)  // (3)
}
1 The connection is set on the binding annotation.
The connection option is also available to be set on the @RabbitClient annotation.

6.1.1.2 Mandatory Flag

You can set the mandatory flag by applying the @Mandatory annotation to a method, to an argument of a method, or to the producer itself.

Apply the annotation to the producer or to a producer method if the value is static for every execution. Apply the annotation to an argument of the method if the value should be set per execution.

import io.micronaut.rabbitmq.annotation.Binding;
import io.micronaut.rabbitmq.annotation.Mandatory;
import io.micronaut.rabbitmq.annotation.RabbitClient;

@RabbitClient
public interface MandatoryProductClient {

    @Binding("product")
    @Mandatory // (1)
    void send(byte[] data);

    @Binding("product")
    void send(@Mandatory boolean mandatory, byte[] data); // (2)
}
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.Mandatory
import io.micronaut.rabbitmq.annotation.RabbitClient

@RabbitClient
interface MandatoryProductClient {

    @Binding("product")
    @Mandatory // (1)
    void send(byte[] data)

    @Binding("product")
    void send(@Mandatory boolean mandatory, byte[] data) // (2)
}
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.Mandatory
import io.micronaut.rabbitmq.annotation.RabbitClient

@RabbitClient
interface MandatoryProductClient {

    @Binding("product")
    @Mandatory // (1)
    fun send(data: ByteArray)

    @Binding("product")
    fun send(@Mandatory mandatory: Boolean, data: ByteArray) // (2)
}
1 The binding is static
2 The binding must be set per execution!

The mandatory flag tells the server how to react if the message cannot be routed to a queue. If this flag is true, the server will return an unroutable message with a Return method. If this flag is false, the server silently drops the message (this is the default behavior).

Unroutable messages can be handled by adding a ReturnListener to the channel.

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ReturnListener;
import io.micronaut.rabbitmq.connect.ChannelInitializer;
import jakarta.inject.Singleton;

import java.io.IOException;

@Singleton
class MyReturnListener extends ChannelInitializer implements ReturnListener {

    @Override
    public void initialize(Channel channel, String name) throws IOException {
        channel.addReturnListener(this); // (1)
    }

    @Override
    public void handleReturn(
        int replyCode,
        String replyText,
        String exchange,
        String routingKey,
        AMQP.BasicProperties properties,
        byte[] body
    ) throws IOException {
        // (2)
    }
}
import com.rabbitmq.client.AMQP
import com.rabbitmq.client.Channel
import com.rabbitmq.client.ReturnListener
import io.micronaut.rabbitmq.connect.ChannelInitializer
import jakarta.inject.Singleton

@Singleton
class MyReturnListener extends ChannelInitializer implements ReturnListener {

    @Override
    void initialize(Channel channel, String name) throws IOException {
        channel.addReturnListener(this) // (1)
    }

    @Override
    void handleReturn(
            int replyCode,
            String replyText,
            String exchange,
            String routingKey,
            AMQP.BasicProperties properties,
            byte[] body
    ) throws IOException {
        // (2)
    }
}
import com.rabbitmq.client.AMQP
import com.rabbitmq.client.Channel
import com.rabbitmq.client.ReturnListener
import io.micronaut.rabbitmq.connect.ChannelInitializer
import jakarta.inject.Singleton
import java.io.IOException

@Singleton
internal class MyReturnListener : ChannelInitializer(), ReturnListener {

    @Throws(IOException::class)
    override fun initialize(channel: Channel, name: String) {
        channel.addReturnListener(this) // (1)
    }

    @Throws(IOException::class)
    override fun handleReturn(
        replyCode: Int,
        replyText: String,
        exchange: String,
        routingKey: String,
        properties: AMQP.BasicProperties,
        body: ByteArray
    ) {
        // (2)
    }
}
1 Add the return listener to the channel
2 Implement custom logic here
You can find more details about this mechanism here.

6.1.1.3 RabbitMQ Properties

It is also supported to supply properties when publishing messages. Any of the BasicProperties can be set dynamically per execution or statically for all executions. Properties can be set using the @RabbitProperty annotation.

import io.micronaut.rabbitmq.annotation.Binding;
import io.micronaut.rabbitmq.annotation.RabbitClient;
import io.micronaut.rabbitmq.annotation.RabbitProperty;

@RabbitClient
@RabbitProperty(name = "appId", value = "myApp") // (1)
@RabbitProperty(name = "userId", value = "admin")
public interface ProductClient {

    @Binding("product")
    @RabbitProperty(name = "contentType", value = "application/json") // (2)
    @RabbitProperty(name = "userId", value = "guest")
    void send(byte[] data);

    @Binding("product")
    void send(@RabbitProperty("userId") String user, @RabbitProperty String contentType, byte[] data); // (3)
}
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.RabbitClient
import io.micronaut.rabbitmq.annotation.RabbitProperty

@RabbitClient
@RabbitProperty(name = "appId", value = "myApp") // (1)
interface ProductClient {

    @Binding("product")
    @RabbitProperty(name = "contentType", value = "application/json") // (2)
    @RabbitProperty(name = "userId", value = "guest")
    void send(byte[] data)

    @Binding("product")
    void send(@RabbitProperty("userId") String user, @RabbitProperty String contentType, byte[] data) // (3)
}
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.RabbitClient
import io.micronaut.rabbitmq.annotation.RabbitProperties
import io.micronaut.rabbitmq.annotation.RabbitProperty

@RabbitClient
@RabbitProperty(name = "appId", value = "myApp") // (1)
interface ProductClient {

    @Binding("product")
    @RabbitProperties( // (2)
            RabbitProperty(name = "contentType", value = "application/json"),
            RabbitProperty(name = "userId", value = "guest")
    )
    fun send(data: ByteArray)

    @Binding("product")
    fun send(@RabbitProperty("userId") user: String, @RabbitProperty contentType: String?, data: ByteArray)  // (3)
}
1 Properties can be defined at the class level and will apply to all methods. If a property is defined on the method with the same name as one on the class, the value on the method will be used.
2 Multiple annotations can be used to set multiple properties on the method or class level.
3 Properties can be set per execution. The name is inferred from the argument if the annotation value is not set. The value passed to the method will always be used, even if null.

For method arguments, if the value is not supplied to the annotation, the argument name will be used as the property name. For example, @RabbitProperty String userId would result in the property userId being set on the properties object before publishing.

If the annotation or argument name cannot be matched to a property name, an exception will be thrown. If the supplied value cannot be converted to the type defined in BasicProperties, an exception will be thrown.

6.1.1.4 Headers

Headers can be set on the message with the @MessageHeader annotation applied to the method or an argument of the method. Apply the annotation to the method itself if the value is static for every execution. Apply the annotation to an argument of the method if the value should be set per execution.

import io.micronaut.messaging.annotation.MessageHeader;
import io.micronaut.rabbitmq.annotation.Binding;
import io.micronaut.rabbitmq.annotation.RabbitClient;
import io.micronaut.rabbitmq.annotation.RabbitHeaders;

import java.util.Map;

@RabbitClient
@MessageHeader(name = "x-product-sealed", value = "true") // (1)
@MessageHeader(name = "productSize", value = "large")
public interface ProductClient {

    @Binding("product")
    @MessageHeader(name = "x-product-count", value = "10") // (2)
    @MessageHeader(name = "productSize", value = "small")
    void send(byte[] data);

    @Binding("product")
    void send(@MessageHeader String productSize, // (3)
              @MessageHeader("x-product-count") Long count,
              byte[] data);

    @Binding("product")
    void send(@RabbitHeaders Map<String, Object> headers, // (4)
              byte[] data);
}
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.RabbitClient
import io.micronaut.rabbitmq.annotation.RabbitHeaders

@RabbitClient
@MessageHeader(name = "x-product-sealed", value = "true") // (1)
@MessageHeader(name = "productSize", value = "large")
interface ProductClient {

    @Binding("product")
    @MessageHeader(name = "x-product-count", value = "10") // (2)
    @MessageHeader(name = "productSize", value = "small")
    void send(byte[] data)

    @Binding("product")
    void send(@MessageHeader String productSize, // (3)
              @MessageHeader("x-product-count") Long count,
              byte[] data)

    @Binding("product")
    void send(@RabbitHeaders Map<String, Object> headers, // (4)
              byte[] data)
}
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.messaging.annotation.MessageHeaders
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.RabbitClient
import io.micronaut.rabbitmq.annotation.RabbitHeaders

@RabbitClient
@MessageHeaders(
    MessageHeader(name = "x-product-sealed", value = "true"), // (1)
    MessageHeader(name = "productSize", value = "large")
)
interface ProductClient {

    @Binding("product")
    @MessageHeaders(
        MessageHeader(name = "x-product-count", value = "10"), // (2)
        MessageHeader(name = "productSize", value = "small")
    )
    fun send(data: ByteArray)

    @Binding("product")
    fun send(@MessageHeader productSize: String?, // (3)
             @MessageHeader("x-product-count") count: Long,
             data: ByteArray)

    @Binding("product")
    fun send(@RabbitHeaders headers: Map<String, Any>, // (4)
             data: ByteArray)
}
1 Headers can be defined at the class level and will apply to all methods. If a header is defined on the method with the same name as one on the class, the value on the method will be used.
2 Multiple annotations can be used to set multiple headers on the method or class level.
3 Headers can be set per execution. The name is inferred from the argument if the annotation value is not set. The value passed to the method will always be used, even if null.
4 A Map<String, Object> argument annotated with @RabbitHeaders can be used to pass a map of headers.

6.1.1.5 Message Body

Most examples up to this point have been using a byte[] as the body type for simplicity. This library supports most standard Java types and JSON serialization (using Jackson) by default. The functionality is extensible and it is possible to add support for additional types and serialization strategies. See the section on Message Serialization/Deserialization for more information.

6.1.2 Broker Acknowledgement

Client methods support two return types, void and a reactive type. If the method returns void, the message will be published and the method will return without acknowledgement. If a reactive type is the return type, a "cold" publisher will be returned that can be subscribed to.

Since the publisher is cold, the message will not actually be published until the stream is subscribed to.

For example:

import io.micronaut.rabbitmq.annotation.Binding;
import io.micronaut.rabbitmq.annotation.RabbitClient;
import org.reactivestreams.Publisher;

import java.util.concurrent.CompletableFuture;

@RabbitClient
public interface ProductClient {

    @Binding("product")
    Publisher<Void> sendPublisher(byte[] data); // (1)

    @Binding("product")
    CompletableFuture<Void> sendFuture(byte[] data); // (2)
}
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.RabbitClient
import org.reactivestreams.Publisher

import java.util.concurrent.CompletableFuture

@RabbitClient
interface ProductClient {

    @Binding("product")
    Publisher<Void> sendPublisher(byte[] data) // (1)

    @Binding("product")
    CompletableFuture<Void> sendFuture(byte[] data) // (2)
}
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.RabbitClient
import org.reactivestreams.Publisher
import java.util.concurrent.CompletableFuture


@RabbitClient
interface ProductClient {

    @Binding("product")
    fun sendPublisher(data: ByteArray): Publisher<Void>  // (1)

    @Binding("product")
    fun sendFuture(data: ByteArray): CompletableFuture<Void>  // (2)

    @Binding("product")
    suspend fun sendSuspend(data: ByteArray) //suspend methods work too!
}
1 A Publisher can be returned. Any other reactive streams implementation type can also be returned given the required relevant dependency is in place
2 Java futures can also be returned
RxJava 1 is not supported. Publisher acknowledgements will be executed on the IO thread pool.

7 RabbitMQ Consumers

The example in the quick start presented a trivial definition of a class that listens for messages using the @RabbitListener annotation.

The implementation that powers @RabbitListener (defined by the RabbitMQConsumerAdvice class) is, however, very flexible and offers a range of options for defining RabbitMQ consumers.

7.1 Defining @RabbitListener Methods

All methods that consume messages from RabbitMQ must meet the following conditions:

  • The method must reside in a class annotated with @RabbitListener.

  • The method must be annotated with @Queue.

In order for all of the functionality to work as designed in this guide your classes must be compiled with the parameters flag set to true. If your application was created with the Micronaut CLI, then that has already been configured for you.

7.1.1 Consumer Parameters

The basicConsume method is used by the RabbitMQConsumerAdvice to consume messages. Some of the options can be directly configured through annotations.

In order for the consumer method to be invoked, all arguments must be satisfied. To allow execution of the method with a null value, the argument must be declared as nullable. If the arguments cannot be satisfied, the message will be rejected.

7.1.1.1 Queue

A @Queue annotation is required for a method to be a consumer of messages from RabbitMQ. Simply apply the annotation to the method and supply the name of the queue you would like to listen to.

Queues must already exist before you can listen to messages from them.
import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitListener;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@RabbitListener
public class ProductListener {

    List<Integer> messageLengths = Collections.synchronizedList(new ArrayList<>());

    @Queue("product") // (1)
    public void receive(byte[] data) {
        messageLengths.add(data.length);
    }
}
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener

import java.util.concurrent.CopyOnWriteArrayList

@RabbitListener
class ProductListener {

    CopyOnWriteArrayList<Integer> messageLengths = []

    @Queue("product") // (1)
    void receive(byte[] data) {
        messageLengths << data.length
    }
}
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener
import java.util.Collections

@RabbitListener
class ProductListener {

    val messageLengths: MutableList<Int> = Collections.synchronizedList(ArrayList())

    @Queue("product") // (1)
    fun receive(data: ByteArray) {
        messageLengths.add(data.size)
    }
}
1 The queue annotation is set per method. Multiple methods may be defined with different queues in the same class.

Other Options

If multiple RabbitMQ servers have been configured, the name of the server can be set in the @Queue annotation to designate which connection should be used to listen for messages.

import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitListener;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@RabbitListener
public class ProductListener {

    List<String> messageLengths = Collections.synchronizedList(new ArrayList<>());

    @Queue(value = "product", connection = "product-cluster") // (1)
    public void receive(byte[] data) {
        messageLengths.add(new String(data));
        System.out.println("Java received " + data.length + " bytes from RabbitMQ");
    }
}
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener

@RabbitListener
class ProductListener {

    List<String> messageLengths = Collections.synchronizedList([])

    @Queue(value = "product", connection = "product-cluster") // (1)
    void receive(byte[] data) {
        messageLengths << new String(data)
    }
}
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener
import java.util.Collections

@RabbitListener
class ProductListener {

    internal var messageLengths: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Queue(value = "product", connection = "product-cluster") // (1)
    fun receive(data: ByteArray) {
        messageLengths.add(String(data))
        println("Java received " + data.size + " bytes from RabbitMQ")
    }
}
1 The connection is set on the queue annotation.
The connection option is also available to be set on the @RabbitListener annotation.

By default all consumers are executed on the same "consumer" thread pool. If for some reason one or more consumers should be executed on a different thread pool, it can be specified on the @Queue annotation.

import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitListener;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@RabbitListener
public class ProductListener {

    List<String> messageLengths = Collections.synchronizedList(new ArrayList<>());

    @Queue(value = "product", executor = "product-listener") // (1)
    public void receive(byte[] data) {
        messageLengths.add(new String(data));
        System.out.println("Java received " + data.length + " bytes from RabbitMQ");
    }
}
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener

import java.util.concurrent.CopyOnWriteArrayList

@RabbitListener
class ProductListener {

    CopyOnWriteArrayList<String> messageLengths = []

    @Queue(value = "product", executor = "product-listener") // (1)
    void receive(byte[] data) {
        messageLengths << new String(data)
    }
}
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener
import java.util.Collections

@RabbitListener
class ProductListener {

    internal var messageLengths: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Queue(value = "product", executor = "product-listener") // (1)
    fun receive(data: ByteArray) {
        messageLengths.add(String(data))
        println("Kotlin received " + data.size + " bytes from RabbitMQ")
    }
}
1 The executor is set on the queue annotation.

Micronaut will look for an ExecutorService bean with a named qualifier that matches the name set in the annotation. The bean can be provided manually or created automatically through configuration of an ExecutorConfiguration.

For example:

Configuring the product-listener thread pool
micronaut:
    executors:
        product-listener:
            type: fixed
            nThreads: 25
Due to the way the RabbitMQ Java client works, the initial callback for all consumers is still the thread pool configured in the connection (by default "consumer"), however the work is immediately shifted to the requested thread pool. The executor option is also available to be set on the @RabbitListener annotation.
The @Queue annotation supports additional options for consuming messages including declaring the consumer as exclusive, whether to re-queue rejected messages, or set an unacknowledged message limit.

7.1.1.2 Properties

The arguments passed to basicConsume can be configured through the @RabbitProperty annotation.

In addition, any method parameters can be annotated to bind to properties from the BasicProperties received with the message.

import io.micronaut.core.annotation.Nullable;
import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitListener;
import io.micronaut.rabbitmq.annotation.RabbitProperty;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@RabbitListener
public class ProductListener {

    List<String> messageProperties = Collections.synchronizedList(new ArrayList<>());

    @Queue("product")
    @RabbitProperty(name = "x-priority", value = "10", type = Integer.class) // (1)
    public void receive(byte[] data,
                        @RabbitProperty("userId") String user,  // (2)
                        @Nullable @RabbitProperty String contentType,  // (3)
                        String appId) {  // (4)
        messageProperties.add(user + "|" + contentType + "|" + appId);
    }
}
import io.micronaut.core.annotation.Nullable
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener
import io.micronaut.rabbitmq.annotation.RabbitProperty

import java.util.concurrent.CopyOnWriteArrayList

@RabbitListener
class ProductListener {

    CopyOnWriteArrayList<String> messageProperties = []

    @Queue("product")
    @RabbitProperty(name = "x-priority", value = "10", type = Integer) // (1)
    void receive(byte[] data,
                 @RabbitProperty("userId") String user, // (2)
                 @Nullable @RabbitProperty String contentType, // (3)
                 String appId) { // (4)
        messageProperties << user + "|" + contentType + "|" + appId
    }
}
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener
import io.micronaut.rabbitmq.annotation.RabbitProperty
import java.util.Collections

@RabbitListener
class ProductListener {

    val messageProperties: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Queue("product")
    @RabbitProperty(name = "x-priority", value = "10", type = Integer::class) // (1)
    fun receive(data: ByteArray,
                @RabbitProperty("userId") user: String, // (2)
                @RabbitProperty contentType: String?, // (3)
                appId: String) { // (4)
        messageProperties.add("$user|$contentType|$appId")
    }
}
1 The property is sent as an argument to the Java client consume method. Properties could also be defined on the class level to apply to all consumers in the class. Note the type is required here if RabbitMQ expects something other than a String.
2 The argument is bound from the userId property.
3 The property name to bind from is inferred from the argument name. This argument allows null values.
4 If the argument name matches one of the defined property names, it will be bound from that property.
If the annotation or argument name cannot be matched to a property name, an exception will be thrown. If the supplied type cannot be converted from the type defined in BasicProperties, an exception will be thrown.

7.1.1.3 Headers

Headers can be retrieved with the @MessageHeader annotation applied to the arguments of the method.

import io.micronaut.core.annotation.Nullable;
import io.micronaut.messaging.annotation.MessageHeader;
import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitHeaders;
import io.micronaut.rabbitmq.annotation.RabbitListener;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

@RabbitListener
public class ProductListener {

    List<String> messageProperties = Collections.synchronizedList(new ArrayList<>());

    @Queue("product")
    public void receive(byte[] data,
                        @MessageHeader("x-product-sealed") Boolean sealed, // (1)
                        @MessageHeader("x-product-count") Long count, // (2)
                        @Nullable @MessageHeader String productSize) { // (3)
        messageProperties.add(sealed + "|" + count + "|" + productSize);
    }

    @Queue("product")
    public void receive(byte[] data,
                        @RabbitHeaders Map<String, Object> headers) { // (4)
        Object productSize = headers.get("productSize");
        messageProperties.add(
                headers.get("x-product-sealed").toString() + "|" +
                headers.get("x-product-count").toString() + "|" +
                (productSize != null ? productSize.toString() : null));
    }
}
import io.micronaut.core.annotation.Nullable
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitHeaders
import io.micronaut.rabbitmq.annotation.RabbitListener

import java.util.concurrent.CopyOnWriteArrayList

@RabbitListener
class ProductListener {

    CopyOnWriteArrayList<String> messageProperties = []

    @Queue("product")
    void receive(byte[] data,
                 @MessageHeader("x-product-sealed") Boolean productSealed, // (1)
                 @MessageHeader("x-product-count") Long count, // (2)
                 @Nullable @MessageHeader String productSize) { // (3)
        messageProperties << productSealed.toString() + "|" + count + "|" + productSize
    }

    @Queue("product")
    void receive(byte[] data,
                 @RabbitHeaders Map<String, Object> headers) { // (4)
        messageProperties <<
                headers["x-product-sealed"].toString() + "|" +
                headers["x-product-count"] + "|" +
                headers["productSize"]?.toString()
    }
}
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitHeaders
import io.micronaut.rabbitmq.annotation.RabbitListener
import java.util.Collections

@RabbitListener
class ProductListener {

    var messageProperties: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Queue("product")
    fun receive(data: ByteArray,
                @MessageHeader("x-product-sealed") sealed: Boolean, // (1)
                @MessageHeader("x-product-count") count: Long, // (2)
                @MessageHeader productSize: String?) { // (3)
        messageProperties.add(sealed.toString() + "|" + count + "|" + productSize)
    }

    @Queue("product")
    fun receive(data: ByteArray,
                @RabbitHeaders headers: Map<String, Any>) { // (4)
        messageProperties.add(
            headers["x-product-sealed"].toString() + "|" +
            headers["x-product-count"].toString() + "|" +
            headers["productSize"]?.toString()
        )
    }
}
1 The header name comes from the annotation and the value is retrieved and converted to a Boolean.
2 The header name comes from the annotation and the value is retrieved and converted to a Long.
3 The header name comes from the argument name. This argument allows null values.
4 All headers can be bound to a single Map argument with @RabbitHeaders.

7.1.1.4 RabbitMQ Types

Arguments can also be bound based on their type. Several types are supported by default and each type has a corresponding RabbitTypeArgumentBinder. The argument binders are covered in detail in the section on Custom Parameter Binding.

There are only two types that are supported for retrieving data about the message. They are the Envelope and BasicProperties.

import com.rabbitmq.client.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitListener;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@RabbitListener
public class ProductListener {

    List<String> messages = Collections.synchronizedList(new ArrayList<>());

    @Queue("product")
    public void receive(byte[] data,
                        Envelope envelope, // (1)
                        BasicProperties basicProperties, // (2)
                        Channel channel) { // (3)
        messages.add(String.format("exchange: [%s], routingKey: [%s], contentType: [%s]",
                envelope.getExchange(), envelope.getRoutingKey(), basicProperties.getContentType()));
    }
}
import com.rabbitmq.client.BasicProperties
import com.rabbitmq.client.Channel
import com.rabbitmq.client.Envelope
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener

import java.util.concurrent.CopyOnWriteArrayList

@RabbitListener
class ProductListener {

    CopyOnWriteArrayList<String> messages = []

    @Queue("product")
    void receive(byte[] data,
                 Envelope envelope, // (1)
                 BasicProperties basicProperties, // (2)
                 Channel channel) { // (3)
        messages << "exchange: [$envelope.exchange], routingKey: [$envelope.routingKey], contentType: [$basicProperties.contentType]".toString()
    }
}
import com.rabbitmq.client.BasicProperties
import com.rabbitmq.client.Channel
import com.rabbitmq.client.Envelope
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener
import java.util.Collections

@RabbitListener
class ProductListener {

    val messages: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Queue("product")
    fun receive(data: ByteArray,
                envelope: Envelope, // (1)
                basicProperties: BasicProperties, // (2)
                channel: Channel) { // (3)
        messages.add("exchange: [${envelope.exchange}], routingKey: [${envelope.routingKey}], contentType: [${basicProperties.contentType}]")
    }
}
1 The argument is bound from the Envelope
2 The argument is bound from the BasicProperties

7.1.1.5 Message Body

Most examples up to this point have been using a byte[] as the body type for simplicity. This library supports most standard Java types and JSON deserialization (using Jackson) by default. The functionality is extensible and it is possible to add support for additional types and deserialization strategies. See the section on Message Serialization/Deserialization for more information.

7.1.1.6 Custom Parameter Binding

Default Binding Functionality

Consumer argument binding is achieved through an ArgumentBinderRegistry that is specific for binding consumers from RabbitMQ messages. The class responsible for this is the RabbitBinderRegistry.

The registry supports argument binders that are used based on an annotation applied to the argument or the argument type. All argument binders must implement either RabbitAnnotatedArgumentBinder or RabbitTypeArgumentBinder. The exception to that rule is the RabbitDefaultBinder which is used when no other binders support a given argument.

When an argument needs bound, the RabbitConsumerState is used as the source of all of the available data. The binder registry follows a small sequence of steps to attempt to find a binder that supports the argument.

  1. Search the annotation based binders for one that matches any annotation on the argument that is annotated with @Bindable.

  2. Search the type based binders for one that matches or is a subclass of the argument type.

  3. Return the default binder.

The default binder checks if the argument name matches one of the BasicProperties. If the name does not match, the body of the message is bound to the argument.

Custom Binding

To inject your own argument binding behavior, it is as simple as registering a bean. The existing binder registry will inject it and include it in the normal processing.

Annotation Binding

A custom annotation can be created to bind consumer arguments. A custom binder can then be created to use that annotation and the RabbitConsumerState to supply a value for the argument. The value may in fact come from anywhere, however for the purposes of this documentation, the delivery tag in the envelope is used.

import io.micronaut.core.bind.annotation.Bindable;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
@Bindable // (1)
public @interface DeliveryTag {
}
import io.micronaut.core.bind.annotation.Bindable

import java.lang.annotation.Documented
import java.lang.annotation.ElementType
import java.lang.annotation.Retention
import java.lang.annotation.RetentionPolicy
import java.lang.annotation.Target

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target([ElementType.PARAMETER])
@Bindable // (1)
@interface DeliveryTag {
}
import io.micronaut.core.bind.annotation.Bindable

@MustBeDocumented
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.VALUE_PARAMETER)
@Bindable // (1)
annotation class DeliveryTag
1 The @Bindable annotation is required for the annotation to be considered for binding.
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.rabbitmq.bind.RabbitAnnotatedArgumentBinder;
import io.micronaut.rabbitmq.bind.RabbitConsumerState;
import jakarta.inject.Singleton;

@Singleton // (1)
public class DeliveryTagAnnotationBinder implements RabbitAnnotatedArgumentBinder<DeliveryTag> { // (2)

    private final ConversionService conversionService;

    public DeliveryTagAnnotationBinder(ConversionService conversionService) { // (3)
        this.conversionService = conversionService;
    }

    @Override
    public Class<DeliveryTag> getAnnotationType() {
        return DeliveryTag.class;
    }

    @Override
    public BindingResult<Object> bind(ArgumentConversionContext<Object> context, RabbitConsumerState source) {
        Long deliveryTag = source.getEnvelope().getDeliveryTag(); // (4)
        return () -> conversionService.convert(deliveryTag, context); // (5)
    }
}
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import io.micronaut.rabbitmq.bind.RabbitAnnotatedArgumentBinder
import io.micronaut.rabbitmq.bind.RabbitConsumerState
import jakarta.inject.Singleton

@Singleton // (1)
class DeliveryTagAnnotationBinder implements RabbitAnnotatedArgumentBinder<DeliveryTag> { // (2)

    private final ConversionService conversionService

    DeliveryTagAnnotationBinder(ConversionService conversionService) { // (3)
        this.conversionService = conversionService
    }

    @Override
    Class<DeliveryTag> getAnnotationType() {
        DeliveryTag
    }

    @Override
    BindingResult<Object> bind(ArgumentConversionContext<Object> context, RabbitConsumerState source) {
        long deliveryTag = source.envelope.deliveryTag // (4)
        return { -> conversionService.convert(deliveryTag, context) } // (5)
    }
}
import io.micronaut.core.bind.ArgumentBinder
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import io.micronaut.rabbitmq.bind.RabbitAnnotatedArgumentBinder
import io.micronaut.rabbitmq.bind.RabbitConsumerState
import jakarta.inject.Singleton

@Singleton // (1)
class DeliveryTagAnnotationBinder(private val conversionService: ConversionService)// (3)
    : RabbitAnnotatedArgumentBinder<DeliveryTag> { // (2)

    override fun getAnnotationType(): Class<DeliveryTag> {
        return DeliveryTag::class.java
    }

    override fun bind(context: ArgumentConversionContext<Any>, source: RabbitConsumerState): ArgumentBinder.BindingResult<Any> {
        val deliveryTag = source.envelope.deliveryTag // (4)
        return ArgumentBinder.BindingResult { conversionService.convert(deliveryTag, context) } // (5)
    }
}
1 The class is made a bean by annotating with @Singleton.
2 The custom annotation is used as the generic type for the interface.
3 The conversion service is injected into the instance.
4 The delivery tag is retrieved from the message state.
5 The tag is converted to the argument type. For example this allows the argument to be a String even though the delivery tag is a Long.

The annotation can now be used on the argument in a consumer method.

import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitListener;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

@RabbitListener
public class ProductListener {

    Set<Long> messages = Collections.synchronizedSet(new HashSet<>());

    @Queue("product")
    public void receive(byte[] data, @DeliveryTag Long tag) { // (1)
        messages.add(tag);
    }
}
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener

import java.util.concurrent.CopyOnWriteArraySet

@RabbitListener
class ProductListener {

    CopyOnWriteArraySet<Long> messages = []

    @Queue("product")
    void receive(byte[] data, @DeliveryTag Long tag) { // (1)
        messages << tag
    }
}
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener
import java.util.Collections

@RabbitListener
class ProductListener {

    val messages: MutableSet<Long> = Collections.synchronizedSet(HashSet())

    @Queue("product")
    fun receive(data: ByteArray, @DeliveryTag tag: Long) { // (1)
        messages.add(tag)
    }
}

Type Binding

A custom binder can be created to support any argument type. For example the following class could be created to bind values from the headers. This functionality could allow the work of retrieving and converting the headers to occur in a single place instead of multiple times in your code.

import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;

public class ProductInfo {

    private String size;
    private Long count;
    private Boolean sealed;

    public ProductInfo(@Nullable String size, // (1)
                       @NonNull Long count, // (2)
                       @NonNull Boolean sealed) { // (3)
        this.size = size;
        this.count = count;
        this.sealed = sealed;
    }

    public String getSize() {
        return size;
    }

    public Long getCount() {
        return count;
    }

    public Boolean getSealed() {
        return sealed;
    }
}
import io.micronaut.core.annotation.NonNull
import io.micronaut.core.annotation.Nullable

class ProductInfo {

    private String size
    private Long count
    private Boolean productSealed

    ProductInfo(@Nullable String size, // (1)
                @NonNull Long count, // (2)
                @NonNull Boolean productSealed) { // (3)
        this.size = size
        this.count = count
        this.productSealed = productSealed
    }

    String getSize() {
        size
    }

    Long getCount() {
        count
    }

    Boolean getSealed() {
        productSealed
    }
}
class ProductInfo(val size: String?, // (1)
                  val count: Long, // (2)
                  val sealed: Boolean)// (3)
1 The size argument is not required
2 The count argument is required
3 The sealed argument is required

A type argument binder can then be created to create the ProductInfo instance to bind to your consumer method argument.

import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionError;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.rabbitmq.bind.RabbitConsumerState;
import io.micronaut.rabbitmq.bind.RabbitHeaderConvertibleValues;
import io.micronaut.rabbitmq.bind.RabbitTypeArgumentBinder;
import jakarta.inject.Singleton;

import java.util.List;
import java.util.Map;
import java.util.Optional;

@Singleton // (1)
public class ProductInfoTypeBinder implements RabbitTypeArgumentBinder<ProductInfo> { //(2)

    private final ConversionService conversionService;

    ProductInfoTypeBinder(ConversionService conversionService) { //(3)
        this.conversionService = conversionService;
    }

    @Override
    public Argument<ProductInfo> argumentType() {
        return Argument.of(ProductInfo.class);
    }

    @Override
    public BindingResult<ProductInfo> bind(ArgumentConversionContext<ProductInfo> context, RabbitConsumerState source) {
        Map<String, Object> rawHeaders = source.getProperties().getHeaders(); //(4)

        if (rawHeaders == null) {
            return BindingResult.EMPTY;
        }

        RabbitHeaderConvertibleValues headers = new RabbitHeaderConvertibleValues(rawHeaders, conversionService);

        String size = headers.get("productSize", String.class).orElse(null);  //(5)
        Optional<Long> count = headers.get("x-product-count", Long.class); //(6)
        Optional<Boolean> sealed = headers.get("x-product-sealed", Boolean.class); // (7)

        if (headers.getConversionErrors().isEmpty() && count.isPresent() && sealed.isPresent()) {
            return () -> Optional.of(new ProductInfo(size, count.get(), sealed.get())); //(8)
        } else {
            return new BindingResult<ProductInfo>() {
                @Override
                public Optional<ProductInfo> getValue() {
                    return Optional.empty();
                }

                @Override
                public List<ConversionError> getConversionErrors() {
                    return headers.getConversionErrors(); //(9)
                }
            };
        }
    }
}
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionError
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.rabbitmq.bind.RabbitConsumerState
import io.micronaut.rabbitmq.bind.RabbitHeaderConvertibleValues
import io.micronaut.rabbitmq.bind.RabbitTypeArgumentBinder
import jakarta.inject.Singleton

@Singleton // (1)
class ProductInfoTypeBinder implements RabbitTypeArgumentBinder<ProductInfo> { //(2)

    private final ConversionService conversionService

    ProductInfoTypeBinder(ConversionService conversionService) { //(3)
        this.conversionService = conversionService
    }

    @Override
    Argument<ProductInfo> argumentType() {
        return Argument.of(ProductInfo)
    }

    @Override
    BindingResult<ProductInfo> bind(ArgumentConversionContext<ProductInfo> context, RabbitConsumerState source) {
        Map<String, Object> rawHeaders = source.properties.headers //(4)

        if (rawHeaders == null) {
            return BindingResult.EMPTY
        }

        def headers = new RabbitHeaderConvertibleValues(rawHeaders, conversionService)

        String size = headers.get("productSize", String).orElse(null)  //(5)
        Optional<Long> count = headers.get("x-product-count", Long) //(6)
        Optional<Boolean> productSealed = headers.get("x-product-sealed", Boolean) // (7)

        if (headers.conversionErrors.isEmpty() && count.isPresent() && productSealed.isPresent()) {
            { -> Optional.of(new ProductInfo(size, count.get(), productSealed.get())) } //(8)
        } else {
            new BindingResult<ProductInfo>() {
                @Override
                Optional<ProductInfo> getValue() {
                    Optional.empty()
                }

                @Override
                List<ConversionError> getConversionErrors() {
                    headers.conversionErrors //(9)
                }
            }
        }
    }
}
import io.micronaut.core.bind.ArgumentBinder.BindingResult
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionError
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.rabbitmq.bind.RabbitConsumerState
import io.micronaut.rabbitmq.bind.RabbitHeaderConvertibleValues
import io.micronaut.rabbitmq.bind.RabbitTypeArgumentBinder
import jakarta.inject.Singleton
import java.util.Optional

@Singleton // (1)
class ProductInfoTypeBinder constructor(private val conversionService: ConversionService) //(3)
    : RabbitTypeArgumentBinder<ProductInfo> { // (2)

    override fun argumentType(): Argument<ProductInfo> {
        return Argument.of(ProductInfo::class.java)
    }

    override fun bind(context: ArgumentConversionContext<ProductInfo>, source: RabbitConsumerState): BindingResult<ProductInfo> {
        val rawHeaders = source.properties.headers ?: return BindingResult { Optional.empty<ProductInfo>() } //(4)

        val headers = RabbitHeaderConvertibleValues(rawHeaders, conversionService)

        val size = headers.get("productSize", String::class.java).orElse(null)  //(5)
        val count = headers.get("x-product-count", Long::class.java) //(6)
        val sealed = headers.get("x-product-sealed", Boolean::class.java) // (7)

        if (headers.conversionErrors.isEmpty() && count.isPresent && sealed.isPresent) {
            return BindingResult<ProductInfo> { Optional.of(ProductInfo(size, count.get(), sealed.get())) } //(8)
        } else {
            return object : BindingResult<ProductInfo> {
                override fun getValue(): Optional<ProductInfo> {
                    return Optional.empty()
                }

                override fun getConversionErrors(): List<ConversionError> {
                    return headers.conversionErrors //(9)
                }
            }
        }
    }
}
1 The class is made a bean by annotating with @Singleton.
2 The custom type is used as the generic type for the interface.
3 The conversion service is injected into the instance.
4 The headers are retrieved from the message state.
5 The productSize header is retrieved, defaulting to null if the value was not found or could not be converted.
6 The x-product-count header is retrieved and converted with a new argument context that is used to retrieve conversion errors later.
7 The x-product-sealed header is retrieved and converted with a new argument context that is used to retrieve conversion errors later.
8 There are no conversion errors and the two required arguments are present, so the instance can be constructed.
9 There are conversion errors or one of the required arguments is not present so a custom BindingResult is returned that allows the conversion errors to be handled appropriately.

7.1.2 Acknowledging Messages

There are three ways a message can be acknowledged, rejected, or not acknowledged.

  1. For methods that accept an argument of type RabbitAcknowledgement, the message will only be acknowledged when the respective methods on that class are executed.

  2. For methods that return any other type, including void, the message will be acknowledged if the method does not throw an exception. If an exception is thrown, the message will be rejected.

  3. In addition, for methods that have @Queue autoAcknowledgment option enabled, the message will be acknowledged once delivered.

Acknowledgement Type

import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitListener;
import io.micronaut.rabbitmq.bind.RabbitAcknowledgement;

import java.util.concurrent.atomic.AtomicInteger;

@RabbitListener
public class ProductListener {

    AtomicInteger messageCount = new AtomicInteger();

    @Queue(value = "product") // (1)
    public void receive(byte[] data, RabbitAcknowledgement acknowledgement) { // (2)
        int count = messageCount.getAndUpdate((intValue) -> ++intValue);
        if (count  == 0) {
            acknowledgement.nack(false, true); // (3)
        } else if (count > 3) {
            acknowledgement.ack(true); // (4)
        }
    }
}
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener
import io.micronaut.rabbitmq.bind.RabbitAcknowledgement

import java.util.concurrent.atomic.AtomicInteger

@RabbitListener
class ProductListener {

    AtomicInteger messageCount = new AtomicInteger()

    @Queue(value = "product") // (1)
    void receive(byte[] data, RabbitAcknowledgement acknowledgement) { // (2)
        int count = messageCount.getAndUpdate({ intValue -> ++intValue })
        println new String(data)
        if (count  == 0) {
            acknowledgement.nack(false, true) // (3)
        } else if (count > 3) {
            acknowledgement.ack(true) // (4)
        }
    }
}
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener
import io.micronaut.rabbitmq.bind.RabbitAcknowledgement
import java.util.concurrent.atomic.AtomicInteger

@RabbitListener
class ProductListener {

    val messageCount = AtomicInteger()

    @Queue(value = "product") // (1)
    fun receive(data: ByteArray, acknowledgement: RabbitAcknowledgement) { // (2)
        val count = messageCount.getAndUpdate { intValue -> intValue + 1 }
        if (count == 0) {
            acknowledgement.nack(false, true) // (3)
        } else if (count > 3) {
            acknowledgement.ack(true) // (4)
        }
    }
}
1 The reQueue and autoAcknowledgment options are no longer considered when the method has a RabbitAcknowledgement argument.
2 The acknowledgement argument is injected into the method. That signifies that this library is no longer in control of acknowledgement in any way for this consumer.
3 The first message is rejected and re-queued.
4 The second and third messages are not acknowledged. The fourth message that is received is acknowledged along with the second and third messages because the multiple argument is true.

7.2 Handling Consumer Exceptions

Exceptions can occur in a number of different ways. Possible problem areas include:

  • Binding the message to the method arguments

  • Exceptions thrown from the consumer methods

  • Exceptions as a result of message acknowledgement

  • Exceptions thrown attempting to add the consumer to the channel

If the consumer bean implements RabbitListenerExceptionHandler, then exceptions will be sent to the method implementation.

If the consumer bean does not implement RabbitListenerExceptionHandler, then the exceptions will be routed to the primary exception handler bean. To override the default exception handler, replace the DefaultRabbitListenerExceptionHandler with your own implementation that is designated as @Primary.

7.3 Consumer Execution

RabbitMQ allows an ExecutorService to be supplied for new connections. The service is used to execute consumers. A single connection is used for the entire application and it is configured to use the consumer named executor service. The executor can be configured through application configuration. See ExecutorConfiguration for the full list of options.

For example:

Configuring the consumer thread pool
micronaut:
    executors:
        consumer:
            type: fixed
            nThreads: 25

If no configuration is supplied, a fixed thread pool with 2 times the amount of available processors is used.

Concurrent Consumers

By default a single consumer may not process multiple messages simultaneously. RabbitMQ waits to provide the consumer with a message until the previous message has been acknowledged. Starting in version 3.0.0, a new option has been added to the @Queue annotation to set the number of consumers that should be registered to RabbitMQ for a single consumer method. This will allow for concurrent execution of the consumers.

import io.micronaut.context.annotation.Requires;
import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitListener;

import java.util.concurrent.CopyOnWriteArraySet;

@RabbitListener
public class ProductListener {

    CopyOnWriteArraySet<String> threads = new CopyOnWriteArraySet<>();

    @Queue(value = "product", numberOfConsumers = "5") // (1)
    public void receive(byte[] data) {
        threads.add(Thread.currentThread().getName()); // (2)
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) { }
    }
}
import io.micronaut.context.annotation.Requires
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener

import java.util.concurrent.CopyOnWriteArraySet

@RabbitListener
class ProductListener {

    CopyOnWriteArraySet<String> threads = new CopyOnWriteArraySet<>()

    @Queue(value = "product", numberOfConsumers = "5") // (1)
    void receive(byte[] data) {
        threads << Thread.currentThread().name // (2)
        sleep 500
    }
}
import io.micronaut.context.annotation.Requires
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener

import java.util.concurrent.CopyOnWriteArraySet

@RabbitListener
class ProductListener {
    var threads = CopyOnWriteArraySet<String>()

    @Queue(value = "product", numberOfConsumers = "5") // (1)
    fun receive(data: ByteArray?) {
        threads.add(Thread.currentThread().name) // (2)
        Thread.sleep(500)
    }
}
1 The numberOfConsumers is set on the annotation
2 Multiple messages received in a short time window will result in the threads collection containing something like [pool-2-thread-9, pool-2-thread-7, pool-2-thread-10, pool-2-thread-8, pool-2-thread-6]
As with any other case of concurrent execution, operations on data in the instance of the consumer must be thread safe.

7.4 Consumer Events

Micronaut sends application events whenever a RabbitMQ consumer subscribes to a queue.

These events contain the following information:

  • source: The bean annotated as @RabbitListener.

  • method: The consumer method.

  • queue: The name of the queue the consumer subscribes to.

Handling RabbitConsumerStarting events

To take any actions right before consumers try to subscribe to a queue, listen to application event RabbitConsumerStarting:

import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.rabbitmq.event.RabbitConsumerStarting;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class MyStartingEventListener implements ApplicationEventListener<RabbitConsumerStarting> {
  private static final Logger LOG = LoggerFactory.getLogger(MyStartingEventListener.class);
  @Override
  public void onApplicationEvent(RabbitConsumerStarting event) {
    LOG.info("RabbitMQ consumer: {} (method: {}) is subscribing to: {}",
      event.getSource(), event.getMethod(), event.getQueue());
  }
}
import io.micronaut.context.event.ApplicationEventListener
import io.micronaut.rabbitmq.event.RabbitConsumerStarting
import jakarta.inject.Singleton
import org.slf4j.Logger
import org.slf4j.LoggerFactory

@Singleton
class MyStartingEventListener implements ApplicationEventListener<RabbitConsumerStarting> {
  private static final Logger LOG = LoggerFactory.getLogger(MyStartingEventListener.class)
  @Override
  void onApplicationEvent(RabbitConsumerStarting event) {
    LOG.info("RabbitMQ consumer: {} (method: {}) is subscribing to: {}",
      event.source, event.method, event.queue);
  }
}
import io.micronaut.context.event.ApplicationEventListener
import io.micronaut.rabbitmq.event.RabbitConsumerStarting
import jakarta.inject.Singleton
import org.slf4j.LoggerFactory

@Singleton
class MyStartingEventListener : ApplicationEventListener<RabbitConsumerStarting> {
  private val LOG = LoggerFactory.getLogger(javaClass)
  override fun onApplicationEvent(event: RabbitConsumerStarting) {
    LOG.info("RabbitMQ consumer: {} (method: {}) is subscribing to: {}",
      event.source, event.method, event.queue)
  }
}
Handling RabbitConsumerStarted events

To take any actions right after consumers are subscribed to a queue, listen to application event RabbitConsumerStarted:

import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.rabbitmq.event.RabbitConsumerStarted;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class MyStartedEventListener implements ApplicationEventListener<RabbitConsumerStarted> {
  private static final Logger LOG = LoggerFactory.getLogger(MyStartingEventListener.class);
  @Override
  public void onApplicationEvent(RabbitConsumerStarted event) {
    LOG.info("RabbitMQ consumer: {} (method: {}) just subscribed to: {}",
      event.getSource(), event.getMethod(), event.getQueue());
  }
}
import io.micronaut.context.event.ApplicationEventListener
import io.micronaut.rabbitmq.event.RabbitConsumerStarted
import jakarta.inject.Singleton
import org.slf4j.Logger
import org.slf4j.LoggerFactory

@Singleton
class MyStartedEventListener implements ApplicationEventListener<RabbitConsumerStarted> {
  private static final Logger LOG = LoggerFactory.getLogger(MyStartingEventListener.class)
  @Override
  void onApplicationEvent(RabbitConsumerStarted event) {
    LOG.info("RabbitMQ consumer: {} (method: {}) just subscribed to: {}",
      event.source, event.method, event.queue);
  }
}
import io.micronaut.context.event.ApplicationEventListener
import io.micronaut.rabbitmq.event.RabbitConsumerStarted
import jakarta.inject.Singleton
import org.slf4j.LoggerFactory

@Singleton
class MyStartedEventListener : ApplicationEventListener<RabbitConsumerStarted> {
  private val LOG = LoggerFactory.getLogger(javaClass)
  override fun onApplicationEvent(event: RabbitConsumerStarted) {
    LOG.info("RabbitMQ consumer: {} (method: {}) just subscribed to: {}",
      event.source, event.method, event.queue)
  }
}

8 Directly Reply-To (RPC)

This library supports RPC through the usage of direct reply-to. Both blocking and non blocking variations are supported. To get started using this feature, publishing methods must have the replyTo property set to "amq.rabbitmq.reply-to". The "amq.rabbitmq.reply-to" queue always exists and does not need to be created.

The following is an example direct reply to where the consumer is converting the body to upper case and replying with the converted string.

8.1 Client Side

The "client side" in this case starts by publishing a message. A consumer somewhere will then receive the message and reply with a new value.

import io.micronaut.rabbitmq.annotation.Binding;
import io.micronaut.rabbitmq.annotation.RabbitClient;
import io.micronaut.rabbitmq.annotation.RabbitProperty;
import org.reactivestreams.Publisher;

@RabbitClient
@RabbitProperty(name = "replyTo", value = "amq.rabbitmq.reply-to") // (1)
public interface ProductClient {

    @Binding("product")
    String send(String data); // (2)

    @Binding("product")
    Publisher<String> sendReactive(String data); // (3)
}
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.RabbitClient
import io.micronaut.rabbitmq.annotation.RabbitProperty
import org.reactivestreams.Publisher

@RabbitClient
@RabbitProperty(name = "replyTo", value = "amq.rabbitmq.reply-to") // (1)
interface ProductClient {

    @Binding("product")
    String send(String data) // (2)

    @Binding("product")
    Publisher<String> sendReactive(String data) // (3)
}
import io.micronaut.rabbitmq.annotation.Binding
import io.micronaut.rabbitmq.annotation.RabbitClient
import io.micronaut.rabbitmq.annotation.RabbitProperty
import org.reactivestreams.Publisher

@RabbitClient
@RabbitProperty(name = "replyTo", value = "amq.rabbitmq.reply-to") // (1)
interface ProductClient {

    @Binding("product")
    fun send(data: String): String  // (2)

    @Binding("product")
    fun sendReactive(data: String): Publisher<String>  // (3)
}
1 The reply to property is set. This could be placed on individual methods.
2 The send method is blocking and will return when the response is received.
3 The sendReactive method returns a reactive type that will complete when the response is received. Reactive methods will be executed on the IO thread pool.
In order for the publisher to assume RPC should be used instead of just completing when the publish is confirmed, the data type must not be Void. In both cases above, the data type is String. In addition, the replyTo property must be set. Queues will not be auto created by specifying a value with replyTo. The "amq.rabbitmq.reply-to" queue is special and does not need creating.

8.2 Server Side

The "server side" in this case starts with the consumption of a message, and then a new message is published to the reply to queue.

import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitListener;

@RabbitListener
public class ProductListener {

    @Queue("product")
    public String toUpperCase(String data) { // (1)
        return data.toUpperCase(); // (2)
    }
}
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener

@RabbitListener
class ProductListener {

    @Queue("product")
    String toUpperCase(String data) { // (1)
        data.toUpperCase() // (2)
    }
}
import io.micronaut.rabbitmq.annotation.Queue
import io.micronaut.rabbitmq.annotation.RabbitListener

@RabbitListener
class ProductListener {

    @Queue("product")
    fun toUpperCase(data: String): String { // (1)
        return data.uppercase() // (2)
    }
}
1 The reply to property is injected. If the consumer will not always be participating in RPC this could be annotated with @Nullable to allow both use cases.
2 The channel is injected so it can be used. This could be replaced with a method call of another @RabbitClient.
3 The converted message is published to the replyTo binding.
If the reply publish fails for any reason, the original message will be rejected.
RPC consumer methods must never return a reactive type. Because the resulting publish needs to occur on the same thread and only a single item can be emitted, there is no value in doing so.

8.3 Configuration

By default, if an RPC call does not have a response within a given time frame, a TimeoutException will be thrown or emitted.

See the guide for RabbitMQ RPC and the Micronaut Framework to learn more.

9 Creating Queues/Exchanges

The purpose of this library is to consume and publish messages with RabbitMQ. Any setup of queues, exchanges, or the binding between them is not done automatically. If your requirements dictate that your application should be creating those entities, then a BeanCreatedEventListener can be registered to intercept the ChannelPool to perform operations with the Java API directly. A class has been provided that you can simply extend to receive a channel to do this work.

For all of the examples in this documentation, an event listener has been registered to create the required queues, exchanges, and bindings necessary for the code to be tested.

import com.rabbitmq.client.Channel;
import io.micronaut.rabbitmq.connect.ChannelInitializer;
import jakarta.inject.Singleton;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@Singleton // (1)
public class ChannelPoolListener extends ChannelInitializer { // (2)

    @Override
    public void initialize(Channel channel, String name) throws IOException { // (3)
        //docs/quickstart
        Map<String, Object> args = new HashMap<>();
        args.put("x-max-priority", 100);
        channel.queueDeclare("product", false, false, false, args); // (4)

        //docs/exchange
        channel.exchangeDeclare("animals", "headers", false);
        channel.queueDeclare("snakes", false, false, false, null);
        channel.queueDeclare("cats", false, false, false, null);
        Map<String, Object> catArgs = new HashMap<>();
        catArgs.put("x-match", "all");
        catArgs.put("animalType", "Cat");
        channel.queueBind("cats", "animals", "", catArgs);

        Map<String, Object> snakeArgs = new HashMap<>();
        snakeArgs.put("x-match", "all");
        snakeArgs.put("animalType", "Snake");
        channel.queueBind("snakes", "animals", "", snakeArgs);
    }

}
import com.rabbitmq.client.Channel
import io.micronaut.rabbitmq.connect.ChannelInitializer
import jakarta.inject.Singleton

@Singleton // (1)
class ChannelPoolListener extends ChannelInitializer { // (2)

    @Override
    void initialize(Channel channel, String name) throws IOException { // (3)
        channel.queueDeclare("product", false, false, false, ["x-max-priority": 100]) // (4)

        //docs/exchange
        channel.exchangeDeclare("animals", "headers", false)
        channel.queueDeclare("snakes", false, false, false, null)
        channel.queueDeclare("cats", false, false, false, null)
        Map<String, Object> catArgs = [:]
        catArgs.put("x-match", "all")
        catArgs.put("animalType", "Cat")
        channel.queueBind("cats", "animals", "", catArgs)

        Map<String, Object> snakeArgs = [:]
        snakeArgs.put("x-match", "all")
        snakeArgs.put("animalType", "Snake")
        channel.queueBind("snakes", "animals", "", snakeArgs)
    }
}
import com.rabbitmq.client.Channel
import io.micronaut.rabbitmq.connect.ChannelInitializer
import jakarta.inject.Singleton
import java.io.IOException

@Singleton // (1)
class ChannelPoolListener : ChannelInitializer() { // (2)

    @Throws(IOException::class)
    override fun initialize(channel: Channel, name: String) { // (3)
        channel.queueDeclare("product", false, false, false, mapOf("x-max-priority" to 100)) // (4)

        //docs/exchange
        channel.exchangeDeclare("animals", "headers", false)
        channel.queueDeclare("snakes", false, false, false, null)
        channel.queueDeclare("cats", false, false, false, null)
        val catArgs = HashMap<String, Any>()
        catArgs["x-match"] = "all"
        catArgs["animalType"] = "Cat"
        channel.queueBind("cats", "animals", "", catArgs)

        val snakeArgs = HashMap<String, Any>()
        snakeArgs["x-match"] = "all"
        snakeArgs["animalType"] = "Snake"
        channel.queueBind("snakes", "animals", "", snakeArgs)
    }

}
1 The class is declared as a singleton so it will be registered with the context
2 The class extends an abstract class provided by this library
3 The method is implemented that receives a channel and the connection name to do the initialization
4 Declare a queue or perform any operations desired

10 Message Serialization/Deserialization (SerDes)

The serialization and deserialization of message bodies is handled through instances of RabbitMessageSerDes. The ser-des (Serializer/Deserializer) is responsible for both serialization and deserialization of RabbitMQ message bodies into the message body types defined in your clients and consumers methods.

The ser-des are managed by a RabbitMessageSerDesRegistry. All ser-des beans are injected in order into the registry and then searched for when serialization or deserialization is needed. The first ser-des that returns true for supports-java.lang.Class- is returned and used.

By default, standard Java lang types and JSON format (with Jackson) are supported. You can supply your own ser-des by simply registering a bean of type RabbitMessageSerDes. All ser-des implement the Ordered interface, so custom implementations can come before, after, or in between the default implementations.

10.1 Custom SerDes

A custom serializer/deserializer would be necessary to support custom data formats. In the section on Custom Consumer Binding an example was demonstrated that allowed binding a ProductInfo type from the headers of the message. If instead that object should represent the body of the message with a custom data format, you could register your own serializer/deserializer to do so.

In this example a simple data format of the string representation of the fields are concatenated together with a pipe character.

import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.rabbitmq.bind.RabbitConsumerState;
import io.micronaut.rabbitmq.intercept.MutableBasicProperties;
import io.micronaut.rabbitmq.serdes.RabbitMessageSerDes;
import jakarta.inject.Singleton;

import java.nio.charset.Charset;
import java.util.Optional;

@Singleton // (1)
public class ProductInfoSerDes implements RabbitMessageSerDes<ProductInfo> { // (2)

    private static final Charset CHARSET = Charset.forName("UTF-8");

    private final ConversionService conversionService;

    public ProductInfoSerDes(ConversionService conversionService) { // (3)
        this.conversionService = conversionService;
    }

    @Override
    public ProductInfo deserialize(RabbitConsumerState consumerState, Argument<ProductInfo> argument) { // (4)
        String body = new String(consumerState.getBody(), CHARSET);
        String[] parts = body.split("\\|");
        if (parts.length == 3) {
            String size = parts[0];
            if (size.equals("null")) {
                size = null;
            }

            Optional<Long> count = conversionService.convert(parts[1], Long.class);
            Optional<Boolean> sealed = conversionService.convert(parts[2], Boolean.class);

            if (count.isPresent() && sealed.isPresent()) {
                return new ProductInfo(size, count.get(), sealed.get());
            }
        }
        return null;
    }

    @Override
    public byte[] serialize(ProductInfo data, MutableBasicProperties properties) { // (5)
        if (data == null) {
            return null;
        }
        return (data.getSize() + "|" + data.getCount() + "|" + data.getSealed()).getBytes(CHARSET);
    }

    @Override
    public boolean supports(Argument<ProductInfo> argument) { // (6)
        return argument.getType().isAssignableFrom(ProductInfo.class);
    }
}
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.rabbitmq.bind.RabbitConsumerState
import io.micronaut.rabbitmq.intercept.MutableBasicProperties
import io.micronaut.rabbitmq.serdes.RabbitMessageSerDes
import jakarta.inject.Singleton
import java.nio.charset.Charset

@Singleton // (1)
class ProductInfoSerDes implements RabbitMessageSerDes<ProductInfo> { // (2)

    private static final Charset UTF8 = Charset.forName("UTF-8")

    private final ConversionService conversionService

    ProductInfoSerDes(ConversionService conversionService) { // (3)
        this.conversionService = conversionService
    }

    @Override
    ProductInfo deserialize(RabbitConsumerState consumerState, Argument<ProductInfo> argument) { // (4)
        String body = new String(consumerState.body, UTF8)
        String[] parts = body.split("\\|")
        if (parts.length == 3) {
            String size = parts[0]
            if (size == "null") {
                size = null
            }

            Optional<Long> count = conversionService.convert(parts[1], Long)
            Optional<Boolean> productSealed = conversionService.convert(parts[2], Boolean)

            if (count.isPresent() && productSealed.isPresent()) {
                return new ProductInfo(size, count.get(), productSealed.get())
            }
        }
        null
    }

    @Override
    byte[] serialize(ProductInfo data, MutableBasicProperties properties) { // (5)
        if (data == null) {
            return null
        }
        (data.size + "|" + data.count + "|" + data.sealed).getBytes(UTF8)
    }

    @Override
    boolean supports(Argument<ProductInfo> argument) { // (6)
        argument.type.isAssignableFrom(ProductInfo)
    }
}
import io.micronaut.core.convert.ConversionService
import io.micronaut.core.type.Argument
import io.micronaut.rabbitmq.bind.RabbitConsumerState
import io.micronaut.rabbitmq.intercept.MutableBasicProperties
import io.micronaut.rabbitmq.serdes.RabbitMessageSerDes
import jakarta.inject.Singleton
import java.nio.charset.Charset

@Singleton // (1)
class ProductInfoSerDes(private val conversionService: ConversionService)// (3)
    : RabbitMessageSerDes<ProductInfo> { // (2)

    override fun deserialize(consumerState: RabbitConsumerState, argument: Argument<ProductInfo>): ProductInfo? { // (4)
        val body = String(consumerState.body, CHARSET)
        val parts = body.split("\\|".toRegex())
        if (parts.size == 3) {
            var size: String? = parts[0]
            if (size == "null") {
                size = null
            }

            val count = conversionService.convert(parts[1], Long::class.java)
            val sealed = conversionService.convert(parts[2], Boolean::class.java)

            if (count.isPresent && sealed.isPresent) {
                return ProductInfo(size, count.get(), sealed.get())
            }
        }
        return null
    }

    override fun serialize(data: ProductInfo?, properties: MutableBasicProperties): ByteArray { // (5)
        return (data?.size + "|" + data?.count + "|" + data?.sealed).toByteArray(CHARSET)
    }

    override fun supports(argument: Argument<ProductInfo>): Boolean { // (6)
        return argument.type.isAssignableFrom(ProductInfo::class.java)
    }

    companion object {
        private val CHARSET = Charset.forName("UTF-8")
    }
}
1 The class is declared as a singleton so it will be registered with the context
2 The generic specifies what type we want to accept and return
3 The conversion service is injected to convert the parts of the message to the required types
4 The deserialize method takes the bytes from the message and constructs a ProductInfo
5 The serialize method takes the ProductInfo and returns the bytes to publish. A mutable version of the properties is also provided so properties such as the content type can be set before publishing.
6 The supports method ensures only the correct body types are processed by this ser-des
Because the getOrder method was not overridden, the default order of 0 is used. All default ser-des have a lower precedent than the default order which means this ser-des will be checked before the others.

11 RabbitMQ Health Indicator

This library comes with a health indicator for applications that are using the management module in Micronaut. See the Health Endpoint documentation for more information about the endpoint itself.

The information reported from the health indicator is under the rabbitmq key. The details will include everything that is reported from Connection#getServerProperties. For example:

"rabbitmq": {
  "status": "UP",
  "details": {
    "cluster_name": "rabbit@a0378bc51148",
    "product": "RabbitMQ",
    "copyright": "Copyright (C) 2007-2018 Pivotal Software, Inc.",
    "capabilities": {
      "consumer_priorities": true,
      "exchange_exchange_bindings": true,
      "connection.blocked": true,
      "authentication_failure_close": true,
      "per_consumer_qos": true,
      "basic.nack": true,
      "direct_reply_to": true,
      "publisher_confirms": true,
      "consumer_cancel_notify": true
    },
    "information": "Licensed under the MPL.  See http:\/\/www.rabbitmq.com\/",
    "version": "3.7.8",
    "platform": "Erlang\/OTP 20.3.8.5"
  }
}
To disable the RabbitMQ health indicator entirely, add endpoints.health.rabbitmq.enabled: false.

12 RabbitMQ Metrics

The Java RabbitMQ client has built in support for Micrometer. If Micrometer is enabled in your application, metrics for RabbitMQ will be collected by default. For more details on how to integrate Micronaut with Micrometer, see the documentation.

The RabbitMQ metrics binder is configurable. For example:

micronaut:
    metrics:
        binders:
            rabbitmq:
                enabled: Boolean
                tags: String[]
                prefix: String

13 Guides

See the following list of guides to learn more about working with RabbitMQ in the Micronaut Framework:

14 Repository

You can find the source code of this project in this repository: