implementation("io.micronaut:micronaut-pulsar")
Micronaut Pulsar
Integration between Micronaut and Apache Pulsar
Version:
1 Introduction
This project includes integration between Micronaut and Apache Pulsar.
2 Release History
For this project, you can find a list of releases (with release notes) here:
1.2.0
-
Added multitenancy support module
1.1.0
-
Micronaut 3.2.3 minimum version
-
Using new JSON mapper
-
Key-Value messages support
-
Message Headers support
-
Protobuf native messages support using micronaut protobuf dependency
-
Apache Pulsar dependency upgraded to 2.9.1
1.0.0
-
Micronaut 2.1.3 minimum version
-
Authorization via JWT
-
Service URL Provider resolver
-
Basic support for consumers
-
Basic support for producers
-
Basic support for reader
3 Quick Start
To configure the Apache Pulsar client, first add the micronaut-pulsar
module dependency:
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-pulsar</artifactId>
</dependency>
Then configure the URI of the Pulsar cluster or standalone server to communicate with:
pulsar.service-url
pulsar.service-url=pulsar://localhost:6650
pulsar:
service-url: pulsar://localhost:6650
[pulsar]
service-url="pulsar://localhost:6650"
pulsar {
serviceUrl = "pulsar://localhost:6650"
}
{
pulsar {
service-url = "pulsar://localhost:6650"
}
}
As this module is based on the official Java client from Apache Pulsar, see the official documentation for detailed information on service URL format. |
Alternatively, pulsar.service-url-provider
can be set using either @kotlinexample.PulsarServiceUrlProvider
or by setting the provider name:
pulsar.service-url-provider=BeanName
pulsar:
service-url-provider: BeanName
[pulsar]
service-url-provider="BeanName"
pulsar {
serviceUrlProvider = "BeanName"
}
{
pulsar {
service-url-provider = "BeanName"
}
}
in which case the implementing class must be annotated with @Named
with value equal to the one in YAML. In both cases, the bean must implement the org.apache.pulsar.client.api.ServiceUrlProvider
interface.
After configuring the Pulsar cluster URL, the Micronaut Pulsar module will be able to produce a bean of type org.apache.pulsar.client.api.PulsarClient
. This bean will be a Singleton
from which all producers and consumers can be created. Since PulsarClient
supports an URL provider which can switch URLs to clusters on demand, there’s no need to have multiple clients.
4 Pulsar consumers
Creating consumers
To create a consumer annotate any bean method with a @PulsarConsumer
and define a topic. Methods that are part of
beans annotated with a @PulsarSubscription
will try to fetch subscription name from it. Otherwise, make sure to put
subscriptionName
property value in the @PulsarConsumer
annotation. Subscription beans are singletons.
package example;
import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
public class ConsumerProducer { // (2)
@PulsarConsumer(topic = "persistent://public/default/messages-java-docs", consumerName = "shared-consumer-jtester") // (3)
public void messagePrinter(String message) { // (4)
try {
String changed = report(message).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
//...
}
@PulsarProducer(topic = "persistent://public/default/reports-java-docs", producerName = "report-producer-java") // (5)
public CompletableFuture<String> report(String message) { // (6)
return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
}
}
package example;
import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
class ConsumerProducer { // (2)
@PulsarConsumer(topic = "persistent://public/default/messages-groovy-docs", consumerName = "shared-consumer-gtester") // (3)
void messagePrinter(String message) { // (4)
try {
String changed = report(message).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
//...
}
@PulsarProducer(topic = "persistent://public/default/reports-groovy-docs", producerName = "report-producer-groovy") // (5)
CompletableFuture<String> report(String message) { // (6)
return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
}
}
package example
import io.micronaut.pulsar.annotation.PulsarConsumer
import io.micronaut.pulsar.annotation.PulsarProducer
import io.micronaut.pulsar.annotation.PulsarSubscription
import org.apache.pulsar.client.api.SubscriptionType
@PulsarSubscription(subscriptionName = "pulsar-ktest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
open class ConsumerProducer { // (2)
@PulsarConsumer(topic = "persistent://public/default/messages-kotlin-docs", consumerName = "shared-consumer-ktester") // (3)
suspend fun messagePrinter(message: String) { // (4)
val changed = report(message)
//...
}
@PulsarProducer(topic = "persistent://public/default/reports-kotlin-docs", producerName = "report-producer-kotlin") // (5)
open suspend fun report(message: String): String { // (6)
return "Reporting message '$message'" // (7)
}
}
1 | The class holding consumers can be annotated with api:pulsar.annotation.PulsarSubscription. It’s also allowed to set
subscription from the @PulsarConsumer . By that, consumer can be located in other beans like @Singleton . |
2 | - |
3 | Methods that will process the message, in other words consumers, must be annotated with @PulsarConsumer and one
of the topic options must be specified. |
4 | Using CompletableFeature, reactor-core, RxJava return types, or Kotlin suspend, allows the body of the method to be executed in an async manner. Method invocations will be async always as underlying Pulsar library uses CompletableFuture. Async return type is mostly for developers control over execution flow from the method body or external callers |
5 | - |
6 | - |
7 | - |
Consumer method
Simple approach is to provide just 1 parameter in the method arguments list which will default to it being detected as a
message body (payload). This can also resolve Message<>
type from Pulsar and thus detect whether to inject parsed body
or to inject the whole Pulsar message. In case of multiple argument it’s important to annotate argument which will be used
for passing message payload with @MessageBody
as well as other parameters with one of the MessageKey
(message key) or
@MessageProperties
of type Map<String,String>
which represents all message properties/headers to be injected. For headers listed
as method arguments use @MessageHeader(*headerName)
on each argument representing message header. Due to Pulsar underlying
library headers must be strings (and their corresponding keys).
Don’t confuse @MessageHeader*s* with @MessageProperties . Former is used by Micronaut messaging and not applicable
to method argument thus latter was created to support mapping all headers/properties on a single method argument. Still,
@MessageHeader() from the same package (messaging) was used for method argument, which maps single header value, to
utilize as much existing annotations.
|
Extra parameter of org.apache.pulsar.client.api.Consumer
type does not need any annotation. It is a consumer passed by
default from underlying Pulsar library. If not present on the method annotated with @PulsarConsumer
it will be omitted.
This is especially important for messages that should be of KeyValue type.
Using @PulsarSubscription annotation
This annotation marks a class as a singleton bean that contains one or more methods serving as Pulsar consumers under same subscription.
Non-required properties:
-
name
, which sets the Pulsar subscription name to a custom string; recommended to always set manually -
type
, one of Pulsar’s subscription types. Subscription types can be read in Apache Pulsar official documentation. -
ackGroupTimeout
, the acknowledgment group timeout for shared subscriptions
Consumer names are generated non-randomly so names will contain sequence numbers which might be OK in a single instance deployment but can cause errors with replicas given that names will use same generating method and start again from 1 for each deployment (instance of the app). Pulsar expects unique names for each consumer within the subscription. |
Given that subscriptions in multi-deployment environment (example same micronaut app replicated couple of times) will
use fixed subscription name and consumer name this will problems because consumers should be unique within the subscription.
To avoid this utilize Micronauts annotation parsing by providing expression inside of the consumerName property. Read
more about it in official docs. In short, you can add expression as a name in the consumerName and it will be parsed so
you can have something like a system property #{pulsar.subscriptionX.consumerYname} and set property for each deployment.
|
Type and acknowledgment group timeout default to Pulsar Java client library values if not set. Name will be generated by this module in a "counter manner" (pulsar-subscription-1,2,3,4…).
Property | Type | Default | Required | Description |
---|---|---|---|---|
name |
|
No |
User-specified name, or leave blank to let the Pulsar module generate the name |
|
type |
|
|
No |
Default as in Pulsar official Java library |
ackGroupTimeout |
|
No |
Must be a |
Using @PulsarConsumer annotation
This is a method annotation. To use it you must specify one of the value
, topic
, topics
, topicsPattern
.
Properties are processed in order topic
(alias value
), topics
, topicsPattern
so setting more than one will ignore
rest depending on the order. Other properties might be omitted, however it’s good to always specify consumer name manually
as it can be used later for debugging or injection points for Pulsar Consumer<T>
which are generated for each method.
Property | Type | Default | Required | Description |
---|---|---|---|---|
topics |
|
Yes* |
Required unless |
|
topicsPattern |
|
Yes* |
Required unless |
|
schema |
|
|
No |
If body is different from |
keySchema |
|
|
No |
If message is of key-value type this must be set or default will be used with same resolution style as the schema |
keyEncoding |
|
|
No |
If message is of key-value type it is frequent that the key is sent separately from the payload in which case this value
should be set to |
consumerName |
|
No |
Consumer name, not required |
|
subscriptionTopicsMode |
|
No |
If |
|
subscribeAsync |
|
true |
No |
Whether to use async when reading Pulsar messages. |
patternAutoDiscoveryPeriod |
|
No |
Time delay in seconds after which regex subscriptions should seek new topics. |
When using topicsPattern , be sure to create topics before the consumer is started, since Pulsar refresh on new
topics tends to take a long time by default.
|
When using PulsarConsumer with specific schema other than byte[] , make sure topics are created and have the same
schema the consumer is expecting, especially when using pattern consumer which listens to multiple topics. Otherwise, the consumer
might not connect and could throw org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException .
|
Dead Letter Queue
By default, Pulsar Java library does not configure DLQ to be used. Instead it will re-deliver failed messages as long as
possible which floods the consumer(s) until message if finally received. Failed means that message was received but consumer
did negative acknowledgement thus received means delivered and acknowledged by the consumer. However, reason for failure might be
bad JSON formatting (in the case where JSON is being used) or such which will create unnecessary traffic to consumer(s) and message
will never be delivered resulting in either consumers slowing down due to high redelivery count or at the end dropping
due to high load. For this purpose Micronaut Pulsar is using DLQ by default and it will retry only 3 times. If you wish
to configure these options please use configuration properties use-dead-letter-queue
and default-max-retry-dlq
. These
properties indicate that the DLQ will be used by default on all Pulsar consumers if not explicitly set otherwise.
5 Pulsar readers
Readers
Pulsar supports both "Consumers" and "Readers". More can be read in their documentation
Creating readers
To initialize a reader, declare a field annotated with @PulsarReader
inside any bean or as a constructor argument.
package example;
import io.micronaut.pulsar.annotation.PulsarReader;
import jakarta.inject.Singleton;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Reader;
import java.util.concurrent.CompletableFuture;
@Singleton
public class ReaderExample {
@PulsarReader(value = "persistent://public/default/messages", readerName = "simple-j-reader") // (1)
private Reader<String> reader; // (2)
public CompletableFuture<Message<String>> readNext() { // (3)
return reader.readNextAsync(); // (4)
}
}
package example;
import io.micronaut.pulsar.annotation.PulsarReader;
import jakarta.inject.Singleton;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Reader;
import java.util.concurrent.CompletableFuture;
@Singleton
class ReaderExample {
@PulsarReader(value = "persistent://public/default/messages", readerName = "simple-g-reader") // (1)
private Reader<String> reader // (2)
CompletableFuture<Message<String>> readNext() { // (3)
return reader.readNextAsync() // (4)
}
}
package example
import io.micronaut.pulsar.annotation.PulsarReader
import jakarta.inject.Singleton
import kotlinx.coroutines.future.await
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.Reader
@Singleton
class ReaderExample {
@PulsarReader(value = "persistent://public/default/messages", readerName = "simple-k-reader") // (1)
private lateinit var reader: Reader<String> // (2)
suspend fun readNext(): Message<String> { // (3)
return reader.readNextAsync().await() // (4)
}
}
1 | Reader annotation with the topic and the reader name |
2 | Reader must be of type api:org.apache.pulsar.client.api.Reader |
3 | Using readAsync requires CompletableFeature or in Kotlin awaiting is possible |
4 | Calling the read will move the cursor to the next message or give null in case there are no more messages |
Reader name
can be autogenerated but topic
argument must be set. Reader injections cause blocking behaviour as
Reader instances will start creation and wait until Pulsar replies with a successful status. Reader name
defaults
to field or argument name for injection points or method name for method annotated reader.
KeyValue readers
In order to use KeyValue with a reader, reader argument type must be of org.apache.pulsar.common.schema.KeyValue
.
It is possible then to set attributes in @PulsarReader
like keyType
which defines what type of data
serializer will be used to extract key, and keyEncoding
which defines how will the key be extracted: INLINE
- key
is part of the message payload or SEPARATED
- key is stored as a message key.
6 Pulsar producers
Creating producers
To initialize a producer it’s sufficient to annotate an interface with @PulsarProducerClient
and annotate methods with @PulsarProducer
.
package example;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarProducerClient;
import org.apache.pulsar.client.api.MessageId;
import java.util.concurrent.CompletableFuture;
@PulsarProducerClient // (1)
public interface Producer {
@PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "kotlin-test-producer") // (2)
CompletableFuture<MessageId> send(String message); // (3)
@PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "b-kotlin-test-producer")
void sendBlocking(String message); // (4)
}
package example;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarProducerClient;
import org.apache.pulsar.client.api.MessageId;
import java.util.concurrent.CompletableFuture;
@PulsarProducerClient // (1)
interface Producer {
@PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "kotlin-test-producer") // (2)
CompletableFuture<MessageId> send(String message); // (3)
@PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "b-kotlin-test-producer")
void sendBlocking(String message); // (4)
}
package example
import io.micronaut.pulsar.annotation.PulsarProducer
import io.micronaut.pulsar.annotation.PulsarProducerClient
import org.apache.pulsar.client.api.MessageId
@PulsarProducerClient // (1)
interface Producer {
@PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "kotlin-test-producer") // (2)
suspend fun send(message: String): MessageId // (3)
@PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "b-kotlin-test-producer")
fun sendBlocking(message: String) // (4)
}
1 | Annotate interface with @PulsarProducerClient to notify Micronaut for processing it without implementation |
2 | Methods are the actual producers so annotate them with @PulsarProducer |
3 | Return decides whether to send message in a blocking or non-blocking manner. |
4 | A blocking send that waits until message is successfully sent or throws an exception on failure. |
Producers can be used in other types of beans as well (Singleton or such).
Producer method
It’s important to note that if the method contains more than 1 argument, @MessageBody
must be specified on argument
intended for message body as well as @MessageKey
, @MessageProperties
, @MessageHeader
depending on desired mapping.
As with Consumers it’s important to spot @MessageProperties for collection of headers value but @MessageHeader
for single header mapping on method argument. More details in consumer warning.
|
Producer return values
Not counting wrappers for async behaviour (CompletableFuture, RxJava, reactor cor), abstract methods can only have 2 return types: void or MessageId since nothing more than sending will be done.
Non-abstract methods can have any return type except for MessageId
in the sense that it will not be filled by Pulsars
resulting value. This is because return value must be defined within the method body so there is no reliable way to know
what was the intended return value - was it MessageID from the Pulsar or was it some other MessageID value generated
within the body.
Methods can be invoked before or after sending the message by setting the property sendBefore
to true
or false
respectively.
Default is false which will execute method invoke message body before calling send message part.
In async approach behaviour is unpredicted as calling the method and sending the message has no blocking
thus message may still be sent before the execution of the method finishes or even starts properly because those can run
on separate threads. This is mainly useful for blocking approach where it’s possible to execute the method body
before sending the message an if execution throws exception, sending will be skipped. First parameter of the method is
still used as a message body.
package example;
import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
public class ConsumerProducer { // (2)
@PulsarConsumer(topic = "persistent://public/default/messages-java-docs", consumerName = "shared-consumer-jtester") // (3)
public void messagePrinter(String message) { // (4)
try {
String changed = report(message).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
//...
}
@PulsarProducer(topic = "persistent://public/default/reports-java-docs", producerName = "report-producer-java") // (5)
public CompletableFuture<String> report(String message) { // (6)
return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
}
}
package example;
import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
class ConsumerProducer { // (2)
@PulsarConsumer(topic = "persistent://public/default/messages-groovy-docs", consumerName = "shared-consumer-gtester") // (3)
void messagePrinter(String message) { // (4)
try {
String changed = report(message).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
//...
}
@PulsarProducer(topic = "persistent://public/default/reports-groovy-docs", producerName = "report-producer-groovy") // (5)
CompletableFuture<String> report(String message) { // (6)
return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
}
}
package example
import io.micronaut.pulsar.annotation.PulsarConsumer
import io.micronaut.pulsar.annotation.PulsarProducer
import io.micronaut.pulsar.annotation.PulsarSubscription
import org.apache.pulsar.client.api.SubscriptionType
@PulsarSubscription(subscriptionName = "pulsar-ktest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
open class ConsumerProducer { // (2)
@PulsarConsumer(topic = "persistent://public/default/messages-kotlin-docs", consumerName = "shared-consumer-ktester") // (3)
suspend fun messagePrinter(message: String) { // (4)
val changed = report(message)
//...
}
@PulsarProducer(topic = "persistent://public/default/reports-kotlin-docs", producerName = "report-producer-kotlin") // (5)
open suspend fun report(message: String): String { // (6)
return "Reporting message '$message'" // (7)
}
}
1 | For Kotlin, open is required in non-interface (abstract) classes because of AOT |
2 | - |
3 | - |
4 | - |
5 | Annotating method as a producer |
6 | A non-abstract async method. In Kotlin case we need open to allow Micronaut AOT interception. |
7 | CompletableFuture or suspend will trigger immediate execution but reactor or RxJava need subscription and will rely on caller or upper layers to provide it. |
7 Authentication
JWT
To configure JWT authentication it’s sufficient to specify a JWT in application.yml
under pulsar.authentication-jwt
, e.g.:
pulsar.authentication.jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX...
pulsar:
authentication:
jwt: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX...
[pulsar]
[pulsar.authentication]
jwt="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX..."
pulsar {
authentication {
jwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX..."
}
}
{
pulsar {
authentication {
jwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX..."
}
}
}
io.micronaut.configuration.pulsar.config.AbstractPulsarConfiguration
is left public in case you want to implement more options yourself.
It’s planned to support specifying a JWT path, either an HTTP endpoint or URL to retrieve a fresh JWT.
OAuth2
To use OAuth2 authentication, configure three parameters like in the example below to connect to a Pulsar service that uses OAuth2:
pulsar.oauth-issuer-url=https://some-sso.io/...
pulsar.oauth-credentials-url=file:///path/to/file.json
pulsar.audience=localhost:6650
pulsar:
oauth-issuer-url: https://some-sso.io/...
oauth-credentials-url: file:///path/to/file.json
audience: localhost:6650
[pulsar]
oauth-issuer-url="https://some-sso.io/..."
oauth-credentials-url="file:///path/to/file.json"
audience="localhost:6650"
pulsar {
oauthIssuerUrl = "https://some-sso.io/..."
oauthCredentialsUrl = "file:///path/to/file.json"
audience = "localhost:6650"
}
{
pulsar {
oauth-issuer-url = "https://some-sso.io/..."
oauth-credentials-url = "file:///path/to/file.json"
audience = "localhost:6650"
}
}
The issuer URL is the URL to the OAuth2 server generating tokens and such.
Parameter oauth-credentials-url
is used for reading a file containing necessary configuration for authenticating to the OAuth2 provider as a client app. The file should contain everything defined in Pulsar documentation, and type is limited to client_credentials
. Below is an example of what the file should look like.
{
"type": "client_credentials",
"client_id": "pulsar",
"client_secret": "1234-abcd-5678-9011-ab56ac4564sa56",
"issuer_url": "https://my-oauth2-server.com/auth/realms/pulsar-realm"
}
The example shows something similar that could be set when using KeyCloak or a similar SSO. Audience must be set, but Pulsar service can be configured to ignore this value, and in such case you might put any text with one or more characters. For more details, see the Apache Pulsar documentation. Consult Java client to understand more about limitations for client_credentials
.
The Pulsar library takes care of the OAuth2 JWT refreshing and such. The Pulsar server must be configured to use OAuth2 and role attribute in its configuration files must be specified for Pulsar to be able to detect which kind of "user" is requesting data.
8 Transport encryption
Transport encryption
To use transport encryption you must configure proper connection string as well as provide necessary information about certificate path, ciphers, and protocols as shown in example below.
pulsar.service-url=pulsar+ssl://localhost:6651
pulsar.tls-cert-file-path=path/to/cert.pem
pulsar.tls-ciphers[0]=TLS_RSA_WITH_AES_256_GCM_SHA384
pulsar.tls-ciphers[1]=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
pulsar.tls-protocols[0]=TLSv1.3
pulsar.tls-protocols[1]=TLSv1.2
pulsar:
service-url: 'pulsar+ssl://localhost:6651'
tls-cert-file-path: 'path/to/cert.pem'
tls-ciphers:
- TLS_RSA_WITH_AES_256_GCM_SHA384
- TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
tls-protocols:
- TLSv1.3
- TLSv1.2
[pulsar]
service-url="pulsar+ssl://localhost:6651"
tls-cert-file-path="path/to/cert.pem"
tls-ciphers=[
"TLS_RSA_WITH_AES_256_GCM_SHA384",
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"
]
tls-protocols=[
"TLSv1.3",
"TLSv1.2"
]
pulsar {
serviceUrl = "pulsar+ssl://localhost:6651"
tlsCertFilePath = "path/to/cert.pem"
tlsCiphers = ["TLS_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"]
tlsProtocols = ["TLSv1.3", "TLSv1.2"]
}
{
pulsar {
service-url = "pulsar+ssl://localhost:6651"
tls-cert-file-path = "path/to/cert.pem"
tls-ciphers = ["TLS_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"]
tls-protocols = ["TLSv1.3", "TLSv1.2"]
}
}
It’s necessary to ensure that both server and client certificates match with supported ciphers and protocols. By default
hostname is not verified but can be enabled by using tls-verify-hostname: true
TLS as authentication method is not yet supported.
9 Protocol Buffers
Protocol buffers support
In order to allow protocol buffer native messaging support you need to include protocol buffers google and micronaut dependencies
implementation("io.micronaut:protobuf-java")
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
implementation("io.micronaut:micronaut-protobuff-support")
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-protobuff-support</artifactId>
</dependency>
10 Repository
You can find the source code of this project in this repository: