implementation("io.micronaut.jms:micronaut-jms-activemq-classic")
Table of Contents
Micronaut JMS
Integration between Micronaut and Java Messaging Service (JMS)
Version: 4.2.1-SNAPSHOT
1 Introduction
This project includes integration between Micronaut and Java Messaging Service (JMS).
Supported implementations include Active MQ "Classic", Active MQ Artemis, and AWS SQS, and configuring other JMS providers is straightforward.
2 Release History
For this project, you can find a list of releases (with release notes) here:
3 Breaking Changes
4.0.0
Removal of javax.jms
namespace
Version 4 switches from the deprecated javax.jms
namespace to the new Jakarta EE jakarta.jms
namespace.
ActiveMQ Classic
The ActiveMQ Classic client has been updated to version 6 to support the new namespace.
ActiveMQ Artemis
The client dependency for ActiveMq Artemis has changed from org.apache.activemq:artemis-jms-client
to org.apache.activemq:artemis-jakarta-client
which uses the new namespace.
Deprecations
-
The annotation
io.micronaut.jms.annotations.Queue
removes the following attributes that were deprecated previously. This is to align the implementation with the JMS model and the messaging libraries' presumptions.-
String concurrency() default "1-1"
-
String executor() default ""
-
-
The annotation
io.micronaut.jms.annotations.Topic
removes the following attributes that were deprecated previously. This is to align the implementation with the JMS model and the messaging libraries' presumptions.-
String executor() default ""
-
-
The following classes in the
io.micronaut.jms.listener
package were deprecated previously are now removed.-
ConcurrentMessageHandler
-
JMSListenerContainer
-
JMSListenerContainerFactory
-
4 JMS Quick Start
To add support for JMS to a project, add a dependency to your build, depending on the JMS implementation(s) you require:
Gradle
Maven
Gradle
Maven
implementation("io.micronaut.jms:micronaut-jms-activemq-artemis")
Gradle
Maven
implementation("io.micronaut.jms:micronaut-jms-sqs")
Note that you can use multiple providers in a project.
Creating a JMS Producer with @JMSProducer
To create a JMS client that sends messages, define an interface annotated with JMSProducer
, for example:
Java
Groovy
Kotlin
import io.micronaut.jms.annotations.JMSProducer;
import io.micronaut.jms.annotations.Queue;
import io.micronaut.messaging.annotation.MessageBody;
import static io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME;
@JMSProducer(CONNECTION_FACTORY_BEAN_NAME) // (1)
public interface TextProducer {
@Queue("queue_text") // (2)
void send(@MessageBody String body); // (3)
}
1 | The JMSProducer annotation designates this interface as a client. |
2 | The @Queue annotation indicates which queue the message should be published to. |
3 | The send method accepts a single parameter which is the payload of the message. |
At compile time Micronaut will produce an implementation of the interface. You can retrieve an instance of MapProducer
either by looking up the bean from the ApplicationContext or with dependency injection:
Java
Groovy
Kotlin
TextProducer textProducer = applicationContext.getBean(TextProducer.class);
textProducer.send("quickstart");
Creating a JMS Consumer with @JMSListener
To listen to JMS messages you can use the @JMSListener annotation to define a message listener.
The following example will listen for messages published by the MapProducer
in the previous section:
Java
Groovy
Kotlin
import io.micronaut.jms.annotations.JMSListener;
import io.micronaut.jms.annotations.Queue;
import io.micronaut.messaging.annotation.MessageBody;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME;
@JMSListener(CONNECTION_FACTORY_BEAN_NAME) // (1)
public class TextConsumer {
List<String> messages = Collections.synchronizedList(new ArrayList<>());
@Queue(value = "queue_text") // (2)
public void receive(@MessageBody String body) { // (3)
messages.add(body);
}
}
1 | The @JMSListener is used to designate the bean as a message listener. |
2 | The @Queue annotation is used to indicate which queue to subscribe to. |
3 | The receive method accepts a single parameter which is the payload of the message. |
5 Configuration
5.1 Configuring ActiveMQ "Classic"
To use ActiveMQ "Classic", add the following properties to your application.yml
Properties
Yaml
Toml
Groovy
Hocon
JSON
micronaut:
jms:
activemq:
classic:
enabled: true
connection-string: 'tcp://host:port'
and the following to your build.gradle
or pom.xml
Gradle
Maven
implementation("io.micronaut.jms:micronaut-jms-activemq-classic")
The JMS ConnectionFactory will be an instance of ActiveMQConnectionFactory. Only the broker URL is configurable in application.yml
but further customization is possible with a BeanCreatedEventListener.
5.2 Configuring ActiveMQ Artemis
To use ActiveMQ Artemis, add the following properties to your application.yml
Properties
Yaml
Toml
Groovy
Hocon
JSON
micronaut:
jms:
activemq:
artemis:
enabled: true
connection-string: 'tcp://host:port'
and the following to your build.gradle
or pom.xml
Gradle
Maven
implementation("io.micronaut.jms:micronaut-jms-activemq-artemis")
The JMS ConnectionFactory will be an instance of ActiveMQJMSConnectionFactory. Only the broker URL is configurable in application.yml
but further customization is possible with a BeanCreatedEventListener.
5.3 Configuring SQS
To use Amazon SQS, add the following property to your application.yml
Properties
Yaml
Toml
Groovy
Hocon
JSON
micronaut:
jms:
sqs:
enabled: true
and the following to your build.gradle
or pom.xml
Gradle
Maven
implementation("io.micronaut.jms:micronaut-jms-sqs")
See the Micronaut AWS docs for more information.
Note that it’s not necessary to add a dependency on io.micronaut.aws:micronaut-aws-sdk-v2
as it’s already a transitive dependency of io.micronaut.jms:micronaut-jms-sqs
.
The JMS ConnectionFactory will be an instance of com.amazon.sqs.javamessaging.SQSConnectionFactory
; you can customize that with a BeanCreatedEventListener.
See the guide for Connect a Micronaut JMS Application to an AWS SQS Queue to learn more. |
5.4 Configuring Unsupported Brokers
Using an unsupported JMS broker is simple; you simply need to instantiate a ConnectionFactory
like the following:
import io.micronaut.context.annotation.Factory;
import io.micronaut.jms.annotations.JMSConnectionFactory;
import jakarta.jms.ConnectionFactory;
@Factory
public class MyJMSConfig {
@JMSConnectionFactory("myConnectionFactory")
public ConnectionFactory connectionFactory() {
return new ...;
}
}
With this in place you can configure consumers and producers that use the connection factory by referencing the bean name ("myConnectionFactory"
in the example) in @JMSProducer
and @JMSListener
annotations on your producer interfaces and consumer classes.
5.5 Customizing Brokers
Although only the broker URL is configurable in application.yml
, there are potentially many configurable properties in the instantiated ConnectionFactory
. Also, you may wish to use a different ConnectionFactory
implementation class. Any customization you need can be done by intercepting the creation of the ConnectionFactory
with a BeanCreatedEventListener.
Customizing the ConnectionFactory
Here we intercept the creation of the ActiveMQ "Classic" ConnectionFactory
by overriding the default to enable asynchronous message sending:
Java
Groovy
Kotlin
import io.micronaut.context.event.BeanCreatedEvent;
import io.micronaut.context.event.BeanCreatedEventListener;
import jakarta.inject.Singleton;
import org.apache.activemq.ActiveMQConnectionFactory;
import jakarta.jms.ConnectionFactory;
@Singleton
public class CustomizeBrokerJMSConnectionPoolListener implements BeanCreatedEventListener<ConnectionFactory> {
@Override
public ConnectionFactory onCreated(BeanCreatedEvent<ConnectionFactory> event) {
ConnectionFactory connectionFactory = event.getBean();
if (connectionFactory instanceof ActiveMQConnectionFactory) {
ActiveMQConnectionFactory amqcf = (ActiveMQConnectionFactory) connectionFactory;
amqcf.setUseAsyncSend(true);
}
return connectionFactory;
}
}
Replacing the ConnectionFactory
Here we discard the default instance and replace it with a custom implementation, in this case an instance of XAConnectionFactory for distributed transaction support.
Java
Groovy
Kotlin
import io.micronaut.context.event.BeanCreatedEvent;
import io.micronaut.context.event.BeanCreatedEventListener;
import io.micronaut.jms.activemq.classic.configuration.properties.ActiveMqClassicConfigurationProperties;
import jakarta.inject.Singleton;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import jakarta.jms.ConnectionFactory;
@Singleton
public class CustomBrokerJMSConnectionPoolListener implements BeanCreatedEventListener<ConnectionFactory> {
private final ActiveMqClassicConfigurationProperties amqConfig;
public CustomBrokerJMSConnectionPoolListener(ActiveMqClassicConfigurationProperties amqConfig) {
this.amqConfig = amqConfig;
}
@Override
public ConnectionFactory onCreated(BeanCreatedEvent<ConnectionFactory> event) {
return new ActiveMQXAConnectionFactory(amqConfig.getConnectionString());
}
}
6 Parameter Binding
Methods in classes/interfaces annotated with @JMSProducer
and @JMSListener
use parameter binding annotations to access the JMS Message
, message body, and message headers.
@MessageBody
The MessageBody annotation is required for the method argument that will be serialized/deserialized as the JMS Message
body.
@MessageHeader
The MessageHeader annotation is required for any method arguments that will be serialized/deserialized as JMS headers or non-JMS headers (message properties).
In this example, the message body is a Map
and the method supports sending the JMSCorrelationID
along with several custom header values:
Java
Groovy
Kotlin
import io.micronaut.core.annotation.Nullable;
import io.micronaut.jms.annotations.JMSProducer;
import io.micronaut.jms.annotations.Queue;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.annotation.MessageHeader;
import java.io.Serializable;
import java.util.Map;
import static io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME;
import static io.micronaut.jms.model.JMSHeaders.JMS_CORRELATION_ID;
@JMSProducer(CONNECTION_FACTORY_BEAN_NAME)
public interface MapProducer {
@Queue("queue_map")
void send(@MessageBody Map<String, Serializable> body,
@MessageHeader(JMS_CORRELATION_ID) @Nullable String correlationId,
@MessageHeader("CustomStringHeader") @Nullable String stringHeader,
@MessageHeader("CustomBooleanHeader") boolean booleanHeader,
@MessageHeader("CustomByteHeader") byte byteHeader,
@MessageHeader("CustomShortHeader") short shortHeader,
@MessageHeader("CustomIntegerHeader") int intHeader,
@MessageHeader("CustomLongHeader") long longHeader,
@MessageHeader("CustomFloatHeader") float floatHeader,
@MessageHeader("CustomDoubleHeader") double doubleHeader);
}
This consumer can access the user-supplied header values as well as several headers that can only be set by the JMS provider, e.g. JMSMessageID
, JMSPriority
, etc.:
Java
Groovy
Kotlin
import io.micronaut.core.annotation.Nullable;
import io.micronaut.jms.annotations.JMSListener;
import io.micronaut.jms.annotations.Message;
import io.micronaut.jms.annotations.Queue;
import io.micronaut.messaging.annotation.MessageBody;
import io.micronaut.messaging.annotation.MessageHeader;
import jakarta.jms.Destination;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME;
import static io.micronaut.jms.model.JMSHeaders.JMS_CORRELATION_ID;
import static io.micronaut.jms.model.JMSHeaders.JMS_DELIVERY_MODE;
import static io.micronaut.jms.model.JMSHeaders.JMS_DESTINATION;
import static io.micronaut.jms.model.JMSHeaders.JMS_EXPIRATION;
import static io.micronaut.jms.model.JMSHeaders.JMS_MESSAGE_ID;
import static io.micronaut.jms.model.JMSHeaders.JMS_PRIORITY;
import static io.micronaut.jms.model.JMSHeaders.JMS_REDELIVERED;
import static io.micronaut.jms.model.JMSHeaders.JMS_REPLY_TO;
import static io.micronaut.jms.model.JMSHeaders.JMS_TIMESTAMP;
import static io.micronaut.jms.model.JMSHeaders.JMS_TYPE;
@JMSListener(CONNECTION_FACTORY_BEAN_NAME)
public class MapConsumer {
List<Map<String, Serializable>> messageBodies = Collections.synchronizedList(new ArrayList<>());
List<Map<String, Object>> messageHeaders = Collections.synchronizedList(new ArrayList<>());
List<jakarta.jms.Message> messages = Collections.synchronizedList(new ArrayList<>());
@Queue(value = "queue_map")
public void receive(@MessageBody Map<String, Serializable> body,
@Message jakarta.jms.Message message,
@MessageHeader(JMS_CORRELATION_ID) @Nullable String correlationId,
@MessageHeader(JMS_DELIVERY_MODE) int deliveryMode,
@MessageHeader(JMS_DESTINATION) Destination destination,
@MessageHeader(JMS_EXPIRATION) long expiration,
@MessageHeader(JMS_MESSAGE_ID) String messageId,
@MessageHeader(JMS_PRIORITY) int priority,
@MessageHeader(JMS_REDELIVERED) boolean redelivered,
@MessageHeader(JMS_REPLY_TO) @Nullable Destination replyTo,
@MessageHeader(JMS_TIMESTAMP) long timestamp,
@MessageHeader(JMS_TYPE) @Nullable String type,
@MessageHeader("CustomStringHeader") @Nullable String stringHeader,
@MessageHeader("CustomBooleanHeader") boolean booleanHeader,
@MessageHeader("CustomByteHeader") byte byteHeader,
@MessageHeader("CustomShortHeader") short shortHeader,
@MessageHeader("CustomIntegerHeader") int intHeader,
@MessageHeader("CustomLongHeader") long longHeader,
@MessageHeader("CustomFloatHeader") float floatHeader,
@MessageHeader("CustomDoubleHeader") double doubleHeader) {
Map<String, Object> headerValues = new HashMap<>();
headerValues.put(JMS_CORRELATION_ID, correlationId);
headerValues.put(JMS_DELIVERY_MODE, deliveryMode);
headerValues.put(JMS_DESTINATION, destination);
headerValues.put(JMS_EXPIRATION, expiration);
headerValues.put(JMS_MESSAGE_ID, messageId);
headerValues.put(JMS_PRIORITY, priority);
headerValues.put(JMS_REDELIVERED, redelivered);
headerValues.put(JMS_REPLY_TO, replyTo);
headerValues.put(JMS_TIMESTAMP, timestamp);
headerValues.put(JMS_TYPE, type);
headerValues.put("CustomStringHeader", stringHeader);
headerValues.put("CustomBooleanHeader", booleanHeader);
headerValues.put("CustomByteHeader", byteHeader);
headerValues.put("CustomShortHeader", shortHeader);
headerValues.put("CustomIntegerHeader", intHeader);
headerValues.put("CustomLongHeader", longHeader);
headerValues.put("CustomFloatHeader", floatHeader);
headerValues.put("CustomDoubleHeader", doubleHeader);
messageHeaders.add(headerValues);
messageBodies.add(body);
messages.add(message);
}
}
7 Error Handlers
There are multiple ways to inject custom error handling into your JMS Listeners as shown here:
1 | You can add the errorHandlers arguments to your @JMSListener to add custom error handling logic to all @Queue or @Topic annotated methods within that class. |
2 | You can add the errorHandlers argument to your @Queue or @Topic annotated method to inject custom error handling logic only for messages received on that specific queue or topic. |
3 | By default you will always have an io.micronaut.jms.listener.LoggingJMSListenerErrorHandler which will log the thrown exception at ERROR level. |
8 Success Handlers
There are multiple ways to inject custom success handling into your JMS Listeners as shown here:
Java
Groovy
Kotlin
import io.micronaut.context.annotation.Requires;
import io.micronaut.jms.annotations.JMSListener;
import io.micronaut.jms.annotations.Queue;
import io.micronaut.jms.listener.JMSListenerSuccessHandler;
import io.micronaut.messaging.annotation.MessageBody;
import jakarta.inject.Singleton;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.Session;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
import static io.micronaut.jms.activemq.classic.configuration.ActiveMqClassicConfiguration.CONNECTION_FACTORY_BEAN_NAME;
@JMSListener(value = CONNECTION_FACTORY_BEAN_NAME, successHandlers = {AccumulatingSuccessHandler.class}) // (1)
class SuccessHandlingConsumer {
Collection<String> messages = Collections.synchronizedSet(new HashSet<>());
@Queue(value = "success-queue", successHandlers = {CountingSuccessHandler.class}) // (2)
void receive(@MessageBody String message) throws JMSException {
messages.add(message);
}
}
1 | You can add the successHandlers arguments to your @JMSListener to add custom success handling logic to all @Queue or @Topic annotated methods within that class. |
2 | You can add the successHandlers argument to your @Queue or @Topic annotated method to inject custom success handling logic only for messages received on that specific queue or topic |
9 Message Selectors
Message Selectors allow you to filter the messages a @JMSListener
method will receive. The selector can use all message header and properties for filtering except the message content.
@JMSListener(CONNECTION_FACTORY_BEAN_NAME)
public class SelectorConsumer {
List<String> messageBodiesTrue = Collections.synchronizedList(new ArrayList<>());
List<String> messageBodiesFalse = Collections.synchronizedList(new ArrayList<>());
List<String> messageBodiesTopic = Collections.synchronizedList(new ArrayList<>());
@Queue(value = "selector_queue", messageSelector = "CustomBooleanHeader=true")
public void receive(@MessageBody String body) {
messageBodiesTrue.add(body);
}
@Queue(value = "selector_queue", messageSelector = "CustomBooleanHeader=false")
public void receive2(@MessageBody String body) {
messageBodiesFalse.add(body);
}
@Topic(value = "selector_topic", messageSelector = "CustomBooleanHeader=true")
public void receiveTopic(@MessageBody String body) {
messageBodiesTopic.add(body);
}
}
More examples on how to use Message Selector can be found here.
10 GraalVM support
Micronaut JMS is compatible with GraalVM. Everything is handled automatically by the library so users don’t need any special configuration.
See the section on GraalVM in the user guide for more information. |
11 Repository
You can find the source code of this project in this repository: