Micronaut Pulsar

Integration between Micronaut and Apache Pulsar

Version:

1 Introduction

This project includes integration between Micronaut and Apache Pulsar.

2 Release History

For this project, you can find a list of releases (with release notes) here:

1.2.0

  • Added multitenancy support module

1.1.0

  • Micronaut 3.2.3 minimum version

  • Using new JSON mapper

  • Key-Value messages support

  • Message Headers support

  • Protobuf native messages support using micronaut protobuf dependency

  • Apache Pulsar dependency upgraded to 2.9.1

1.0.0

  • Micronaut 2.1.3 minimum version

  • Authorization via JWT

  • Service URL Provider resolver

  • Basic support for consumers

  • Basic support for producers

  • Basic support for reader

3 Quick Start

To configure the Apache Pulsar client, first add the micronaut-pulsar module dependency:

implementation("io.micronaut:micronaut-pulsar")
<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-pulsar</artifactId>
</dependency>

Then configure the URI of the Pulsar cluster or standalone server to communicate with:

Configuring pulsar.service-url
pulsar.service-url=pulsar://localhost:6650
pulsar:
  service-url: pulsar://localhost:6650
[pulsar]
  service-url="pulsar://localhost:6650"
pulsar {
  serviceUrl = "pulsar://localhost:6650"
}
{
  pulsar {
    service-url = "pulsar://localhost:6650"
  }
}
As this module is based on the official Java client from Apache Pulsar, see the official documentation for detailed information on service URL format.

Alternatively, pulsar.service-url-provider can be set using either @kotlinexample.PulsarServiceUrlProvider or by setting the provider name:

pulsar.service-url-provider=BeanName
pulsar:
  service-url-provider: BeanName
[pulsar]
  service-url-provider="BeanName"
pulsar {
  serviceUrlProvider = "BeanName"
}
{
  pulsar {
    service-url-provider = "BeanName"
  }
}

in which case the implementing class must be annotated with @Named with value equal to the one in YAML. In both cases, the bean must implement the org.apache.pulsar.client.api.ServiceUrlProvider interface.

After configuring the Pulsar cluster URL, the Micronaut Pulsar module will be able to produce a bean of type org.apache.pulsar.client.api.PulsarClient. This bean will be a Singleton from which all producers and consumers can be created. Since PulsarClient supports an URL provider which can switch URLs to clusters on demand, there’s no need to have multiple clients.

4 Pulsar consumers

Creating consumers

To create a consumer annotate any bean method with a @PulsarConsumer and define a topic. Methods that are part of beans annotated with a @PulsarSubscription will try to fetch subscription name from it. Otherwise, make sure to put subscriptionName property value in the @PulsarConsumer annotation. Subscription beans are singletons.

package example;

import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
public class ConsumerProducer { // (2)
    @PulsarConsumer(topic = "persistent://public/default/messages-java-docs", consumerName = "shared-consumer-jtester") // (3)
    public void messagePrinter(String message) { // (4)
        try {
            String changed = report(message).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        //...
    }


    @PulsarProducer(topic = "persistent://public/default/reports-java-docs", producerName = "report-producer-java") // (5)
    public CompletableFuture<String> report(String message) { // (6)
        return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
    }
}
package example;

import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
class ConsumerProducer { // (2)
    @PulsarConsumer(topic = "persistent://public/default/messages-groovy-docs", consumerName = "shared-consumer-gtester") // (3)
    void messagePrinter(String message) { // (4)
        try {
            String changed = report(message).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        //...
    }


    @PulsarProducer(topic = "persistent://public/default/reports-groovy-docs", producerName = "report-producer-groovy") // (5)
    CompletableFuture<String> report(String message) { // (6)
        return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
    }
}
package example

import io.micronaut.pulsar.annotation.PulsarConsumer
import io.micronaut.pulsar.annotation.PulsarProducer
import io.micronaut.pulsar.annotation.PulsarSubscription
import org.apache.pulsar.client.api.SubscriptionType

@PulsarSubscription(subscriptionName = "pulsar-ktest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
open class ConsumerProducer { // (2)

    @PulsarConsumer(topic = "persistent://public/default/messages-kotlin-docs", consumerName = "shared-consumer-ktester") // (3)
    suspend fun messagePrinter(message: String) { // (4)
        val changed = report(message)
        //...
    }


    @PulsarProducer(topic = "persistent://public/default/reports-kotlin-docs", producerName = "report-producer-kotlin") // (5)
    open suspend fun report(message: String): String { // (6)
        return "Reporting message '$message'" // (7)
    }
}
1 The class holding consumers can be annotated with api:pulsar.annotation.PulsarSubscription. It’s also allowed to set subscription from the @PulsarConsumer. By that, consumer can be located in other beans like @Singleton.
2 -
3 Methods that will process the message, in other words consumers, must be annotated with @PulsarConsumer and one of the topic options must be specified.
4 Using CompletableFeature, reactor-core, RxJava return types, or Kotlin suspend, allows the body of the method to be executed in an async manner. Method invocations will be async always as underlying Pulsar library uses CompletableFuture. Async return type is mostly for developers control over execution flow from the method body or external callers
5 -
6 -
7 -

Consumer method

Simple approach is to provide just 1 parameter in the method arguments list which will default to it being detected as a message body (payload). This can also resolve Message<> type from Pulsar and thus detect whether to inject parsed body or to inject the whole Pulsar message. In case of multiple argument it’s important to annotate argument which will be used for passing message payload with @MessageBody as well as other parameters with one of the MessageKey (message key) or @MessageProperties of type Map<String,String> which represents all message properties/headers to be injected. For headers listed as method arguments use @MessageHeader(*headerName) on each argument representing message header. Due to Pulsar underlying library headers must be strings (and their corresponding keys).

Don’t confuse @MessageHeader*s* with @MessageProperties. Former is used by Micronaut messaging and not applicable to method argument thus latter was created to support mapping all headers/properties on a single method argument. Still, @MessageHeader() from the same package (messaging) was used for method argument, which maps single header value, to utilize as much existing annotations.

Extra parameter of org.apache.pulsar.client.api.Consumer type does not need any annotation. It is a consumer passed by default from underlying Pulsar library. If not present on the method annotated with @PulsarConsumer it will be omitted.

This is especially important for messages that should be of KeyValue type.

Using @PulsarSubscription annotation

This annotation marks a class as a singleton bean that contains one or more methods serving as Pulsar consumers under same subscription.

Non-required properties:

  • name, which sets the Pulsar subscription name to a custom string; recommended to always set manually

  • type, one of Pulsar’s subscription types. Subscription types can be read in Apache Pulsar official documentation.

  • ackGroupTimeout, the acknowledgment group timeout for shared subscriptions

Consumer names are generated non-randomly so names will contain sequence numbers which might be OK in a single instance deployment but can cause errors with replicas given that names will use same generating method and start again from 1 for each deployment (instance of the app). Pulsar expects unique names for each consumer within the subscription.
Given that subscriptions in multi-deployment environment (example same micronaut app replicated couple of times) will use fixed subscription name and consumer name this will problems because consumers should be unique within the subscription. To avoid this utilize Micronauts annotation parsing by providing expression inside of the consumerName property. Read more about it in official docs. In short, you can add expression as a name in the consumerName and it will be parsed so you can have something like a system property #{pulsar.subscriptionX.consumerYname} and set property for each deployment.

Type and acknowledgment group timeout default to Pulsar Java client library values if not set. Name will be generated by this module in a "counter manner" (pulsar-subscription-1,2,3,4…​).

Property Type Default Required Description

name

java.lang.String

No

User-specified name, or leave blank to let the Pulsar module generate the name

type

org.apache.pulsar.client.api.SubscriptionType

Exclusive

No

Default as in Pulsar official Java library

ackGroupTimeout

java.lang.String

No

Must be a Duration-parsable string.

Using @PulsarConsumer annotation

This is a method annotation. To use it you must specify one of the value, topic, topics, topicsPattern. Properties are processed in order topic (alias value), topics, topicsPattern so setting more than one will ignore rest depending on the order. Other properties might be omitted, however it’s good to always specify consumer name manually as it can be used later for debugging or injection points for Pulsar Consumer<T> which are generated for each method.

Property Type Default Required Description

topics

java.lang.String[]

Yes*

Required unless topicsPattern is specified. Has priority over topicsPattern

topicsPattern

java.lang.String

Yes*

Required unless topics is specified. Regex for listening to multiple topics.

schema

MessageSchema

MessageSchema.BYTES or MessageSchema.JSON*

No

If body is different from byte[] but MessageSchema is Byte, will default to JSON.

keySchema

MessageSchema

MessageSchema.BYTES or MessageSchema.JSON*

No

If message is of key-value type this must be set or default will be used with same resolution style as the schema

keyEncoding

KeyValueEncodingType

KeyValueEncodingType.INLINE

No

If message is of key-value type it is frequent that the key is sent separately from the payload in which case this value should be set to KeyValueEncodingType.SEPARATED; otherwise Pulsar will try to extract key from the message payload (body)

consumerName

java.lang.String

No

Consumer name, not required

subscriptionTopicsMode

org.apache.pulsar.client.api.RegexSubscriptionMode

No

If topicsPattern is set, this can be one of PersistentOnly, NonPersistentOnly, or AllTopics. Not required, set by Pulsar library if not defined.

subscribeAsync

boolean

true

No

Whether to use async when reading Pulsar messages.

patternAutoDiscoveryPeriod

int

No

Time delay in seconds after which regex subscriptions should seek new topics.

When using topicsPattern, be sure to create topics before the consumer is started, since Pulsar refresh on new topics tends to take a long time by default.
When using PulsarConsumer with specific schema other than byte[], make sure topics are created and have the same schema the consumer is expecting, especially when using pattern consumer which listens to multiple topics. Otherwise, the consumer might not connect and could throw org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException.

Dead Letter Queue

By default, Pulsar Java library does not configure DLQ to be used. Instead it will re-deliver failed messages as long as possible which floods the consumer(s) until message if finally received. Failed means that message was received but consumer did negative acknowledgement thus received means delivered and acknowledged by the consumer. However, reason for failure might be bad JSON formatting (in the case where JSON is being used) or such which will create unnecessary traffic to consumer(s) and message will never be delivered resulting in either consumers slowing down due to high redelivery count or at the end dropping due to high load. For this purpose Micronaut Pulsar is using DLQ by default and it will retry only 3 times. If you wish to configure these options please use configuration properties use-dead-letter-queue and default-max-retry-dlq. These properties indicate that the DLQ will be used by default on all Pulsar consumers if not explicitly set otherwise.

5 Pulsar readers

Readers

Pulsar supports both "Consumers" and "Readers". More can be read in their documentation

Creating readers

To initialize a reader, declare a field annotated with @PulsarReader inside any bean or as a constructor argument.

package example;

import io.micronaut.pulsar.annotation.PulsarReader;
import jakarta.inject.Singleton;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Reader;

import java.util.concurrent.CompletableFuture;


@Singleton
public class ReaderExample {

    @PulsarReader(value = "persistent://public/default/messages", readerName = "simple-j-reader") // (1)
    private Reader<String> reader; // (2)

    public CompletableFuture<Message<String>> readNext() { // (3)
        return reader.readNextAsync(); // (4)
    }
}
package example;

import io.micronaut.pulsar.annotation.PulsarReader;
import jakarta.inject.Singleton;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Reader;

import java.util.concurrent.CompletableFuture;


@Singleton
class ReaderExample {

    @PulsarReader(value = "persistent://public/default/messages", readerName = "simple-g-reader") // (1)
    private Reader<String> reader // (2)

    CompletableFuture<Message<String>> readNext() { // (3)
        return reader.readNextAsync() // (4)
    }
}
package example

import io.micronaut.pulsar.annotation.PulsarReader
import jakarta.inject.Singleton
import kotlinx.coroutines.future.await
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.Reader

@Singleton
class ReaderExample {
    @PulsarReader(value = "persistent://public/default/messages", readerName = "simple-k-reader") // (1)
    private lateinit var reader: Reader<String> // (2)

    suspend fun readNext(): Message<String> { // (3)
        return reader.readNextAsync().await() // (4)
    }
}
1 Reader annotation with the topic and the reader name
2 Reader must be of type api:org.apache.pulsar.client.api.Reader
3 Using readAsync requires CompletableFeature or in Kotlin awaiting is possible
4 Calling the read will move the cursor to the next message or give null in case there are no more messages

Reader name can be autogenerated but topic argument must be set. Reader injections cause blocking behaviour as Reader instances will start creation and wait until Pulsar replies with a successful status. Reader name defaults to field or argument name for injection points or method name for method annotated reader.

KeyValue readers

In order to use KeyValue with a reader, reader argument type must be of org.apache.pulsar.common.schema.KeyValue. It is possible then to set attributes in @PulsarReader like keyType which defines what type of data serializer will be used to extract key, and keyEncoding which defines how will the key be extracted: INLINE - key is part of the message payload or SEPARATED - key is stored as a message key.

6 Pulsar producers

Creating producers

To initialize a producer it’s sufficient to annotate an interface with @PulsarProducerClient and annotate methods with @PulsarProducer.

package example;

import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarProducerClient;
import org.apache.pulsar.client.api.MessageId;

import java.util.concurrent.CompletableFuture;

@PulsarProducerClient // (1)
public interface Producer {
    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "kotlin-test-producer") // (2)
    CompletableFuture<MessageId> send(String message); // (3)

    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "b-kotlin-test-producer")
    void sendBlocking(String message); // (4)
}
package example;

import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarProducerClient;
import org.apache.pulsar.client.api.MessageId;

import java.util.concurrent.CompletableFuture;

@PulsarProducerClient // (1)
interface Producer {
    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "kotlin-test-producer") // (2)
    CompletableFuture<MessageId> send(String message); // (3)

    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "b-kotlin-test-producer")
    void sendBlocking(String message); // (4)
}
package example

import io.micronaut.pulsar.annotation.PulsarProducer
import io.micronaut.pulsar.annotation.PulsarProducerClient
import org.apache.pulsar.client.api.MessageId

@PulsarProducerClient // (1)
interface Producer {
    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "kotlin-test-producer") // (2)
    suspend fun send(message: String): MessageId // (3)
    @PulsarProducer(topic = "persistent://public/default/messages-kotlin-docs", producerName = "b-kotlin-test-producer")
    fun sendBlocking(message: String) // (4)
}
1 Annotate interface with @PulsarProducerClient to notify Micronaut for processing it without implementation
2 Methods are the actual producers so annotate them with @PulsarProducer
3 Return decides whether to send message in a blocking or non-blocking manner.
4 A blocking send that waits until message is successfully sent or throws an exception on failure.

Producers can be used in other types of beans as well (Singleton or such).

Producer method

It’s important to note that if the method contains more than 1 argument, @MessageBody must be specified on argument intended for message body as well as @MessageKey, @MessageProperties, @MessageHeader depending on desired mapping.

As with Consumers it’s important to spot @MessageProperties for collection of headers value but @MessageHeader for single header mapping on method argument. More details in consumer warning.

Producer return values

Not counting wrappers for async behaviour (CompletableFuture, RxJava, reactor cor), abstract methods can only have 2 return types: void or MessageId since nothing more than sending will be done.

Non-abstract methods can have any return type except for MessageId in the sense that it will not be filled by Pulsars resulting value. This is because return value must be defined within the method body so there is no reliable way to know what was the intended return value - was it MessageID from the Pulsar or was it some other MessageID value generated within the body.

Methods can be invoked before or after sending the message by setting the property sendBefore to true or false respectively. Default is false which will execute method invoke message body before calling send message part. In async approach behaviour is unpredicted as calling the method and sending the message has no blocking thus message may still be sent before the execution of the method finishes or even starts properly because those can run on separate threads. This is mainly useful for blocking approach where it’s possible to execute the method body before sending the message an if execution throws exception, sending will be skipped. First parameter of the method is still used as a message body.

package example;

import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
public class ConsumerProducer { // (2)
    @PulsarConsumer(topic = "persistent://public/default/messages-java-docs", consumerName = "shared-consumer-jtester") // (3)
    public void messagePrinter(String message) { // (4)
        try {
            String changed = report(message).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        //...
    }


    @PulsarProducer(topic = "persistent://public/default/reports-java-docs", producerName = "report-producer-java") // (5)
    public CompletableFuture<String> report(String message) { // (6)
        return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
    }
}
package example;

import io.micronaut.pulsar.annotation.PulsarConsumer;
import io.micronaut.pulsar.annotation.PulsarProducer;
import io.micronaut.pulsar.annotation.PulsarSubscription;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
class ConsumerProducer { // (2)
    @PulsarConsumer(topic = "persistent://public/default/messages-groovy-docs", consumerName = "shared-consumer-gtester") // (3)
    void messagePrinter(String message) { // (4)
        try {
            String changed = report(message).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        //...
    }


    @PulsarProducer(topic = "persistent://public/default/reports-groovy-docs", producerName = "report-producer-groovy") // (5)
    CompletableFuture<String> report(String message) { // (6)
        return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // (7)
    }
}
package example

import io.micronaut.pulsar.annotation.PulsarConsumer
import io.micronaut.pulsar.annotation.PulsarProducer
import io.micronaut.pulsar.annotation.PulsarSubscription
import org.apache.pulsar.client.api.SubscriptionType

@PulsarSubscription(subscriptionName = "pulsar-ktest-subscription", subscriptionType = SubscriptionType.Shared) // (1)
open class ConsumerProducer { // (2)

    @PulsarConsumer(topic = "persistent://public/default/messages-kotlin-docs", consumerName = "shared-consumer-ktester") // (3)
    suspend fun messagePrinter(message: String) { // (4)
        val changed = report(message)
        //...
    }


    @PulsarProducer(topic = "persistent://public/default/reports-kotlin-docs", producerName = "report-producer-kotlin") // (5)
    open suspend fun report(message: String): String { // (6)
        return "Reporting message '$message'" // (7)
    }
}
1 For Kotlin, open is required in non-interface (abstract) classes because of AOT
2 -
3 -
4 -
5 Annotating method as a producer
6 A non-abstract async method. In Kotlin case we need open to allow Micronaut AOT interception.
7 CompletableFuture or suspend will trigger immediate execution but reactor or RxJava need subscription and will rely on caller or upper layers to provide it.

7 Authentication

JWT

To configure JWT authentication it’s sufficient to specify a JWT in application.yml under pulsar.authentication-jwt, e.g.:

pulsar.authentication.jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX...
pulsar:
  authentication:
    jwt: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX...
[pulsar]
  [pulsar.authentication]
    jwt="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX..."
pulsar {
  authentication {
    jwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX..."
  }
}
{
  pulsar {
    authentication {
      jwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpX..."
    }
  }
}

io.micronaut.configuration.pulsar.config.AbstractPulsarConfiguration is left public in case you want to implement more options yourself.

It’s planned to support specifying a JWT path, either an HTTP endpoint or URL to retrieve a fresh JWT.

OAuth2

To use OAuth2 authentication, configure three parameters like in the example below to connect to a Pulsar service that uses OAuth2:

pulsar.oauth-issuer-url=https://some-sso.io/...
pulsar.oauth-credentials-url=file:///path/to/file.json
pulsar.audience=localhost:6650
pulsar:
    oauth-issuer-url: https://some-sso.io/...
    oauth-credentials-url: file:///path/to/file.json
    audience: localhost:6650
[pulsar]
  oauth-issuer-url="https://some-sso.io/..."
  oauth-credentials-url="file:///path/to/file.json"
  audience="localhost:6650"
pulsar {
  oauthIssuerUrl = "https://some-sso.io/..."
  oauthCredentialsUrl = "file:///path/to/file.json"
  audience = "localhost:6650"
}
{
  pulsar {
    oauth-issuer-url = "https://some-sso.io/..."
    oauth-credentials-url = "file:///path/to/file.json"
    audience = "localhost:6650"
  }
}

The issuer URL is the URL to the OAuth2 server generating tokens and such.

Parameter oauth-credentials-url is used for reading a file containing necessary configuration for authenticating to the OAuth2 provider as a client app. The file should contain everything defined in Pulsar documentation, and type is limited to client_credentials. Below is an example of what the file should look like.

{
    "type": "client_credentials",
    "client_id": "pulsar",
    "client_secret": "1234-abcd-5678-9011-ab56ac4564sa56",
    "issuer_url": "https://my-oauth2-server.com/auth/realms/pulsar-realm"
}

The example shows something similar that could be set when using KeyCloak or a similar SSO. Audience must be set, but Pulsar service can be configured to ignore this value, and in such case you might put any text with one or more characters. For more details, see the Apache Pulsar documentation. Consult Java client to understand more about limitations for client_credentials.

The Pulsar library takes care of the OAuth2 JWT refreshing and such. The Pulsar server must be configured to use OAuth2 and role attribute in its configuration files must be specified for Pulsar to be able to detect which kind of "user" is requesting data.

8 Transport encryption

Transport encryption

To use transport encryption you must configure proper connection string as well as provide necessary information about certificate path, ciphers, and protocols as shown in example below.

pulsar.service-url=pulsar+ssl://localhost:6651
pulsar.tls-cert-file-path=path/to/cert.pem
pulsar.tls-ciphers[0]=TLS_RSA_WITH_AES_256_GCM_SHA384
pulsar.tls-ciphers[1]=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
pulsar.tls-protocols[0]=TLSv1.3
pulsar.tls-protocols[1]=TLSv1.2
pulsar:
    service-url: 'pulsar+ssl://localhost:6651'
    tls-cert-file-path: 'path/to/cert.pem'
    tls-ciphers:
        - TLS_RSA_WITH_AES_256_GCM_SHA384
        - TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
    tls-protocols:
        - TLSv1.3
        - TLSv1.2
[pulsar]
  service-url="pulsar+ssl://localhost:6651"
  tls-cert-file-path="path/to/cert.pem"
  tls-ciphers=[
    "TLS_RSA_WITH_AES_256_GCM_SHA384",
    "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"
  ]
  tls-protocols=[
    "TLSv1.3",
    "TLSv1.2"
  ]
pulsar {
  serviceUrl = "pulsar+ssl://localhost:6651"
  tlsCertFilePath = "path/to/cert.pem"
  tlsCiphers = ["TLS_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"]
  tlsProtocols = ["TLSv1.3", "TLSv1.2"]
}
{
  pulsar {
    service-url = "pulsar+ssl://localhost:6651"
    tls-cert-file-path = "path/to/cert.pem"
    tls-ciphers = ["TLS_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"]
    tls-protocols = ["TLSv1.3", "TLSv1.2"]
  }
}

It’s necessary to ensure that both server and client certificates match with supported ciphers and protocols. By default hostname is not verified but can be enabled by using tls-verify-hostname: true

TLS as authentication method is not yet supported.

9 Protocol Buffers

Protocol buffers support

In order to allow protocol buffer native messaging support you need to include protocol buffers google and micronaut dependencies

implementation("io.micronaut:protobuf-java")
<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>protobuf-java</artifactId>
</dependency>
implementation("io.micronaut:micronaut-protobuff-support")
<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-protobuff-support</artifactId>
</dependency>

10 Repository

You can find the source code of this project in this repository: