implementation("io.micronaut.pulsar:micronaut-pulsar")Micronaut Pulsar
Integration between Micronaut and Apache Pulsar
Version: 2.4.0
1 Introduction
This project includes integration between Micronaut framework and Apache Pulsar. It tries to simplify configuration and replace building consumers, producers, and readers with annotation based approach.
2 Release History
For this project, you can find a list of releases (with release notes) here:
2.0.0
- 
Java 17 baseline 
- 
Micronaut 4.0.0 minimum version 
- 
Remove shutdown on subscribe error 
- 
Bug fixes 
- 
Apache Pulsar Java Client dependency 3.0.0 
1.3.0
- 
Dependency updates 
- 
Micronaut 3.8.0 
1.2.0
- 
Added multitenancy support module 
1.1.0
- 
Micronaut 3.2.3 minimum version 
- 
Using new JSON mapper 
- 
Key-Value messages support 
- 
Message Headers support 
- 
Protobuf native messages support using micronaut protobuf dependency 
- 
Apache Pulsar Java Client dependency 2.9.1 
1.0.0
- 
Micronaut 2.1.3 minimum version 
- 
Authorization via JWT 
- 
Service URL Provider resolver 
- 
Basic support for consumers 
- 
Basic support for producers 
- 
Basic support for reader 
3 Quick Start
To configure the Apache Pulsar client, first add the micronaut-pulsar module dependency:
<dependency>
    <groupId>io.micronaut.pulsar</groupId>
    <artifactId>micronaut-pulsar</artifactId>
</dependency>implementation("io.micronaut.serde:micronaut-serde-jackson")<dependency>
    <groupId>io.micronaut.serde</groupId>
    <artifactId>micronaut-serde-jackson</artifactId>
</dependency>Then configure the URI of the Pulsar cluster or standalone server to communicate with:
pulsar.service-urlpulsar.service-url=pulsar://localhost:6650pulsar:
  service-url: pulsar://localhost:6650[pulsar]
  service-url="pulsar://localhost:6650"pulsar {
  serviceUrl = "pulsar://localhost:6650"
}{
  pulsar {
    service-url = "pulsar://localhost:6650"
  }
}{
  "pulsar": {
    "service-url": "pulsar://localhost:6650"
  }
}| As this module is based on the official Java client from Apache Pulsar, see the official documentation for detailed information on service URL format. | 
Alternatively, pulsar.service-url-provider can be set using either @kotlinexample.PulsarServiceUrlProvider or by setting the provider name:
pulsar.service-url-provider=BeanNamepulsar:
  service-url-provider: BeanName[pulsar]
  service-url-provider="BeanName"pulsar {
  serviceUrlProvider = "BeanName"
}{
  pulsar {
    service-url-provider = "BeanName"
  }
}{
  "pulsar": {
    "service-url-provider": "BeanName"
  }
}in which case the implementing class must be annotated with @Named with value equal to the one in YAML. In both cases, the bean must implement the org.apache.pulsar.client.api.ServiceUrlProvider interface.
After configuring the Pulsar cluster URL, the Micronaut Pulsar module will be able to produce a bean of type org.apache.pulsar.client.api.PulsarClient. This bean will be a Singleton from which all producers and consumers can be created. Since PulsarClient supports an URL provider which can switch URLs to clusters on demand, there’s no need to have multiple clients.
4 Pulsar consumers
Creating consumers
To create a consumer annotate any bean method with a @PulsarConsumer and define a topic. Methods that are part of
beans annotated with a @PulsarSubscription will try to fetch subscription name from it. Otherwise, make sure to put
subscriptionName property value in the @PulsarConsumer annotation. Subscription beans are singletons.
package example;
import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
public class ConsumerProducer { // (2)
    @PulsarConsumer(topic = "persistent://public/default/messages-java-docs", consumerName = "shared-consumer-jtester") // (3)
    public void messagePrinter(String message) { // (4)
        try {
            String changed = report(message).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        //...
    }
    @PulsarProducer(topic = "persistent://public/default/reports-java-docs", producerName = "report-producer-java") // (5)
    public CompletableFuture<String> report(String message) { // (6)
        return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
    }
}package example;
import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
class ConsumerProducer { // (2)
    @PulsarConsumer(topic = "persistent://public/default/messages-groovy-docs", consumerName = "shared-consumer-gtester") // (3)
    void messagePrinter(String message) { // (4)
        try {
            String changed = report(message).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        //...
    }
    @PulsarProducer(topic = "persistent://public/default/reports-groovy-docs", producerName = "report-producer-groovy") // (5)
    CompletableFuture<String> report(String message) { // (6)
        return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
    }
}package example
import io.micronaut.pulsar.annotation.PulsarConsumer
import io.micronaut.pulsar.annotation.PulsarProducer
import io.micronaut.pulsar.annotation.PulsarSubscription
import org.apache.pulsar.client.api.SubscriptionType
@PulsarSubscription(subscriptionName = "pulsar-ktest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
open class ConsumerProducer { // (2)
    @PulsarConsumer(topic = "persistent://public/default/messages-kotlin-docs", consumerName = "shared-consumer-ktester") // (3)
    suspend fun messagePrinter(message: String) { // (4)
        val changed = report(message)
        //...
    }
    @PulsarProducer(topic = "persistent://public/default/reports-kotlin-docs", producerName = "report-producer-kotlin") // (5)
    open suspend fun report(message: String): String { // (6)
        return "Reporting message '$message'" // (7)
    }
}| 1 | The class holding consumers can be annotated with api:pulsar.annotation.PulsarSubscription. It’s also allowed to set
subscription from the @PulsarConsumer. By that, consumer can be located in other beans like@Singleton. | 
| 2 | - | 
| 3 | Methods that will process the message, in other words consumers, must be annotated with @PulsarConsumerand one
of the topic options must be specified. | 
| 4 | Using CompletableFeature, reactor-core, RxJava return types, or Kotlin suspend, allows the body of the method to be executed in an async manner. Method invocations will be async always as underlying Pulsar library uses CompletableFuture. Async return type is mostly for developers control over execution flow from the method body or external callers | 
| 5 | - | 
| 6 | - | 
| 7 | - | 
Consumer method
Simple approach is to provide just 1 parameter in the method arguments list which will default to it being detected as a
message body (payload). This can also resolve Message<> type from Pulsar and thus detect whether to inject parsed body
or to inject the whole Pulsar message. In case of multiple argument it’s important to annotate argument which will be used
for passing message payload with @MessageBody as well as other parameters with one of the MessageKey (message key) or
@MessageProperties of type Map<String,String> which represents all message properties/headers to be injected. For headers listed
as method arguments use @MessageHeader(*headerName) on each argument representing message header. Due to Pulsar underlying
library headers must be strings (and their corresponding keys).
| Don’t confuse @MessageHeader*s*with@MessageProperties. Former is used by Micronaut messaging and not applicable
to method argument thus latter was created to support mapping all headers/properties on a single method argument. Still,@MessageHeader()from the same package (messaging) was used for method argument, which maps single header value, to
utilize as much existing annotations. | 
Extra parameter of org.apache.pulsar.client.api.Consumer type does not need any annotation. It is a consumer passed by
default from underlying Pulsar library. If not present on the method annotated with @PulsarConsumer it will be omitted.
This is especially important for messages that should be of KeyValue type.
Using @PulsarSubscription annotation
This annotation marks a class as a singleton bean that contains one or more methods serving as Pulsar consumers under same subscription.
Non-required properties:
- 
name, which sets the Pulsar subscription name to a custom string; recommended to always set manually
- 
type, one of Pulsar’s subscription types. Subscription types can be read in Apache Pulsar official documentation.
- 
ackGroupTimeout, the acknowledgment group timeout for shared subscriptions
Type and acknowledgment group timeout default to Pulsar Java client library values if not set. Name will be generated by this module in a "counter manner" (pulsar-subscription-1,2,3,4…).
| Property | Type | Default | Required | Description | 
|---|---|---|---|---|
| name | 
 | No | User-specified name, or leave blank to let the Pulsar module generate the name | |
| type | 
 | 
 | No | Default as in Pulsar official Java library | 
| ackGroupTimeout | 
 | No | Must be a  | 
Using @PulsarConsumer annotation
This is a method annotation. To use it you must specify one of the value, topic, topics, topicsPattern.
Properties are processed in order topic (alias value), topics, topicsPattern so setting more than one will ignore
rest depending on the order. Other properties might be omitted, however it’s good to always specify consumer name manually
as it can be used later for debugging or injection points for Pulsar Consumer<T> which are generated for each method.
| Property | Type | Default | Required | Description | 
|---|---|---|---|---|
| topics | 
 | Yes* | Required unless  | |
| topicsPattern | 
 | Yes* | Required unless  | |
| schema | 
 | 
 | No | If body is different from  | 
| keySchema | 
 | 
 | No | If message is of key-value type this must be set or default will be used with same resolution style as the schema | 
| keyEncoding | 
 | 
 | No | If message is of key-value type it is frequent that the key is sent separately from the payload in which case this value
should be set to  | 
| consumerName | 
 | No | Consumer name, not required; will be generated automatically if missing | |
| subscriptionTopicsMode | 
 | No | If  | |
| subscribeAsync | 
 | true | No | Whether to use async when reading Pulsar messages. | 
| patternAutoDiscoveryPeriod | 
 | No | Time delay in seconds after which regex subscriptions should seek new topics. | |
| maxRetriesBeforeDlq | 
 | 16 - Pulsar library default | No | Maximum attempts before sending failed message to DLQ | 
| deadLetterTopic | 
 | topic name + '-DLQ' Pulsar library default | No | DLQ topic name, if not set will let pulsar decide | 
| When using topicsPattern, be sure to create topics before the consumer is started, since Pulsar refresh on new
topics tends to take a long time by default. | 
| When using PulsarConsumer with specific schema other than byte[], make sure topics are created and have the same
schema the consumer is expecting, especially when using pattern consumer which listens to multiple topics. Otherwise, the consumer
might not connect and could throworg.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException. | 
Consumer names
Consumer names are important but can be left out. This will trigger sequential assignment of names in pattern of
pulsar-cosumer-#. Default name assignment will start from 10 and increase with consumer number.
| Apache Pulsar expects unique names for each consumer within the subscription. To avoid issues in shared subscription
model, where multiple instances of same micronaut messaging application is deployed, use expressions in consumer
annotation. Example: ${pulsarapp.myTestSubscription.consumerXyz.name}then have such property in you application.yml
(or other ways). This means auto-naming for consumers will break at some point as well as hardcoded
names in annotations, so please switch to expressions and assign names via properties dynamically to be able to do
something like value injection via CI/CD or ENV variables. | 
Dead Letter Queue
Pulsar library uses DLQ for Shared and Key_Shared subscriptions. It also configures DLQ topic name as well as maximum amount of times for redelivery before putting message for DLQ.
To override default behavior please use configuration properties use-dead-letter-queue and default-max-retry-dlq. These
properties indicate that the DLQ settings will be used by default on all Pulsar consumers during their creation if not explicitly set through annotation.
5 Pulsar readers
Readers
Pulsar supports both "Consumers" and "Readers". More can be read in their documentation
Creating readers
To initialize a reader, declare a field annotated with @PulsarReader inside any bean or as a constructor argument.
package example;
import io.micronaut.pulsar.annotation.PulsarReader;
import jakarta.inject.Singleton;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Reader;
import java.util.concurrent.CompletableFuture;
@Singleton
public class ReaderExample {
    @PulsarReader(value = "persistent://public/default/messages", readerName = "simple-j-reader") // (1)
    private Reader<String> reader; // (2)
    public CompletableFuture<Message<String>> readNext() { // (3)
        return reader.readNextAsync(); // (4)
    }
}package example;
import io.micronaut.pulsar.annotation.PulsarReader;
import jakarta.inject.Singleton;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Reader;
import java.util.concurrent.CompletableFuture;
@Singleton
class ReaderExample {
    @PulsarReader(value = "persistent://public/default/messages", readerName = "simple-g-reader") // (1)
    private Reader<String> reader // (2)
    CompletableFuture<Message<String>> readNext() { // (3)
        return reader.readNextAsync() // (4)
    }
}package example
import io.micronaut.pulsar.annotation.PulsarReader
import jakarta.inject.Singleton
import kotlinx.coroutines.future.await
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.Reader
@Singleton
class ReaderExample {
    @PulsarReader(value = "persistent://public/default/messages", readerName = "simple-k-reader") // (1)
    private lateinit var reader: Reader<String> // (2)
    suspend fun readNext(): Message<String> { // (3)
        return reader.readNextAsync().await() // (4)
    }
}| 1 | Reader annotation with the topic and the reader name | 
| 2 | Reader must be of type api:org.apache.pulsar.client.api.Reader | 
| 3 | Using readAsync requires CompletableFeatureor in Kotlin awaiting is possible | 
| 4 | Calling the read will move the cursor to the next message or give null in case there are no more messages | 
Reader name can be autogenerated but topic argument must be set. Reader injections cause blocking behaviour as
Reader instances will start creation and wait until Pulsar replies with a successful status. Reader name defaults
to field or argument name for injection points or method name for method annotated reader.
KeyValue readers
In order to use KeyValue with a reader, reader argument type must be of org.apache.pulsar.common.schema.KeyValue.
It is possible then to set attributes in @PulsarReader like keyType which defines what type of data
serializer will be used to extract key, and keyEncoding which defines how will the key be extracted: INLINE - key
is part of the message payload or SEPARATED - key is stored as a message key.
6 Pulsar producers
Creating producers
To initialize a producer it’s sufficient to annotate an interface with @PulsarProducerClient and annotate methods with @PulsarProducer.
package example;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarProducerClient;
import org.apache.pulsar.client.api.MessageId;
import java.util.concurrent.CompletableFuture;
@PulsarProducerClient // (1)
public interface Producer {
    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "kotlin-test-producer") // (2)
    CompletableFuture<MessageId> send(String message); // (3)
    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "b-kotlin-test-producer")
    void sendBlocking(String message); // (4)
}package example;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarProducerClient;
import org.apache.pulsar.client.api.MessageId;
import java.util.concurrent.CompletableFuture;
@PulsarProducerClient // (1)
interface Producer {
    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "kotlin-test-producer") // (2)
    CompletableFuture<MessageId> send(String message); // (3)
    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "b-kotlin-test-producer")
    void sendBlocking(String message); // (4)
}package example
import io.micronaut.pulsar.annotation.PulsarProducer
import io.micronaut.pulsar.annotation.PulsarProducerClient
import org.apache.pulsar.client.api.MessageId
@PulsarProducerClient // (1)
interface Producer {
    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "kotlin-test-producer") // (2)
    suspend fun send(message: String): MessageId // (3)
    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "b-kotlin-test-producer")
    fun sendBlocking(message: String) // (4)
}| 1 | Annotate interface with @PulsarProducerClient to notify Micronaut for processing it without implementation | 
| 2 | Methods are the actual producers so annotate them with @PulsarProducer | 
| 3 | Return decides whether to send message in a blocking or non-blocking manner. | 
| 4 | A blocking send that waits until message is successfully sent or throws an exception on failure. | 
Producers can be used in other types of beans as well (Singleton or such).
Producer method
It’s important to note that if the method contains more than 1 argument, @MessageBody must be specified on argument
intended for message body as well as @MessageKey, @MessageProperties, @MessageHeader depending on desired mapping.
| As with Consumers it’s important to spot @MessagePropertiesfor collection of headers value but@MessageHeaderfor single header mapping on method argument. More details in consumer warning. | 
Producer return values
Not counting wrappers for async behaviour (CompletableFuture, RxJava, reactor cor), abstract methods can only have 2 return types: void or MessageId since nothing more than sending will be done.
Non-abstract methods can have any return type except for MessageId in the sense that it will not be filled by Pulsars
resulting value. This is because return value must be defined within the method body so there is no reliable way to know
what was the intended return value - was it MessageID from the Pulsar or was it some other MessageID value generated
within the body.
Methods can be invoked before or after sending the message by setting the property sendBefore to true or false respectively.
Default is false which will execute method invoke message body before calling send message part.
In async approach behaviour is unpredicted as calling the method and sending the message has no blocking
thus message may still be sent before the execution of the method finishes or even starts properly because those can run
on separate threads. This is mainly useful for blocking approach where it’s possible to execute the method body
before sending the message an if execution throws exception, sending will be skipped. First parameter of the method is
still used as a message body.
package example;
import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
public class ConsumerProducer { // (2)
    @PulsarConsumer(topic = "persistent://public/default/messages-java-docs", consumerName = "shared-consumer-jtester") // (3)
    public void messagePrinter(String message) { // (4)
        try {
            String changed = report(message).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        //...
    }
    @PulsarProducer(topic = "persistent://public/default/reports-java-docs", producerName = "report-producer-java") // (5)
    public CompletableFuture<String> report(String message) { // (6)
        return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
    }
}package example;
import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
class ConsumerProducer { // (2)
    @PulsarConsumer(topic = "persistent://public/default/messages-groovy-docs", consumerName = "shared-consumer-gtester") // (3)
    void messagePrinter(String message) { // (4)
        try {
            String changed = report(message).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        //...
    }
    @PulsarProducer(topic = "persistent://public/default/reports-groovy-docs", producerName = "report-producer-groovy") // (5)
    CompletableFuture<String> report(String message) { // (6)
        return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
    }
}package example
import io.micronaut.pulsar.annotation.PulsarConsumer
import io.micronaut.pulsar.annotation.PulsarProducer
import io.micronaut.pulsar.annotation.PulsarSubscription
import org.apache.pulsar.client.api.SubscriptionType
@PulsarSubscription(subscriptionName = "pulsar-ktest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
open class ConsumerProducer { // (2)
    @PulsarConsumer(topic = "persistent://public/default/messages-kotlin-docs", consumerName = "shared-consumer-ktester") // (3)
    suspend fun messagePrinter(message: String) { // (4)
        val changed = report(message)
        //...
    }
    @PulsarProducer(topic = "persistent://public/default/reports-kotlin-docs", producerName = "report-producer-kotlin") // (5)
    open suspend fun report(message: String): String { // (6)
        return "Reporting message '$message'" // (7)
    }
}| 1 | For Kotlin, open is required in non-interface (abstract) classes because of AOT | 
| 2 | - | 
| 3 | - | 
| 4 | - | 
| 5 | Annotating method as a producer | 
| 6 | A non-abstract async method. In Kotlin case we need open to allow Micronaut AOT interception. | 
| 7 | CompletableFuture or suspend will trigger immediate execution but reactor or RxJava need subscription and will rely on caller or upper layers to provide it. | 
7 Authentication
JWT
To configure JWT authentication it’s sufficient to specify a JWT in application.yml under pulsar.authentication-jwt, e.g.:
pulsar.authentication.jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX...pulsar:
  authentication:
    jwt: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX...[pulsar]
  [pulsar.authentication]
    jwt="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX..."pulsar {
  authentication {
    jwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX..."
  }
}{
  pulsar {
    authentication {
      jwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX..."
    }
  }
}{
  "pulsar": {
    "authentication": {
      "jwt": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX..."
    }
  }
}io.micronaut.configuration.pulsar.config.AbstractPulsarConfiguration is left public in case you want to implement more options yourself.
It’s planned to support specifying a JWT path, either an HTTP endpoint or URL to retrieve a fresh JWT.
OAuth2
To use OAuth2 authentication, configure three parameters like in the example below to connect to a Pulsar service that uses OAuth2:
pulsar.oauth-issuer-url=https://some-sso.io/...
pulsar.oauth-credentials-url=file:///path/to/file.json
pulsar.audience=localhost:6650pulsar:
    oauth-issuer-url: https://some-sso.io/...
    oauth-credentials-url: file:///path/to/file.json
    audience: localhost:6650[pulsar]
  oauth-issuer-url="https://some-sso.io/..."
  oauth-credentials-url="file:///path/to/file.json"
  audience="localhost:6650"pulsar {
  oauthIssuerUrl = "https://some-sso.io/..."
  oauthCredentialsUrl = "file:///path/to/file.json"
  audience = "localhost:6650"
}{
  pulsar {
    oauth-issuer-url = "https://some-sso.io/..."
    oauth-credentials-url = "file:///path/to/file.json"
    audience = "localhost:6650"
  }
}{
  "pulsar": {
    "oauth-issuer-url": "https://some-sso.io/...",
    "oauth-credentials-url": "file:///path/to/file.json",
    "audience": "localhost:6650"
  }
}The issuer URL is the URL to the OAuth2 server generating tokens and such.
Parameter oauth-credentials-url is used for reading a file containing necessary configuration for authenticating to the OAuth2 provider as a client app. The file should contain everything defined in Pulsar documentation, and type is limited to client_credentials. Below is an example of what the file should look like.
{
    "type": "client_credentials",
    "client_id": "pulsar",
    "client_secret": "1234-abcd-5678-9011-ab56ac4564sa56",
    "issuer_url": "https://my-oauth2-server.com/auth/realms/pulsar-realm"
}The example shows something similar that could be set when using KeyCloak or a similar SSO. Audience must be set, but Pulsar service can be configured to ignore this value, and in such case you might put any text with one or more characters. For more details, see the Apache Pulsar documentation. Consult Java client to understand more about limitations for client_credentials.
The Pulsar library takes care of the OAuth2 JWT refreshing and such. The Pulsar server must be configured to use OAuth2 and role attribute in its configuration files must be specified for Pulsar to be able to detect which kind of "user" is requesting data.
8 Transport encryption
Transport encryption
To use transport encryption you must configure proper connection string as well as provide necessary information about certificate path, ciphers, and protocols as shown in example below.
pulsar.service-url=pulsar+ssl://localhost:6651
pulsar.tls-cert-file-path=path/to/cert.pem
pulsar.tls-ciphers[0]=TLS_RSA_WITH_AES_256_GCM_SHA384
pulsar.tls-ciphers[1]=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
pulsar.tls-protocols[0]=TLSv1.3
pulsar.tls-protocols[1]=TLSv1.2pulsar:
    service-url: 'pulsar+ssl://localhost:6651'
    tls-cert-file-path: 'path/to/cert.pem'
    tls-ciphers:
        - TLS_RSA_WITH_AES_256_GCM_SHA384
        - TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
    tls-protocols:
        - TLSv1.3
        - TLSv1.2[pulsar]
  service-url="pulsar+ssl://localhost:6651"
  tls-cert-file-path="path/to/cert.pem"
  tls-ciphers=[
    "TLS_RSA_WITH_AES_256_GCM_SHA384",
    "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"
  ]
  tls-protocols=[
    "TLSv1.3",
    "TLSv1.2"
  ]pulsar {
  serviceUrl = "pulsar+ssl://localhost:6651"
  tlsCertFilePath = "path/to/cert.pem"
  tlsCiphers = ["TLS_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"]
  tlsProtocols = ["TLSv1.3", "TLSv1.2"]
}{
  pulsar {
    service-url = "pulsar+ssl://localhost:6651"
    tls-cert-file-path = "path/to/cert.pem"
    tls-ciphers = ["TLS_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"]
    tls-protocols = ["TLSv1.3", "TLSv1.2"]
  }
}{
  "pulsar": {
    "service-url": "pulsar+ssl://localhost:6651",
    "tls-cert-file-path": "path/to/cert.pem",
    "tls-ciphers": ["TLS_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"],
    "tls-protocols": ["TLSv1.3", "TLSv1.2"]
  }
}It’s necessary to ensure that both server and client certificates match with supported ciphers and protocols. By default
hostname is not verified but can be enabled by using tls-verify-hostname: true
TLS as authentication method is not yet supported.
9 Protocol Buffers
Protocol buffers support
In order to allow protocol buffer native messaging support you need to include support for protocol buffers google from micronaut dependencies
implementation("io.micronaut:micronaut-protobuff-support")<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-protobuff-support</artifactId>
</dependency>10 Multitenancy
Multitenancy
Micronaut has a support for multitenancy and so does Apache Pulsar. There is a bridge now between these two which allows you to switch pulsar tenant based upon micronaut tenant in given context. This will utilize micronaut tenant resolver to instantiate new consumers, producer, or readers for multi tenant approach.
In order to use it both dependencies must be added: Micronaut Multitenancy
implementation("io.micronaut:micronaut-multitenancy")<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-multitenancy</artifactId>
</dependency>implementation("io.micronaut:pulsar-multitenant")<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>pulsar-multitenant</artifactId>
</dependency>Configuration for multitenant listeners / producers
To make a reader, consumer, or a producer multitenant simply use ${tenant} in topic value.
For example: "persistent://${tenant}/public/default". This will indicate to Pulsar module that the client needs to
resolve tenant name prior to instantiation.
Dynamic vs fixed tenant names
A good use case for this module is orchestration of messaging clients that use same workflow thus same namespaces and topics but
different tenants. In such case it’s good to have SystemPropertyTenantResolver or FixedTenantResolver setup in
Micronaut application which will always use one single tenant per application instance. This will make it possible to use
same code for different tenants by simply specifying tenant-id before deploying the application, and it also makes it
possible to individually scale messaging clients based upon tenant or simply automating deployment of messaging clients
with the creation of tenants in the system.
Tenant names are resolved by Micronaut Multitenancy module. This module has coupe of out-of-the-box resolvers for tenant names, but we can divide them in 2 groups: fixed / static & dynamic.
Static / Fixed tenant name resolution
Fixed ones would be FixedTenantResolver & SystemPropertyTenantResolver. Latter one is usually used for testing
(resolves tenant name only from system property named tenantId) while former is reading property value from the
properties file. Since properties file can be used to inject ENV variables or such it’s covering the both purposes.
Such approach would be the most reasonable given that each instance of this application would then only read data sent to
only 1 Apache Pulsar tenant which would provide further data isolation, and independent scaling capabilities based on
tenants.
Dynamic tenant name resolution
Dynamic group would be resolving tenant name through cookies, http headers, or any such way. This means that there’s no reasonable way for Micronaut messaging application that uses Pulsar module to figure out what tenants should it listen or produce to. For producers, it’s much easier assuming that each request to produce a message would probably be linked to some kind of request that would contain tenant description. Similar could be said for readers as sometimes we want to read data to fulfill a request by enriching information or just simply reading values from streams. Both cases would be easy implement given that there’s some kind of context.
However, for consumers, we usually want to start them beforehand. Before any request is made, on the application boot. Also, we want producers, and readers sometimes to work independently of outside requests or any context that would bring in later needed name. For this reason dynamic approach is less likely but not uncommon.
BUT sometimes we want to provide tenant name in "reactive" style, where a new tenant is created across the whole system
and a message is sent. Or a tenant is reported on-demand via HTTP like user adding tenant via web UI and expecting
instant consumer activation. In such cases this module provides PulsarTenantDiscoveredEvent which can be fired up at
any time. This will force creating consumers, producers, or readers specified as beans and instantly activate them for
each new tenant detected in this event. Implementation tries to ensure that no "double" tenant listeners will be made
within a single application instance. It means that if you fire up 2 of such events concurrently it should only create 1
consumer/producer/reader per specified bean.
It is also possible to create a bean that would read tenant names from the database and fire up mentioned event for each tenant name. This would make it somewhat of a "static via dynamic" approach. You fire up events as soon as DB query is ready and those are the only tenants that would be made through application lifecycle.
Combined approach?
In some cases it might be useful to separate readers, consumers, and producers and allow each one to resolve tenant names individually - like letting tenants for consumers be resolved on boot via DB or properties file while allowing producers & readers to be generated via some other HTTP header. Currently, this is not supported, and you would need to override multiple infrastructure classes to achieve this. Later it might be supported to via reserved keywords and/or configuration through application properties to allow each type or event specific instance to depend on a specific way of resolving tenant name.
11 Repository
You can find the source code of this project in this repository: