Package io.micronaut.rabbitmq.reactive
Class ReactorReactivePublisher
java.lang.Object
io.micronaut.rabbitmq.reactive.ReactorReactivePublisher
- All Implemented Interfaces:
ReactivePublisher
@Internal
@EachBean(ChannelPool.class)
public class ReactorReactivePublisher
extends Object
implements ReactivePublisher
A reactive publisher implementation that uses a single channel per publish
operation and returns a Reactor
Mono
.
This is an internal API and may change at any time
- Since:
- 3.0.0
- Author:
- James Kleeh, Iván López
-
Constructor Summary
ConstructorDescriptionReactorReactivePublisher
(ChannelPool channelPool, RabbitConnectionFactoryConfig config) Default constructor. -
Method Summary
Modifier and TypeMethodDescriptionprotected reactor.core.Disposable
createConsumer
(com.rabbitmq.client.Channel channel, RabbitPublishState publishState, String correlationId, reactor.core.publisher.MonoSink<RabbitConsumerState> emitter) Listens for ack/nack from the broker.protected reactor.core.Disposable
createListener
(com.rabbitmq.client.Channel channel, reactor.core.publisher.MonoSink<Object> emitter, RabbitPublishState publishState) Listens for ack/nack from the broker.protected reactor.core.publisher.Mono<com.rabbitmq.client.Channel>
Creates aMono
from a channel, emitting an error if the channel could not be retrieved.protected reactor.core.publisher.Mono<com.rabbitmq.client.Channel>
initializePublish
(com.rabbitmq.client.Channel channel) Initializes the channel to allow publisher acknowledgements.reactor.core.publisher.Mono<Void>
publish
(RabbitPublishState publishState) Publish the message with the provided arguments and return a reactive type that completes successfully when the message is published.reactor.core.publisher.Mono<Void>
publishAndConfirm
(RabbitPublishState publishState) Publish the message with the provided arguments and return a reactive type that completes successfully when the broker acknowledged the message.reactor.core.publisher.Flux<RabbitConsumerState>
publishAndReply
(RabbitPublishState publishState) Publish the message with the provided arguments and return a reactive type that completes successfully when the reply is received from the reply to queue.protected reactor.core.publisher.Mono<Object>
publishInternal
(com.rabbitmq.client.Channel channel, RabbitPublishState publishState) Publishes the message to the channel.protected reactor.core.publisher.Mono<Object>
publishInternalNoConfirm
(com.rabbitmq.client.Channel channel, RabbitPublishState publishState) Publishes the message to the channel.protected reactor.core.publisher.Mono<RabbitConsumerState>
publishRpcInternal
(com.rabbitmq.client.Channel channel, RabbitPublishState publishState) Publishes the message to the channel.protected void
returnChannel
(com.rabbitmq.client.Channel channel) Removes confirm listeners from the channel and returns the channel to the pool.
-
Constructor Details
-
ReactorReactivePublisher
public ReactorReactivePublisher(@Parameter ChannelPool channelPool, @Parameter RabbitConnectionFactoryConfig config) Default constructor.- Parameters:
channelPool
- The channel pool to retrieve channelsconfig
- Any configuration used in building the publishers
-
-
Method Details
-
publishAndConfirm
Description copied from interface:ReactivePublisher
Publish the message with the provided arguments and return a reactive type that completes successfully when the broker acknowledged the message.- Specified by:
publishAndConfirm
in interfaceReactivePublisher
- Parameters:
publishState
- The RabbitMQ publishing data- Returns:
- The publisher
-
publish
Description copied from interface:ReactivePublisher
Publish the message with the provided arguments and return a reactive type that completes successfully when the message is published.- Specified by:
publish
in interfaceReactivePublisher
- Parameters:
publishState
- The RabbitMQ publishing data- Returns:
- The publisher
-
publishAndReply
public reactor.core.publisher.Flux<RabbitConsumerState> publishAndReply(RabbitPublishState publishState) Description copied from interface:ReactivePublisher
Publish the message with the provided arguments and return a reactive type that completes successfully when the reply is received from the reply to queue.- Specified by:
publishAndReply
in interfaceReactivePublisher
- Parameters:
publishState
- The RabbitMQ publishing data- Returns:
- The publisher of the received reply
-
getChannel
protected reactor.core.publisher.Mono<com.rabbitmq.client.Channel> getChannel()Creates aMono
from a channel, emitting an error if the channel could not be retrieved.- Returns:
- A
Mono
that emits the channel on success
-
publishInternal
protected reactor.core.publisher.Mono<Object> publishInternal(com.rabbitmq.client.Channel channel, RabbitPublishState publishState) Publishes the message to the channel. TheMono
returned from this method should ensure theConfirmListener
is added to the channel prior to publishing and theConfirmListener
is removed from the channel after the publish has been acknowledged.- Parameters:
channel
- The channel to publish the message topublishState
- The publishing state- Returns:
- A completable that terminates when the publish has been acknowledged
- See Also:
-
Channel.basicPublish(String, String, AMQP.BasicProperties, byte[])
-
publishRpcInternal
protected reactor.core.publisher.Mono<RabbitConsumerState> publishRpcInternal(com.rabbitmq.client.Channel channel, RabbitPublishState publishState) Publishes the message to the channel. TheMono
returned from this method should ensure theConfirmListener
is added to the channel prior to publishing and theConfirmListener
is removed from the channel after the publish has been acknowledged.- Parameters:
channel
- The channel to publish the message topublishState
- The publishing state- Returns:
- A completable that terminates when the publish has been acknowledged
- See Also:
-
Channel.basicPublish(String, String, AMQP.BasicProperties, byte[])
-
publishInternalNoConfirm
protected reactor.core.publisher.Mono<Object> publishInternalNoConfirm(com.rabbitmq.client.Channel channel, RabbitPublishState publishState) Publishes the message to the channel. TheMono
returned from this method should ensure theConfirmListener
is added to the channel prior to publishing and theConfirmListener
is removed from the channel after the publish has been acknowledged.- Parameters:
channel
- The channel to publish the message topublishState
- The publishing state- Returns:
- A completable that terminates when the publish has been acknowledged
- See Also:
-
Channel.basicPublish(String, String, AMQP.BasicProperties, byte[])
-
initializePublish
protected reactor.core.publisher.Mono<com.rabbitmq.client.Channel> initializePublish(com.rabbitmq.client.Channel channel) Initializes the channel to allow publisher acknowledgements.- Parameters:
channel
- The channel to enable acknowledgements.- Returns:
- A
Mono
that will complete according to the success of the operation.
-
returnChannel
protected void returnChannel(com.rabbitmq.client.Channel channel) Removes confirm listeners from the channel and returns the channel to the pool.- Parameters:
channel
- The channel to clean and return
-
createListener
protected reactor.core.Disposable createListener(com.rabbitmq.client.Channel channel, reactor.core.publisher.MonoSink<Object> emitter, RabbitPublishState publishState) Listens for ack/nack from the broker. The listener auto disposes itself after receiving a response from the broker. If no response is received, the caller is responsible for disposing the listener.- Parameters:
channel
- The channel to listen for confirmsemitter
- The emitter to send the eventpublishState
- The publishing state- Returns:
- A disposable to allow cleanup of the listener
-
createConsumer
protected reactor.core.Disposable createConsumer(com.rabbitmq.client.Channel channel, RabbitPublishState publishState, String correlationId, reactor.core.publisher.MonoSink<RabbitConsumerState> emitter) throws IOException Listens for ack/nack from the broker. The listener auto disposes itself after receiving a response from the broker. If no response is received, the caller is responsible for disposing the listener.- Parameters:
channel
- The channel to listen for confirmspublishState
- The publish statecorrelationId
- The correlation idemitter
- The emitter to send the response- Returns:
- A disposable to allow cleanup of the listener
- Throws:
IOException
- If an error occurred subscribing
-