Class ReactorReactivePublisher

java.lang.Object
io.micronaut.rabbitmq.reactive.ReactorReactivePublisher
All Implemented Interfaces:
ReactivePublisher

@Internal @EachBean(ChannelPool.class) public class ReactorReactivePublisher extends Object implements ReactivePublisher
A reactive publisher implementation that uses a single channel per publish operation and returns a Reactor Mono.

This is an internal API and may change at any time

Since:
3.0.0
Author:
James Kleeh, Iván López
  • Constructor Details

    • ReactorReactivePublisher

      public ReactorReactivePublisher(@Parameter ChannelPool channelPool, @Parameter RabbitConnectionFactoryConfig config)
      Default constructor.
      Parameters:
      channelPool - The channel pool to retrieve channels
      config - Any configuration used in building the publishers
  • Method Details

    • publishAndConfirm

      public reactor.core.publisher.Mono<Void> publishAndConfirm(RabbitPublishState publishState)
      Description copied from interface: ReactivePublisher
      Publish the message with the provided arguments and return a reactive type that completes successfully when the broker acknowledged the message.
      Specified by:
      publishAndConfirm in interface ReactivePublisher
      Parameters:
      publishState - The RabbitMQ publishing data
      Returns:
      The publisher
    • publish

      public reactor.core.publisher.Mono<Void> publish(RabbitPublishState publishState)
      Description copied from interface: ReactivePublisher
      Publish the message with the provided arguments and return a reactive type that completes successfully when the message is published.
      Specified by:
      publish in interface ReactivePublisher
      Parameters:
      publishState - The RabbitMQ publishing data
      Returns:
      The publisher
    • publishAndReply

      public reactor.core.publisher.Flux<RabbitConsumerState> publishAndReply(RabbitPublishState publishState)
      Description copied from interface: ReactivePublisher
      Publish the message with the provided arguments and return a reactive type that completes successfully when the reply is received from the reply to queue.
      Specified by:
      publishAndReply in interface ReactivePublisher
      Parameters:
      publishState - The RabbitMQ publishing data
      Returns:
      The publisher of the received reply
    • getChannel

      protected reactor.core.publisher.Mono<com.rabbitmq.client.Channel> getChannel()
      Creates a Mono from a channel, emitting an error if the channel could not be retrieved.
      Returns:
      A Mono that emits the channel on success
    • publishInternal

      protected reactor.core.publisher.Mono<Object> publishInternal(com.rabbitmq.client.Channel channel, RabbitPublishState publishState)
      Publishes the message to the channel. The Mono returned from this method should ensure the ConfirmListener is added to the channel prior to publishing and the ConfirmListener is removed from the channel after the publish has been acknowledged.
      Parameters:
      channel - The channel to publish the message to
      publishState - The publishing state
      Returns:
      A completable that terminates when the publish has been acknowledged
      See Also:
      • Channel.basicPublish(String, String, AMQP.BasicProperties, byte[])
    • publishRpcInternal

      protected reactor.core.publisher.Mono<RabbitConsumerState> publishRpcInternal(com.rabbitmq.client.Channel channel, RabbitPublishState publishState)
      Publishes the message to the channel. The Mono returned from this method should ensure the ConfirmListener is added to the channel prior to publishing and the ConfirmListener is removed from the channel after the publish has been acknowledged.
      Parameters:
      channel - The channel to publish the message to
      publishState - The publishing state
      Returns:
      A completable that terminates when the publish has been acknowledged
      See Also:
      • Channel.basicPublish(String, String, AMQP.BasicProperties, byte[])
    • publishInternalNoConfirm

      protected reactor.core.publisher.Mono<Object> publishInternalNoConfirm(com.rabbitmq.client.Channel channel, RabbitPublishState publishState)
      Publishes the message to the channel. The Mono returned from this method should ensure the ConfirmListener is added to the channel prior to publishing and the ConfirmListener is removed from the channel after the publish has been acknowledged.
      Parameters:
      channel - The channel to publish the message to
      publishState - The publishing state
      Returns:
      A completable that terminates when the publish has been acknowledged
      See Also:
      • Channel.basicPublish(String, String, AMQP.BasicProperties, byte[])
    • initializePublish

      protected reactor.core.publisher.Mono<com.rabbitmq.client.Channel> initializePublish(com.rabbitmq.client.Channel channel)
      Initializes the channel to allow publisher acknowledgements.
      Parameters:
      channel - The channel to enable acknowledgements.
      Returns:
      A Mono that will complete according to the success of the operation.
    • returnChannel

      protected void returnChannel(com.rabbitmq.client.Channel channel)
      Removes confirm listeners from the channel and returns the channel to the pool.
      Parameters:
      channel - The channel to clean and return
    • createListener

      protected reactor.core.Disposable createListener(com.rabbitmq.client.Channel channel, reactor.core.publisher.MonoSink<Object> emitter, RabbitPublishState publishState)
      Listens for ack/nack from the broker. The listener auto disposes itself after receiving a response from the broker. If no response is received, the caller is responsible for disposing the listener.
      Parameters:
      channel - The channel to listen for confirms
      emitter - The emitter to send the event
      publishState - The publishing state
      Returns:
      A disposable to allow cleanup of the listener
    • createConsumer

      protected reactor.core.Disposable createConsumer(com.rabbitmq.client.Channel channel, RabbitPublishState publishState, String correlationId, reactor.core.publisher.MonoSink<RabbitConsumerState> emitter) throws IOException
      Listens for ack/nack from the broker. The listener auto disposes itself after receiving a response from the broker. If no response is received, the caller is responsible for disposing the listener.
      Parameters:
      channel - The channel to listen for confirms
      publishState - The publish state
      correlationId - The correlation id
      emitter - The emitter to send the response
      Returns:
      A disposable to allow cleanup of the listener
      Throws:
      IOException - If an error occurred subscribing