Micronaut JMS

Integration between Micronaut and Java Messaging Service (JMS)

Version:

1 Introduction

This project includes integration between Micronaut and Java Messaging Service (JMS).

Supported implementations include Active MQ "Classic", Active MQ Artemis, and AWS SQS, and configuring other JMS providers is straightforward.

2 Release History

For this project, you can find a list of releases (with release notes) here:

3 JMS Quick Start

To add support for JMS to a project, add a dependency to your build, depending on the JMS implementation(s) you require:

implementation("io.micronaut.jms:micronaut-jms-activemq-classic")
<dependency>
    <groupId>io.micronaut.jms</groupId>
    <artifactId>micronaut-jms-activemq-classic</artifactId>
</dependency>

implementation("io.micronaut.jms:micronaut-jms-activemq-artemis")
<dependency>
    <groupId>io.micronaut.jms</groupId>
    <artifactId>micronaut-jms-activemq-artemis</artifactId>
</dependency>

implementation("io.micronaut.jms:micronaut-jms-sqs")
<dependency>
    <groupId>io.micronaut.jms</groupId>
    <artifactId>micronaut-jms-sqs</artifactId>
</dependency>

Note that you can use multiple providers in a project.

Creating a JMS Producer with @JMSProducer

To create a JMS client that sends messages, define an interface annotated with JMSProducer, for example:

import io.micronaut.jms.annotations.JMSProducer;
import io.micronaut.jms.annotations.Queue;
import io.micronaut.messaging.annotation.MessageBody;

import static io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME;

@JMSProducer(CONNECTION_FACTORY_BEAN_NAME) // (1)
public interface TextProducer {

    @Queue("queue_text") // (2)
    void send(@MessageBody String body); // (3)
}
import io.micronaut.jms.annotations.JMSProducer
import io.micronaut.jms.annotations.Queue
import io.micronaut.messaging.annotation.MessageBody

import static io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME

@JMSProducer(CONNECTION_FACTORY_BEAN_NAME) // (1)
interface TextProducer {

    @Queue('queue_text') // (2)
    void send(@MessageBody String body) // (3)
}
import io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME
import io.micronaut.jms.annotations.JMSProducer
import io.micronaut.jms.annotations.Queue
import io.micronaut.messaging.annotation.MessageBody

@JMSProducer(CONNECTION_FACTORY_BEAN_NAME) // (1)
interface TextProducer {

    @Queue("queue_text") // (2)
    fun send(@MessageBody body: String) // (3)
}
1 The JMSProducer annotation designates this interface as a client.
2 The @Queue annotation indicates which queue the message should be published to.
3 The send method accepts a single parameter which is the payload of the message.

At compile time Micronaut will produce an implementation of the interface. You can retrieve an instance of MapProducer either by looking up the bean from the ApplicationContext or with dependency injection:

TextProducer textProducer = applicationContext.getBean(TextProducer.class);
textProducer.send("quickstart");
def textProducer = applicationContext.getBean(TextProducer)
textProducer.send 'quickstart'
val textProducer = applicationContext.getBean(TextProducer::class.java)
textProducer.send("quickstart")

Creating a JMS Consumer with @JMSListener

To listen to JMS messages you can use the @JMSListener annotation to define a message listener.

The following example will listen for messages published by the MapProducer in the previous section:

import io.micronaut.jms.annotations.JMSListener;
import io.micronaut.jms.annotations.Queue;
import io.micronaut.messaging.annotation.MessageBody;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME;

@JMSListener(CONNECTION_FACTORY_BEAN_NAME) // (1)
public class TextConsumer {

    List<String> messages = Collections.synchronizedList(new ArrayList<>());

    @Queue(value = "queue_text", concurrency = "1-5") // (2)
    public void receive(@MessageBody String body) { // (3)
        messages.add(body);
    }
}
import io.micronaut.jms.annotations.JMSListener
import io.micronaut.jms.annotations.Queue
import io.micronaut.messaging.annotation.MessageBody

import static io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME

@JMSListener(CONNECTION_FACTORY_BEAN_NAME) // (1)
class TextConsumer {

    List<String> messages = [].asSynchronized()

    @Queue(value = 'queue_text', concurrency = '1-5') // (2)
    void receive(@MessageBody String body) { // (3)
        messages << body
    }
}
import io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME
import io.micronaut.jms.annotations.JMSListener
import io.micronaut.jms.annotations.Queue
import io.micronaut.messaging.annotation.MessageBody
import java.util.ArrayList
import java.util.Collections

@JMSListener(CONNECTION_FACTORY_BEAN_NAME) // (1)
class TextConsumer {

    val messages: MutableList<String> = Collections.synchronizedList(ArrayList())

    @Queue(value = "queue_text", concurrency = "1-5") // (2)
    fun receive(@MessageBody body: String) { // (3)
        messages.add(body)
    }
}
1 The @JMSListener is used to designate the bean as a message listener.
2 The @Queue annotation is used to indicate which queue to subscribe to.
3 The receive method accepts a single parameter which is the payload of the message.

4 Configuration

4.1 Configuring ActiveMQ "Classic"

To use ActiveMQ "Classic", add the following properties to your application.yml

micronaut:
  jms:
    activemq:
      classic:
        enabled: true
        connection-string: 'tcp://host:port'

and the following to your build.gradle or pom.xml

implementation("io.micronaut.jms:micronaut-jms-activemq-classic")
<dependency>
    <groupId>io.micronaut.jms</groupId>
    <artifactId>micronaut-jms-activemq-classic</artifactId>
</dependency>

The JMS ConnectionFactory will be an instance of ActiveMQConnectionFactory. Only the broker URL is configurable in application.yml but further customization is possible with a BeanCreatedEventListener.

4.2 Configuring ActiveMQ Artemis

To use ActiveMQ Artemis, add the following properties to your application.yml

micronaut:
  jms:
    activemq:
      artemis:
        enabled: true
        connection-string: 'tcp://host:port'

and the following to your build.gradle or pom.xml

implementation("io.micronaut.jms:micronaut-jms-activemq-artemis")
<dependency>
    <groupId>io.micronaut.jms</groupId>
    <artifactId>micronaut-jms-activemq-artemis</artifactId>
</dependency>

The JMS ConnectionFactory will be an instance of ActiveMQJMSConnectionFactory. Only the broker URL is configurable in application.yml but further customization is possible with a BeanCreatedEventListener.

4.3 Configuring SQS

To use Amazon SQS, add the following property to your application.yml

micronaut:
  jms:
    sqs:
      enabled: true

and the following to your build.gradle or pom.xml

implementation("io.micronaut.jms:micronaut-jms-sqs")
<dependency>
    <groupId>io.micronaut.jms</groupId>
    <artifactId>micronaut-jms-sqs</artifactId>
</dependency>

Additionally you’ll need to configure an instance of com.amazonaws.services.sqs.AmazonSQS as a bean for AWS authentication, for example:

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import io.micronaut.aws.sdk.v1.EnvironmentAWSCredentialsProvider;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.env.Environment;

import javax.inject.Singleton;

import static com.amazonaws.regions.Regions.US_EAST_2;

@Factory
public class SqsClientFactory {

    @Singleton
    AmazonSQS sqsClient(Environment environment) {
        return AmazonSQSClientBuilder
            .standard()
            .withRegion(US_EAST_2)
            .withCredentials(new EnvironmentAWSCredentialsProvider(environment))
            .build();
    }
}

See the Micronaut AWS docs for more information.

Note that it’s not necessary to add a dependency on io.micronaut.aws:micronaut-aws-sdk-v1 as it’s already a transitive dependency of io.micronaut.jms:micronaut-jms-sqs.

The JMS ConnectionFactory will be an instance of com.amazon.sqs.javamessaging.SQSConnectionFactory; you can customize that with a BeanCreatedEventListener.

4.4 Configuring Unsupported Brokers

Using an unsupported JMS broker is simple; you simply need to instantiate a ConnectionFactory like the following:

import io.micronaut.context.annotation.Factory;
import io.micronaut.jms.annotations.JMSConnectionFactory;
import javax.jms.ConnectionFactory;

@Factory
public class MyJMSConfig {
    @JMSConnectionFactory("myConnectionFactory")
    public ConnectionFactory connectionFactory() {
        return new ...;
    }
}

With this in place you can configure consumers and producers that use the connection factory by referencing the bean name ("myConnectionFactory" in the example) in @JMSProducer and @JMSListener annotations on your producer interfaces and consumer classes.

4.5 Customizing Brokers

Although only the broker URL is configurable in application.yml, there are potentially many configurable properties in the instantiated ConnectionFactory. Also, you may wish to use a different ConnectionFactory implementation class. Any customization you need can be done by intercepting the creation of the ConnectionFactory with a BeanCreatedEventListener.

Customizing the ConnectionFactory

Here we intercept the creation of the ActiveMQ "Classic" ConnectionFactory by overriding the default to enable asynchronous message sending:

import io.micronaut.context.event.BeanCreatedEvent;
import io.micronaut.context.event.BeanCreatedEventListener;
import jakarta.inject.Singleton;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.ConnectionFactory;

@Singleton
public class CustomizeBrokerJMSConnectionPoolListener implements BeanCreatedEventListener<ConnectionFactory> {

    @Override
    public ConnectionFactory onCreated(BeanCreatedEvent<ConnectionFactory> event) {

        ConnectionFactory connectionFactory = event.getBean();
        if (connectionFactory instanceof ActiveMQConnectionFactory) {
            ActiveMQConnectionFactory amqcf = (ActiveMQConnectionFactory) connectionFactory;
            amqcf.setUseAsyncSend(true);
        }

        return connectionFactory;
    }
}
import groovy.transform.CompileStatic
import io.micronaut.context.event.BeanCreatedEvent
import io.micronaut.context.event.BeanCreatedEventListener
import jakarta.inject.Singleton
import org.apache.activemq.ActiveMQConnectionFactory

import javax.jms.ConnectionFactory

@CompileStatic
@Singleton
class CustomizeBrokerJMSConnectionPoolListener implements BeanCreatedEventListener<ConnectionFactory> {

    @Override
    ConnectionFactory onCreated(BeanCreatedEvent<ConnectionFactory> event) {

        ConnectionFactory connectionFactory = event.bean
        if (connectionFactory instanceof ActiveMQConnectionFactory) {
            connectionFactory.useAsyncSend = true
        }

        return connectionFactory
    }
}
import io.micronaut.context.event.BeanCreatedEvent
import io.micronaut.context.event.BeanCreatedEventListener
import jakarta.inject.Singleton
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.ConnectionFactory

@Singleton
class CustomizeBrokerJMSConnectionPoolListener : BeanCreatedEventListener<ConnectionFactory?> {

    override fun onCreated(event: BeanCreatedEvent<ConnectionFactory?>): ConnectionFactory? {
        val connectionFactory = event.bean
        if (connectionFactory is ActiveMQConnectionFactory) {
            connectionFactory.isUseAsyncSend = true
        }
        return connectionFactory
    }
}

Replacing the ConnectionFactory

Here we discard the default instance and replace it with a custom implementation, in this case an instance of XAConnectionFactory for distributed transaction support.

import io.micronaut.context.event.BeanCreatedEvent;
import io.micronaut.context.event.BeanCreatedEventListener;
import io.micronaut.jms.activemq.classic.configuration.properties.ActiveMqClassicConfigurationProperties;
import jakarta.inject.Singleton;
import org.apache.activemq.ActiveMQXAConnectionFactory;

import javax.jms.ConnectionFactory;

@Singleton
public class CustomBrokerJMSConnectionPoolListener implements BeanCreatedEventListener<ConnectionFactory> {

    private final ActiveMqClassicConfigurationProperties amqConfig;

    public CustomBrokerJMSConnectionPoolListener(ActiveMqClassicConfigurationProperties amqConfig) {
        this.amqConfig = amqConfig;
    }

    @Override
    public ConnectionFactory onCreated(BeanCreatedEvent<ConnectionFactory> event) {
        return new ActiveMQXAConnectionFactory(amqConfig.getConnectionString());
    }
}
import groovy.transform.CompileStatic
import io.micronaut.context.event.BeanCreatedEvent
import io.micronaut.context.event.BeanCreatedEventListener
import io.micronaut.jms.activemq.classic.configuration.properties.ActiveMqClassicConfigurationProperties
import jakarta.inject.Singleton
import org.apache.activemq.ActiveMQXAConnectionFactory

import javax.jms.ConnectionFactory

@CompileStatic
@Singleton
class CustomBrokerJMSConnectionPoolListener implements BeanCreatedEventListener<ConnectionFactory> {

    private final ActiveMqClassicConfigurationProperties amqConfig

    CustomBrokerJMSConnectionPoolListener(ActiveMqClassicConfigurationProperties amqConfig) {
        this.amqConfig = amqConfig
    }

    @Override
    ConnectionFactory onCreated(BeanCreatedEvent<ConnectionFactory> event) {
        new ActiveMQXAConnectionFactory(amqConfig.getConnectionString())
    }
}
import io.micronaut.context.event.BeanCreatedEvent
import io.micronaut.context.event.BeanCreatedEventListener
import io.micronaut.jms.activemq.classic.configuration.properties.ActiveMqClassicConfigurationProperties
import jakarta.inject.Singleton
import org.apache.activemq.ActiveMQXAConnectionFactory
import javax.jms.ConnectionFactory

@Singleton
class CustomBrokerJMSConnectionPoolListener(
    private val amqConfig: ActiveMqClassicConfigurationProperties) :
    BeanCreatedEventListener<ConnectionFactory> {

    override fun onCreated(event: BeanCreatedEvent<ConnectionFactory>) =
        ActiveMQXAConnectionFactory(amqConfig.connectionString)
}

5 Parameter Binding

Methods in classes/interfaces annotated with @JMSProducer and @JMSListener use parameter binding annotations to access the JMS Message, message body, and message headers.

@Body

The Body annotation is required for the method argument that will be serialized/deserialized as the JMS Message body.

@Header

The Header annotation is required for any method arguments that will be serialized/deserialized as JMS headers or non-JMS headers (message properties).

In this example, the message body is a Map and the method supports sending the JMSCorrelationID along with several custom header values:

import io.micronaut.core.annotation.Nullable;
import io.micronaut.jms.annotations.JMSProducer;
import io.micronaut.jms.annotations.Queue;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.annotation.MessageHeader;
import java.io.Serializable;
import java.util.Map;

import static io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME;
import static io.micronaut.jms.model.JMSHeaders.JMS_CORRELATION_ID;

@JMSProducer(CONNECTION_FACTORY_BEAN_NAME)
public interface MapProducer {

    @Queue("queue_map")
    void send(@MessageBody Map<String, Serializable> body,
              @MessageHeader(JMS_CORRELATION_ID) @Nullable String correlationId,
              @MessageHeader("CustomStringHeader") @Nullable String stringHeader,
              @MessageHeader("CustomBooleanHeader") boolean booleanHeader,
              @MessageHeader("CustomByteHeader") byte byteHeader,
              @MessageHeader("CustomShortHeader") short shortHeader,
              @MessageHeader("CustomIntegerHeader") int intHeader,
              @MessageHeader("CustomLongHeader") long longHeader,
              @MessageHeader("CustomFloatHeader") float floatHeader,
              @MessageHeader("CustomDoubleHeader") double doubleHeader);
}
import io.micronaut.core.annotation.Nullable
import io.micronaut.jms.annotations.JMSProducer
import io.micronaut.jms.annotations.Queue
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader

import static io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME
import static io.micronaut.jms.model.JMSHeaders.JMS_CORRELATION_ID

@JMSProducer(CONNECTION_FACTORY_BEAN_NAME)
interface MapProducer {

    @Queue('queue_map')
    void send(@MessageBody Map<String, Serializable> body,
              @MessageHeader(JMS_CORRELATION_ID) @Nullable String correlationId,
              @MessageHeader('CustomStringHeader') @Nullable String stringHeader,
              @MessageHeader('CustomBooleanHeader') boolean booleanHeader,
              @MessageHeader('CustomByteHeader') byte byteHeader,
              @MessageHeader('CustomShortHeader') short shortHeader,
              @MessageHeader('CustomIntegerHeader') int intHeader,
              @MessageHeader('CustomLongHeader') long longHeader,
              @MessageHeader('CustomFloatHeader') float floatHeader,
              @MessageHeader('CustomDoubleHeader') double doubleHeader)
}
import io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME
import io.micronaut.jms.annotations.JMSProducer
import io.micronaut.jms.annotations.Queue
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.jms.model.JMSHeaders.JMS_CORRELATION_ID
import java.io.Serializable

@JMSProducer(CONNECTION_FACTORY_BEAN_NAME)
interface MapProducer {
    @Queue("queue_map")
    fun send(
        @MessageBody body: Map<String, Serializable?>,
        @MessageHeader(JMS_CORRELATION_ID) correlationId: String?,
        @MessageHeader("CustomStringHeader") stringHeader: String?,
        @MessageHeader("CustomBooleanHeader") booleanHeader: Boolean,
        @MessageHeader("CustomByteHeader") byteHeader: Byte,
        @MessageHeader("CustomShortHeader") shortHeader: Short,
        @MessageHeader("CustomIntegerHeader") intHeader: Int,
        @MessageHeader("CustomLongHeader") longHeader: Long,
        @MessageHeader("CustomFloatHeader") floatHeader: Float,
        @MessageHeader("CustomDoubleHeader") doubleHeader: Double
    )
}

This consumer can access the user-supplied header values as well as several headers that can only be set by the JMS provider, e.g. JMSMessageID, JMSPriority, etc.:

import io.micronaut.core.annotation.Nullable;
import io.micronaut.jms.annotations.JMSListener;
import io.micronaut.jms.annotations.Message;
import io.micronaut.jms.annotations.Queue;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.annotation.MessageHeader;

import javax.jms.Destination;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME;
import static io.micronaut.jms.model.JMSHeaders.JMS_CORRELATION_ID;
import static io.micronaut.jms.model.JMSHeaders.JMS_DELIVERY_MODE;
import static io.micronaut.jms.model.JMSHeaders.JMS_DESTINATION;
import static io.micronaut.jms.model.JMSHeaders.JMS_EXPIRATION;
import static io.micronaut.jms.model.JMSHeaders.JMS_MESSAGE_ID;
import static io.micronaut.jms.model.JMSHeaders.JMS_PRIORITY;
import static io.micronaut.jms.model.JMSHeaders.JMS_REDELIVERED;
import static io.micronaut.jms.model.JMSHeaders.JMS_REPLY_TO;
import static io.micronaut.jms.model.JMSHeaders.JMS_TIMESTAMP;
import static io.micronaut.jms.model.JMSHeaders.JMS_TYPE;

@JMSListener(CONNECTION_FACTORY_BEAN_NAME)
public class MapConsumer {

    List<Map<String, Serializable>> messageBodies = Collections.synchronizedList(new ArrayList<>());
    List<Map<String, Object>> messageHeaders = Collections.synchronizedList(new ArrayList<>());
    List<javax.jms.Message> messages = Collections.synchronizedList(new ArrayList<>());

    @Queue(value = "queue_map", concurrency = "1-5")
    public void receive(@MessageBody Map<String, Serializable> body,
                        @Message javax.jms.Message message,
                        @MessageHeader(JMS_CORRELATION_ID) @Nullable String correlationId,
                        @MessageHeader(JMS_DELIVERY_MODE) int deliveryMode,
                        @MessageHeader(JMS_DESTINATION) Destination destination,
                        @MessageHeader(JMS_EXPIRATION) long expiration,
                        @MessageHeader(JMS_MESSAGE_ID) String messageId,
                        @MessageHeader(JMS_PRIORITY) int priority,
                        @MessageHeader(JMS_REDELIVERED) boolean redelivered,
                        @MessageHeader(JMS_REPLY_TO) @Nullable Destination replyTo,
                        @MessageHeader(JMS_TIMESTAMP) long timestamp,
                        @MessageHeader(JMS_TYPE) @Nullable String type,
                        @MessageHeader("CustomStringHeader") @Nullable String stringHeader,
                        @MessageHeader("CustomBooleanHeader") boolean booleanHeader,
                        @MessageHeader("CustomByteHeader") byte byteHeader,
                        @MessageHeader("CustomShortHeader") short shortHeader,
                        @MessageHeader("CustomIntegerHeader") int intHeader,
                        @MessageHeader("CustomLongHeader") long longHeader,
                        @MessageHeader("CustomFloatHeader") float floatHeader,
                        @MessageHeader("CustomDoubleHeader") double doubleHeader) {

        Map<String, Object> headerValues = new HashMap<>();
        headerValues.put(JMS_CORRELATION_ID, correlationId);
        headerValues.put(JMS_DELIVERY_MODE, deliveryMode);
        headerValues.put(JMS_DESTINATION, destination);
        headerValues.put(JMS_EXPIRATION, expiration);
        headerValues.put(JMS_MESSAGE_ID, messageId);
        headerValues.put(JMS_PRIORITY, priority);
        headerValues.put(JMS_REDELIVERED, redelivered);
        headerValues.put(JMS_REPLY_TO, replyTo);
        headerValues.put(JMS_TIMESTAMP, timestamp);
        headerValues.put(JMS_TYPE, type);
        headerValues.put("CustomStringHeader", stringHeader);
        headerValues.put("CustomBooleanHeader", booleanHeader);
        headerValues.put("CustomByteHeader", byteHeader);
        headerValues.put("CustomShortHeader", shortHeader);
        headerValues.put("CustomIntegerHeader", intHeader);
        headerValues.put("CustomLongHeader", longHeader);
        headerValues.put("CustomFloatHeader", floatHeader);
        headerValues.put("CustomDoubleHeader", doubleHeader);

        messageHeaders.add(headerValues);
        messageBodies.add(body);
        messages.add(message);
    }
}
import io.micronaut.core.annotation.Nullable
import io.micronaut.jms.annotations.JMSListener
import io.micronaut.jms.annotations.Message
import io.micronaut.jms.annotations.Queue
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader

import javax.jms.Destination

import static io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME
import static io.micronaut.jms.model.JMSHeaders.JMS_CORRELATION_ID
import static io.micronaut.jms.model.JMSHeaders.JMS_DELIVERY_MODE
import static io.micronaut.jms.model.JMSHeaders.JMS_DESTINATION
import static io.micronaut.jms.model.JMSHeaders.JMS_EXPIRATION
import static io.micronaut.jms.model.JMSHeaders.JMS_MESSAGE_ID
import static io.micronaut.jms.model.JMSHeaders.JMS_PRIORITY
import static io.micronaut.jms.model.JMSHeaders.JMS_REDELIVERED
import static io.micronaut.jms.model.JMSHeaders.JMS_REPLY_TO
import static io.micronaut.jms.model.JMSHeaders.JMS_TIMESTAMP
import static io.micronaut.jms.model.JMSHeaders.JMS_TYPE

@JMSListener(CONNECTION_FACTORY_BEAN_NAME)
class MapConsumer {

    List<Map<String, Serializable>> messageBodies = [].asSynchronized()
    List<Map<String, Object>> messageHeaders =  [].asSynchronized()
    List<javax.jms.Message> messages =  [].asSynchronized()

    @Queue(value = 'queue_map', concurrency = '1-5')
    void receive(@MessageBody Map<String, Serializable> body,
                 @Message javax.jms.Message message,
                 @MessageHeader(JMS_CORRELATION_ID) @Nullable String correlationId,
                 @MessageHeader(JMS_DELIVERY_MODE) int deliveryMode,
                 @MessageHeader(JMS_DESTINATION) Destination destination,
                 @MessageHeader(JMS_EXPIRATION) long expiration,
                 @MessageHeader(JMS_MESSAGE_ID) String messageId,
                 @MessageHeader(JMS_PRIORITY) int priority,
                 @MessageHeader(JMS_REDELIVERED) boolean redelivered,
                 @MessageHeader(JMS_REPLY_TO) @Nullable Destination replyTo,
                 @MessageHeader(JMS_TIMESTAMP) long timestamp,
                 @MessageHeader(JMS_TYPE) @Nullable String type,
                 @MessageHeader('CustomStringHeader') @Nullable String stringHeader,
                 @MessageHeader('CustomBooleanHeader') boolean booleanHeader,
                 @MessageHeader('CustomByteHeader') byte byteHeader,
                 @MessageHeader('CustomShortHeader') short shortHeader,
                 @MessageHeader('CustomIntegerHeader') int intHeader,
                 @MessageHeader('CustomLongHeader') long longHeader,
                 @MessageHeader('CustomFloatHeader') float floatHeader,
                 @MessageHeader('CustomDoubleHeader') double doubleHeader) {

        Map<String, Object> headerValues = [
                (JMS_CORRELATION_ID): correlationId,
                (JMS_DELIVERY_MODE): deliveryMode,
                (JMS_DESTINATION): destination,
                (JMS_EXPIRATION): expiration,
                (JMS_MESSAGE_ID): messageId,
                (JMS_PRIORITY): priority,
                (JMS_REDELIVERED): redelivered,
                (JMS_REPLY_TO): replyTo,
                (JMS_TIMESTAMP): timestamp,
                (JMS_TYPE): type,
                CustomStringHeader: stringHeader,
                CustomBooleanHeader: booleanHeader,
                CustomByteHeader: byteHeader,
                CustomShortHeader: shortHeader,
                CustomIntegerHeader: intHeader,
                CustomLongHeader: longHeader,
                CustomFloatHeader: floatHeader,
                CustomDoubleHeader: doubleHeader]

        messageHeaders << headerValues
        messageBodies << body
        messages << message
    }
}
import io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME
import io.micronaut.jms.annotations.JMSListener
import io.micronaut.jms.annotations.Message
import io.micronaut.jms.annotations.Queue
import io.micronaut.jms.model.JMSHeaders.JMS_CORRELATION_ID
import io.micronaut.jms.model.JMSHeaders.JMS_DELIVERY_MODE
import io.micronaut.jms.model.JMSHeaders.JMS_DESTINATION
import io.micronaut.jms.model.JMSHeaders.JMS_EXPIRATION
import io.micronaut.jms.model.JMSHeaders.JMS_MESSAGE_ID
import io.micronaut.jms.model.JMSHeaders.JMS_PRIORITY
import io.micronaut.jms.model.JMSHeaders.JMS_REDELIVERED
import io.micronaut.jms.model.JMSHeaders.JMS_REPLY_TO
import io.micronaut.jms.model.JMSHeaders.JMS_TIMESTAMP
import io.micronaut.jms.model.JMSHeaders.JMS_TYPE
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader
import java.io.Serializable
import javax.jms.Destination

@JMSListener(CONNECTION_FACTORY_BEAN_NAME)
class MapConsumer {

    val messageBodies = mutableListOf<Map<String, Serializable>>()
    val messageHeaders = mutableListOf<Map<String, Any?>>()
    val messages = mutableListOf<javax.jms.Message>()

    @Queue(value = "queue_map", concurrency = "1-5")
    fun receive(
        @MessageBody body: Map<String, Serializable>,
        @Message message: javax.jms.Message,
        @MessageHeader(JMS_CORRELATION_ID) correlationId: String?,
        @MessageHeader(JMS_DELIVERY_MODE) deliveryMode: Int,
        @MessageHeader(JMS_DESTINATION) destination: Destination?,
        @MessageHeader(JMS_EXPIRATION) expiration: Long,
        @MessageHeader(JMS_MESSAGE_ID) messageId: String?,
        @MessageHeader(JMS_PRIORITY) priority: Int,
        @MessageHeader(JMS_REDELIVERED) redelivered: Boolean,
        @MessageHeader(JMS_REPLY_TO) replyTo: Destination?,
        @MessageHeader(JMS_TIMESTAMP) timestamp: Long,
        @MessageHeader(JMS_TYPE) type: String?,
        @MessageHeader("CustomStringHeader") stringHeader: String?,
        @MessageHeader("CustomBooleanHeader") booleanHeader: Boolean,
        @MessageHeader("CustomByteHeader") byteHeader: Byte,
        @MessageHeader("CustomShortHeader") shortHeader: Short,
        @MessageHeader("CustomIntegerHeader") intHeader: Int,
        @MessageHeader("CustomLongHeader") longHeader: Long,
        @MessageHeader("CustomFloatHeader") floatHeader: Float,
        @MessageHeader("CustomDoubleHeader") doubleHeader: Double
    ) {
        val headerValues = mapOf(
            JMS_CORRELATION_ID to correlationId,
            JMS_DELIVERY_MODE to deliveryMode,
            JMS_DESTINATION to destination,
            JMS_EXPIRATION to expiration,
            JMS_MESSAGE_ID to messageId,
            JMS_PRIORITY to priority,
            JMS_REDELIVERED to redelivered,
            JMS_REPLY_TO to replyTo,
            JMS_TIMESTAMP to timestamp,
            JMS_TYPE to type,
            "CustomStringHeader" to stringHeader,
            "CustomBooleanHeader" to booleanHeader,
            "CustomByteHeader" to byteHeader,
            "CustomShortHeader" to shortHeader,
            "CustomIntegerHeader" to intHeader,
            "CustomLongHeader" to longHeader,
            "CustomFloatHeader" to floatHeader,
            "CustomDoubleHeader" to doubleHeader)
        messageHeaders.add(headerValues)
        messageBodies.add(body)
        messages.add(message)
    }
}

6 Message Selectors

Message Selectors allow you to filter the messages a @JMSListener method will receive. The selector can use all message header and properties for filtering except the message content.

@JMSListener(CONNECTION_FACTORY_BEAN_NAME)
public class SelectorConsumer {

    List<String> messageBodiesTrue = Collections.synchronizedList(new ArrayList<>());

    List<String> messageBodiesFalse = Collections.synchronizedList(new ArrayList<>());

    List<String> messageBodiesTopic = Collections.synchronizedList(new ArrayList<>());

    @Queue(value = "selector_queue", concurrency = "1-5", messageSelector = "CustomBooleanHeader=true")
    public void receive(@MessageBody String body) {
        messageBodiesTrue.add(body);
    }

    @Queue(value = "selector_queue", concurrency = "1-5", messageSelector = "CustomBooleanHeader=false")
    public void receive2(@MessageBody String body) {
        messageBodiesFalse.add(body);
    }

    @Topic(value = "selector_topic", messageSelector = "CustomBooleanHeader=true")
    public void receiveTopic(@MessageBody String body) {
        messageBodiesTopic.add(body);
    }
}

More examples on how to use Message Selector can be found here.

7 GraalVM support

Micronaut JMS is compatible with GraalVM. Everything is handled automatically by the library so users don’t need any special configuration.

See the section on GraalVM in the user guide for more information.

8 Repository

You can find the source code of this project in this repository: