implementation("io.micronaut.gcp:micronaut-gcp-common")
Table of Contents
Micronaut GCP
Provides integration between Micronaut and Google Cloud Platform (GCP)
Version: 5.7.2-SNAPSHOT
1 Introduction
This project provides various extensions to Micronaut to integrate Micronaut with Google Cloud Platform (GCP).
2 Setting up GCP Support
The micronaut-gcp-common
module includes basic setup for running applications on Google Cloud.
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-common</artifactId>
</dependency>
Prerequisites:
-
You should have a Google Cloud Platform project created
-
Configure default project
gcloud config set project YOUR_PROJECT_ID
-
Authenticate with
gcloud auth login
-
Authenticate application default credential with
gcloud auth application-default login
It’s strongly recommended that you use a Service Account for your application.
Google Project ID
The module features a base GoogleCloudConfiguration which you can use to configure or retrieve the GCP Project ID:
Property | Type | Description |
---|---|---|
|
java.lang.String |
Returns the Google project ID for the project. |
You can inject this bean and use the getProjectId()
method to retrieve the configured or detected project ID.
Google Credentials
The module will setup a bean of exposing the com.google.auth.oauth2.GoogleCredentials
instance that are either detected from the local environment or configured by GoogleCredentialsConfiguration:
Property | Type | Description |
---|---|---|
|
java.util.List |
The scopes to use. |
|
java.lang.String |
The location of the service account credential key file. See <a href="https://cloud.google.com/iam/docs/understanding-service-accounts">Understanding Service Accounts</a> for more information on generating a service account key file. |
|
java.lang.String |
The Base64 encoded service account key content. This is not recommended except if you need to encode service account key via an environmental variable. For other use cases, configure <pre>location</pre> instead. |
|
boolean |
Allows disabling Google credentials configuration. This may be useful in situations where you don’t want to authenticate despite having the Google Cloud SDK configured. Default value is true. |
|
boolean |
If the HttpClient based transport should be used for retrieving authentication tokens. Default value is true. Note that if HttpClient is not on the classpath, the GCP SDK’s default transport will be used instead. |
Debug Logging
The underlying GCP SDK libraries use the standard java.util.logging
package (JUL) for log statements. The libraries are fairly conservative in what they log by default. If you need to debug the GCP libraries' activity, especially their GRPC-based communication with the GCP cloud services, it can be useful to turn up the logging level. In order to do this in conjunction with the framework’s SLF4J-based logging, it is necessary to perform some additional setup to enable the JUL bridge library for SLF4J.
There is an unavoidable performance impact to enabled JUL log statements when using the jul-to-slf4j bridge, thus it is advised to be conservative in enabling this configuration, preferably only for debugging purposes.
|
To enable the GCP library debug logging, first add the jul-to-slf4j.jar
dependency to your classpath:
runtimeOnly("org.slf4j:jul-to-slf4j:2.0.9")
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<version>2.0.9</version>
<scope>runtime</scope>
</dependency>
Next you can either enable the JUL bridge class SLF4JBridgeHandler
programmatically during application initialization (such as in the main
method of your application), or by adding the following line to a logging.properties
file on your classpath (see the SLF4JBridgeHandler javadocs for more details):
handlers = org.slf4j.bridge.SLF4JBridgeHandler
Next add the following configuration for LevelChangePropagator
(which eliminates the performance impact of disabled JUL log statements) to your SLF4J configuration:
<configuration>
<contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator"/>
<!-- rest of the configuration file .... -->
</configuration>
Once this is done, you can set the logging level for GCP library classes in the usual manner using SLF4J configuration.
3 Release History
For this project, you can find a list of releases (with release notes) here:
4 Google Cloud Logging Support
Google Cloud Logging integration is available via StackdriverJsonLayout that formats log output using Stackdriver structured logging format.
To enable it add the following dependency to your project:
implementation("io.micronaut.gcp:micronaut-gcp-logging")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-logging</artifactId>
</dependency>
By default if an application is using a CONSOLE appender Stackdriver log parser will consider the entire payload of the message as a text entry, making searching and correlation with tracing impractical.
Logs on the picture above can’t be searched by attributes such as thread
and ansi coloring makes regex searching even more challenging.
When enabled the JSON appender allows logs to be easily filtered:
If you combine this module with the Stackdriver Trace module, all logs will also have a traceId field, making possible to correlate traces with log entries.
|
1 | Visual display of log levels |
2 | Correlated traceId for tracing when you have enabled cloud tracing |
3 | Simplified output without extra fields or ANSI coloring codes |
Configuring logging via console
<configuration>
<!-- Uncomment the next line to enable ANSI color code interpretation -->
<!-- <property name="STDOUT_WITH_JANSI" value="true" /> -->
<include resource="io/micronaut/gcp/logging/logback-json-appender.xml" /> (1)
<root level="INFO">
<appender-ref ref="CONSOLE_JSON" /> (2)
</root>
</configuration>
1 | Import the logback configuration that defines the JsonLayout appender |
2 | Define your root level as CONSOLE_JSON |
Overriding defaults on logging configuration
If you would like to override the fields that are included in the JsonLayout
appender, you can declare it on your own logback configuration instead of including the default from logback-json-appender.xml
:
<configuration>
<appender name="CONSOLE_JSON" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="io.micronaut.gcp.logging.StackdriverJsonLayout">
<projectId>${projectId}</projectId> (1)
<includeTraceId>true</includeTraceId>
<includeSpanId>true</includeSpanId>
<includeLevel>true</includeLevel>
<includeThreadName>false</includeThreadName>
<includeMDC>true</includeMDC>
<includeLoggerName>true</includeLoggerName>
<includeFormattedMessage>true</includeFormattedMessage>
<includeExceptionInMessage>true</includeExceptionInMessage>
<includeContextName>true</includeContextName>
<includeMessage>false</includeMessage>
<includeException>false</includeException>
</layout>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE_JSON" >
</appender-ref>
</root>
</configuration>
1 | You can override the projectId settings via property. If not defined, the default ServiceOptions.getDefaultProjectId() will be used. |
Dynamic appender selection
The JSONLayout
comes in hand when using Google Cloud Logging, but when running locally it will make your console logs unreadable. The default logback-json-appender
configuration includes both a STDOUT
and a CONSOLE_JSON
appenders, as well as a dynamic logback property called google_cloud_logging
.
You can use that variable to switch your logger appender dynamically.
You logging configuration would look like this:
<configuration>
<include resource="io/micronaut/gcp/logging/logback-json-appender.xml" />
<root level="INFO">
<appender-ref ref="${google_cloud_logging}" /> (1)
</root>
</configuration>
1 | Chooses the appropriate appender depending on the environment. |
The environment detection executes a HTTP request to the Google Cloud metadata server. If you rather skip this to improve startup time, just set MICRONAUT_ENVIRONMENTS environment variable or the micronaut.environments System property as described in the reference documentation.
|
5 Stackdriver Trace
The micronaut-gcp-tracing
integrates Micronaut with Cloud Trace from Google Cloud Operations (formerly Stackdriver).
To enable it add the following dependency:
implementation("io.micronaut.gcp:micronaut-gcp-tracing")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-tracing</artifactId>
</dependency>
Then enabling Zipkin tracing in your application configuration:
tracing.zipkin.enabled=true
tracing.zipkin.sampler=probability=1.0
tracing.gcp.tracing.enabled=true
tracing:
zipkin:
enabled: true
sampler:
probability=1.0
gcp:
tracing:
enabled: true
[tracing]
[tracing.zipkin]
enabled=true
sampler="probability=1.0"
[tracing.gcp]
[tracing.gcp.tracing]
enabled=true
tracing {
zipkin {
enabled = true
sampler = "probability=1.0"
}
gcp {
tracing {
enabled = true
}
}
}
{
tracing {
zipkin {
enabled = true
sampler = "probability=1.0"
}
gcp {
tracing {
enabled = true
}
}
}
}
{
"tracing": {
"zipkin": {
"enabled": true,
"sampler": "probability=1.0"
},
"gcp": {
"tracing": {
"enabled": true
}
}
}
}
-
sampler.probability
(optional) Set sampling probability to 100% for dev/testing purposes to observe traces -
gcp.tracing.enabled
(optional) Enable/disable Stackdriver Trace configuration, defaults to true
See the guide for OpenTelemetry Tracing with Google Cloud Trace and the Micronaut Framework to learn more. |
6 Authorizing HTTP Clients
The micronaut-gcp-http-client
module can be used to help authorize service-to-service communication. To get started add the following module:
implementation("io.micronaut.gcp:micronaut-gcp-http-client")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-http-client</artifactId>
</dependency>
You should then configure the service accounts as per the documentation on service-to-service communication and the enable the filter for the outgoing URI paths you wish to include the Google-signed OAuth ID token:
gcp.http.client.auth.patterns[0]=/foo/**
gcp.http.client.auth.patterns[1]=/bar/**
gcp:
http:
client:
auth:
patterns:
- /foo/**
- /bar/**
[gcp]
[gcp.http]
[gcp.http.client]
[gcp.http.client.auth]
patterns=[
"/foo/**",
"/bar/**"
]
gcp {
http {
client {
auth {
patterns = ["/foo/**", "/bar/**"]
}
}
}
}
{
gcp {
http {
client {
auth {
patterns = ["/foo/**", "/bar/**"]
}
}
}
}
}
{
"gcp": {
"http": {
"client": {
"auth": {
"patterns": ["/foo/**", "/bar/**"]
}
}
}
}
}
7 Cloud Function Support
Micronaut GCP includes extended support for Google Cloud Function - designed for serverless workloads.
7.1 Simple Functions
Micronaut GCP offers two ways to write cloud functions with Micronaut. The first way is more low level and involves using Micronaut’s built in support for functions. Simply add the following dependency to your classpath:
implementation("io.micronaut.gcp:micronaut-gcp-function")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-function</artifactId>
</dependency>
Then add the Cloud Function API as a compileOnly
dependency (provided
with Maven):
compileOnly("com.google.cloud.functions:functions-framework-api")
<dependency>
<groupId>com.google.cloud.functions</groupId>
<artifactId>functions-framework-api</artifactId>
<scope>provided</scope>
</dependency>
Now define a class that implements one of Google Cloud Found’s interfaces, for example com.google.cloud.functions.BackgroundFunction
, and extends from io.micronaut.function.executor.FunctionInitializer
.
The following is an example of a BackgroundFunction
that uses Micronaut and Google Cloud Function:
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.background;
import com.google.cloud.functions.*;
import io.micronaut.gcp.function.GoogleFunctionInitializer;
import jakarta.inject.*;
import java.util.*;
public class Example extends GoogleFunctionInitializer // (1)
implements BackgroundFunction<PubSubMessage> { // (2)
@Inject LoggingService loggingService; // (3)
@Override
public void accept(PubSubMessage message, Context context) {
loggingService.logMessage(message);
}
}
class PubSubMessage {
String data;
Map<String, String> attributes;
String messageId;
String publishTime;
}
@Singleton
class LoggingService {
void logMessage(PubSubMessage message) {
// log the message
}
}
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.background
import com.google.cloud.functions.*
import io.micronaut.gcp.function.GoogleFunctionInitializer
import jakarta.inject.*
class Example extends GoogleFunctionInitializer // (1)
implements BackgroundFunction<PubSubMessage> { // (2)
@Inject LoggingService loggingService // (3)
@Override
void accept(PubSubMessage message, Context context) {
loggingService.logMessage(message)
}
}
class PubSubMessage {
String data
Map<String, String> attributes
String messageId
String publishTime
}
@Singleton
class LoggingService {
void logMessage(PubSubMessage message) {
// log the message
}
}
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.background
import com.google.cloud.functions.BackgroundFunction
import com.google.cloud.functions.Context
import io.micronaut.gcp.function.GoogleFunctionInitializer
import jakarta.inject.Inject
import jakarta.inject.Singleton
class Example : GoogleFunctionInitializer(), // (1)
BackgroundFunction<PubSubMessage> { // (2)
@Inject
lateinit var loggingService: LoggingService // (3)
override fun accept(message: PubSubMessage, context: Context) {
loggingService.logMessage(message)
}
}
class PubSubMessage {
var data: String? = null
var attributes: Map<String, String>? = null
var messageId: String? = null
var publishTime: String? = null
}
@Singleton
class LoggingService {
fun logMessage(message: PubSubMessage) {
// log the message
}
}
1 | The function extends from io.micronaut.function.executor.FunctionInitializer |
2 | The function implements com.google.cloud.functions.BackgroundFunction |
3 | Dependency injection can be used on the fields |
When you extend from FunctionInitializer
the Micronaut ApplicationContext
will be initialized and dependency injection will be performed on the function instance. You can use inject any bean using jakarta.inject.Inject
as usual.
Functions require a no argument constructor hence you must use field injection (which requires lateinit in Kotlin) when injecting dependencies into the function itself.
|
The FunctionInitializer
super class provides numerous methods that you can override to customize how the ApplicationContext
is built if desired.
Running Functions Locally
Raw functions cannot be executed locally. They can be tested by instantiating the function and inspecting any side effects by providing mock arguments or mocking dependent beans.
Deployment
When deploying the function to Cloud Function you should use the fully qualified name of the function class as the handler reference.
First build the function with:
$ ./gradlew clean shadowJar
Then cd
into the build/libs
directory (deployment has to be done from the location where the JAR file resides):
$ cd build/libs
To deploy the function make sure you have gcloud
CLI then run:
$ gcloud beta functions deploy myfunction --entry-point example.function.Function --runtime java11 --trigger-http
In the example above myfunction
refers to the name of your function and can be changed to whatever name you prefer to name your function. example.function.Function
refers to the fully qualified name of your function class.
To obtain the trigger URL you can use the following command:
$ YOUR_HTTP_TRIGGER_URL=$(gcloud beta functions describe myfunction --format='value(httpsTrigger.url)')
You can then use this variable to test the function invocation:
$ curl -i $YOUR_HTTP_TRIGGER_URL
7.2 HTTP Functions
It is common to want to take just a slice of a regular Micronaut HTTP server application and deploy it as a function.
Configuration
To facilitate this model, Micronaut GCP includes an additional module that allows you to use regular Micronaut annotations like @Controller
and @Get
to define your functions that can be deployed to cloud function.
With this model you need to add the micronaut-gcp-function-http
dependency to your application:
implementation("io.micronaut.gcp:micronaut-gcp-function-http")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-function-http</artifactId>
</dependency>
And define the Google Function API as a development only dependency:
developmentOnly("com.google.cloud.functions:functions-framework-api")
<dependency>
<groupId>com.google.cloud.functions</groupId>
<artifactId>functions-framework-api</artifactId>
<scope>provided</scope>
</dependency>
Running Functions Locally
First to run the function locally you should then make the regular Micronaut server a developmentOnly
dependency since it is not necessary to include it in the JAR file that will be deployed to Cloud Function:
developmentOnly("io.micronaut:micronaut-http-server-netty")
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-http-server-netty</artifactId>
<scope>provided</scope>
</dependency>
You can then use ./gradlew run
or ./mvnw compile exec:exec
to run the function locally using Micronaut’s Netty-based server.
Alternatively, you could configure the Google Function Framework for Java which includes a Maven plugin, or for Gradle include the following:
configurations {
invoker
}
dependencies {
invoker 'com.google.cloud.functions.invoker:java-function-invoker:1.0.0-beta1'
}
task('runFunction', type: JavaExec, dependsOn: classes) {
main = 'com.google.cloud.functions.invoker.runner.Invoker'
classpath(configurations.invoker)
args(
'--target', 'io.micronaut.gcp.function.http.HttpFunction',
'--classpath', (configurations.runtimeClasspath + sourceSets.main.output).asPath,
'--port', 8081
)
}
With this in place you can run ./gradlew runFunction
to run the function locally.
Deployment
When deploying the function to Cloud Function you should use the HttpFunction class as the handler reference.
First build the function with:
$ ./gradlew clean shadowJar
Then cd
into the build/libs
directory (deployment has to be done from the location where the JAR file resides):
$ cd build/libs
To deploy the function make sure you have gcloud
CLI then run:
$ gcloud beta functions deploy myfunction --entry-point io.micronaut.gcp.function.http.HttpFunction --runtime java11 --trigger-http
In the example above myfunction
refers to the name of your function and can be changed to whatever name you prefer to name your function.
To obtain the trigger URL you can use the following command:
$ YOUR_HTTP_TRIGGER_URL=$(gcloud beta functions describe myfunction --format='value(httpsTrigger.url)')
You can then use this variable to test the function invocation:
$ curl -i $YOUR_HTTP_TRIGGER_URL/hello/John
See the guide for Deploy an HTTP Function to Google Cloud Functions to learn more. |
7.3 CloudEvents Functions
To use CloudEvents, add the following dependency:
implementation("io.micronaut.gcp:micronaut-gcp-function-cloudevents")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-function-cloudevents</artifactId>
</dependency>
In your application, create a class that extends GoogleCloudEventsFunction. Google publishes POJOs for Google Cloud Events. For example, to subscribe to a Google Cloud Storage event, your function may look like this:
public class TestFunction extends GoogleCloudEventsFunction<StorageObjectData> {
@Override
protected void accept(@NonNull CloudEventContext context, @Nullable StorageObjectData data) throws Exception {
}
}
8 Google Cloud Run
Google Cloud Run ia a fully-managed serverless platform for containerized applications.
To learn more see the Micronaut guides for Google Cloud Run
9 Google Cloud Pub/Sub Support
9.1 Introduction
This project provides integration between Micronaut and Google Cloud PubSub. It uses the official Google Cloud Pub/Sub client java libraries to create Publisher and Subscribers while keeping a similar programming model for messaging as the ones defined in Micronaut RabbitMQ and Micronaut Kafka.
Support is provided for the Pull (for long-running processes) and Push (ideal for serverless environments such as Cloud Run) styles of message consumption using a consistent programming model.
The project does not attempt to create any of the resources in Google Cloud such as Topics and Subscription. Make sure your project has the correct resources and the Service Account being used has proper permissions. |
9.2 Quickstart
To add support for Google Cloud Pub/Sub to an existing project, add the following dependencies to your build.
implementation("io.micronaut.gcp:micronaut-gcp-pubsub")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-pubsub</artifactId>
</dependency>
Creating a Pub/Sub Publisher with @PubSubClient
To publish messages to Google Cloud Pub/Sub, just define an interface that is annotated with @PubSubClient.
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
@PubSubClient // (1)
public interface AnimalClient {
@Topic("animals") // (2)
void send(byte[] data); // (3)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
@PubSubClient // (1)
interface AnimalClient {
@Topic("animals") // (2)
void send(byte[] data) // (3)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
@PubSubClient // (1)
interface AnimalClient {
@Topic("animals") // (2)
fun send(data: ByteArray) // (3)
}
1 | The @PubSubClient annotation is used to designate this interface as a client |
2 | The topic animals will be used to publish messages |
3 | The method accepts one argument that will be used as message body |
Make sure your current GCP project has a topic named animals before starting the application.
|
At compile time Micronaut will create an implementation of the above interface. You can then inject that interface on your business methods via @Inject
and use it.
import io.micronaut.gcp.pubsub.support.Animal;
import jakarta.inject.Singleton;
@Singleton
public final class AnimalService {
private final AnimalClient animalClient;
public AnimalService(AnimalClient animalClient) { // (1)
this.animalClient = animalClient;
}
public void someBusinessMethod(Animal animal) {
byte[] serializedBody = serialize(animal);
animalClient.send(serializedBody);
}
private byte[] serialize(Animal animal) { // (2)
return null;
}
}
import jakarta.inject.Singleton;
@Singleton
final class AnimalService {
private final AnimalClient animalClient;
AnimalService(AnimalClient animalClient) { // (1)
this.animalClient = animalClient
}
void someBusinessMethod(Animal animal) {
byte[] serializedBody = serialize(animal)
animalClient.send(serializedBody)
}
private byte[] serialize(Animal animal) { // (2)
return null
}
}
import io.micronaut.gcp.pubsub.support.Animal
import jakarta.inject.Singleton
@Singleton
class AnimalService(private val animalClient: AnimalClient) { // (1)
fun someBusinessMethod(animal: Animal) {
val serializedBody = serialize(animal)
animalClient.send(serializedBody)
}
private fun serialize(animal: Animal): ByteArray { // (2)
return ByteArray(0)
}
}
1 | Constructor based injection |
2 | Your custom serialization logic |
Creating a Pub/Sub Pull Subscriber with @PubSubListener and @Subscription
To listen to Pub/Sub messages you can use @PubSubListener annotation on a class to mark it as a message listener.
The following example would listen on a subscription named animals
that is configured to be attached to the topic of the previous example.
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
@PubSubListener // (1)
public class AnimalListener {
@Subscription("animals") // (2)
public void onMessage(byte[] data) { // (3)
System.out.println("Message received");
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
@PubSubListener // (1)
class AnimalListener {
@Subscription("animals") // (2)
void onMessage(byte[] data) { // (3)
System.out.println("Message received")
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
@PubSubListener // (1)
class AnimalListener {
@Subscription("animals") // (2)
fun onMessage(data: ByteArray) { // (3)
println("Message received")
}
}
1 | Designates this class to be a message listener for Pub/Sub messages |
2 | Messages routed to the animals subscription will be delivered to this method via Pull subscription |
3 | The method has a single argument that contains the serialized body of the Pub/Sub message |
Methods annotated with @Subscription will use a long-running Pull subscription style.
Creating a Pub/Sub Push Subscriber with @PubSubListener and @PushSubscription
Using the Push style of subscription is similar to the above example, only substituting the @PushSubscription annotation instead.
The following example would listen on a subscription named animals-push that is configured to be attached to the same topic of the previous example.
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.PushSubscription;
@PubSubListener // (1)
public class AnimalListener {
@PushSubscription("animals-push") // (2)
public void onPushMessage(byte[] data) { // (3)
System.out.println("Message received");
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.PushSubscription;
@PubSubListener // (1)
class AnimalListener {
@PushSubscription("animals-push") // (2)
void onPushMessage(byte[] data) { // (3)
System.out.println("Message received")
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.PushSubscription;
@PubSubListener // (1)
class AnimalListener {
@PushSubscription("animals-push") // (2)
fun onPushMessage(data: ByteArray) { // (3)
println("Message received")
}
}
1 | Designates this class to be a message listener for Pub/Sub messages |
2 | Messages routed to the animals subscription will be delivered to this method via Push subscription |
3 | The method has a single argument that contains the serialized body of the Pub/Sub message |
Methods annotated with @PushSubscription will the Push subscription style, where messages are delivered to the application via HTTP request.
9.3 Pub/Sub configuration Properties
You can customize certain aspects of the client. Pub/Sub client libraries leverage a ScheduledExecutorService for both message publishing and consumption.
If not specified the framework will configure the framework default Scheduled
executor service to be used for both Publishers and Subscribers.
See ExecutorConfiguration for the full list of options.
You can override it at PubSubConfigurationProperties to make a default value for all clients, or you can setup per Topic Publisher, or Subscription listener as discussed further bellow.
Creating a custom executor is presented on the section Configuring Thread pools .
When the application is shutting down, stopAsync()
is invoked on all of the running GCP library Subscriber instances. The subscribers will attempt to fully process all pending in-memory messages before releasing the configured executor threads. By default, the framework will in turn continue to invoke the bound subscription methods on all @PubSubListener beans until all messages have been processed. To discontinue processing of messages and enable faster shutdown, the gcp.pubsub.nack-on-shutdown
property can be set to true
, which will cause all pending unprocessed messages that have not yet reached a subscriber method to be eagerly nacked, which will cause PubSub to redeliver them according to each subscription’s configuration.
Property | Type | Description |
---|---|---|
|
java.lang.String |
The name of the {@link java.util.concurrent.ScheduledExecutorService} to be used by all {@link com.google.cloud.pubsub.v1.Publisher} instances. Defaults to "scheduled". |
|
java.lang.String |
The name of the {@link java.util.concurrent.ScheduledExecutorService} to be used by all {@link com.google.cloud.pubsub.v1.Subscriber} instances. Defaults to "scheduled". |
|
int |
How often to ping the server to keep the channel alive. Defaults to 5 minutes. |
|
java.lang.String |
Which endpoint the {@link com.google.cloud.pubsub.v1.Publisher} should publish messages to. Defaults to the global endpoint |
|
boolean |
Whether subscribers should stop processing pending in-memory messages and eagerly nack() during application shutdown. Defaults to false. |
9.4 Pub/Sub Publishers
Pub/Sub support in micronaut follows the same pattern used for other types of clients such as the HTTP Client. By annotating an interface with @PubSubClient the framework will create an implementation bean that handles communication with Pub/Sub for you.
Topics
In order to publish messages to Pub/Sub you need a method annotated with a @Topic
annotation.
For each annotated method the framework will create a dedicated Publisher with its own configuration for RetrySettings
, BatchSettings
and its own Executor
.
All settings can be overridden via configuration properties and the appropriate configuration can be passed via the configuration
attribute of the @Topic
annotation.
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubClient // (1)
public interface SimpleClient {
@Topic("animals")
void send(PubsubMessage message); // (2)
@Topic("animals")
void send(byte[] data); // (3)
@Topic("animals")
void send(Animal animal); // (4)
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
@PubSubClient // (1)
interface SimpleClient {
@Topic("animals")
void send(PubsubMessage message) // (2)
@Topic("animals")
void send(byte[] data) // (3)
@Topic("animals")
void send(Animal animal) // (4)
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
@PubSubClient // (1)
interface SimpleClient {
@Topic("animals")
fun send(message: PubsubMessage) // (2)
@Topic("animals")
fun send(data: ByteArray) // (3)
@Topic("animals")
fun send(animal: Animal) // (4)
}
1 | The @PubSubClient enables this interface to be replaced by bean implementation by micronaut. |
2 | Sending a PubsubMessage object skips any SerDes or contentType headers |
3 | Sending a byte array, SerDes will be bypassed and bytes will just be copied, but the default content type will still be application/json |
4 | You can send any domain object as long as a SerDes is configured to handled it. By default application/json is used. |
If a body argument cannot be found, an exception will be thrown. |
Resource naming
On the previous examples you noticed we used a simple naming for the topic such as animals
. Inside Google Cloud however resources are only accessible via their FQN.
In Pub/Sub case topics are named as projects/$PROJECT_ID/topics/$TOPIC_NAME
.
Micronaut integration with GCP will automatically grab the default project id available (please refer to the section Google Project Id ) and convert the simple naming of the resource into a FQN.
You can also support a FQN that uses a different project name, that is useful when your project has to publish message to more than the default project configured for the Service Account.
When publishing to a different project, make sure your service account has the proper access to the resource. |
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubClient
public interface MultipleProjectClient {
@Topic("animals")
void sendUS(Animal animal); // (1)
@Topic("projects/eu-project/topics/animals")
void sendEU(Animal animal); // (2)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubClient
interface MultipleProjectClient {
@Topic("animals")
void sendUS(Animal animal) // (1)
@Topic("projects/eu-project/topics/animals")
void sendEU(Animal animal) // (2)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
@PubSubClient
interface MultipleProjectClient {
@Topic("animals")
fun sendUS(animal: Animal) // (1)
@Topic("projects/eu-project/topics/animals")
fun sendEU(animal: Animal) // (2)
}
1 | This would use the default project configured for the application. |
2 | This would override and publish messages to a different project. |
9.4.1 Content-Type and message serialization
The contents of a PubSubMessage are always a base64 encoded ByteString. This framework provide a way to create custom Serialization/Deserialization as explained in the section Custom Serialization/Deserialization.
By default any body message argument will be serialized via JsonPubSubMessageSerDes unless specified otherwise via the contentType
property of the @Topic annotation.
The rules of message serialization are the following:
-
If the body type is PubSubMessage then SerDes is bypassed completely and no header is added to the message.
-
If the body type is
byte[]
SerDes logic is bypassed, but aContent-Type
header ofapplication/json
will be added unless overwritten by the @Topic annotation. -
For any other type, the type defined by
contentType
will be used to located the correct PubSubMessageSerDes to handle it, if none is passedapplication/json
will be used.
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.http.MediaType;
@PubSubClient
public interface CustomSerDesClient {
@Topic("animals") // (1)
void send(PubsubMessage pubsubMessage);
@Topic("animals") // (2)
void send(byte[] data);
@Topic(value = "animals", contentType = MediaType.IMAGE_GIF) // (3)
void sendWithCustomType(byte[] data);
@Topic("animals") // (4)
void send(Animal animal);
@Topic(value = "animals", contentType = MediaType.APPLICATION_XML) // (5)
void sendWithCustomType(Animal animal);
}
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.http.MediaType;
@PubSubClient
interface CustomSerDesClient {
@Topic("animals") // (1)
void send(PubsubMessage pubsubMessage)
@Topic("animals") // (2)
void send(byte[] data)
@Topic(value = "animals", contentType = MediaType.IMAGE_GIF) // (3)
void sendWithCustomType(byte[] data)
@Topic("animals") // (4)
void send(Animal animal)
@Topic(value = "animals", contentType = MediaType.APPLICATION_XML) // (5)
void sendWithCustomType(Animal animal)
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.http.MediaType
@PubSubClient
interface CustomSerDesClient {
@Topic("animals") // (1)
fun send(pubsubMessage: PubsubMessage)
@Topic("animals") // (2)
fun send(data: ByteArray)
@Topic(value = "animals", contentType = MediaType.IMAGE_GIF) // (3)
fun sendWithCustomType(data: ByteArray)
@Topic("animals") // (4)
fun send(animal: Animal)
@Topic(value = "animals", contentType = MediaType.APPLICATION_XML) // (5)
fun sendWithCustomType(animal: Animal)
}
1 | Using PubsubMessage no SerDes will be used, no headers are added to the message, user is responsible to create the message. |
2 | Using byte array. No SerDes logic is used, however a Content-Type header in this example is added with value application/json |
3 | Using byte array and specifying a contentType. SerDes will be bypassed and a Content-Type header with value image/gif will be added. |
4 | No contentType defined, and JsonPubSubMessageSerDes will be use to serialize the object. |
5 | Using specific contentType , the framework will look for a PubSubMessageSerDes registered for type application/xml user must provide this bean or an exception is thrown at runtime. |
9.4.2 Message Headers
Google Cloud Pub/Sub messages contain a dictionary of message attributes in the form of a Map<String, String>
.
The framework binds those attributes to a @Header annotation that can be used at the class level or to the method or an argument of the method.
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.annotation.MessageHeader;
@PubSubClient
@MessageHeader(name = "application-name", value = "petclinic") // (1)
public interface CustomHeadersClient {
@MessageHeader(name = "status", value = "healthy") // (2)
@Topic("animals")
void sendWithStaticHeaders(Animal animal);
@Topic("animals")
void sendWithDynamicHeaders(Animal animal, @MessageHeader(name = "code") Integer code); // (3)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.annotation.MessageHeader;
@PubSubClient
@MessageHeader(name = "application-name", value = "petclinic") // (1)
interface CustomHeadersClient {
@MessageHeader(name = "status", value = "healthy") // (2)
@Topic("animals")
void sendWithStaticHeaders(Animal animal)
@Topic("animals")
void sendWithDynamicHeaders(Animal animal, @MessageHeader(name = "code") Integer code) // (3)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.annotation.MessageHeader
@PubSubClient
@MessageHeader(name = "application-name", value = "petclinic") // (1)
interface CustomHeadersClient {
@MessageHeader(name = "status", value = "healthy") // (2)
@Topic("animals")
fun sendWithStaticHeaders(animal: Animal)
@Topic("animals")
fun sendWithDynamicHeaders(animal: Animal, @MessageHeader(name = "code") code: Int) // (3)
}
1 | Headers specified at class level will be added to all annotated methods using @Topic |
2 | You can pass a static value as part of @Header |
3 | Values can also be passed via arguments. A ConversionService will try to convert them into a String value. |
9.4.3 Publisher properties
Pub/Sub allows each Publisher to have its own configuration for things such as executors or batching settings. This is useful when you need to publish messages to topic that uses different SLAs.
The framework allows you to create configurations and then bind those configurations to each topic annotation using the configuration
parameter.
Property | Type | Description |
---|---|---|
gcp.pubsub.publisher.*.executor |
java.lang.String |
Name of the executor to use. Default: scheduled |
gcp.pubsub.publisher.*.retry.total-timeout |
org.threeten.bp.Duration |
How long the logic should keep trying the remote calluntil it gives up completely. Default 600 seconds |
gcp.pubsub.publisher.*.retry.initial-retry-delay |
org.threeten.bp.Duration |
Delay before the first retry. Default: 100ms |
gcp.pubsub.publisher.*.retry.retry-delay-multiplier |
double |
Controls the change in retry delay. The retry delay of the previous call is multiplied by the RetryDelayMultiplier to calculate the retry delay for the next call. Default: 1.3 |
gcp.pubsub.publisher.*.retry.max-retry-delay |
org.threeten.bp.Duration |
Puts a limit on the value of the retry delay, so that the RetryDelayMultiplier can’t increase the retry delay higher than this amount. Default: 60 seconds |
gcp.pubsub.publisher.*.retry.max-attempts |
int |
Defines the maximum number of attempts to perform. Default: 0 |
gcp.pubsub.publisher.*.retry.jittered |
boolean |
Determines if the delay time should be randomized. Default: true |
gcp.pubsub.publisher.*.retry.initial-rpc-timeout |
org.threeten.bp.Duration |
Controls the timeout for the initial RPC. Default: 5 seconds |
gcp.pubsub.publisher.*.retry.rpc-timeout-multiplier |
double |
Controls the change in RPC timeout. The timeout of the previous call is multiplied by the RpcTimeoutMultiplier to calculate the timeout for the next call. Default: 1.0 |
gcp.pubsub.publisher.*.retry.max-rpc-timeout |
org.threeten.bp.Duration |
Puts a limit on the value of the RPC timeout, so that the RpcTimeoutMultiplier can’t increase the RPC timeout higher than this amount. Default 0 |
gcp.pubsub.publisher.*.batching.element-count-threshold |
java.lang.Long |
Set the element count threshold to use for batching. After this many elements are accumulated, they will be wrapped up in a batch and sent. Default: 100 |
gcp.pubsub.publisher.*.batching.request-byte-threshold |
java.lang.Long |
Set the request byte threshold to use for batching. After this many bytes are accumulated, the elements will be wrapped up in a batch and sent. Default 1000 (1Kb) |
gcp.pubsub.publisher.*.batching.delay-threshold |
org.threeten.bp.Duration |
Set the delay threshold to use for batching. After this amount of time has elapsed (counting from the first element added), the elements will be wrapped up in a batch and sent. Default 1ms |
gcp.pubsub.publisher.*.batching.is-enabled |
java.lang.Boolean |
Indicate if the batching is enabled. Default : true |
gcp.pubsub.publisher.*.flow-control.max-outstanding-element-count |
java.lang.Long |
Maximum number of outstanding elements to keep in memory before enforcing flow control. |
gcp.pubsub.publisher.*.flow-control.max-outstanding-request-bytes |
java.lang.Long |
Maximum number of outstanding bytes to keep in memory before enforcing flow control. |
gcp.pubsub.publisher.*.flow-control.limit-exceeded-behavior |
com.google.api.gax.batching.FlowController$LimitExceededBehavior |
The behavior of FlowController when the specified limits are exceeded. Defaults to Ignore. |
For example suppose you have the following configuration:
gcp.pubsub.publisher.batching.executor=batch-executor
gcp.pubsub.publisher.immediate.executor=immediate-executor
gcp.pubsub.publisher.immediate.batching.enabled=false
gcp:
pubsub:
publisher:
batching:
executor: batch-executor
immediate:
executor: immediate-executor
batching:
enabled: false
[gcp]
[gcp.pubsub]
[gcp.pubsub.publisher]
[gcp.pubsub.publisher.batching]
executor="batch-executor"
[gcp.pubsub.publisher.immediate]
executor="immediate-executor"
[gcp.pubsub.publisher.immediate.batching]
enabled=false
gcp {
pubsub {
publisher {
batching {
executor = "batch-executor"
}
immediate {
executor = "immediate-executor"
batching {
enabled = false
}
}
}
}
}
{
gcp {
pubsub {
publisher {
batching {
executor = "batch-executor"
}
immediate {
executor = "immediate-executor"
batching {
enabled = false
}
}
}
}
}
}
{
"gcp": {
"pubsub": {
"publisher": {
"batching": {
"executor": "batch-executor"
},
"immediate": {
"executor": "immediate-executor",
"batching": {
"enabled": false
}
}
}
}
}
}
You can then apply it to individual methods as:
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubClient
public interface CustomConfigurationClient {
@Topic(value = "animals", configuration = "batching") // (1)
void batchSend(Animal animal);
@Topic(value = "animals", configuration = "immediate") // (2)
void send(Animal animal);
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubClient
interface CustomConfigurationClient {
@Topic(value = "animals", configuration = "batching") // (1)
void batchSend(Animal animal)
@Topic(value = "animals", configuration = "immediate") // (2)
void send(Animal animal)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
@PubSubClient
interface CustomConfigurationClient {
@Topic(value = "animals", configuration = "batching")
fun batchSend(animal: Animal) // (1)
@Topic(value = "animals", configuration = "immediate")
fun send(animal: Animal) // (2)
}
1 | The Publisher will be configured using a configuration named batching |
2 | The Publisher will be configured using a configuration named immediate |
FlowControlSettings are actually configured for the BatchingSettings property, due the nature of Google’s Builders the configuration was
flattened at PubSubConfigurationProperties level, and it’s injected it into the RetrySettings later.
|
9.4.4 Retrieving message Ids (broker acknowledge)
All the examples so far have been using void
on the method signature. However getting a message acknowledge from the broker is usually required.
Pub/Sub returns a String
object that contains the message id that the broker generated. Your methods can also be defined using either String
or Single<String>
(for reactive support).
When you define your client you can actually choose between a few different method signatures. Depending on your choice you may get the message acknowledge back and control if the method is blocking or reactive.
-
If your method has
void
as a return, then a blocking call to publish the message is made and you don’t get the message id returned by the Pub/Sub broker. -
If your method return a
String
then a blocking call to publish the message is made and the message id is returned. -
If your method returns
Single<String>
then it’s a reactive call, and you cansubscribe
to the publisher to retrieve the message id.
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import reactor.core.publisher.Mono;
@PubSubClient
public interface CustomReturnClient {
@Topic("animals")
void send(Animal animal); // (1)
@Topic("animals")
String sendWithId(Animal animal); // (2)
@Topic("animals")
Mono<String> sendReactive(Animal animal); //(3)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
import reactor.core.publisher.Mono
@PubSubClient
interface CustomReturnClient {
@Topic("animals")
void send(Animal animal) // (1)
@Topic("animals")
String sendWithId(Animal animal) // (2)
@Topic("animals")
Mono<String> sendReactive(Animal animal) //(3)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
import reactor.core.publisher.Mono
@PubSubClient
interface CustomReturnClient {
@Topic("animals")
fun send(animal: Animal) // (1)
@Topic("animals")
fun sendWithId(animal: Animal): String // (2)
@Topic("animals")
fun sendReactive(animal: Animal?): Mono<String> //(3)
}
1 | Blocking call, message id is not returned |
2 | Blocking call, message id is returned as String |
3 | Reactive call |
9.5 Restricting locations and message ordering
Restricting storage locations
Google Cloud Pub/Sub is a globally distributed event platform. When a client publishes a message, the platform stores the message on the nearest region to the publisher. Sometimes regulations such as GDPR impose restrictions on where customer data can live. When using Micronaut integration you can specify the endpoint of the topic, either via the topic annotation or properties, so that Pub/Sub will persist the message data on the specified region. To learn more about this feature visit the resource location restriction page.
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubClient
public interface LocationClient {
@Topic(value = "animals", endpoint = "europe-west1-pubsub.googleapis.com:443") // (1)
void send(Animal animal);
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
@PubSubClient
interface LocationClient {
@Topic(value = "animals", endpoint = "europe-west1-pubsub.googleapis.com:443") // (1)
void send(Animal animal);
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
@PubSubClient
interface LocationClient {
@Topic(value = "animals", endpoint = "europe-west1-pubsub.googleapis.com:443") // (1)
fun send(animal: Animal?)
}
1 | The endpoint attribute enforces that messages sent via this method will be stored on the europe-west1 region. |
Message ordering
Google Cloud Pub/Sub supports message ordering if messages are published to a single location, and specify an ordering key. An example of this would be trading orders placed for a specific symbol. If you use the symbol as the ordering key, all messages regardless of how many publishers are guaranteed to be delivered in order.
To enable message ordering for your publishers, use the @OrderingKey in one of the method’s arguments to declare it an ordering key.
import io.micronaut.gcp.pubsub.annotation.OrderingKey;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Order;
@PubSubClient
public interface OrderClient {
@Topic(value = "orders", endpoint = "us-central1-pubsub.googleapis.com:443") // (1)
void send(Order order, @OrderingKey String key); // (2)
}
import io.micronaut.gcp.pubsub.annotation.OrderingKey
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Order
@PubSubClient
interface OrderClient {
@Topic(value = "orders", endpoint = "us-central1-pubsub.googleapis.com:443") // (1)
void send(Order order, @OrderingKey String key) // (2)
}
import io.micronaut.gcp.pubsub.annotation.OrderingKey
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Order
@PubSubClient
interface OrderClient {
@Topic(value = "orders", endpoint = "us-central1-pubsub.googleapis.com:443") // (1)
fun send(order: Order, @OrderingKey key: String) // (2)
}
1 | Ordering only works on regional endpoints, so you must declare an endpoint first |
2 | Although the Pub/Sub API requires the key to be a String, the framework uses a conversion service to convert other types to String. |
Here’s an example of how to use ordering on your clients:
import io.micronaut.gcp.pubsub.support.Order;
import jakarta.inject.Singleton;
@Singleton
public final class OrderService {
private final OrderClient client;
public OrderService(OrderClient client) {
this.client = client;
}
public void placeOrder() {
Order order = new Order(100, "GOOG");
client.send(order, order.getSymbol());
}
}
import io.micronaut.gcp.pubsub.support.Order
import jakarta.inject.Singleton
@Singleton
class OrderService {
private final OrderClient client
OrderService(OrderClient client) {
this.client = client
}
void placeOrder() {
Order order = new Order(100, "GOOG")
client.send(order, order.getSymbol())
}
}
import io.micronaut.gcp.pubsub.support.Order
import jakarta.inject.Singleton
@Singleton
class OrderService(private val client: OrderClient) {
fun placeOrder() {
val order = Order(100, "GOOG")
client.send(order, order.symbol)
}
}
9.6 Receiving messages via @PubSubListener methods
To start receiving messages you annotate a class with @PubSubListener, the framework will then use AOP to deliver messages to methods annotated with either @Subscription or @PushSubscription.
The semantics for how methods work when annotated with these two different subscription annotations are identical, differing only by the infrastructure that the framework transparently sets up to deliver the messages. In the following examples, @Subscription can be exchanged for @PushSubscription and the behavior will be the same except for some minor differences noted in the following sections. |
Subscriptions
Pull Subscriptions
All methods annotated with @Subscription will be invoked by the framework in response to receiving messages via Pull subscription.
Each annotated method creates an individual Subscriber,
that can be configured using the configuration
parameter of the @Subscription annotation.
Pull subscriptions are long-running processes that continually poll the PubSub service, and are meant to be used in an environment such as Google Kubernetes Engine.
Push Subscriptions
Methods annotated with @PushSubscription will be invoked by the framework in response to receiving messages via Push subscription.
Push messages are sent to the application by the PubSub service via HTTP request, making them an ideal fit for serverless environments such as Cloud Run.
When Push subscriptions are enabled, the application will expose a single HTTP endpoint for processing all push requests, and messages will be routed to the matching PushSubscription
method. The default path for this endpoint is /push
. This endpoint URL must be specified when setting up a Push subscription in GCP.
As Push message handling uses HTTP, you must have a Micronaut HTTP server implementation available on your classpath, or else push handling will be disabled. |
The available parameters of the @PushSubscription annotation are identical to those of @Subscription, except for configuration which is relevant only to Pull subscriptions.
|
Methods annotated with @Subscription or @PushSubscription must be unique in your application.
If two distinct methods try to subscribe to the same Subscription an error is thrown.
This is intended to avoid issues with message Acknowledgement control.
|
The annotated method must have at least one argument that is bound to the body of the message or an exception is thrown. |
Resource naming
Just as described in the Pub/Sub Publisher section, subscriptions also use simple names such as animals
.
Inside Google Cloud however resources are only accessible via their FQN. A Subscription name follows the pattern: projects/$PROJECT_ID/subscriptions/$SUBSCRIPTION_NAME
.
Micronaut integration with GCP will automatically grab the default project id available (please refer to the section Google Project Id ) and convert the simple naming of the resource into a FQN.
You can also pass a FQN as the subscription name.
This is helpful when you need to listen to subscriptions from different projects.
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubListener // (1)
public class SimpleSubscriber {
@Subscription("animals") // (2)
public void onMessage(Animal animal) {
}
@Subscription("projects/eu-project/subscriptions/animals") // (3)
public void onMessageEU(Animal animal) {
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
@PubSubListener // (1)
class SimpleSubscriber {
@Subscription("animals") // (2)
void onMessage(Animal animal) {
}
@Subscription("projects/eu-project/subscriptions/animals") // (3)
void onMessageEU(Animal animal) {
}
}
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
@PubSubListener // (1)
class SimpleSubscriber {
@Subscription("animals") // (2)
fun onMessage(animal: Animal) {
}
@Subscription("projects/eu-project/subscriptions/animals") // (3)
fun onMessageEU(animal: Animal) {
}
}
1 | @PubSubListener marks this class to be a Message Listener |
2 | Methods annotated with @Subscription will receive messages from that subscription |
3 | You can also use a FQN for the subscription, specially when you need to access a resource on a different project |
When publishing to a different project, make sure your service account has the proper access to the resource. |
9.6.1 Content-Type and message deserialization
The framework provides a custom serialization/deserialization (SerDes) mechanism for both message producers and message listeners. On the receiving end the rules to deserialize a PubSubMessage are the following:
-
If the
body
argument of the method is of PubSubMessage type, SerDes is bypassed and the "raw" message is copied to the argument. -
If the
body
argument of the method is a byte array, SerDes is bypassed and the byte contents of thePubSubMessage
are copied to the argument. -
If the
body
argument is a Pojo then the following applies:-
The default
Content-Type
isapplication/json
and the framework will use it if not overridden -
If the message contains an attribute
Content-Type
that value is used -
Finally if the @Subscription or @PushSubscription has a
contentType
value this value overrides all of the previous values
-
Automatic SerDes is a nice feature that the framework offers, but sometimes you may need to have access to the PubSubMessage
id.
This is provided via the @MessageId annotation.
Once you annotate an argument of type String
with this annotation, the message id will be copied to that argument.
PubSubMessage ids are always of type String , thus your annotated argument must also be a String .
|
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.MessageId;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.http.MediaType;
@PubSubListener
public class ContentTypeSubscriber {
@Subscription("raw-subscription") // (1)
void receiveRaw(byte[] data, @MessageId String id) {
}
@Subscription("native-subscription") // (2)
void receiveNative(PubsubMessage message) {
}
@Subscription("animals") // (3)
void receivePojo(Animal animal, @MessageId String id) {
}
@Subscription(value = "animals-legacy", contentType = MediaType.APPLICATION_XML) // (4)
void receiveXML(Animal animal, @MessageId String id) {
}
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.MessageId;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubListener
class ContentTypeSubscriber {
@Subscription("raw-subscription") // (1)
void receiveRaw(byte[] data, @MessageId String id) {
}
@Subscription("native-subscription") // (2)
void receiveNative(PubsubMessage message) {
}
@Subscription("animals") // (3)
void receivePojo(Animal animal, @MessageId String id) {
}
@Subscription(value = "animals-legacy", contentType = "application/xml") // (4)
void receiveXML(Animal animal, @MessageId String id) {
}
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.MessageId
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
@PubSubListener
class ContentTypeSubscriber(private val messageProcessor: MessageProcessor) {
@Subscription("raw-subscription")
fun receiveRaw(data: ByteArray, @MessageId id: String) { // (1)
messageProcessor.handleByteArrayMessage(data).block()
}
@Subscription("native-subscription")
fun receiveNative(message: PubsubMessage) { // (2)
messageProcessor.handlePubsubMessage(message).block()
}
@Subscription("animals")
fun receivePojo(animal: Animal, @MessageId id: String) { // (3)
messageProcessor.handleAnimalMessage(animal).block()
}
@Subscription(value = "animals-legacy", contentType = "application/xml")
fun receiveXML(animal: Animal, @MessageId id: String) { // (4)
messageProcessor.handleAnimalMessage(animal).block()
}
}
1 | Bytes are copied, SerDes is bypassed, message id injected for usage |
2 | SerDes is bypassed, PubSubMessage object is copied, no need to use @MessageId |
3 | The framework will try to deserialize this payload. If no Content-Type header is found, will default to application/json |
4 | Uses a custom SerDes and the framework will find a PubSubMessageSerDes that can handle application/xml |
Though the deserialization is identical for @PushSubscription methods, one thing that requires additional consideration is that since Push messages are delivered via HTTP the subscriber methods will be executed on the main HTTP event loop thread by default. Care must be taken not to block the event loop thread.
As a convenience, @PushSubscription methods that are known to use blocking operations during the course of message processing may be annotated with @ExecuteOn. This will cause the invocation of the method to occur in a separate thread using the ExecutorService
specified in the annotation.
If all the subscriber methods in a given @PubSubListener class are known to be blocking, then the @ExecuteOn annotation may be used at the class level instead and all @PushSubscription methods in that class will be executed by the specified ExecutorService
.
If you use the TaskExecutors.BLOCKING ExecutorService , Virtual Threads will be used if available.
|
The following example is equivalent to the preceding one, except using @ExecuteOn at the class level and @PushSubscription methods:
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.MessageId;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.PushSubscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.http.MediaType;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
@PubSubListener
@ExecuteOn(TaskExecutors.BLOCKING) // (1)
public class ContentTypePushSubscriber {
@PushSubscription("raw-push-subscription") // (2)
void receiveRaw(byte[] data, @MessageId String id) {
//process with blocking code
}
@PushSubscription("native-push-subscription") // (3)
void receiveNative(PubsubMessage message) {
//process with blocking code
}
@PushSubscription("animals-push") // (4)
void receivePojo(Animal animal, @MessageId String id) {
//process with blocking code
}
@PushSubscription(value = "animals-legacy-push", contentType = MediaType.APPLICATION_XML) // (5)
void receiveXML(Animal animal, @MessageId String id) {
//process with blocking code
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.PushSubscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
@PubSubListener
@ExecuteOn(TaskExecutors.BLOCKING) // (1)
class ContentTypePushSubscriber {
@PushSubscription("raw-push-subscription") // (2)
void receiveRaw(byte[] data, @MessageId String id) {
//process with blocking code
}
@PushSubscription("native-push-subscription") // (3)
void receiveNative(PubsubMessage message) {
//process with blocking code
}
@PushSubscription("animals-push") // (4)
void receivePojo(Animal animal, @MessageId String id) {
//process with blocking code
}
@PushSubscription(value = "animals-legacy-push", contentType = "application/xml") // (5)
void receiveXML(Animal animal, @MessageId String id) {
//process with blocking code
}
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.MessageId
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.PushSubscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
@PubSubListener
@ExecuteOn(TaskExecutors.BLOCKING) // (1)
class ContentTypePushSubscriber(private val messageProcessor: MessageProcessor) {
@PushSubscription("raw-push-subscription")
fun receiveRaw(data: ByteArray, @MessageId id: String) { // (2)
messageProcessor.handleByteArrayMessage(data).block()
}
@PushSubscription("native-push-subscription")
fun receiveNative(message: PubsubMessage) { // (3)
messageProcessor.handlePubsubMessage(message).block()
}
@PushSubscription("animals-push")
fun receivePojo(animal: Animal, @MessageId id: String) { // (4)
messageProcessor.handleAnimalMessage(animal).block()
}
@PushSubscription(value = "animals-legacy-push", contentType = "application/xml")
fun receiveXML(animal: Animal, @MessageId id: String) { // (5)
messageProcessor.handleAnimalMessage(animal).block()
}
}
1 | The class is annotated with @ExecuteOn so that all of the @PushSubscription methods will be executed on a separate thread |
2 | Bytes are copied, SerDes is bypassed, message id injected for usage |
3 | SerDes is bypassed, PubSubMessage object is copied, no need to use @MessageId |
4 | The framework will try to deserialize this payload. If no Content-Type header is found, will default to application/json |
5 | Uses a custom SerDes and the framework will find a PubSubMessageSerDes that can handle application/xml |
9.6.2 Receiving and Returning Reactive Types
In addition to byte[], PubsubMessage, and POJOs you can also define listener methods that receive a Reactive type such as a Reactor Mono or a RxJava Single. The same deserialization rules as above will be applied using the type parameter of the Reactive type.
For the conversion to Reactive types to work correctly, you must add either the library Micronaut Reactor or Micronaut RxJava 3 to your application’s dependencies.
For example, using Reactor:
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.MessageId;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
import reactor.core.publisher.Mono;
@PubSubListener
public class ReactiveSubscriber {
private final MessageProcessor messageProcessor;
public ReactiveSubscriber(MessageProcessor messageProcessor) {
this.messageProcessor = messageProcessor;
}
@Subscription("raw-subscription") // (1)
Mono<Object> receiveRaw(Mono<byte[]> data, @MessageId String id) {
return data.flatMap(messageProcessor::handleByteArrayMessage);
}
@Subscription("native-subscription") // (2)
Mono<Object> receiveNative(Mono<PubsubMessage> message) {
return message.flatMap(messageProcessor::handlePubSubMessage);
}
@Subscription("animals") // (3)
Mono<Object> receivePojo(Mono<Animal> animal, @MessageId String id) {
return animal.flatMap(messageProcessor::handleAnimalMessage);
}
@Subscription(value = "animals-legacy", contentType = "application/xml") // (4)
Mono<Object> receiveXML(Mono<Animal> animal, @MessageId String id) {
return animal.flatMap(messageProcessor::handleAnimalMessage);
}
}
import io.micronaut.gcp.pubsub.annotation.MessageId
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import reactor.core.publisher.Mono
@PubSubListener
class ReactiveSubscriber {
private final MessageProcessor messageProcessor
ReactiveSubscriber(MessageProcessor messageProcessor) {
this.messageProcessor = messageProcessor
}
@Subscription("raw-subscription") // (1)
Mono<Object> receiveRaw(Mono<byte[]> data, @MessageId String id) {
return data.flatMap(messageProcessor::handleByteArrayMessage)
}
@Subscription("native-subscription") // (2)
Mono<Object> receiveNative(Mono<PubsubMessage> message) {
return message.flatMap(messageProcessor::handlePubSubMessage)
}
@Subscription("animals") // (3)
Mono<Object> receivePojo(Mono<Animal> animal, @MessageId String id) {
return animal.flatMap(messageProcessor::handleAnimalMessage)
}
@Subscription(value = "animals-legacy", contentType = "application/xml") // (4)
Mono<Object> receiveXML(Mono<Animal> animal, @MessageId String id) {
return animal.flatMap(messageProcessor::handleAnimalMessage)
}
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.MessageId
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import reactor.core.publisher.Mono
@PubSubListener
class ReactiveSubscriber(private val messageProcessor: MessageProcessor) {
@Subscription("raw-subscription")
fun receiveRaw(data: Mono<ByteArray>, @MessageId id: String): Mono<Any> { // (1)
return data.flatMap { payload ->
messageProcessor.handleByteArrayMessage(payload)
}
}
@Subscription("native-subscription")
fun receiveNative(message: Mono<PubsubMessage>): Mono<Any> { // (2)
return message.flatMap { payload ->
messageProcessor.handlePubsubMessage(payload)
}
}
@Subscription("animals")
fun receivePojo(message: Mono<Animal>, @MessageId id: String): Mono<Any> { // (3)
return message.flatMap { animal ->
messageProcessor.handleAnimalMessage(animal)
}
}
@Subscription(value = "animals-legacy", contentType = "application/xml")
fun receiveXML(message: Mono<Animal>, @MessageId id: String): Mono<Any> { // (4)
return message.flatMap { animal ->
messageProcessor.handleAnimalMessage(animal)
}
}
}
1 | Bytes are copied and wrapped in a Mono , SerDes is bypassed, message id injected for usage |
2 | SerDes is bypassed, PubSubMessage object is copied and wrapped in a Mono , no need to use @MessageId |
3 | The framework will try to deserialize this payload into Mono<Animal> . If no Content-Type header is found, will default to application/json |
4 | Uses a custom SerDes and the framework will find a PubSubMessageSerDes that can handle application/xml and then pass the deserialized payload as Mono<Animal> . |
Using Reactor with push subscriptions is similar:
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.MessageId;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.PushSubscription;
import io.micronaut.gcp.pubsub.support.Animal;
import reactor.core.publisher.Mono;
@PubSubListener
public class ReactivePushSubscriber {
private final MessageProcessor messageProcessor;
public ReactivePushSubscriber(MessageProcessor messageProcessor) {
this.messageProcessor = messageProcessor;
}
@PushSubscription("raw-push-subscription") // (1)
Mono<Object> receiveRaw(Mono<byte[]> data, @MessageId String id) {
return data.flatMap(messageProcessor::handleByteArrayMessage);
}
@PushSubscription("native-push-subscription") // (2)
Mono<Object> receiveNative(Mono<PubsubMessage> message) {
return message.flatMap(messageProcessor::handlePubSubMessage);
}
@PushSubscription("animals-push") // (3)
Mono<Object> receivePojo(Mono<Animal> animal, @MessageId String id) {
return animal.flatMap(messageProcessor::handleAnimalMessage);
}
@PushSubscription(value = "animals-legacy-push", contentType = "application/xml") // (4)
Mono<Object> receiveXML(Mono<Animal> animal, @MessageId String id) {
return animal.flatMap(messageProcessor::handleAnimalMessage);
}
}
import io.micronaut.gcp.pubsub.annotation.PushSubscription
import io.micronaut.gcp.pubsub.support.Animal
import reactor.core.publisher.Mono
@PubSubListener
class ReactivePushSubscriber {
private final MessageProcessor messageProcessor
ReactivePushSubscriber(MessageProcessor messageProcessor) {
this.messageProcessor = messageProcessor
}
@PushSubscription("raw-push-subscription") // (1)
Mono<Object> receiveRaw(Mono<byte[]> data, @MessageId String id) {
return data.flatMap(messageProcessor::handleByteArrayMessage)
}
@PushSubscription("native-push-subscription") // (2)
Mono<Object> receiveNative(Mono<PubsubMessage> message) {
return message.flatMap(messageProcessor::handlePubSubMessage)
}
@PushSubscription("animals-push") // (3)
Mono<Object> receivePojo(Mono<Animal> animal, @MessageId String id) {
return animal.flatMap(messageProcessor::handleAnimalMessage)
}
@PushSubscription(value = "animals-legacy-push", contentType = "application/xml") // (4)
Mono<Object> receiveXML(Mono<Animal> animal, @MessageId String id) {
return animal.flatMap(messageProcessor::handleAnimalMessage)
}
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.MessageId
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.PushSubscription
import io.micronaut.gcp.pubsub.support.Animal
import reactor.core.publisher.Mono
@PubSubListener
class ReactivePushSubscriber(private val messageProcessor: MessageProcessor) {
@PushSubscription("raw-push-subscription")
fun receiveRaw(data: Mono<ByteArray>, @MessageId id: String): Mono<Any> { // (1)
return data.flatMap { payload ->
messageProcessor.handleByteArrayMessage(payload)
}
}
@PushSubscription("native-push-subscription")
fun receiveNative(message: Mono<PubsubMessage>): Mono<Any> { // (2)
return message.flatMap { payload ->
messageProcessor.handlePubsubMessage(payload)
}
}
@PushSubscription("animals-push")
fun receivePojo(message: Mono<Animal>, @MessageId id: String): Mono<Any> { // (3)
return message.flatMap { animal ->
messageProcessor.handleAnimalMessage(animal)
}
}
@PushSubscription(value = "animals-legacy-push", contentType = "application/xml")
fun receiveXML(message: Mono<Animal>, @MessageId id: String): Mono<Any> { // (4)
return message.flatMap { animal ->
messageProcessor.handleAnimalMessage(animal)
}
}
}
1 | Bytes are copied and wrapped in a Mono , SerDes is bypassed, message id injected for usage |
2 | SerDes is bypassed, PubSubMessage object is copied and wrapped in a Mono , no need to use @MessageId |
3 | The framework will try to deserialize this payload into Mono<Animal> . If no Content-Type header is found, will default to application/json |
4 | Uses a custom SerDes and the framework will find a PubSubMessageSerDes that can handle application/xml and then pass the deserialized payload as Mono<Animal> . |
Note that the above examples all return a Mono<Object>
to allow for a fully non-blocking reactive message processing pipeline. When a Publisher
is returned from a @Subscription
method, it will be subscribed to by the framework and the message will not be auto-acknowledged until the Publisher
completes successfully. If the Publisher
completes with an error, the framework will nack()
the message for re-delivery.
9.6.3 Message Headers
Google Cloud Pub/Sub messages contain a dictionary of message attributes in the form of a Map<String, String>
.
The framework binds those attributes to a @Header annotation that can be used at an argument of the method.
A ConversionService is used to try to convert from the String
value of the attribute to the target type on the method.
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.annotation.MessageHeader;
@PubSubListener
public class CustomHeaderSubscriber {
@Subscription("animals")
public void onMessage(Animal animal, @MessageHeader("Content-Type") String contentType, @MessageHeader("code") Integer code) { // (1)
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.annotation.MessageHeader
@PubSubListener
class CustomHeaderSubscriber {
@Subscription("animals")
void onMessage(Animal animal, @MessageHeader("Content-Type") String contentType, @MessageHeader("code") Integer code) { // (1)
}
}
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.annotation.MessageHeader
@PubSubListener
class CustomHeaderSubscriber {
@Subscription("animals")
fun onMessage(animal: Animal,
@MessageHeader("Content-Type") contentType: String, @MessageHeader("code") code: Int) { // (1)
}
}
1 | Each annotated argument will be mapped to the corresponding attribute value. The ConversionService will try to convert to the target type |
9.6.4 Pull Subscriber properties
Pub/Sub allows each Pull Subscriber to have its own configuration for things such as executors or flow control settings.
The framework allows you to create configurations and then bind those configurations to each @Subscription using the configuration
parameter.
Property | Type | Description |
---|---|---|
gcp.pubsub.subscriber.*.executor |
java.lang.String |
Name of the executor to use. Default: scheduled |
gcp.pubsub.subscriber.*.parallel-pull-count |
java.lang.Integer |
number of concurrent pulls. Default: 1 |
gcp.pubsub.subscriber.*.max-ack-extension-period |
org.threeten.bp.Duration |
Set the maximum period a message ack deadline will be extended. Default: one hour. |
gcp.pubsub.subscriber.*.max-duration-per-ack-extension |
org.threeten.bp.Duration |
Set the upper bound for a single mod ack extention period. Default: one hour. |
gcp.pubsub.subscriber.*.flow-control.max-outstanding-element-count |
java.lang.Long |
Maximum number of outstanding elements to keep in memory before enforcing flow control. Default: 1000 |
gcp.pubsub.subscriber.*.flow-control.max-outstanding-request-bytes |
java.lang.Long |
Maximum number of outstanding bytes to keep in memory before enforcing flow control. Default: 100 * 1024 * 1024 |
gcp.pubsub.subscriber.*.flow-control.limit-exceeded-behavior |
com.google.api.gax.batching.FlowController$LimitExceededBehavior |
Default: LimitExceededBehavior.Block |
Suppose you have the following configuration for a subscriber:
gcp.pubsub.subscriber.custom.executor=custom-executor
gcp.pubsub.subscriber.custom.parallel-pull-count=4
gcp:
pubsub:
subscriber:
custom:
executor: custom-executor
parallel-pull-count: 4
[gcp]
[gcp.pubsub]
[gcp.pubsub.subscriber]
[gcp.pubsub.subscriber.custom]
executor="custom-executor"
parallel-pull-count=4
gcp {
pubsub {
subscriber {
custom {
executor = "custom-executor"
parallelPullCount = 4
}
}
}
}
{
gcp {
pubsub {
subscriber {
custom {
executor = "custom-executor"
parallel-pull-count = 4
}
}
}
}
}
{
"gcp": {
"pubsub": {
"subscriber": {
"custom": {
"executor": "custom-executor",
"parallel-pull-count": 4
}
}
}
}
}
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubListener
public class CustomConfigurationSubscriber {
@Subscription(value = "animals", configuration = "custom") // (1)
public void onMessage(Animal animal) {
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
@PubSubListener
class CustomConfigurationSubscriber {
@Subscription(value = "animals", configuration = "custom") // (1)
void onMessage(Animal animal) {
}
}
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
@PubSubListener
class CustomConfigurationSubscriber {
@Subscription(value = "animals", configuration = "custom") // (1)
fun onMessage(animal: Animal) {
}
}
1 | The Subscriber will be configured using a configuration named custom |
9.6.5 Push Subscriber configuration
Push support is enabled by default if the Micronaut HTTP server support is on the classpath. It can be explicitly disabled via configuration (see the table below).
The push endpoint is exposed at /push
by default. This path is also configurable.
Property | Type | Description |
---|---|---|
|
boolean |
Whether PubSub Push is enabled. |
|
java.lang.String |
The configured path for the PubSub Push HTTP endpoint. Default value "/push" |
9.6.6 Handling message acknowledgement
When messages are delivered to @Subscription and @PushSubscription methods, they are by default auto-acknowledged to the PubSub service if the method returns without exceptions. When an error occurs during processing, a nack
signal will be sent to the service instead, potentially resulting in redelivery of the message.
See the section Consumer ErrorHandling for more information on error handling. |
Google Cloud Pub/Sub controls delivery behavior in response to nack signals at the Subscription level, please refer to the Pub/Sub Subscriber documentation for more information
|
It is possible to have manual acknowledgement control by adding an argument of type Acknowledgement and manually invoking ack()
or nack()
methods..
If you provide an Acknowledgement type in your method and forget to invoke ack() /nack() the framework will log a warning message to let you know if you forgot to manually register an acknowledgement. Messages that are processed without an ack or nack signal being sent could potentially cause undesired behavior that could negatively affect performance.
|
The following example shows usage of manual acknowledgement:
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.Acknowledgement;
import reactor.core.publisher.Mono;
@PubSubListener
public class AcknowledgementSubscriber {
private final MessageProcessor messageProcessor;
public AcknowledgementSubscriber(MessageProcessor messageProcessor) {
this.messageProcessor = messageProcessor;
}
@Subscription("animals")
public void onMessage(Animal animal, Acknowledgement acknowledgement) {
if (Boolean.TRUE.equals(messageProcessor.handleAnimalMessage(animal).block())) {
acknowledgement.ack();
} else {
acknowledgement.nack();
}
}
@Subscription("animals-async")
public Mono<Boolean> onReactiveMessage(Mono<Animal> animal, Acknowledgement acknowledgement) {
return animal.flatMap(messageProcessor::handleAnimalMessage)
.doOnNext(result -> {
if (Boolean.TRUE.equals(result)) {
acknowledgement.ack();
} else {
acknowledgement.nack();
}
});
}
}
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.Acknowledgement
import reactor.core.publisher.Mono
@PubSubListener
class AcknowledgementSubscriber {
MessageProcessor messageProcessor
AcknowledgementSubscriber(MessageProcessor messageProcessor) {
this.messageProcessor = messageProcessor
}
@Subscription("animals")
void onMessage(Animal animal, Acknowledgement acknowledgement) {
if (Boolean.TRUE == messageProcessor.handleAnimalMessage(animal).block()) {
acknowledgement.ack()
} else {
acknowledgement.nack()
}
}
@Subscription("animals-async")
Mono<Boolean> onReactiveMessage(Mono<Animal> animal, Acknowledgement acknowledgement) {
return animal.flatMap(messageProcessor::handleAnimalMessage)
.doOnNext(result -> {
if (Boolean.TRUE == result) {
acknowledgement.ack()
} else {
acknowledgement.nack()
}
})
}
}
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.Acknowledgement
import reactor.core.publisher.Mono
@PubSubListener
class AcknowledgementSubscriber(private val messageProcessor: MessageProcessor) {
@Subscription("animals")
fun onMessage(animal: Animal, acknowledgement: Acknowledgement) {
if (messageProcessor.handleAnimalMessage(animal).block() == true) {
acknowledgement.ack()
messageProcessor.recordAcknowledgement(acknowledgement)
} else {
acknowledgement.nack()
messageProcessor.recordAcknowledgement(acknowledgement)
}
}
@Subscription("animals-async")
fun onReactiveMessage(message: Mono<Animal>, acknowledgement: Acknowledgement): Mono<Boolean> {
return message.flatMap { animal -> messageProcessor.handleAnimalMessage(animal) }
.doOnNext { result ->
if (result) {
acknowledgement.ack()
messageProcessor.recordAcknowledgement(acknowledgement)
} else {
acknowledgement.nack()
messageProcessor.recordAcknowledgement(acknowledgement)
}
}
}
}
The following example shows usage of manual acknowledgement with push messages:
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.PushSubscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.Acknowledgement;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import reactor.core.publisher.Mono;
@PubSubListener
public class AcknowledgementPushSubscriber {
private final MessageProcessor messageProcessor;
public AcknowledgementPushSubscriber(MessageProcessor messageProcessor) {
this.messageProcessor = messageProcessor;
}
@ExecuteOn(TaskExecutors.BLOCKING)
@PushSubscription("animals-push")
public void onMessage(Animal animal, Acknowledgement acknowledgement) {
if (Boolean.TRUE.equals(messageProcessor.handleAnimalMessage(animal).block())) {
acknowledgement.ack();
} else {
acknowledgement.nack();
}
}
@PushSubscription("animals-async-push")
public Mono<Boolean> onReactiveMessage(Mono<Animal> animal, Acknowledgement acknowledgement) {
return animal.flatMap(messageProcessor::handleAnimalMessage)
.doOnNext(result -> {
if (Boolean.TRUE.equals(result)) {
acknowledgement.ack();
} else {
acknowledgement.nack();
}
});
}
}
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import reactor.core.publisher.Mono
@PubSubListener
class AcknowledgementPushSubscriber {
MessageProcessor messageProcessor
AcknowledgementPushSubscriber(MessageProcessor messageProcessor) {
this.messageProcessor = messageProcessor
}
@ExecuteOn(TaskExecutors.BLOCKING)
@PushSubscription("animals-push")
void onMessage(Animal animal, Acknowledgement acknowledgement) {
if (Boolean.TRUE == messageProcessor.handleAnimalMessage(animal).block()) {
acknowledgement.ack()
} else {
acknowledgement.nack()
}
}
@PushSubscription("animals-async-push")
Mono<Boolean> onReactiveMessage(Mono<Animal> animal, Acknowledgement acknowledgement) {
return animal.flatMap(messageProcessor::handleAnimalMessage)
.doOnNext(result -> {
if (Boolean.TRUE == result) {
acknowledgement.ack()
} else {
acknowledgement.nack()
}
})
}
}
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.PushSubscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.Acknowledgement
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import reactor.core.publisher.Mono
@PubSubListener
class AcknowledgementPushSubscriber(private val messageProcessor: MessageProcessor) {
@ExecuteOn(TaskExecutors.BLOCKING)
@PushSubscription("animals-push")
fun onMessage(animal: Animal, acknowledgement: Acknowledgement) {
if (messageProcessor.handleAnimalMessage(animal).block() == true) {
acknowledgement.ack()
messageProcessor.recordAcknowledgement(acknowledgement)
} else {
acknowledgement.nack()
messageProcessor.recordAcknowledgement(acknowledgement)
}
}
@PushSubscription("animals-async-push")
fun onReactiveMessage(message: Mono<Animal>, acknowledgement: Acknowledgement): Mono<Boolean> {
return message.flatMap { animal -> messageProcessor.handleAnimalMessage(animal) }
.doOnNext { result ->
if (result) {
acknowledgement.ack()
messageProcessor.recordAcknowledgement(acknowledgement)
} else {
acknowledgement.nack()
messageProcessor.recordAcknowledgement(acknowledgement)
}
}
}
}
9.6.7 Consumer error handling
During message handling for listeners errors can happen at the framework or at your method level such as:
-
Problems binding method arguments to the message body
-
Content-Type deserialization issues
-
Acknowledgement
-
Uncaught exceptions at the annotated method
The framework provides a Global Error Handler DefaultPubSubMessageReceiverExceptionHandler, this handler will catch any errors and just log it. This behavior is far from ideal as messages would continue to be redelivered since they are not acknowledged.
There’s two ways you can provide error handling.
-
Locally: your @PubSubListener class implements PubSubMessageReceiverExceptionHandler, then any error related to
Subscriptions
of that class will be handled by your class. -
Globally: You define a new implementation of PubSubMessageReceiverExceptionHandler which replaces DefaultPubSubMessageReceiverExceptionHandler.
The PubSubMessageReceiverException contains references to the originating bean that threw the exception as well as a reference to PubSubConsumerState which has state information regarding the message handling.
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.bind.PubSubConsumerState;
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException;
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubListener
public class ErrorHandlingSubscriber implements PubSubMessageReceiverExceptionHandler { // (1)
/**
*
* @param animal payload
*/
public void onMessage(Animal animal) {
throw new RuntimeException("error");
}
@Override
public void handle(PubSubMessageReceiverException exception) { // (2)
Object listener = exception.getListener(); // (3)
PubSubConsumerState state = exception.getState(); // (4)
PubsubMessage originalMessage = state.getPubsubMessage();
String contentType = state.getContentType();
//some logic
state.getAckReplyConsumer().ack(); // (5)
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler
import io.micronaut.gcp.pubsub.support.Animal
@PubSubListener
class ErrorHandlingSubscriber implements PubSubMessageReceiverExceptionHandler { // (1)
void onMessage(Animal animal) {
throw new RuntimeException("error");
}
@Override
void handle(PubSubMessageReceiverException exception) { // (2)
def listener = exception.listener// (3)
def state = exception.state // (4)
def originalMessage = state.pubsubMessage
def contentType = state.contentType
//some logic
state.getAckReplyConsumer().ack(); // (5)
}
}
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler
import io.micronaut.gcp.pubsub.support.Animal
@PubSubListener
class ErrorHandlingSubscriber : PubSubMessageReceiverExceptionHandler { // (1)
fun onMessage(animal: Animal) {
throw RuntimeException("error")
}
override fun handle(exception: PubSubMessageReceiverException) { // (2)
val listener = exception.listener // (3)
val state = exception.state // (4)
val originalMessage = state.pubsubMessage
val contentType = state.contentType
//some logic
state.ackReplyConsumer.ack() // (5)
}
}
1 | Implement PubSubMessageReceiverExceptionHandler to mark this bean as a local error handler |
2 | Method invoked if any error happens during the processing of this PubSubListener class |
3 | Reference to the class that originated the exception |
4 | Contains information related to the subscription |
5 | Depending on your use case you can ack() of nack() the message |
9.6.8 Custom Parameter Binding
Default Binding Functionality
Consumer argument binding is achieved through an ArgumentBinderRegistry that is specific for binding consumers from Pub/Sub messages. The class responsible for this is the PubSubBinderRegistry. The registry supports argument binders that are used based on an annotation applied to the argument or the argument type. All argument binders must implement either PubSubAnnotatedArgumentBinder or PubSubTypeArgumentBinder. The exception to that rule is the PubSubDefaultArgumentBinder which is used when no other binders support a given argument.
When an argument needs bound, the PubSubConsumerState is used as the source of all of the available data. The binder registry follows a small sequence of steps to attempt to find a binder that supports the argument.
-
Search the annotation based binders for one that matches any annotation on the argument that is annotated with @Bindable.
-
Search the type based binders for one that matches or is a subclass of the argument type.
-
Return the default binder.
Custom Binding
To inject your own argument binding behavior, it is as simple as registering a bean. The existing binder registry will inject it and include it in the normal processing.
Annotation Binding
A custom annotation can be created to bind consumer arguments. A custom binder can then be created to use that annotation and the PubSubConsumerState to supply a value for the argument. The value may in fact come from anywhere, however for the purposes of this documentation, we will show how you would create an annotation to bind the message publish time.
import io.micronaut.core.bind.annotation.Bindable;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
@Bindable // (1)
public @interface MessagePublishTime {
}
import io.micronaut.core.bind.annotation.Bindable;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target([ElementType.PARAMETER])
@Bindable // (1)
@interface MessagePublishTime {
}
import io.micronaut.core.bind.annotation.Bindable
@MustBeDocumented
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.VALUE_PARAMETER)
@Bindable // (1)
annotation class MessagePublishTime
1 | The @Bindable annotation is required for the annotation to be considered for binding. |
import com.google.protobuf.util.Timestamps;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionService;
import jakarta.inject.Singleton;
@Singleton // (1)
public class MessagePublishTimeAnnotationBinder implements PubSubAnnotatedArgumentBinder<MessagePublishTime> { // (2)
private final ConversionService conversionService;
public MessagePublishTimeAnnotationBinder(ConversionService conversionService) { // (3)
this.conversionService = conversionService;
}
@Override
public Class<MessagePublishTime> getAnnotationType() {
return MessagePublishTime.class;
}
@Override
public BindingResult<Object> bind(ArgumentConversionContext<Object> context, PubSubConsumerState source) {
Long epochMillis = Timestamps.toMillis(source.getPubsubMessage().getPublishTime()); // (4)
return () -> conversionService.convert(epochMillis, context); // (5)
}
}
import com.google.protobuf.util.Timestamps
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import jakarta.inject.Singleton;
@Singleton // (1)
class MessagePublishTimeAnnotationBinder implements PubSubAnnotatedArgumentBinder<MessagePublishTime> { // (2)
private final ConversionService conversionService;
public MessagePublishTimeAnnotationBinder(ConversionService conversionService) { // (3)
this.conversionService = conversionService;
}
@Override
Class<MessagePublishTime> getAnnotationType() {
MessagePublishTime
}
@Override
public BindingResult<Object> bind(ArgumentConversionContext<Object> context, PubSubConsumerState source) {
def epochMillis = Timestamps.toMillis(source.pubsubMessage.publishTime) // (4)
return { -> conversionService.convert(epochMillis, context) } // (5)
}
}
import com.google.protobuf.util.Timestamps
import io.micronaut.core.bind.ArgumentBinder
import io.micronaut.core.bind.ArgumentBinder.BindingResult
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import jakarta.inject.Singleton
@Singleton // (1)
class MessagePublishTimeAnnotationBinder(private val conversionService: ConversionService) // (3)
: PubSubAnnotatedArgumentBinder<MessagePublishTime> // (2)
{
override fun getAnnotationType(): Class<MessagePublishTime> {
return MessagePublishTime::class.java
}
override fun bind(context: ArgumentConversionContext<Any>, source: PubSubConsumerState): BindingResult<Any> {
val epochMillis = Timestamps.toMillis(source.pubsubMessage.publishTime) // (4)
return ArgumentBinder.BindingResult { conversionService.convert(epochMillis, context) } // (5)
}
}
1 | The class is made a bean by annotating with @Singleton . |
2 | The custom annotation is used as the generic type for the interface. |
3 | The conversion service is injected into the instance. |
4 | The message publish time is retrieved from the message state. |
5 | The value is converted to the argument type. For example this allows the argument to be a String even though the epochMillis is a Long . |
You could also return a com.google.protobuf.Timestamp if you register the appropriate type converter to the conversion service, but such example is outside of the scope of this documentation.
|
The annotation can now be used on the argument in a consumer method.
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubListener
public class CustomBindingSubscriber {
/**
*
* @param animal payload
* @param publishTime message publish time
*/
public void onMessage(Animal animal, @MessagePublishTime Long publishTime) { // (1)
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubListener
class CustomBindingSubscriber {
/**
*
* @param animal payload
* @param publishTime message publish time
*/
void onMessage(Animal animal, @MessagePublishTime Long publishTime) { // (1)
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.support.Animal
@PubSubListener
class CustomBindingSubscriber {
/**
*
* @param animal payload
* @param publishTime message publish time
*/
fun onMessage(animal: Animal, @MessagePublishTime publishTime: Long) { // (1)
}
}
1 | The message publish time will now be passed to the annotated argument |
9.7 Message Serialization/Deserialization (SerDes)
The serialization and deserialization of message bodies is handled through instances of PubSubMessageSerDes. The ser-des (Serializer/Deserializer) is responsible for both serialization and deserialization of Pub/Sub message bodies into the message body types defined in your clients and consumers methods.
The ser-des are managed by a PubSubMessageSerDesRegistry.
All ser-des beans are injected in order into the registry and then searched for when serialization or deserialization is needed.
The search is based on the Content-Type
defined for the message, the framework default is application/json
.
If a ser-des can’t be located an exception is thrown.
You can supply your own ser-des by simply registering a bean of type PubSubMessageSerDes.
For example let’s say you want to implement a ser-des that uses java native object serialization instead of the Json serialization provided.
First you need to define a custom mimeType for that, we will use application/x.java
, following the best principles for handling mime types.
This is a fictional example, java serialization is not ideal, and its not portable. |
import io.micronaut.core.serialize.exceptions.SerializationException;
import io.micronaut.core.type.Argument;
import jakarta.inject.Singleton;
import java.io.*;
@Singleton // (1)
public class JavaMessageSerDes implements PubSubMessageSerDes {
@Override
public String supportedType() { // (2)
return "application/x.java";
}
@Override
public Object deserialize(byte[] data, Argument<?> type) {
ByteArrayInputStream bin = new ByteArrayInputStream(data);
Object result = null;
try {
ObjectInputStream reader = new ObjectInputStream(bin);
result = reader.readObject();
} catch (Exception e) {
throw new SerializationException("Failed to deserialize object", e);
}
return result;
}
@Override
public byte[] serialize(Object data) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
ObjectOutputStream writer = new ObjectOutputStream(baos);
writer.writeObject(data);
} catch (IOException e) {
throw new SerializationException("Failed to serialize object", e);
}
return baos.toByteArray();
}
}
import io.micronaut.core.serialize.exceptions.SerializationException;
import io.micronaut.core.type.Argument;
import jakarta.inject.Singleton;
import java.io.*;
@Singleton // (1)
class JavaMessageSerDes implements PubSubMessageSerDes {
@Override
String supportedType() { // (2)
"application/x.java"
}
@Override
public Object deserialize(byte[] data, Argument<?> type) {
ByteArrayInputStream bin = new ByteArrayInputStream(data)
Object result = null
try {
ObjectInputStream reader = new ObjectInputStream(bin)
result = reader.readObject();
} catch (Exception e) {
throw new SerializationException("Failed to deserialize object", e)
}
return result;
}
@Override
byte[] serialize(Object data) {
ByteArrayOutputStream baos = new ByteArrayOutputStream()
try {
ObjectOutputStream writer = new ObjectOutputStream(baos)
writer.writeObject(data)
} catch (IOException e) {
throw new SerializationException("Failed to serialize object", e)
}
return baos.toByteArray()
}
}
import io.micronaut.core.serialize.exceptions.SerializationException
import io.micronaut.core.type.Argument
import java.io.*
import jakarta.inject.Singleton
@Singleton // (1)
class JavaMessageSerDes : PubSubMessageSerDes {
override fun supportedType(): String { // (2)
return "application/x.java"
}
override fun deserialize(data: ByteArray, type: Argument<*>): Any {
val bin = ByteArrayInputStream(data)
var result: Any? = null
result = try {
val reader = ObjectInputStream(bin)
reader.readObject()
} catch (e: Exception) {
throw SerializationException("Failed to deserialize object", e)
}
return result
}
override fun serialize(data: Any): ByteArray {
val baos = ByteArrayOutputStream()
try {
val writer = ObjectOutputStream(baos)
writer.writeObject(data)
} catch (e: IOException) {
throw SerializationException("Failed to serialize object", e)
}
return baos.toByteArray()
}
}
1 | The class is declared as a singleton so it will be registered with the context |
2 | The contentType defines what kind of messages will this implementation be able to handle |
To publish messages with this ser-des set the Content-Type for @Topic
@Topic(value = "animals", contentType = "application/x.java")
If the messages that are arriving on your subscriber already contain a Content-Type
header with this type the framework will pick it up, or you can just force it on the @Suibscription
annotation too.
@Subscription(value = "animals", contentType = "application/x.java")
9.8 Configuring Thread pools
The framework allows users to configure separate thread pools to be used for Publishers and Subscriber. You can set a reference to a pre-defined executor at Pub/Sub configuration properties, or create custom configurations for publishers and subscribers. Micronaut supports definition of custom ExecutorService implementations, see ExecutorConfiguration for the full list of options.
Pub/Sub client libraries require an ScheduledExecutorService for both Publisher and Subscriber make sure your custom executor is of type scheduled or an error will be thrown.
|
For example:
custom
thread poolmicronaut.executors.custom.type=scheduled
micronaut.executors.custom.nThreads=32
micronaut:
executors:
custom:
type: scheduled
nThreads: 32
[micronaut]
[micronaut.executors]
[micronaut.executors.custom]
type="scheduled"
nThreads=32
micronaut {
executors {
custom {
type = "scheduled"
nThreads = 32
}
}
}
{
micronaut {
executors {
custom {
type = "scheduled"
nThreads = 32
}
}
}
}
{
"micronaut": {
"executors": {
"custom": {
"type": "scheduled",
"nThreads": 32
}
}
}
}
If no configuration is supplied, the framework will use the default named scheduled
executor.
9.9 Using Google Cloud Pub/Sub emulator
Google Cloud Pub/Sub has a local emulator to enable developers to test their applications locally with no need to connect to the cloud Pub/Sub service.
The framework supports automatic switching of the TransportChannelProvider
to use PUBSUB_EMULATOR_HOST
if this variable is set on the environment.
Make sure that you also set GCP_PROJECT_ID to be the same as the project you have configured the emulator to use. Otherwise the framework may pick up the projectId used by the default credentials.
|
You will need to manually configure topics and subscriptions on the emulator if you want to test locally. You can easily create resources using the Pub/Sub REST interface to create topics and subscriptions.
For instance, the following curl
command would create a topic named micronaut-devices
on the test-project
project.
curl -XPUT $PUBSUB_EMULATOR_HOST/v1/projects/test-project/topics/micronaut-devices
9.10 Testing Push Subscribers
The Pub/Sub emulator does not provide any built-in support for simulating Push messages. That said, testing your PushSubscription endpoints can be done simply by using Micronaut’s HttpClient in to simulate delivery of Push messages.
Push Messages follow a specified JSON format, which can be constructed and serialized with the PushRequest record.
For example:
@MicronautTest
@Property(name = "spec.name", value = "ContentTypePushSubscriberTest")
@Property(name = "gcp.projectId", value = "test-project")
class ContentTypePushSubscriberSpec {
@Inject
JsonMapper jsonMapper;
@Test
void testJsonPojo() throws IOException {
Animal dog = new Animal("dog");
String encodedData = Base64.getEncoder().encodeToString(jsonMapper.writeValueAsBytes(dog)); // (1)
PushRequest request = new PushRequest("projects/test-project/subscriptions/animals-push", // (2)
new PushRequest.PushMessage(new HashMap<>(), encodedData, "1", "2021-02-26T19:13:55.749Z"));
HttpResponse<?> response = pushClient.toBlocking().exchange(HttpRequest.POST("/push", request)); // (3)
Assertions.assertEquals(HttpStatus.OK, response.getStatus());
}
}
@MicronautTest
@Property(name = "spec.name", value = "ContentTypePushSubscriberSpec")
@Property(name = "gcp.projectId", value = "test-project")
class ContentTypePushSubscriberSpec extends Specification {
@Inject
JsonMapper jsonMapper
void "receive pojo message from json"() {
given:
Animal dog = new Animal("dog")
String encodedData = Base64.getEncoder().encodeToString(jsonMapper.writeValueAsBytes(dog)) // (1)
PushRequest request = new PushRequest("projects/test-project/subscriptions/animals-push", // (2)
new PushRequest.PushMessage(new HashMap<>(), encodedData, "1", "2021-02-26T19:13:55.749Z"))
when:
HttpResponse response = pushClient.toBlocking().exchange(HttpRequest.POST("/push", request)) // (3)
then:
response.status == HttpStatus.OK
}
}
1 | The data to be sent must be encoded as a Base64 String |
2 | The fully qualified subscription name must be specified |
3 | The message is sent to the /push endpoint |
10 Google Cloud Secret Manager Support
Google Cloud Secret Manager is a secure and convenient storage system for API keys, passwords, certificates, and other sensitive data. Applications can also use it to store configuration files and use it as secure distributed repository of metadata.
To add support for Google Cloud Secret Manager to an existing project, add the following dependencies to your build.
implementation("io.micronaut.gcp:micronaut-gcp-secret-manager")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-secret-manager</artifactId>
</dependency>
10.1 Distributed Configuration
You can leverage Distributed Configuration and rely on Google Cloud Secret Manager to store your configuration files.
To enable it, add a bootstrap configuration file to src/main/resources/
:
micronaut.application.name=hello-world
micronaut.config-client.enabled=true
micronaut:
application:
name: hello-world
config-client:
enabled: true
[micronaut]
[micronaut.application]
name="hello-world"
[micronaut.config-client]
enabled=true
micronaut {
application {
name = "hello-world"
}
configClient {
enabled = true
}
}
{
micronaut {
application {
name = "hello-world"
}
config-client {
enabled = true
}
}
}
{
"micronaut": {
"application": {
"name": "hello-world"
},
"config-client": {
"enabled": true
}
}
}
Make sure you have configured the correct credentials on your project following the Setting up GCP section. And that the service account you designated your application has proper rights to read secrets. Follow the official Access Control guide for Secret Manager if you need more information. |
Configuration file resolution
It’s important to note that Google Cloud Secret Manager does not support file extensions.
When you upload your application configuration file (e.g. application.yml
or application.properties
) remove the file extension.
Micronaut tries to circumvent this limitation, adopting a convention when fetching configuration files. The following table displays the possible secrets that are fetched by default:
Name |
Description |
|
Configuration shared by all applications |
|
Environment specific configuration, for instance if you have |
|
Application specific configuration, example |
|
Application specific configuration bound to an environment, example |
Supported formats
Internally the framework will try to read the files (that lack extensions) on the following order: yaml
, json
, properties
.
The first PropertySourceLoader that succeeds reading the contents of the secrets interrupts the chain.
Avoid loading default config files
If, for some reason, you don’t want to fetch these default config files, you can set gcp.secret-manager.default-config-enabled
to false
:
micronaut.application.name=hello-world
micronaut.config-client.enabled=true
gcp.secret-manager.default-config-enabled=false
micronaut:
application:
name: hello-world
config-client:
enabled: true
gcp:
secret-manager:
default-config-enabled: false
[micronaut]
[micronaut.application]
name="hello-world"
[micronaut.config-client]
enabled=true
[gcp]
[gcp.secret-manager]
default-config-enabled=false
micronaut {
application {
name = "hello-world"
}
configClient {
enabled = true
}
}
gcp {
secretManager {
defaultConfigEnabled = false
}
}
{
micronaut {
application {
name = "hello-world"
}
config-client {
enabled = true
}
}
gcp {
secret-manager {
default-config-enabled = false
}
}
}
{
"micronaut": {
"application": {
"name": "hello-world"
},
"config-client": {
"enabled": true
}
},
"gcp": {
"secret-manager": {
"default-config-enabled": false
}
}
}
Loading non default config files
It may be desired to sometimes load a secret as a PropertySource that does not follow the above convention.
You can whitelist
secrets that you would like to loaded as PropertySources using the following configuration:
micronaut.application.name=hello-world
micronaut.config-client.enabled=true
gcp.secret-manager.custom-configs[0]=my-first-config
gcp.secret-manager.custom-configs[1]=my-second-config
micronaut:
application:
name: hello-world
config-client:
enabled: true
gcp:
secret-manager:
custom-configs:
- my-first-config
- my-second-config
[micronaut]
[micronaut.application]
name="hello-world"
[micronaut.config-client]
enabled=true
[gcp]
[gcp.secret-manager]
custom-configs=[
"my-first-config",
"my-second-config"
]
micronaut {
application {
name = "hello-world"
}
configClient {
enabled = true
}
}
gcp {
secretManager {
customConfigs = ["my-first-config", "my-second-config"]
}
}
{
micronaut {
application {
name = "hello-world"
}
config-client {
enabled = true
}
}
gcp {
secret-manager {
custom-configs = ["my-first-config", "my-second-config"]
}
}
}
{
"micronaut": {
"application": {
"name": "hello-world"
},
"config-client": {
"enabled": true
}
},
"gcp": {
"secret-manager": {
"custom-configs": ["my-first-config", "my-second-config"]
}
}
}
The secrets you list on custom-configs must be valid configuration files and not single keys.
|
Loading key/value pairs into a PropertySource
Google Cloud Secret Manager can also host single keys mapping to a single String value.
For instance one could have a DB_PASSWORD
secret, instead of storing it on a configuration file.
Because Secret Manager does not offer a hierarchical approach, loading all secrets in a project is not a viable solution.
Instead you need to whitelist the keys you would like to pre-fetch such as the following config demonstrates:
micronaut.application.name=hello-world
micronaut.config-client.enabled=true
gcp.secret-manager.keys[0]=DB_PASSWORD
gcp.secret-manager.keys[1]=dbUser
micronaut:
application:
name: hello-world
config-client:
enabled: true
gcp:
secret-manager:
keys:
- DB_PASSWORD
- dbUser
[micronaut]
[micronaut.application]
name="hello-world"
[micronaut.config-client]
enabled=true
[gcp]
[gcp.secret-manager]
keys=[
"DB_PASSWORD",
"dbUser"
]
micronaut {
application {
name = "hello-world"
}
configClient {
enabled = true
}
}
gcp {
secretManager {
keys = ["DB_PASSWORD", "dbUser"]
}
}
{
micronaut {
application {
name = "hello-world"
}
config-client {
enabled = true
}
}
gcp {
secret-manager {
keys = ["DB_PASSWORD", "dbUser"]
}
}
}
{
"micronaut": {
"application": {
"name": "hello-world"
},
"config-client": {
"enabled": true
}
},
"gcp": {
"secret-manager": {
"keys": ["DB_PASSWORD", "dbUser"]
}
}
}
The framework will load all Secret Manager keys and prefix them with a sm
key. The table below shows how entries could be accessed:
Secret Name |
Micronaut resolved property |
|
|
|
|
Secret Versioning
Google Cloud Secret Manager supports secret versioning.
For the distributed configuration use case Micronaut will always load the latest
version of each secret into a property source file.
It’s currently not supported specifying what version of your application
configuration file should be loaded.
10.2 Low-Level Secret Manager Client access
Accessing Secret Manager via client libraries
If you require access to Secret Manager via the Google client java libraries, Micronaut provides a bean of type SecretManagerServiceClient for injection.
import com.google.cloud.secretmanager.v1.AccessSecretVersionRequest;
import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse;
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
import com.google.cloud.secretmanager.v1.SecretVersionName;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.runtime.event.annotation.EventListener;
public final class GoogleClientExample {
private final SecretManagerServiceClient client;
public GoogleClientExample(SecretManagerServiceClient googleSecretManagerClient) { // (1)
this.client = googleSecretManagerClient;
}
@EventListener
public void onStartup(StartupEvent event) {
AccessSecretVersionResponse response = client.accessSecretVersion(AccessSecretVersionRequest
.newBuilder()
.setName(SecretVersionName.of("my-cloud-project", "secretName", "latest").toString())
.build());
String secret = response.getPayload().getData().toStringUtf8();
}
}
import com.google.cloud.secretmanager.v1.AccessSecretVersionRequest;
import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse;
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
import com.google.cloud.secretmanager.v1.SecretVersionName;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.runtime.event.annotation.EventListener;
class GoogleClientExample {
private final SecretManagerServiceClient client
GoogleClientExample(SecretManagerServiceClient googleSecretManagerClient) { // (1)
this.client = googleSecretManagerClient;
}
@EventListener
void onStartup(StartupEvent event) {
AccessSecretVersionResponse response = client.accessSecretVersion(AccessSecretVersionRequest
.newBuilder()
.setName(SecretVersionName.of("my-cloud-project", "secretName", "latest").toString())
.build())
String secret = response.getPayload().getData().toStringUtf8()
}
}
import com.google.cloud.secretmanager.v1.AccessSecretVersionRequest
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient
import com.google.cloud.secretmanager.v1.SecretVersionName
import io.micronaut.context.event.StartupEvent
import io.micronaut.runtime.event.annotation.EventListener
class GoogleClientExample (private val client: SecretManagerServiceClient) { // (1)
@EventListener
fun onStartup(event: StartupEvent) {
val response = client.accessSecretVersion(AccessSecretVersionRequest
.newBuilder()
.setName(SecretVersionName.of("my-cloud-project", "secretName", "latest").toString())
.build())
val secret = response.payload.data.toStringUtf8()
}
}
1 | Inject SecretManagerServiceClient |
Micronaut Secret Manager Client
If you rather not deal with Google gRPC libraries, internally the framework uses a wrapper client: SecretManagerClient that provides a reactive based approach:
import io.micronaut.context.event.StartupEvent;
import io.micronaut.gcp.secretmanager.client.SecretManagerClient;
import io.micronaut.gcp.secretmanager.client.VersionedSecret;
import io.micronaut.runtime.event.annotation.EventListener;
import reactor.core.publisher.Mono;
public final class ClientExample {
private final SecretManagerClient client;
public ClientExample(SecretManagerClient client) {
this.client = client;
}
@EventListener
public void onStartup(StartupEvent event) {
Mono<VersionedSecret> secret = Mono.from(client.getSecret("secretId")); // (1)
Mono<VersionedSecret> v2 = Mono.from(client.getSecret("secretId", "v2")); //(2)
Mono<VersionedSecret> fromOtherProject = Mono.from(client.getSecret("secretId", "latest", "another-project-id")); //(3)
}
}
import io.micronaut.context.event.StartupEvent
import io.micronaut.gcp.secretmanager.client.SecretManagerClient
import io.micronaut.gcp.secretmanager.client.VersionedSecret
import io.micronaut.runtime.event.annotation.EventListener
import reactor.core.publisher.Mono
class ClientExample {
private final SecretManagerClient client
ClientExample(SecretManagerClient client) {
this.client = client
}
@EventListener
void onStartup(StartupEvent event) {
Mono<VersionedSecret> secret = client.getSecret("secretId") // (1)
Mono<VersionedSecret> v2 = client.getSecret("secretId", "v2") //(2)
Mono<VersionedSecret> fromOtherProject = client.getSecret("secretId", "latest", "another-project-id") //(3)
}
}
import io.micronaut.context.event.StartupEvent
import io.micronaut.gcp.secretmanager.client.SecretManagerClient
import io.micronaut.runtime.event.annotation.EventListener
class ClientExample(private val client: SecretManagerClient) {
@EventListener
fun onStartup(event: StartupEvent) {
val secret = client.getSecret("secretId") // (1)
val v2 = client.getSecret("secretId", "v2") //(2)
val fromOtherProject = client.getSecret("secretId", "latest", "another-project-id") //(3)
}
}
1 | Uses latest version and current project from GoogleCloudCredentials |
2 | Uses current project but overrides version |
3 | Fetches a secret from a different project |
You can use the client libraries at any point in time of your runtime, however it’s advised that secrets should be loaded only once during application startup, to reduce latency of the remote call as well as costs associated with secret retrieval. |
See the guide for Use Google Secret Manager to store MySQL credentials to learn more. |
11 Google Cloud Storage Support
Micronaut provides a high-level, uniform object storage API that works across the major cloud providers: Micronaut Object Storage.
To get started, select the object-storage-gcp
feature in Micronaut Launch, or add the following dependency:
implementation("io.micronaut.objectstorage:micronaut-object-storage-gcp")
<dependency>
<groupId>io.micronaut.objectstorage</groupId>
<artifactId>micronaut-object-storage-gcp</artifactId>
</dependency>
For more information, check the Micronaut Object Storage GCP support documentation.
See the guide for Use the Micronaut Object Storage API to store files in Google Cloud Storage to learn more. |
12 Guides
See the following list of guides to learn more about working with Google Cloud Platform in the Micronaut Framework:
13 Repository
You can find the source code of this project in this repository: