Micronaut Pulsar

Integration between Micronaut and Apache Pulsar

Version: 2.5.0

1 Introduction

This project includes integration between Micronaut framework and Apache Pulsar. It tries to simplify configuration and replace building consumers, producers, and readers with annotation based approach.

2 Release History

For this project, you can find a list of releases (with release notes) here:

2.0.0

  • Java 17 baseline

  • Micronaut 4.0.0 minimum version

  • Remove shutdown on subscribe error

  • Bug fixes

  • Apache Pulsar Java Client dependency 3.0.0

1.3.0

  • Dependency updates

  • Micronaut 3.8.0

1.2.0

  • Added multitenancy support module

1.1.0

  • Micronaut 3.2.3 minimum version

  • Using new JSON mapper

  • Key-Value messages support

  • Message Headers support

  • Protobuf native messages support using micronaut protobuf dependency

  • Apache Pulsar Java Client dependency 2.9.1

1.0.0

  • Micronaut 2.1.3 minimum version

  • Authorization via JWT

  • Service URL Provider resolver

  • Basic support for consumers

  • Basic support for producers

  • Basic support for reader

3 Quick Start

To configure the Apache Pulsar client, first add the micronaut-pulsar module dependency:

implementation("io.micronaut.pulsar:micronaut-pulsar")
<dependency>
    <groupId>io.micronaut.pulsar</groupId>
    <artifactId>micronaut-pulsar</artifactId>
</dependency>
NOTE: default serializers & deserializers are JSON and will use mn-serde module so don’t forget to include those as well if you will not switch manually to BYTES or other parsers
implementation("io.micronaut.serde:micronaut-serde-jackson")
<dependency>
    <groupId>io.micronaut.serde</groupId>
    <artifactId>micronaut-serde-jackson</artifactId>
</dependency>

Then configure the URI of the Pulsar cluster or standalone server to communicate with:

Configuring pulsar.service-url
pulsar.service-url=pulsar://localhost:6650
pulsar:
  service-url: pulsar://localhost:6650
[pulsar]
  service-url="pulsar://localhost:6650"
pulsar {
  serviceUrl = "pulsar://localhost:6650"
}
{
  pulsar {
    service-url = "pulsar://localhost:6650"
  }
}
{
  "pulsar": {
    "service-url": "pulsar://localhost:6650"
  }
}
As this module is based on the official Java client from Apache Pulsar, see the official documentation for detailed information on service URL format.

Alternatively, pulsar.service-url-provider can be set using either @kotlinexample.PulsarServiceUrlProvider or by setting the provider name:

pulsar.service-url-provider=BeanName
pulsar:
  service-url-provider: BeanName
[pulsar]
  service-url-provider="BeanName"
pulsar {
  serviceUrlProvider = "BeanName"
}
{
  pulsar {
    service-url-provider = "BeanName"
  }
}
{
  "pulsar": {
    "service-url-provider": "BeanName"
  }
}

in which case the implementing class must be annotated with @Named with value equal to the one in YAML. In both cases, the bean must implement the org.apache.pulsar.client.api.ServiceUrlProvider interface.

After configuring the Pulsar cluster URL, the Micronaut Pulsar module will be able to produce a bean of type org.apache.pulsar.client.api.PulsarClient. This bean will be a Singleton from which all producers and consumers can be created. Since PulsarClient supports an URL provider which can switch URLs to clusters on demand, there’s no need to have multiple clients.

4 Pulsar consumers

Creating consumers

To create a consumer annotate any bean method with a @PulsarConsumer and define a topic. Methods that are part of beans annotated with a @PulsarSubscription will try to fetch subscription name from it. Otherwise, make sure to put subscriptionName property value in the @PulsarConsumer annotation. Subscription beans are singletons.

package example;

import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
public class ConsumerProducer { // (2)
    @PulsarConsumer(topic = "persistent://public/default/messages-java-docs", consumerName = "shared-consumer-jtester") // (3)
    public void messagePrinter(String message) { // (4)
        try {
            String changed = report(message).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        //...
    }


    @PulsarProducer(topic = "persistent://public/default/reports-java-docs", producerName = "report-producer-java") // (5)
    public CompletableFuture<String> report(String message) { // (6)
        return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
    }
}
package example;

import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
class ConsumerProducer { // (2)
    @PulsarConsumer(topic = "persistent://public/default/messages-groovy-docs", consumerName = "shared-consumer-gtester") // (3)
    void messagePrinter(String message) { // (4)
        try {
            String changed = report(message).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        //...
    }


    @PulsarProducer(topic = "persistent://public/default/reports-groovy-docs", producerName = "report-producer-groovy") // (5)
    CompletableFuture<String> report(String message) { // (6)
        return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
    }
}
package example

import io.micronaut.pulsar.annotation.PulsarConsumer
import io.micronaut.pulsar.annotation.PulsarProducer
import io.micronaut.pulsar.annotation.PulsarSubscription
import org.apache.pulsar.client.api.SubscriptionType

@PulsarSubscription(subscriptionName = "pulsar-ktest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
open class ConsumerProducer { // (2)

    @PulsarConsumer(topic = "persistent://public/default/messages-kotlin-docs", consumerName = "shared-consumer-ktester") // (3)
    suspend fun messagePrinter(message: String) { // (4)
        val changed = report(message)
        //...
    }


    @PulsarProducer(topic = "persistent://public/default/reports-kotlin-docs", producerName = "report-producer-kotlin") // (5)
    open suspend fun report(message: String): String { // (6)
        return "Reporting message '$message'" // (7)
    }
}
1 The class holding consumers can be annotated with api:pulsar.annotation.PulsarSubscription. It’s also allowed to set subscription from the @PulsarConsumer. By that, consumer can be located in other beans like @Singleton.
2 -
3 Methods that will process the message, in other words consumers, must be annotated with @PulsarConsumer and one of the topic options must be specified.
4 Using CompletableFeature, reactor-core, RxJava return types, or Kotlin suspend, allows the body of the method to be executed in an async manner. Method invocations will be async always as underlying Pulsar library uses CompletableFuture. Async return type is mostly for developers control over execution flow from the method body or external callers
5 -
6 -
7 -

Consumer method

Simple approach is to provide just 1 parameter in the method arguments list which will default to it being detected as a message body (payload). This can also resolve Message<> type from Pulsar and thus detect whether to inject parsed body or to inject the whole Pulsar message. In case of multiple argument it’s important to annotate argument which will be used for passing message payload with @MessageBody as well as other parameters with one of the MessageKey (message key) or @MessageProperties of type Map<String,String> which represents all message properties/headers to be injected. For headers listed as method arguments use @MessageHeader(*headerName) on each argument representing message header. Due to Pulsar underlying library headers must be strings (and their corresponding keys).

Don’t confuse @MessageHeader*s* with @MessageProperties. Former is used by Micronaut messaging and not applicable to method argument thus latter was created to support mapping all headers/properties on a single method argument. Still, @MessageHeader() from the same package (messaging) was used for method argument, which maps single header value, to utilize as much existing annotations.

Extra parameter of org.apache.pulsar.client.api.Consumer type does not need any annotation. It is a consumer passed by default from underlying Pulsar library. If not present on the method annotated with @PulsarConsumer it will be omitted.

This is especially important for messages that should be of KeyValue type.

Using @PulsarSubscription annotation

This annotation marks a class as a singleton bean that contains one or more methods serving as Pulsar consumers under same subscription.

Non-required properties:

  • name, which sets the Pulsar subscription name to a custom string; recommended to always set manually

  • type, one of Pulsar’s subscription types. Subscription types can be read in Apache Pulsar official documentation.

  • ackGroupTimeout, the acknowledgment group timeout for shared subscriptions

Type and acknowledgment group timeout default to Pulsar Java client library values if not set. Name will be generated by this module in a "counter manner" (pulsar-subscription-1,2,3,4…​).

Property Type Default Required Description

name

java.lang.String

No

User-specified name, or leave blank to let the Pulsar module generate the name

type

org.apache.pulsar.client.api.SubscriptionType

Exclusive

No

Default as in Pulsar official Java library

ackGroupTimeout

java.lang.String

No

Must be a Duration-parsable string.

Using @PulsarConsumer annotation

This is a method annotation. To use it you must specify one of the value, topic, topics, topicsPattern. Properties are processed in order topic (alias value), topics, topicsPattern so setting more than one will ignore rest depending on the order. Other properties might be omitted, however it’s good to always specify consumer name manually as it can be used later for debugging or injection points for Pulsar Consumer<T> which are generated for each method.

Property Type Default Required Description

topics

java.lang.String[]

Yes*

Required unless topicsPattern is specified. Has priority over topicsPattern

topicsPattern

java.lang.String

Yes*

Required unless topics is specified. Regex for listening to multiple topics.

schema

MessageSchema

MessageSchema.BYTES or MessageSchema.JSON*

No

If body is different from byte[] but MessageSchema is Byte, will default to JSON.

keySchema

MessageSchema

MessageSchema.BYTES or MessageSchema.JSON*

No

If message is of key-value type this must be set or default will be used with same resolution style as the schema

keyEncoding

KeyValueEncodingType

KeyValueEncodingType.INLINE

No

If message is of key-value type it is frequent that the key is sent separately from the payload in which case this value should be set to KeyValueEncodingType.SEPARATED; otherwise Pulsar will try to extract key from the message payload (body)

consumerName

java.lang.String

No

Consumer name, not required; will be generated automatically if missing

subscriptionTopicsMode

org.apache.pulsar.client.api.RegexSubscriptionMode

No

If topicsPattern is set, this can be one of PersistentOnly, NonPersistentOnly, or AllTopics. Not required, set by Pulsar library if not defined.

subscribeAsync

boolean

true

No

Whether to use async when reading Pulsar messages.

patternAutoDiscoveryPeriod

int

No

Time delay in seconds after which regex subscriptions should seek new topics.

maxRetriesBeforeDlq

int

16 - Pulsar library default

No

Maximum attempts before sending failed message to DLQ

deadLetterTopic

String

topic name + '-DLQ' Pulsar library default

No

DLQ topic name, if not set will let pulsar decide

When using topicsPattern, be sure to create topics before the consumer is started, since Pulsar refresh on new topics tends to take a long time by default.
When using PulsarConsumer with specific schema other than byte[], make sure topics are created and have the same schema the consumer is expecting, especially when using pattern consumer which listens to multiple topics. Otherwise, the consumer might not connect and could throw org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException.

Consumer names

Consumer names are important but can be left out. This will trigger sequential assignment of names in pattern of pulsar-cosumer-#. Default name assignment will start from 10 and increase with consumer number.

Apache Pulsar expects unique names for each consumer within the subscription. To avoid issues in shared subscription model, where multiple instances of same micronaut messaging application is deployed, use expressions in consumer annotation. Example: ${pulsarapp.myTestSubscription.consumerXyz.name} then have such property in you application.yml (or other ways). This means auto-naming for consumers will break at some point as well as hardcoded names in annotations, so please switch to expressions and assign names via properties dynamically to be able to do something like value injection via CI/CD or ENV variables.

Dead Letter Queue

Pulsar library uses DLQ for Shared and Key_Shared subscriptions. It also configures DLQ topic name as well as maximum amount of times for redelivery before putting message for DLQ. To override default behavior please use configuration properties use-dead-letter-queue and default-max-retry-dlq. These properties indicate that the DLQ settings will be used by default on all Pulsar consumers during their creation if not explicitly set through annotation.

5 Pulsar readers

Readers

Pulsar supports both "Consumers" and "Readers". More can be read in their documentation

Creating readers

To initialize a reader, declare a field annotated with @PulsarReader inside any bean or as a constructor argument.

package example;

import io.micronaut.pulsar.annotation.PulsarReader;
import jakarta.inject.Singleton;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Reader;

import java.util.concurrent.CompletableFuture;


@Singleton
public class ReaderExample {

    @PulsarReader(value = "persistent://public/default/messages", readerName = "simple-j-reader") // (1)
    private Reader<String> reader; // (2)

    public CompletableFuture<Message<String>> readNext() { // (3)
        return reader.readNextAsync(); // (4)
    }
}
package example;

import io.micronaut.pulsar.annotation.PulsarReader;
import jakarta.inject.Singleton;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Reader;

import java.util.concurrent.CompletableFuture;


@Singleton
class ReaderExample {

    @PulsarReader(value = "persistent://public/default/messages", readerName = "simple-g-reader") // (1)
    private Reader<String> reader // (2)

    CompletableFuture<Message<String>> readNext() { // (3)
        return reader.readNextAsync() // (4)
    }
}
package example

import io.micronaut.pulsar.annotation.PulsarReader
import jakarta.inject.Singleton
import kotlinx.coroutines.future.await
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.Reader

@Singleton
class ReaderExample {
    @PulsarReader(value = "persistent://public/default/messages", readerName = "simple-k-reader") // (1)
    private lateinit var reader: Reader<String> // (2)

    suspend fun readNext(): Message<String> { // (3)
        return reader.readNextAsync().await() // (4)
    }
}
1 Reader annotation with the topic and the reader name
2 Reader must be of type api:org.apache.pulsar.client.api.Reader
3 Using readAsync requires CompletableFeature or in Kotlin awaiting is possible
4 Calling the read will move the cursor to the next message or give null in case there are no more messages

Reader name can be autogenerated but topic argument must be set. Reader injections cause blocking behaviour as Reader instances will start creation and wait until Pulsar replies with a successful status. Reader name defaults to field or argument name for injection points or method name for method annotated reader.

KeyValue readers

In order to use KeyValue with a reader, reader argument type must be of org.apache.pulsar.common.schema.KeyValue. It is possible then to set attributes in @PulsarReader like keyType which defines what type of data serializer will be used to extract key, and keyEncoding which defines how will the key be extracted: INLINE - key is part of the message payload or SEPARATED - key is stored as a message key.

6 Pulsar producers

Creating producers

To initialize a producer it’s sufficient to annotate an interface with @PulsarProducerClient and annotate methods with @PulsarProducer.

package example;

import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarProducerClient;
import org.apache.pulsar.client.api.MessageId;

import java.util.concurrent.CompletableFuture;

@PulsarProducerClient // (1)
public interface Producer {
    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "kotlin-test-producer") // (2)
    CompletableFuture<MessageId> send(String message); // (3)

    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "b-kotlin-test-producer")
    void sendBlocking(String message); // (4)
}
package example;

import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarProducerClient;
import org.apache.pulsar.client.api.MessageId;

import java.util.concurrent.CompletableFuture;

@PulsarProducerClient // (1)
interface Producer {
    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "kotlin-test-producer") // (2)
    CompletableFuture<MessageId> send(String message); // (3)

    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "b-kotlin-test-producer")
    void sendBlocking(String message); // (4)
}
package example

import io.micronaut.pulsar.annotation.PulsarProducer
import io.micronaut.pulsar.annotation.PulsarProducerClient
import org.apache.pulsar.client.api.MessageId

@PulsarProducerClient // (1)
interface Producer {
    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "kotlin-test-producer") // (2)
    suspend fun send(message: String): MessageId // (3)
    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "b-kotlin-test-producer")
    fun sendBlocking(message: String) // (4)
}
1 Annotate interface with @PulsarProducerClient to notify Micronaut for processing it without implementation
2 Methods are the actual producers so annotate them with @PulsarProducer
3 Return decides whether to send message in a blocking or non-blocking manner.
4 A blocking send that waits until message is successfully sent or throws an exception on failure.

Producers can be used in other types of beans as well (Singleton or such).

Producer method

It’s important to note that if the method contains more than 1 argument, @MessageBody must be specified on argument intended for message body as well as @MessageKey, @MessageProperties, @MessageHeader depending on desired mapping.

As with Consumers it’s important to spot @MessageProperties for collection of headers value but @MessageHeader for single header mapping on method argument. More details in consumer warning.

Producer return values

Not counting wrappers for async behaviour (CompletableFuture, RxJava, reactor cor), abstract methods can only have 2 return types: void or MessageId since nothing more than sending will be done.

Non-abstract methods can have any return type except for MessageId in the sense that it will not be filled by Pulsars resulting value. This is because return value must be defined within the method body so there is no reliable way to know what was the intended return value - was it MessageID from the Pulsar or was it some other MessageID value generated within the body.

Methods can be invoked before or after sending the message by setting the property sendBefore to true or false respectively. Default is false which will execute method invoke message body before calling send message part. In async approach behaviour is unpredicted as calling the method and sending the message has no blocking thus message may still be sent before the execution of the method finishes or even starts properly because those can run on separate threads. This is mainly useful for blocking approach where it’s possible to execute the method body before sending the message an if execution throws exception, sending will be skipped. First parameter of the method is still used as a message body.

package example;

import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
public class ConsumerProducer { // (2)
    @PulsarConsumer(topic = "persistent://public/default/messages-java-docs", consumerName = "shared-consumer-jtester") // (3)
    public void messagePrinter(String message) { // (4)
        try {
            String changed = report(message).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        //...
    }


    @PulsarProducer(topic = "persistent://public/default/reports-java-docs", producerName = "report-producer-java") // (5)
    public CompletableFuture<String> report(String message) { // (6)
        return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
    }
}
package example;

import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
class ConsumerProducer { // (2)
    @PulsarConsumer(topic = "persistent://public/default/messages-groovy-docs", consumerName = "shared-consumer-gtester") // (3)
    void messagePrinter(String message) { // (4)
        try {
            String changed = report(message).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        //...
    }


    @PulsarProducer(topic = "persistent://public/default/reports-groovy-docs", producerName = "report-producer-groovy") // (5)
    CompletableFuture<String> report(String message) { // (6)
        return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
    }
}
package example

import io.micronaut.pulsar.annotation.PulsarConsumer
import io.micronaut.pulsar.annotation.PulsarProducer
import io.micronaut.pulsar.annotation.PulsarSubscription
import org.apache.pulsar.client.api.SubscriptionType

@PulsarSubscription(subscriptionName = "pulsar-ktest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
open class ConsumerProducer { // (2)

    @PulsarConsumer(topic = "persistent://public/default/messages-kotlin-docs", consumerName = "shared-consumer-ktester") // (3)
    suspend fun messagePrinter(message: String) { // (4)
        val changed = report(message)
        //...
    }


    @PulsarProducer(topic = "persistent://public/default/reports-kotlin-docs", producerName = "report-producer-kotlin") // (5)
    open suspend fun report(message: String): String { // (6)
        return "Reporting message '$message'" // (7)
    }
}
1 For Kotlin, open is required in non-interface (abstract) classes because of AOT
2 -
3 -
4 -
5 Annotating method as a producer
6 A non-abstract async method. In Kotlin case we need open to allow Micronaut AOT interception.
7 CompletableFuture or suspend will trigger immediate execution but reactor or RxJava need subscription and will rely on caller or upper layers to provide it.

7 Authentication

JWT

To configure JWT authentication it’s sufficient to specify a JWT in application.yml under pulsar.authentication-jwt, e.g.:

pulsar.authentication.jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX...
pulsar:
  authentication:
    jwt: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX...
[pulsar]
  [pulsar.authentication]
    jwt="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX..."
pulsar {
  authentication {
    jwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX..."
  }
}
{
  pulsar {
    authentication {
      jwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX..."
    }
  }
}
{
  "pulsar": {
    "authentication": {
      "jwt": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX..."
    }
  }
}

io.micronaut.configuration.pulsar.config.AbstractPulsarConfiguration is left public in case you want to implement more options yourself.

It’s planned to support specifying a JWT path, either an HTTP endpoint or URL to retrieve a fresh JWT.

OAuth2

To use OAuth2 authentication, configure three parameters like in the example below to connect to a Pulsar service that uses OAuth2:

pulsar.oauth-issuer-url=https://some-sso.io/...
pulsar.oauth-credentials-url=file:///path/to/file.json
pulsar.audience=localhost:6650
pulsar:
    oauth-issuer-url: https://some-sso.io/...
    oauth-credentials-url: file:///path/to/file.json
    audience: localhost:6650
[pulsar]
  oauth-issuer-url="https://some-sso.io/..."
  oauth-credentials-url="file:///path/to/file.json"
  audience="localhost:6650"
pulsar {
  oauthIssuerUrl = "https://some-sso.io/..."
  oauthCredentialsUrl = "file:///path/to/file.json"
  audience = "localhost:6650"
}
{
  pulsar {
    oauth-issuer-url = "https://some-sso.io/..."
    oauth-credentials-url = "file:///path/to/file.json"
    audience = "localhost:6650"
  }
}
{
  "pulsar": {
    "oauth-issuer-url": "https://some-sso.io/...",
    "oauth-credentials-url": "file:///path/to/file.json",
    "audience": "localhost:6650"
  }
}

The issuer URL is the URL to the OAuth2 server generating tokens and such.

Parameter oauth-credentials-url is used for reading a file containing necessary configuration for authenticating to the OAuth2 provider as a client app. The file should contain everything defined in Pulsar documentation, and type is limited to client_credentials. Below is an example of what the file should look like.

{
    "type": "client_credentials",
    "client_id": "pulsar",
    "client_secret": "1234-abcd-5678-9011-ab56ac4564sa56",
    "issuer_url": "https://my-oauth2-server.com/auth/realms/pulsar-realm"
}

The example shows something similar that could be set when using KeyCloak or a similar SSO. Audience must be set, but Pulsar service can be configured to ignore this value, and in such case you might put any text with one or more characters. For more details, see the Apache Pulsar documentation. Consult Java client to understand more about limitations for client_credentials.

The Pulsar library takes care of the OAuth2 JWT refreshing and such. The Pulsar server must be configured to use OAuth2 and role attribute in its configuration files must be specified for Pulsar to be able to detect which kind of "user" is requesting data.

8 Transport encryption

Transport encryption

To use transport encryption you must configure proper connection string as well as provide necessary information about certificate path, ciphers, and protocols as shown in example below.

pulsar.service-url=pulsar+ssl://localhost:6651
pulsar.tls-cert-file-path=path/to/cert.pem
pulsar.tls-ciphers[0]=TLS_RSA_WITH_AES_256_GCM_SHA384
pulsar.tls-ciphers[1]=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
pulsar.tls-protocols[0]=TLSv1.3
pulsar.tls-protocols[1]=TLSv1.2
pulsar:
    service-url: 'pulsar+ssl://localhost:6651'
    tls-cert-file-path: 'path/to/cert.pem'
    tls-ciphers:
        - TLS_RSA_WITH_AES_256_GCM_SHA384
        - TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
    tls-protocols:
        - TLSv1.3
        - TLSv1.2
[pulsar]
  service-url="pulsar+ssl://localhost:6651"
  tls-cert-file-path="path/to/cert.pem"
  tls-ciphers=[
    "TLS_RSA_WITH_AES_256_GCM_SHA384",
    "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"
  ]
  tls-protocols=[
    "TLSv1.3",
    "TLSv1.2"
  ]
pulsar {
  serviceUrl = "pulsar+ssl://localhost:6651"
  tlsCertFilePath = "path/to/cert.pem"
  tlsCiphers = ["TLS_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"]
  tlsProtocols = ["TLSv1.3", "TLSv1.2"]
}
{
  pulsar {
    service-url = "pulsar+ssl://localhost:6651"
    tls-cert-file-path = "path/to/cert.pem"
    tls-ciphers = ["TLS_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"]
    tls-protocols = ["TLSv1.3", "TLSv1.2"]
  }
}
{
  "pulsar": {
    "service-url": "pulsar+ssl://localhost:6651",
    "tls-cert-file-path": "path/to/cert.pem",
    "tls-ciphers": ["TLS_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"],
    "tls-protocols": ["TLSv1.3", "TLSv1.2"]
  }
}

It’s necessary to ensure that both server and client certificates match with supported ciphers and protocols. By default hostname is not verified but can be enabled by using tls-verify-hostname: true

TLS as authentication method is not yet supported.

9 Protocol Buffers

Protocol buffers support

In order to allow protocol buffer native messaging support you need to include support for protocol buffers google from micronaut dependencies

implementation("io.micronaut:micronaut-protobuff-support")
<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-protobuff-support</artifactId>
</dependency>

10 Multitenancy

Multitenancy

Micronaut has a support for multitenancy and so does Apache Pulsar. There is a bridge now between these two which allows you to switch pulsar tenant based upon micronaut tenant in given context. This will utilize micronaut tenant resolver to instantiate new consumers, producer, or readers for multi tenant approach.

In order to use it both dependencies must be added: Micronaut Multitenancy

implementation("io.micronaut:micronaut-multitenancy")
<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-multitenancy</artifactId>
</dependency>
And Micronaut Pulsar Multitenant module
implementation("io.micronaut:pulsar-multitenant")
<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>pulsar-multitenant</artifactId>
</dependency>

Configuration for multitenant listeners / producers

To make a reader, consumer, or a producer multitenant simply use ${tenant} in topic value. For example: "persistent://${tenant}/public/default". This will indicate to Pulsar module that the client needs to resolve tenant name prior to instantiation.

Dynamic vs fixed tenant names

A good use case for this module is orchestration of messaging clients that use same workflow thus same namespaces and topics but different tenants. In such case it’s good to have SystemPropertyTenantResolver or FixedTenantResolver setup in Micronaut application which will always use one single tenant per application instance. This will make it possible to use same code for different tenants by simply specifying tenant-id before deploying the application, and it also makes it possible to individually scale messaging clients based upon tenant or simply automating deployment of messaging clients with the creation of tenants in the system.

Tenant names are resolved by Micronaut Multitenancy module. This module has coupe of out-of-the-box resolvers for tenant names, but we can divide them in 2 groups: fixed / static & dynamic.

Static / Fixed tenant name resolution

Fixed ones would be FixedTenantResolver & SystemPropertyTenantResolver. Latter one is usually used for testing (resolves tenant name only from system property named tenantId) while former is reading property value from the properties file. Since properties file can be used to inject ENV variables or such it’s covering the both purposes. Such approach would be the most reasonable given that each instance of this application would then only read data sent to only 1 Apache Pulsar tenant which would provide further data isolation, and independent scaling capabilities based on tenants.

Dynamic tenant name resolution

Dynamic group would be resolving tenant name through cookies, http headers, or any such way. This means that there’s no reasonable way for Micronaut messaging application that uses Pulsar module to figure out what tenants should it listen or produce to. For producers, it’s much easier assuming that each request to produce a message would probably be linked to some kind of request that would contain tenant description. Similar could be said for readers as sometimes we want to read data to fulfill a request by enriching information or just simply reading values from streams. Both cases would be easy implement given that there’s some kind of context.

However, for consumers, we usually want to start them beforehand. Before any request is made, on the application boot. Also, we want producers, and readers sometimes to work independently of outside requests or any context that would bring in later needed name. For this reason dynamic approach is less likely but not uncommon.

BUT sometimes we want to provide tenant name in "reactive" style, where a new tenant is created across the whole system and a message is sent. Or a tenant is reported on-demand via HTTP like user adding tenant via web UI and expecting instant consumer activation. In such cases this module provides PulsarTenantDiscoveredEvent which can be fired up at any time. This will force creating consumers, producers, or readers specified as beans and instantly activate them for each new tenant detected in this event. Implementation tries to ensure that no "double" tenant listeners will be made within a single application instance. It means that if you fire up 2 of such events concurrently it should only create 1 consumer/producer/reader per specified bean.

It is also possible to create a bean that would read tenant names from the database and fire up mentioned event for each tenant name. This would make it somewhat of a "static via dynamic" approach. You fire up events as soon as DB query is ready and those are the only tenants that would be made through application lifecycle.

Combined approach?

In some cases it might be useful to separate readers, consumers, and producers and allow each one to resolve tenant names individually - like letting tenants for consumers be resolved on boot via DB or properties file while allowing producers & readers to be generated via some other HTTP header. Currently, this is not supported, and you would need to override multiple infrastructure classes to achieve this. Later it might be supported to via reserved keywords and/or configuration through application properties to allow each type or event specific instance to depend on a specific way of resolving tenant name.

11 Repository

You can find the source code of this project in this repository: