implementation("io.micronaut.gcp:micronaut-gcp-common:4.10.2")
Table of Contents
Micronaut GCP
Provides integration between Micronaut and Google Cloud Platform (GCP)
Version:
1 Introduction
This project provides various extensions to Micronaut to integrate Micronaut with Google Cloud Platform (GCP).
2 Setting up GCP Support
The micronaut-gcp-common
module includes basic setup for running applications on Google Cloud.
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-common</artifactId>
<version>4.10.2</version>
</dependency>
Prerequisites:
-
You should have a Google Cloud Platform project created
-
Configure default project
gcloud config set project YOUR_PROJECT_ID
-
Authenticate with
gcloud auth login
-
Authenticate application default credential with
gcloud auth application-default login
It’s strongly recommended that you use a Service Account for your application.
Google Project ID
The module features a base GoogleCloudConfiguration which you can use to configure or retrieve the GCP Project ID:
Property | Type | Description |
---|---|---|
|
java.lang.String |
Sets the project id to use. |
You can inject this bean and use the getProjectId()
method to retrieve the configured or detected project ID.
Google Credentials
The module will setup a bean of exposing the com.google.auth.oauth2.GoogleCredentials
instance that are either detected from the local environment or configured by GoogleCredentialsConfiguration:
Property | Type | Description |
---|---|---|
|
boolean |
Allows disabling Google credentials configuration. This may be useful in situations where you don’t want to authenticate despite having the Google Cloud SDK configured. Default value is true. |
|
java.util.List |
The default scopes to associate with the application to access specific APIs. See <a href="https://developers.google.com/identity/protocols/googlescopes">Google Scopes</a> for a complete list. Leave this empty if you don’t need additional API access. |
|
java.lang.String |
Sets the location to the service account credential key file. |
|
java.lang.String |
Sets the Base64 encoded service account key content.. |
3 Release History
For this project, you can find a list of releases (with release notes) here:
4 Google Cloud Logging Support
Google Cloud Logging integration is available via StackdriverJsonLayout that formats log output using Stackdriver structured logging format.
To enable it add the following dependency to your project:
implementation("io.micronaut.gcp:micronaut-gcp-logging:4.10.2")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-logging</artifactId>
<version>4.10.2</version>
</dependency>
By default if an application is using a CONSOLE appender Stackdriver log parser will consider the entire payload of the message as a text entry, making searching and correlation with tracing impractical.

Logs on the picture above can’t be searched by attributes such as thread
and ansi coloring makes regex searching even more challenging.
When enabled the JSON appender allows logs to be easily filtered:
If you combine this module with the Stackdriver Trace module, all logs will also have a traceId field, making possible to correlate traces with log entries.
|

1 | Visual display of log levels |
2 | Correlated traceId for tracing when you have enabled cloud tracing |
3 | Simplified output without extra fields or ANSI coloring codes |
Configuring logging via console
<configuration>
<!-- Uncomment the next line to enable ANSI color code interpretation -->
<!-- <property name="STDOUT_WITH_JANSI" value="true" /> -->
<include resource="io/micronaut/gcp/logging/logback-json-appender.xml" /> (1)
<root level="INFO">
<appender-ref ref="CONSOLE_JSON" /> (2)
</root>
</configuration>
1 | Import the logback configuration that defines the JsonLayout appender |
2 | Define your root level as CONSOLE_JSON |
Overriding defaults on logging configuration
If you would like to override the fields that are included in the JsonLayout
appender, you can declare it on your own logback configuration instead of including the default from logback-json-appender.xml
:
<configuration>
<appender name="CONSOLE_JSON" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="io.micronaut.gcp.logging.StackdriverJsonLayout">
<projectId>${projectId}</projectId> (1)
<includeTraceId>true</includeTraceId>
<includeSpanId>true</includeSpanId>
<includeLevel>true</includeLevel>
<includeThreadName>false</includeThreadName>
<includeMDC>true</includeMDC>
<includeLoggerName>true</includeLoggerName>
<includeFormattedMessage>true</includeFormattedMessage>
<includeExceptionInMessage>true</includeExceptionInMessage>
<includeContextName>true</includeContextName>
<includeMessage>false</includeMessage>
<includeException>false</includeException>
</layout>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE_JSON" >
</appender-ref>
</root>
</configuration>
1 | You can override the projectId settings via property. If not defined, the default ServiceOptions.getDefaultProjectId() will be used. |
Dynamic appender selection
The JSONLayout
comes in hand when using Google Cloud Logging, but when running locally it will make your console logs unreadable. The default logback-json-appender
configuration includes both a STDOUT
and a CONSOLE_JSON
appenders, as well as a dynamic logback property called google_cloud_logging
.
You can use that variable to switch your logger appender dynamically.
You logging configuration would look like this:
<configuration>
<include resource="io/micronaut/gcp/logging/logback-json-appender.xml" />
<root level="INFO">
<appender-ref ref="${google_cloud_logging}" /> (1)
</root>
</configuration>
1 | Chooses the appropriate appender depending on the environment. |
The environment detection executes a HTTP request to the Google Cloud metadata server. If you rather skip this to improve startup time, just set MICRONAUT_ENVIRONMENTS environment variable or the micronaut.environments System property as described in the reference documentation.
|
5 Stackdriver Trace
The micronaut-gcp-tracing
integrates Micronaut with Cloud Trace from Google Cloud Operations (formerly Stackdriver).
To enable it add the following dependency:
implementation("io.micronaut.gcp:micronaut-gcp-tracing:4.10.2")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-tracing</artifactId>
<version>4.10.2</version>
</dependency>
Then enabling Zipkin tracing in your application configuration:
tracing.zipkin.enabled=true
tracing.zipkin.sampler=probability=1.0
tracing.gcp.tracing.enabled=true
tracing:
zipkin:
enabled: true
sampler:
probability=1.0
gcp:
tracing:
enabled: true
[tracing]
[tracing.zipkin]
enabled=true
sampler="probability=1.0"
[tracing.gcp]
[tracing.gcp.tracing]
enabled=true
tracing {
zipkin {
enabled = true
sampler = "probability=1.0"
}
gcp {
tracing {
enabled = true
}
}
}
{
tracing {
zipkin {
enabled = true
sampler = "probability=1.0"
}
gcp {
tracing {
enabled = true
}
}
}
}
{
"tracing": {
"zipkin": {
"enabled": true,
"sampler": "probability=1.0"
},
"gcp": {
"tracing": {
"enabled": true
}
}
}
}
-
sampler.probability
(optional) Set sampling probability to 100% for dev/testing purposes to observe traces -
gcp.tracing.enabled
(optional) Enable/disable Stackdriver Trace configuration, defaults to true
6 Authorizing HTTP Clients
The micronaut-gcp-http-client
module can be used to help authorize service-to-service communication. To get started add the following module:
implementation("io.micronaut.gcp:micronaut-gcp-http-client")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-http-client</artifactId>
</dependency>
You should then configure the service accounts as per the documentation on service-to-service communication and the enable the filter for the outgoing URI paths you wish to include the Google-signed OAuth ID token:
gcp.http.client.auth.patterns[0]=/foo/**
gcp.http.client.auth.patterns[1]=/bar/**
gcp:
http:
client:
auth:
patterns:
- /foo/**
- /bar/**
[gcp]
[gcp.http]
[gcp.http.client]
[gcp.http.client.auth]
patterns=[
"/foo/**",
"/bar/**"
]
gcp {
http {
client {
auth {
patterns = ["/foo/**", "/bar/**"]
}
}
}
}
{
gcp {
http {
client {
auth {
patterns = ["/foo/**", "/bar/**"]
}
}
}
}
}
{
"gcp": {
"http": {
"client": {
"auth": {
"patterns": ["/foo/**", "/bar/**"]
}
}
}
}
}
7 Cloud Function Support
Micronaut GCP includes extended support for Google Cloud Function - designed for serverless workloads.
7.1 Simple Functions
Micronaut GCP offers two ways to write cloud functions with Micronaut. The first way is more low level and involves using Micronaut’s built in support for functions. Simply add the following dependency to your classpath:
implementation("io.micronaut.gcp:micronaut-gcp-function")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-function</artifactId>
</dependency>
Then add the Cloud Function API as a compileOnly
dependency (provided
with Maven):
compileOnly("com.google.cloud.functions:functions-framework-api:1.0.1")
<dependency>
<groupId>com.google.cloud.functions</groupId>
<artifactId>functions-framework-api</artifactId>
<version>1.0.1</version>
<scope>provided</scope>
</dependency>
Now define a class that implements one of Google Cloud Found’s interfaces, for example com.google.cloud.functions.BackgroundFunction
, and extends from io.micronaut.function.executor.FunctionInitializer
.
The following is an example of a BackgroundFunction
that uses Micronaut and Google Cloud Function:
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.background;
import com.google.cloud.functions.*;
import io.micronaut.gcp.function.GoogleFunctionInitializer;
import jakarta.inject.*;
import java.util.*;
public class Example extends GoogleFunctionInitializer // (1)
implements BackgroundFunction<PubSubMessage> { // (2)
@Inject LoggingService loggingService; // (3)
@Override
public void accept(PubSubMessage message, Context context) {
loggingService.logMessage(message);
}
}
class PubSubMessage {
String data;
Map<String, String> attributes;
String messageId;
String publishTime;
}
@Singleton
class LoggingService {
void logMessage(PubSubMessage message) {
// log the message
}
}
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.background
import com.google.cloud.functions.*
import io.micronaut.gcp.function.GoogleFunctionInitializer
import jakarta.inject.*
class Example extends GoogleFunctionInitializer // (1)
implements BackgroundFunction<PubSubMessage> { // (2)
@Inject LoggingService loggingService // (3)
@Override
void accept(PubSubMessage message, Context context) {
loggingService.logMessage(message)
}
}
class PubSubMessage {
String data
Map<String, String> attributes
String messageId
String publishTime
}
@Singleton
class LoggingService {
void logMessage(PubSubMessage message) {
// log the message
}
}
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.background
import com.google.cloud.functions.BackgroundFunction
import com.google.cloud.functions.Context
import io.micronaut.gcp.function.GoogleFunctionInitializer
import jakarta.inject.Inject
import jakarta.inject.Singleton
class Example : GoogleFunctionInitializer(), // (1)
BackgroundFunction<PubSubMessage> { // (2)
@Inject
lateinit var loggingService: LoggingService // (3)
override fun accept(message: PubSubMessage, context: Context) {
loggingService.logMessage(message)
}
}
class PubSubMessage {
var data: String? = null
var attributes: Map<String, String>? = null
var messageId: String? = null
var publishTime: String? = null
}
@Singleton
class LoggingService {
fun logMessage(message: PubSubMessage) {
// log the message
}
}
1 | The function extends from io.micronaut.function.executor.FunctionInitializer |
2 | The function implements com.google.cloud.functions.BackgroundFunction |
3 | Dependency injection can be used on the fields |
When you extend from FunctionInitializer
the Micronaut ApplicationContext
will be initialized and dependency injection will be performed on the function instance. You can use inject any bean using jakarta.inject.Inject
as usual.
Functions require a no argument constructor hence you must use field injection (which requires lateinit in Kotlin) when injecting dependencies into the function itself.
|
The FunctionInitializer
super class provides numerous methods that you can override to customize how the ApplicationContext
is built if desired.
Running Functions Locally
Raw functions cannot be executed locally. They can be tested by instantiating the function and inspecting any side effects by providing mock arguments or mocking dependent beans.
Deployment
When deploying the function to Cloud Function you should use the fully qualified name of the function class as the handler reference.
First build the function with:
$ ./gradlew clean shadowJar
Then cd
into the build/libs
directory (deployment has to be done from the location where the JAR file resides):
$ cd build/libs
To deploy the function make sure you have gcloud
CLI then run:
$ gcloud beta functions deploy myfunction --entry-point example.function.Function --runtime java11 --trigger-http
In the example above myfunction
refers to the name of your function and can be changed to whatever name you prefer to name your function. example.function.Function
refers to the fully qualified name of your function class.
To obtain the trigger URL you can use the following command:
$ YOUR_HTTP_TRIGGER_URL=$(gcloud beta functions describe myfunction --format='value(httpsTrigger.url)')
You can then use this variable to test the function invocation:
$ curl -i $YOUR_HTTP_TRIGGER_URL
7.2 HTTP Functions
It is common to want to take just a slice of a regular Micronaut HTTP server application and deploy it as a function.
Configuration
To facilitate this model, Micronaut GCP includes an additional module that allows you to use regular Micronaut annotations like @Controller
and @Get
to define your functions that can be deployed to cloud function.
With this model you need to add the micronaut-gcp-function-http
dependency to your application:
implementation("io.micronaut.gcp:micronaut-gcp-function-http:4.10.2")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-function-http</artifactId>
<version>4.10.2</version>
</dependency>
And define the Google Function API as a development only dependency:
developmentOnly("com.google.cloud.functions:functions-framework-api:1.0.1")
<dependency>
<groupId>com.google.cloud.functions</groupId>
<artifactId>functions-framework-api</artifactId>
<version>1.0.1</version>
<scope>provided</scope>
</dependency>
Running Functions Locally
First to run the function locally you should then make the regular Micronaut server a developmentOnly
dependency since it is not necessary to include it in the JAR file that will be deployed to Cloud Function:
developmentOnly("io.micronaut:micronaut-http-server-netty")
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-http-server-netty</artifactId>
<scope>provided</scope>
</dependency>
You can then use ./gradlew run
or ./mvnw compile exec:exec
to run the function locally using Micronaut’s Netty-based server.
Alternatively, you could configure the Google Function Framework for Java which includes a Maven plugin, or for Gradle include the following:
configurations {
invoker
}
dependencies {
invoker 'com.google.cloud.functions.invoker:java-function-invoker:1.0.0-beta1'
}
task('runFunction', type: JavaExec, dependsOn: classes) {
main = 'com.google.cloud.functions.invoker.runner.Invoker'
classpath(configurations.invoker)
args(
'--target', 'io.micronaut.gcp.function.http.HttpFunction',
'--classpath', (configurations.runtimeClasspath + sourceSets.main.output).asPath,
'--port', 8081
)
}
With this in place you can run ./gradlew runFunction
to run the function locally.
Deployment
When deploying the function to Cloud Function you should use the HttpFunction class as the handler reference.
First build the function with:
$ ./gradlew clean shadowJar
Then cd
into the build/libs
directory (deployment has to be done from the location where the JAR file resides):
$ cd build/libs
To deploy the function make sure you have gcloud
CLI then run:
$ gcloud beta functions deploy myfunction --entry-point io.micronaut.gcp.function.http.HttpFunction --runtime java11 --trigger-http
In the example above myfunction
refers to the name of your function and can be changed to whatever name you prefer to name your function.
To obtain the trigger URL you can use the following command:
$ YOUR_HTTP_TRIGGER_URL=$(gcloud beta functions describe myfunction --format='value(httpsTrigger.url)')
You can then use this variable to test the function invocation:
$ curl -i $YOUR_HTTP_TRIGGER_URL/hello/John
7.3 CloudEvents Functions
To use CloudEvents, add the following dependency:
implementation("io.micronaut.gcp:micronaut-gcp-function-cloudevents")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-function-cloudevents</artifactId>
</dependency>
In your application, create a class that extends GoogleCloudEventsFunction. Google publishes POJOs for Google Cloud Events. For example, to subscribe to a Google Cloud Storage event, your function may look like this:
public class TestFunction extends GoogleCloudEventsFunction<StorageObjectData> {
@Override
protected void accept(@NonNull CloudEventContext context, @Nullable StorageObjectData data) throws Exception {
}
}
8 Google Cloud Pub/Sub Support
8.1 Introduction
This project provides integration between Micronaut and Google Cloud PubSub. It uses the official Google Cloud Pub/Sub client java libraries to create Publisher and Subscribers while keeping a similar programming model for messaging as the ones defined in Micronaut RabbitMQ and Micronaut Kafka.
The project does not attempt to create any of the resources in Google Cloud such as Topics and Subscription. Make sure your project has the correct resources and the Service Account being used has proper permissions. |
8.2 Quickstart
To add support for Google Cloud Pub/Sub to an existing project, add the following dependencies to your build.
implementation("io.micronaut.gcp:micronaut-gcp-pubsub")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-pubsub</artifactId>
</dependency>
Creating a Pub/Sub Publisher with @PubSubClient
To publish messages to Google Cloud Pub/Sub, just define an interface that is annotated with @PubSubClient.
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
@PubSubClient // (1)
public interface AnimalClient {
@Topic("animals") // (2)
void send(byte[] data); // (3)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
@PubSubClient // (1)
interface AnimalClient {
@Topic("animals") // (2)
void send(byte[] data) // (3)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
@PubSubClient // (1)
interface AnimalClient {
@Topic("animals") // (2)
fun send(data: ByteArray) // (3)
}
1 | The @PubSubClient annotation is used to designate this interface as a client |
2 | The topic animals will be used to publish messages |
3 | The method accepts one argument that will be used as message body |
Make sure your current GCP project has a topic named animals before starting the application.
|
At compile time Micronaut will create an implementation of the above interface. You can then inject that interface on your business methods via @Inject
and use it.
import io.micronaut.gcp.pubsub.support.Animal;
import jakarta.inject.Singleton;
@Singleton
public final class AnimalService {
private final AnimalClient animalClient;
public AnimalService(AnimalClient animalClient) { // (1)
this.animalClient = animalClient;
}
public void someBusinessMethod(Animal animal) {
byte[] serializedBody = serialize(animal);
animalClient.send(serializedBody);
}
private byte[] serialize(Animal animal) { // (2)
return null;
}
}
import jakarta.inject.Singleton;
@Singleton
final class AnimalService {
private final AnimalClient animalClient;
AnimalService(AnimalClient animalClient) { // (1)
this.animalClient = animalClient
}
void someBusinessMethod(Animal animal) {
byte[] serializedBody = serialize(animal)
animalClient.send(serializedBody)
}
private byte[] serialize(Animal animal) { // (2)
return null
}
}
import io.micronaut.gcp.pubsub.support.Animal
import jakarta.inject.Singleton
@Singleton
class AnimalService(private val animalClient: AnimalClient) { // (1)
fun someBusinessMethod(animal: Animal) {
val serializedBody = serialize(animal)
animalClient.send(serializedBody)
}
private fun serialize(animal: Animal): ByteArray { // (2)
return ByteArray(0)
}
}
1 | Constructor based injection |
2 | Your custom serialization logic |
Creating a Pub/Sub Subscriber with @PubSubListener
To listen to Pub/Sub messages you can use @PubSubListener annotation on a class to mark it as a message listener.
The following example would listen on a subscription named animals
that is configured to be attached to the topic of the previous example.
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
@PubSubListener // (1)
public class AnimalListener {
/**
*
* @param data raw data
*/
@Subscription("animals") // (2)
public void onMessage(byte[] data) { // (3)
System.out.println("Message received");
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
@PubSubListener // (1)
class AnimalListener {
@Subscription("animals") // (2)
void onMessage(byte[] data) { // (3)
System.out.println("Message received")
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
@PubSubListener // (1)
class AnimalListener {
@Subscription("animals") // (2)
fun onMessage(data: ByteArray) { // (3)
println("Message received")
}
}
1 | Designates this class to be a message listener for Pub/Sub messages |
2 | Messages routed to the animals subscription will be delivered to this method |
3 | The method has a single argument that contains the serialized body of the Pub/Sub message |
8.3 Pub/Sub configuration Properties
You can customize certain aspects of the client. Pub/Sub client libraries leverage a ScheduledExecutorService for both message publishing and consumption.
If not specified the framework will configure the framework default Scheduled
executor service to be used for both Publishers and Subscribers.
See ExecutorConfiguration for the full list of options.
You can override it at PubSubConfigurationProperties to make a default value for all clients, or you can setup per Topic Publisher, or Subscription listener as discussed further bellow.
Creating a custom executor is presented on the section Configuring Thread pools .
Property | Type | Description |
---|---|---|
|
int |
How often to ping the server to keep the channel alive. Default: 5 minutes. |
|
java.lang.String |
Name of the {@link java.util.concurrent.ScheduledExecutorService} to be used by all {@link com.google.cloud.pubsub.v1.Publisher} instances. Default: "scheduled" |
|
java.lang.String |
Name of the {@link java.util.concurrent.ScheduledExecutorService} to be used by all {@link com.google.cloud.pubsub.v1.Subscriber} instances. Default: "scheduled" |
8.4 Pub/Sub Publishers
Pub/Sub support in micronaut follows the same pattern used for other types of clients such as the HTTP Client. By annotating an interface with @PubSubClient the framework will create an implementation bean that handles communication with Pub/Sub for you.
Topics
In order to publish messages to Pub/Sub you need a method annotated with a @Topic
annotation.
For each annotated method the framework will create a dedicated Publisher with its own configuration for RetrySettings
, BatchSettings
and its own Executor
.
All settings can be overridden via configuration properties and the appropriate configuration can be passed via the configuration
attribute of the @Topic
annotation.
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubClient // (1)
public interface SimpleClient {
@Topic("animals")
void send(PubsubMessage message); // (2)
@Topic("animals")
void send(byte[] data); // (3)
@Topic("animals")
void send(Animal animal); // (4)
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
@PubSubClient // (1)
interface SimpleClient {
@Topic("animals")
void send(PubsubMessage message) // (2)
@Topic("animals")
void send(byte[] data) // (3)
@Topic("animals")
void send(Animal animal) // (4)
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
@PubSubClient // (1)
interface SimpleClient {
@Topic("animals")
fun send(message: PubsubMessage) // (2)
@Topic("animals")
fun send(data: ByteArray) // (3)
@Topic("animals")
fun send(animal: Animal) // (4)
}
1 | The @PubSubClient enables this interface to be replaced by bean implementation by micronaut. |
2 | Sending a PubsubMessage object skips any SerDes or contentType headers |
3 | Sending a byte array, SerDes will be bypassed and bytes will just be copied, but the default content type will still be application/json |
4 | You can send any domain object as long as a SerDes is configured to handled it. By default application/json is used. |
If a body argument cannot be found, an exception will be thrown. |
Resource naming
On the previous examples you noticed we used a simple naming for the topic such as animals
. Inside Google Cloud however resources are only accessible via their FQN.
In Pub/Sub case topics are named as projects/$PROJECT_ID/topics/$TOPIC_NAME
.
Micronaut integration with GCP will automatically grab the default project id available (please refer to the section Google Project Id ) and convert the simple naming of the resource into a FQN.
You can also support a FQN that uses a different project name, that is useful when your project has to publish message to more than the default project configured for the Service Account.
When publishing to a different project, make sure your service account has the proper access to the resource. |
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubClient
public interface MultipleProjectClient {
@Topic("animals")
void sendUS(Animal animal); // (1)
@Topic("projects/eu-project/topics/animals")
void sendEU(Animal animal); // (2)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubClient
interface MultipleProjectClient {
@Topic("animals")
void sendUS(Animal animal) // (1)
@Topic("projects/eu-project/topics/animals")
void sendEU(Animal animal) // (2)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
@PubSubClient
interface MultipleProjectClient {
@Topic("animals")
fun sendUS(animal: Animal) // (1)
@Topic("projects/eu-project/topics/animals")
fun sendEU(animal: Animal) // (2)
}
1 | This would use the default project configured for the application. |
2 | This would override and publish messages to a different project. |
8.4.1 Content-Type and message serialization
The contents of a PubSubMessage are always a base64 encoded ByteString. This framework provide a way to create custom Serialization/Deserialization as explained in the section Custom Serialization/Deserialization.
By default any body message argument will be serialized via JsonPubSubMessageSerDes unless specified otherwise via the contentType
property of the @Topic annotation.
The rules of message serialization are the following:
-
If the body type is PubSubMessage then SerDes is bypassed completely and no header is added to the message.
-
If the body type is
byte[]
SerDes logic is bypassed, but aContent-Type
header ofapplication/json
will be added unless overwritten by the @Topic annotation. -
For any other type, the type defined by
contentType
will be used to located the correct PubSubMessageSerDes to handle it, if none is passedapplication/json
will be used.
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.http.MediaType;
@PubSubClient
public interface CustomSerDesClient {
@Topic("animals") // (1)
void send(PubsubMessage pubsubMessage);
@Topic("animals") // (2)
void send(byte[] data);
@Topic(value = "animals", contentType = MediaType.IMAGE_GIF) // (3)
void sendWithCustomType(byte[] data);
@Topic("animals") // (4)
void send(Animal animal);
@Topic(value = "animals", contentType = MediaType.APPLICATION_XML) // (5)
void sendWithCustomType(Animal animal);
}
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.http.MediaType;
@PubSubClient
interface CustomSerDesClient {
@Topic("animals") // (1)
void send(PubsubMessage pubsubMessage)
@Topic("animals") // (2)
void send(byte[] data)
@Topic(value = "animals", contentType = MediaType.IMAGE_GIF) // (3)
void sendWithCustomType(byte[] data)
@Topic("animals") // (4)
void send(Animal animal)
@Topic(value = "animals", contentType = MediaType.APPLICATION_XML) // (5)
void sendWithCustomType(Animal animal)
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.http.MediaType
@PubSubClient
interface CustomSerDesClient {
@Topic("animals") // (1)
fun send(pubsubMessage: PubsubMessage)
@Topic("animals") // (2)
fun send(data: ByteArray)
@Topic(value = "animals", contentType = MediaType.IMAGE_GIF) // (3)
fun sendWithCustomType(data: ByteArray)
@Topic("animals") // (4)
fun send(animal: Animal)
@Topic(value = "animals", contentType = MediaType.APPLICATION_XML) // (5)
fun sendWithCustomType(animal: Animal)
}
1 | Using PubsubMessage no SerDes will be used, no headers are added to the message, user is responsible to create the message. |
2 | Using byte array. No SerDes logic is used, however a Content-Type header in this example is added with value application/json |
3 | Using byte array and specifying a contentType. SerDes will be bypassed and a Content-Type header with value image/gif will be added. |
4 | No contentType defined, and JsonPubSubMessageSerDes will be use to serialize the object. |
5 | Using specific contentType , the framework will look for a PubSubMessageSerDes registered for type application/xml user must provide this bean or an exception is thrown at runtime. |
8.4.2 Message Headers
Google Cloud Pub/Sub messages contain a dictionary of message attributes in the form of a Map<String, String>
.
The framework binds those attributes to a @Header annotation that can be used at the class level or to the method or an argument of the method.
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.annotation.MessageHeader;
@PubSubClient
@MessageHeader(name = "application-name", value = "petclinic") // (1)
public interface CustomHeadersClient {
@MessageHeader(name = "status", value = "healthy") // (2)
@Topic("animals")
void sendWithStaticHeaders(Animal animal);
@Topic("animals")
void sendWithDynamicHeaders(Animal animal, @MessageHeader(name = "code") Integer code); // (3)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.annotation.MessageHeader;
@PubSubClient
@MessageHeader(name = "application-name", value = "petclinic") // (1)
interface CustomHeadersClient {
@MessageHeader(name = "status", value = "healthy") // (2)
@Topic("animals")
void sendWithStaticHeaders(Animal animal)
@Topic("animals")
void sendWithDynamicHeaders(Animal animal, @MessageHeader(name = "code") Integer code) // (3)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.annotation.MessageHeader
@PubSubClient
@MessageHeader(name = "application-name", value = "petclinic") // (1)
interface CustomHeadersClient {
@MessageHeader(name = "status", value = "healthy") // (2)
@Topic("animals")
fun sendWithStaticHeaders(animal: Animal)
@Topic("animals")
fun sendWithDynamicHeaders(animal: Animal, @MessageHeader(name = "code") code: Int) // (3)
}
1 | Headers specified at class level will be added to all annotated methods using @Topic |
2 | You can pass a static value as part of @Header |
3 | Values can also be passed via arguments. A ConversionService will try to convert them into a String value. |
8.4.3 Publisher properties
Pub/Sub allows each Publisher to have its own configuration for things such as executors or batching settings. This is useful when you need to publish messages to topic that uses different SLAs.
The framework allows you to create configurations and then bind those configurations to each topic annotation using the configuration
parameter.
Property | Type | Description |
---|---|---|
gcp.pubsub.publisher.*.executor |
java.lang.String |
Name of the executor to use. Default: scheduled |
gcp.pubsub.publisher.*.retry.total-timeout |
org.threeten.bp.Duration |
How long the logic should keep trying the remote calluntil it gives up completely. Default 600 seconds |
gcp.pubsub.publisher.*.retry.initial-retry-delay |
org.threeten.bp.Duration |
Delay before the first retry. Default: 100ms |
gcp.pubsub.publisher.*.retry.retry-delay-multiplier |
double |
Controls the change in retry delay. The retry delay of the previous call is multiplied by the RetryDelayMultiplier to calculate the retry delay for the next call. Default: 1.3 |
gcp.pubsub.publisher.*.retry.max-retry-delay |
org.threeten.bp.Duration |
Puts a limit on the value of the retry delay, so that the RetryDelayMultiplier can’t increase the retry delay higher than this amount. Default: 60 seconds |
gcp.pubsub.publisher.*.retry.max-attempts |
int |
Defines the maximum number of attempts to perform. Default: 0 |
gcp.pubsub.publisher.*.retry.jittered |
boolean |
Determines if the delay time should be randomized. Default: true |
gcp.pubsub.publisher.*.retry.initial-rpc-timeout |
org.threeten.bp.Duration |
Controls the timeout for the initial RPC. Default: 5 seconds |
gcp.pubsub.publisher.*.retry.rpc-timeout-multiplier |
double |
Controls the change in RPC timeout. The timeout of the previous call is multiplied by the RpcTimeoutMultiplier to calculate the timeout for the next call. Default: 1.0 |
gcp.pubsub.publisher.*.retry.max-rpc-timeout |
org.threeten.bp.Duration |
Puts a limit on the value of the RPC timeout, so that the RpcTimeoutMultiplier can’t increase the RPC timeout higher than this amount. Default 0 |
gcp.pubsub.publisher.*.batching.element-count-threshold |
java.lang.Long |
Set the element count threshold to use for batching. After this many elements are accumulated, they will be wrapped up in a batch and sent. Default: 100 |
gcp.pubsub.publisher.*.batching.request-byte-threshold |
java.lang.Long |
Set the request byte threshold to use for batching. After this many bytes are accumulated, the elements will be wrapped up in a batch and sent. Default 1000 (1Kb) |
gcp.pubsub.publisher.*.batching.delay-threshold |
org.threeten.bp.Duration |
Set the delay threshold to use for batching. After this amount of time has elapsed (counting from the first element added), the elements will be wrapped up in a batch and sent. Default 1ms |
gcp.pubsub.publisher.*.batching.is-enabled |
java.lang.Boolean |
Indicate if the batching is enabled. Default : true |
gcp.pubsub.publisher.*.flow-control.max-outstanding-element-count |
java.lang.Long |
Maximum number of outstanding elements to keep in memory before enforcing flow control. |
gcp.pubsub.publisher.*.flow-control.max-outstanding-request-bytes |
java.lang.Long |
Maximum number of outstanding bytes to keep in memory before enforcing flow control. |
gcp.pubsub.publisher.*.flow-control.limit-exceeded-behavior |
com.google.api.gax.batching.FlowController$LimitExceededBehavior |
The behavior of FlowController when the specified limits are exceeded. Defaults to Ignore. |
For example suppose you have the following configuration:
gcp.pubsub.publisher.batching.executor=batch-executor
gcp.pubsub.publisher.immediate.executor=immediate-executor
gcp.pubsub.publisher.immediate.batching.enabled=false
gcp:
pubsub:
publisher:
batching:
executor: batch-executor
immediate:
executor: immediate-executor
batching:
enabled: false
[gcp]
[gcp.pubsub]
[gcp.pubsub.publisher]
[gcp.pubsub.publisher.batching]
executor="batch-executor"
[gcp.pubsub.publisher.immediate]
executor="immediate-executor"
[gcp.pubsub.publisher.immediate.batching]
enabled=false
gcp {
pubsub {
publisher {
batching {
executor = "batch-executor"
}
immediate {
executor = "immediate-executor"
batching {
enabled = false
}
}
}
}
}
{
gcp {
pubsub {
publisher {
batching {
executor = "batch-executor"
}
immediate {
executor = "immediate-executor"
batching {
enabled = false
}
}
}
}
}
}
{
"gcp": {
"pubsub": {
"publisher": {
"batching": {
"executor": "batch-executor"
},
"immediate": {
"executor": "immediate-executor",
"batching": {
"enabled": false
}
}
}
}
}
}
You can then apply it to individual methods as:
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubClient
public interface CustomConfigurationClient {
@Topic(value = "animals", configuration = "batching") // (1)
void batchSend(Animal animal);
@Topic(value = "animals", configuration = "immediate") // (2)
void send(Animal animal);
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubClient
interface CustomConfigurationClient {
@Topic(value = "animals", configuration = "batching") // (1)
void batchSend(Animal animal)
@Topic(value = "animals", configuration = "immediate") // (2)
void send(Animal animal)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
@PubSubClient
interface CustomConfigurationClient {
@Topic(value = "animals", configuration = "batching")
fun batchSend(animal: Animal) // (1)
@Topic(value = "animals", configuration = "immediate")
fun send(animal: Animal) // (2)
}
1 | The Publisher will be configured using a configuration named batching |
2 | The Publisher will be configured using a configuration named immediate |
FlowControlSettings are actually configured for the BatchingSettings property, due the nature of Google’s Builders the configuration was
flattened at PubSubConfigurationProperties level, and it’s injected it into the RetrySettings later.
|
8.4.4 Retrieving message Ids (broker acknowledge)
All the examples so far have been using void
on the method signature. However getting a message acknowledge from the broker is usually required.
Pub/Sub returns a String
object that contains the message id that the broker generated. Your methods can also be defined using either String
or Single<String>
(for reactive support).
When you define your client you can actually choose between a few different method signatures. Depending on your choice you may get the message acknowledge back and control if the method is blocking or reactive.
-
If your method has
void
as a return, then a blocking call to publish the message is made and you don’t get the message id returned by the Pub/Sub broker. -
If your method return a
String
then a blocking call to publish the message is made and the message id is returned. -
If your method returns
Single<String>
then it’s a reactive call, and you cansubscribe
to the publisher to retrieve the message id.
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import reactor.core.publisher.Mono;
@PubSubClient
public interface CustomReturnClient {
@Topic("animals")
void send(Animal animal); // (1)
@Topic("animals")
String sendWithId(Animal animal); // (2)
@Topic("animals")
Mono<String> sendReactive(Animal animal); //(3)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
import reactor.core.publisher.Mono
@PubSubClient
interface CustomReturnClient {
@Topic("animals")
void send(Animal animal) // (1)
@Topic("animals")
String sendWithId(Animal animal) // (2)
@Topic("animals")
Mono<String> sendReactive(Animal animal) //(3)
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
import reactor.core.publisher.Mono
@PubSubClient
interface CustomReturnClient {
@Topic("animals")
fun send(animal: Animal) // (1)
@Topic("animals")
fun sendWithId(animal: Animal): String // (2)
@Topic("animals")
fun sendReactive(animal: Animal?): Mono<String> //(3)
}
1 | Blocking call, message id is not returned |
2 | Blocking call, message id is returned as String |
3 | Reactive call |
8.5 Restricting locations and message ordering
Restricting storage locations
Google Cloud Pub/Sub is a globally distributed event platform. When a client publishes a message, the platform stores the message on the nearest region to the publisher. Sometimes regulations such as GDPR impose restrictions on where customer data can live. When using Micronaut integration you can specify the endpoint of the topic so that Pub/Sub will persist the message data on the specified region. To learn more about this feature visit the resource location restriction page.
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubClient
public interface LocationClient {
@Topic(value = "animals", endpoint = "europe-west1-pubsub.googleapis.com:443") // (1)
void send(Animal animal);
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
@PubSubClient
interface LocationClient {
@Topic(value = "animals", endpoint = "europe-west1-pubsub.googleapis.com:443") // (1)
void send(Animal animal);
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
@PubSubClient
interface LocationClient {
@Topic(value = "animals", endpoint = "europe-west1-pubsub.googleapis.com:443") // (1)
fun send(animal: Animal?)
}
1 | The endpoint attribute enforces that messages sent via this method will be stored on the europe-west1 region. |
Message ordering
Google Cloud Pub/Sub supports message ordering if messages are published to a single location, and specify an ordering key. An example of this would be trading orders placed for a specific symbol. If you use the symbol as the ordering key, all messages regardless of how many publishers are guaranteed to be delivered in order.
To enable message ordering for your publishers, use the @OrderingKey in one of the method’s arguments to declare it an ordering key.
import io.micronaut.gcp.pubsub.annotation.OrderingKey;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Order;
@PubSubClient
public interface OrderClient {
@Topic(value = "orders", endpoint = "us-central1-pubsub.googleapis.com:443") // (1)
void send(Order order, @OrderingKey String key); // (2)
}
import io.micronaut.gcp.pubsub.annotation.OrderingKey
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Order
@PubSubClient
interface OrderClient {
@Topic(value = "orders", endpoint = "us-central1-pubsub.googleapis.com:443") // (1)
void send(Order order, @OrderingKey String key) // (2)
}
import io.micronaut.gcp.pubsub.annotation.OrderingKey
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Order
@PubSubClient
interface OrderClient {
@Topic(value = "orders", endpoint = "us-central1-pubsub.googleapis.com:443") // (1)
fun send(order: Order, @OrderingKey key: String) // (2)
}
1 | Ordering only works on regional endpoints, so you must declare an endpoint first |
2 | Although the Pub/Sub API requires the key to be a String, the framework uses a conversion service to convert other types to String. |
Here’s an example of how to use ordering on your clients:
import io.micronaut.gcp.pubsub.support.Order;
import jakarta.inject.Singleton;
@Singleton
public final class OrderService {
private final OrderClient client;
public OrderService(OrderClient client) {
this.client = client;
}
public void placeOrder() {
Order order = new Order(100, "GOOG");
client.send(order, order.getSymbol());
}
}
import io.micronaut.gcp.pubsub.support.Order
import jakarta.inject.Singleton
@Singleton
class OrderService {
private final OrderClient client
OrderService(OrderClient client) {
this.client = client
}
void placeOrder() {
Order order = new Order(100, "GOOG")
client.send(order, order.getSymbol())
}
}
import io.micronaut.gcp.pubsub.support.Order
import jakarta.inject.Singleton
@Singleton
class OrderService(private val client: OrderClient) {
fun placeOrder() {
val order = Order(100, "GOOG")
client.send(order, order.symbol)
}
}
8.6 Receiving messages via @PubSubListener methods
To start receiving messages you annotate a class with @PubSubListener, the framework will then use AOP to deliver messages to methods annotated with @Subscription.
Subscriptions
All methods annotated with @Subscription will be invoked by the framework.
Each annotated method creates an individual Subscriber,
that can be configured using the configuration
parameter of the @Subscription annotation.
Methods annotated with @Subscription must be unique on your application.
If two distinct methods try to subscribe to the same Subscription an error is thrown.
This is intended to avoid issues with message Acknowledgement control.
|
The annotated method must have at least one argument that is bound to the body of the message or an exception is thrown. |
Resource naming
Just as described on the Pub/Sub Publisher section, subscriptions examples are also using simple names such as animals
.
Inside Google Cloud however resources are only accessible via their FQN. A Subscription name follows the pattern: projects/$PROJECT_ID/subscriptions/$SUBSCRIPTION_NAME
.
Micronaut integration with GCP will automatically grab the default project id available (please refer to the section Google Project Id ) and convert the simple naming of the resource into a FQN.
You can also pass a FQN as the subscription name.
This is helpful when you need to listen to subscriptions from different projects.
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubListener // (1)
public class SimpleSubscriber {
@Subscription("animals") // (2)
public void onMessage(Animal animal) {
}
@Subscription("projects/eu-project/subscriptions/animals") // (3)
public void onMessageEU(Animal animal) {
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubListener // (1)
class SimpleSubscriber {
@Subscription("animals") // (2)
void onMessage(Animal animal) {
}
@Subscription("projects/eu-project/subscriptions/animals") // (3)
void onMessageEU(Animal animal) {
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
@PubSubListener // (1)
class SimpleSubscriber {
@Subscription("animals") // (2)
fun onMessage(animal: Animal) {
}
@Subscription("projects/eu-project/subscriptions/animals") // (3)
fun onMessageEU(animal: Animal) {
}
}
1 | @PubSubListener marks this class to be a Message Listener |
2 | Methods annotated with @Subscription will receive messages from that subscription |
3 | You can also use a FQN for the subscription, specially when you need to access a resource on a different project |
When publishing to a different project, make sure your service account has the proper access to the resource. |
8.6.1 Content-Type and message deserialization
The framework provides a custom serialization/deserialization (SerDes) mechanism for both message producers and message listeners. On the receiving end the rules to deserialize a PubSubMessage are the following:
-
If the
body
argument of the method is of PubSubMessage type, SerDes is bypassed and the "raw" message is copied to the argument. -
If the
body
argument of the method is a byte array, SerDes is bypassed and the byte contents of thePubSubMessage
are copied to the argument. -
If the
body
argument is a Pojo then the following applies:-
The default
Content-Type
isapplication/json
and the framework will use it if not overridden -
If the message contains an attribute
Content-Type
that value is used -
Finally if the @Subscription has a
contentType
value this value overrides all of the previous values
-
Automatic SerDes is a nice feature that the framework offers, but sometimes you may need to have access to the PubSubMessage
id.
This is provided via the @MessageId annotation.
Once you annotate an argument of type String
with this annotation, the message id will be copied to that argument.
PubSubMessage ids are always of type String for that reason your annotated argument must also be a String .
|
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.MessageId;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubListener
public class ContentTypeSubscriber {
@Subscription("raw-subscription") // (1)
void receiveRaw(byte[] data, @MessageId String id) {
}
@Subscription("native-subscription") // (2)
void receiveNative(PubsubMessage message) {
}
@Subscription("animals") // (3)
void receivePojo(Animal animal, @MessageId String id) {
}
@Subscription(value = "animals-legacy", contentType = "application/xml") // (4)
void receiveXML(Animal animal, @MessageId String id) {
}
}
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.MessageId;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubListener
class ContentTypeSubscriber {
@Subscription("raw-subscription") // (1)
void receiveRaw(byte[] data, @MessageId String id) {
}
@Subscription("native-subscription") // (2)
void receiveNative(PubsubMessage message) {
}
@Subscription("animals") // (3)
void receivePojo(Animal animal, @MessageId String id) {
}
@Subscription(value = "animals-legacy", contentType = "application/xml") // (4)
void receiveXML(Animal animal, @MessageId String id) {
}
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.gcp.pubsub.annotation.MessageId
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
@PubSubListener
class ContentTypeSubscriber {
@Subscription("raw-subscription")
fun receiveRaw(data: ByteArray, @MessageId id: String) { // (1)
}
@Subscription("native-subscription")
fun receiveNative(message: PubsubMessage) { // (2)
}
@Subscription("animals")
fun receivePojo(animal: Animal, @MessageId id: String) { // (3)
}
@Subscription(value = "animals-legacy", contentType = "application/xml")
fun receiveXML(animal: Animal, @MessageId id: String) { // (4)
}
}
1 | Bytes are copied, SerDes is bypassed, message id injected for usage |
2 | SerDes is bypassed, PubSubMessage object is copied, no need to use @MessageId |
3 | The framework will try to deserialize this payload. If no Content-Type header is found, will default to application/json |
4 | Uses a custom SerDes and the framework will find a PubSubMessageSerDes that can handle application/xml |
8.6.2 Message Headers
Google Cloud Pub/Sub messages contain a dictionary of message attributes in the form of a Map<String, String>
.
The framework binds those attributes to a @Header annotation that can be used at an argument of the method.
A ConversionService is used to try to convert from the String
value of the attribute to the target type on the method.
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.annotation.MessageHeader;
@PubSubListener
public class CustomHeaderSubscriber {
@Subscription("animals")
public void onMessage(Animal animal, @MessageHeader("Content-Type") String contentType, @MessageHeader("code") Integer code) { // (1)
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.annotation.MessageHeader;
@PubSubListener
class CustomHeaderSubscriber {
@Subscription("animals")
void onMessage(Animal animal, @MessageHeader("Content-Type") String contentType, @MessageHeader("code") Integer code) { // (1)
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.annotation.MessageHeader
@PubSubListener
class CustomHeaderSubscriber {
@Subscription("animals")
fun onMessage(animal: Animal,
@MessageHeader("Content-Type") contentType: String, @MessageHeader("code") code: Int) { // (1)
}
}
1 | Each annotated argument will be mapped to the corresponding attribute value. The ConversionService will try to convert to the target type |
8.6.3 Subscriber properties
Pub/Sub allows each Subscriber to have its own configuration for things such as executors or flow control settings.
The framework allows you to create configurations and then bind those configurations to each @Subscription using the configuration
parameter.
Property | Type | Description |
---|---|---|
gcp.pubsub.subscriber.*.executor |
java.lang.String |
Name of the executor to use. Default: scheduled |
gcp.pubsub.subscriber.*.parallel-pull-count |
java.lang.Integer |
number of concurrent pulls. Default: 1 |
gcp.pubsub.subscriber.*.max-ack-extension-period |
org.threeten.bp.Duration |
Set the maximum period a message ack deadline will be extended. Default: one hour. |
gcp.pubsub.subscriber.*.max-duration-per-ack-extension |
org.threeten.bp.Duration |
Set the upper bound for a single mod ack extention period. Default: one hour. |
gcp.pubsub.subscriber.*.flow-control.max-outstanding-element-count |
java.lang.Long |
Maximum number of outstanding elements to keep in memory before enforcing flow control. Default: 1000 |
gcp.pubsub.subscriber.*.flow-control.max-outstanding-request-bytes |
java.lang.Long |
Maximum number of outstanding bytes to keep in memory before enforcing flow control. Default: 100 * 1024 * 1024 |
gcp.pubsub.subscriber.*.flow-control.limit-exceeded-behavior |
com.google.api.gax.batching.FlowController$LimitExceededBehavior |
Default: LimitExceededBehavior.Block |
Suppose you have the following configuration for a subscriber:
gcp.pubsub.subscriber.custom.executor=custom-executor
gcp.pubsub.subscriber.custom.parallel-pull-count=4
gcp:
pubsub:
subscriber:
custom:
executor: custom-executor
parallel-pull-count: 4
[gcp]
[gcp.pubsub]
[gcp.pubsub.subscriber]
[gcp.pubsub.subscriber.custom]
executor="custom-executor"
parallel-pull-count=4
gcp {
pubsub {
subscriber {
custom {
executor = "custom-executor"
parallelPullCount = 4
}
}
}
}
{
gcp {
pubsub {
subscriber {
custom {
executor = "custom-executor"
parallel-pull-count = 4
}
}
}
}
}
{
"gcp": {
"pubsub": {
"subscriber": {
"custom": {
"executor": "custom-executor",
"parallel-pull-count": 4
}
}
}
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubListener
public class CustomConfigurationSubscriber {
@Subscription(value = "animals", configuration = "custom") // (1)
public void onMessage(Animal animal) {
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubListener
class CustomConfigurationSubscriber {
@Subscription(value = "animals", configuration = "custom") // (1)
void onMessage(Animal animal) {
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
@PubSubListener
class CustomConfigurationSubscriber {
@Subscription(value = "animals", configuration = "custom") // (1)
fun onMessage(animal: Animal) {
}
}
1 | The Subscriber will be configured using a configuration named custom |
8.6.4 Handling message acknowledgement
The message delivery is handled by an IntroductionAdvice
class PubSubConsumerAdvice. It invokes the method annotated with @Subscription.
By default messages are auto acknowledged if the method returns without exceptions, see the section Consumer ErrorHandling for more information on error handling.
It is possible to have manual acknowledgement control by adding an argument of type Acknowledge and manually invoking ack()
or nack()
methods.
Google Cloud Pub/Sub controls Acknowledgment at the Subscription level, please refer to the Pub/Sub Subscriber documentation for more information.
If you provide an Acknowledge type in your method and forget to invoke ack() /nack() the framework will log a warning message to let you know if you forgot to manually register an acknowledgement.
|
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.Acknowledgement;
@PubSubListener
public class AcknowledgementSubscriber {
@Subscription("animals")
public void onMessage(Animal animal, Acknowledgement acknowledgement) { // (1)
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.Acknowledgement;
@PubSubListener
class AcknowledgementSubscriber {
@Subscription("animals")
void onMessage(Animal animal, Acknowledgement acknowledgement) { // (1)
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.Acknowledgement
@PubSubListener
class AcknowledgementSubscriber {
@Subscription("animals")
fun onMessage(animal: Animal, acknowledgement: Acknowledgement) { // (1)
}
}
8.6.5 Consumer error handling
During message handling for listeners errors can happen at the framework or at your method level such as:
-
Problems binding method arguments to the message body
-
Content-Type deserialization issues
-
Acknowledgement
-
Uncaught exceptions at the annotated method
The framework provides a Global Error Handler DefaultPubSubMessageReceiverExceptionHandler, this handler will catch any errors and just log it. This behavior is far from ideal as messages would continue to be redelivered since they are not acknowledged.
There’s two ways you can provide error handling.
-
Locally: your @PubSubListener class implements PubSubMessageReceiverExceptionHandler, then any error related to
Subscriptions
of that class will be handled by your class. -
Globally: You define a new implementation of PubSubMessageReceiverExceptionHandler which replaces DefaultPubSubMessageReceiverExceptionHandler.
The PubSubMessageReceiverException contains references to the originating bean that threw the exception as well as a reference to PubSubConsumerState which has state information regarding the message handling.
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.bind.PubSubConsumerState;
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException;
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubListener
public class ErrorHandlingSubscriber implements PubSubMessageReceiverExceptionHandler { // (1)
/**
*
* @param animal payload
*/
public void onMessage(Animal animal) {
throw new RuntimeException("error");
}
@Override
public void handle(PubSubMessageReceiverException exception) { // (2)
Object listener = exception.getListener(); // (3)
PubSubConsumerState state = exception.getState(); // (4)
PubsubMessage originalMessage = state.getPubsubMessage();
String contentType = state.getContentType();
//some logic
state.getAckReplyConsumer().ack(); // (5)
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler
import io.micronaut.gcp.pubsub.support.Animal
@PubSubListener
class ErrorHandlingSubscriber implements PubSubMessageReceiverExceptionHandler { // (1)
void onMessage(Animal animal) {
throw new RuntimeException("error");
}
@Override
void handle(PubSubMessageReceiverException exception) { // (2)
def listener = exception.listener// (3)
def state = exception.state // (4)
def originalMessage = state.pubsubMessage
def contentType = state.contentType
//some logic
state.getAckReplyConsumer().ack(); // (5)
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler
import io.micronaut.gcp.pubsub.support.Animal
@PubSubListener
class ErrorHandlingSubscriber : PubSubMessageReceiverExceptionHandler { // (1)
fun onMessage(animal: Animal) {
throw RuntimeException("error")
}
override fun handle(exception: PubSubMessageReceiverException) { // (2)
val listener = exception.listener // (3)
val state = exception.state // (4)
val originalMessage = state.pubsubMessage
val contentType = state.contentType
//some logic
state.ackReplyConsumer.ack() // (5)
}
}
1 | Implement PubSubMessageReceiverExceptionHandler to mark this bean as a local error handler |
2 | Method invoked if any error happens during the processing of this PubSubListener class |
3 | Reference to the class that originated the exception |
4 | Contains information related to the subscription |
5 | Depending on your use case you can ack() of nack() the message |
8.6.6 Custom Parameter Binding
Default Binding Functionality
Consumer argument binding is achieved through an ArgumentBinderRegistry that is specific for binding consumers from Pub/Sub messages. The class responsible for this is the PubSubBinderRegistry. The registry supports argument binders that are used based on an annotation applied to the argument or the argument type. All argument binders must implement either PubSubAnnotatedArgumentBinder or PubSubTypeArgumentBinder. The exception to that rule is the PubSubDefaultArgumentBinder which is used when no other binders support a given argument.
When an argument needs bound, the PubSubConsumerState is used as the source of all of the available data. The binder registry follows a small sequence of steps to attempt to find a binder that supports the argument.
-
Search the annotation based binders for one that matches any annotation on the argument that is annotated with @Bindable.
-
Search the type based binders for one that matches or is a subclass of the argument type.
-
Return the default binder.
Custom Binding
To inject your own argument binding behavior, it is as simple as registering a bean. The existing binder registry will inject it and include it in the normal processing.
Annotation Binding
A custom annotation can be created to bind consumer arguments. A custom binder can then be created to use that annotation and the PubSubConsumerState to supply a value for the argument. The value may in fact come from anywhere, however for the purposes of this documentation, we will show how you would create an annotation to bind the message publish time.
import io.micronaut.core.bind.annotation.Bindable;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
@Bindable // (1)
public @interface MessagePublishTime {
}
import io.micronaut.core.bind.annotation.Bindable;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target([ElementType.PARAMETER])
@Bindable // (1)
@interface MessagePublishTime {
}
import io.micronaut.core.bind.annotation.Bindable
@MustBeDocumented
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.VALUE_PARAMETER)
@Bindable // (1)
annotation class MessagePublishTime
1 | The @Bindable annotation is required for the annotation to be considered for binding. |
import com.google.protobuf.util.Timestamps;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionService;
import jakarta.inject.Singleton;
@Singleton // (1)
public class MessagePublishTimeAnnotationBinder implements PubSubAnnotatedArgumentBinder<MessagePublishTime> { // (2)
private final ConversionService<?> conversionService;
public MessagePublishTimeAnnotationBinder(ConversionService<?> conversionService) { // (3)
this.conversionService = conversionService;
}
@Override
public Class<MessagePublishTime> getAnnotationType() {
return MessagePublishTime.class;
}
@Override
public BindingResult<Object> bind(ArgumentConversionContext<Object> context, PubSubConsumerState source) {
Long epochMillis = Timestamps.toMillis(source.getPubsubMessage().getPublishTime()); // (4)
return () -> conversionService.convert(epochMillis, context); // (5)
}
}
import com.google.protobuf.util.Timestamps
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import jakarta.inject.Singleton;
@Singleton // (1)
class MessagePublishTimeAnnotationBinder implements PubSubAnnotatedArgumentBinder<MessagePublishTime> { // (2)
private final ConversionService<?> conversionService;
public MessagePublishTimeAnnotationBinder(ConversionService<?> conversionService) { // (3)
this.conversionService = conversionService;
}
@Override
Class<MessagePublishTime> getAnnotationType() {
MessagePublishTime
}
@Override
public BindingResult<Object> bind(ArgumentConversionContext<Object> context, PubSubConsumerState source) {
def epochMillis = Timestamps.toMillis(source.pubsubMessage.publishTime) // (4)
return { -> conversionService.convert(epochMillis, context) } // (5)
}
}
import com.google.protobuf.util.Timestamps
import io.micronaut.core.bind.ArgumentBinder
import io.micronaut.core.bind.ArgumentBinder.BindingResult
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import jakarta.inject.Singleton
@Singleton // (1)
class MessagePublishTimeAnnotationBinder(private val conversionService: ConversionService<*>) // (3)
: PubSubAnnotatedArgumentBinder<MessagePublishTime> // (2)
{
override fun getAnnotationType(): Class<MessagePublishTime> {
return MessagePublishTime::class.java
}
override fun bind(context: ArgumentConversionContext<Any>, source: PubSubConsumerState): BindingResult<Any> {
val epochMillis = Timestamps.toMillis(source.pubsubMessage.publishTime) // (4)
return ArgumentBinder.BindingResult { conversionService.convert(epochMillis, context) } // (5)
}
}
1 | The class is made a bean by annotating with @Singleton . |
2 | The custom annotation is used as the generic type for the interface. |
3 | The conversion service is injected into the instance. |
4 | The message publish time is retrieved from the message state. |
5 | The value is converted to the argument type. For example this allows the argument to be a String even though the epochMillis is a Long . |
You could also return a com.google.protobuf.Timestamp if you register the appropriate type converter to the conversion service, but such example is outside of the scope of this documentation.
|
The annotation can now be used on the argument in a consumer method.
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubListener
public class CustomBindingSubscriber {
/**
*
* @param animal payload
* @param publishTime message publish time
*/
public void onMessage(Animal animal, @MessagePublishTime Long publishTime) { // (1)
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.support.Animal;
@PubSubListener
class CustomBindingSubscriber {
/**
*
* @param animal payload
* @param publishTime message publish time
*/
void onMessage(Animal animal, @MessagePublishTime Long publishTime) { // (1)
}
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.support.Animal
@PubSubListener
class CustomBindingSubscriber {
/**
*
* @param animal payload
* @param publishTime message publish time
*/
fun onMessage(animal: Animal, @MessagePublishTime publishTime: Long) { // (1)
}
}
1 | The message publish time will now be passed to the annotated argument |
8.7 Message Serialization/Deserialization (SerDes)
The serialization and deserialization of message bodies is handled through instances of PubSubMessageSerDes. The ser-des (Serializer/Deserializer) is responsible for both serialization and deserialization of Pub/Sub message bodies into the message body types defined in your clients and consumers methods.
The ser-des are managed by a PubSubMessageSerDesRegistry.
All ser-des beans are injected in order into the registry and then searched for when serialization or deserialization is needed.
The search is based on the Content-Type
defined for the message, the framework default is application/json
.
If a ser-des can’t be located an exception is thrown.
You can supply your own ser-des by simply registering a bean of type PubSubMessageSerDes.
For example let’s say you want to implement a ser-des that uses java native object serialization instead of the Json serialization provided.
First you need to define a custom mimeType for that, we will use application/x.java
, following the best principles for handling mime types.
This is a fictional example, java serialization is not ideal, and its not portable. |
import io.micronaut.core.serialize.exceptions.SerializationException;
import io.micronaut.core.type.Argument;
import jakarta.inject.Singleton;
import java.io.*;
@Singleton // (1)
public class JavaMessageSerDes implements PubSubMessageSerDes {
@Override
public String supportedType() { // (2)
return "application/x.java";
}
@Override
public Object deserialize(byte[] data, Argument<?> type) {
ByteArrayInputStream bin = new ByteArrayInputStream(data);
Object result = null;
try {
ObjectInputStream reader = new ObjectInputStream(bin);
result = reader.readObject();
} catch (Exception e) {
throw new SerializationException("Failed to deserialize object", e);
}
return result;
}
@Override
public byte[] serialize(Object data) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
ObjectOutputStream writer = new ObjectOutputStream(baos);
writer.writeObject(data);
} catch (IOException e) {
throw new SerializationException("Failed to serialize object", e);
}
return baos.toByteArray();
}
}
import io.micronaut.core.serialize.exceptions.SerializationException;
import io.micronaut.core.type.Argument;
import jakarta.inject.Singleton;
import java.io.*;
@Singleton // (1)
class JavaMessageSerDes implements PubSubMessageSerDes {
@Override
String supportedType() { // (2)
"application/x.java"
}
@Override
public Object deserialize(byte[] data, Argument<?> type) {
ByteArrayInputStream bin = new ByteArrayInputStream(data)
Object result = null
try {
ObjectInputStream reader = new ObjectInputStream(bin)
result = reader.readObject();
} catch (Exception e) {
throw new SerializationException("Failed to deserialize object", e)
}
return result;
}
@Override
byte[] serialize(Object data) {
ByteArrayOutputStream baos = new ByteArrayOutputStream()
try {
ObjectOutputStream writer = new ObjectOutputStream(baos)
writer.writeObject(data)
} catch (IOException e) {
throw new SerializationException("Failed to serialize object", e)
}
return baos.toByteArray()
}
}
import io.micronaut.core.serialize.exceptions.SerializationException
import io.micronaut.core.type.Argument
import java.io.*
import jakarta.inject.Singleton
@Singleton // (1)
class JavaMessageSerDes : PubSubMessageSerDes {
override fun supportedType(): String { // (2)
return "application/x.java"
}
override fun deserialize(data: ByteArray, type: Argument<*>): Any {
val bin = ByteArrayInputStream(data)
var result: Any? = null
result = try {
val reader = ObjectInputStream(bin)
reader.readObject()
} catch (e: Exception) {
throw SerializationException("Failed to deserialize object", e)
}
return result
}
override fun serialize(data: Any): ByteArray {
val baos = ByteArrayOutputStream()
try {
val writer = ObjectOutputStream(baos)
writer.writeObject(data)
} catch (e: IOException) {
throw SerializationException("Failed to serialize object", e)
}
return baos.toByteArray()
}
}
1 | The class is declared as a singleton so it will be registered with the context |
2 | The contentType defines what kind of messages will this implementation be able to handle |
To publish messages with this ser-des set the Content-Type for @Topic
@Topic(value = "animals", contentType = "application/x.java")
If the messages that are arriving on your subscriber already contain a Content-Type
header with this type the framework will pick it up, or you can just force it on the @Suibscription
annotation too.
@Subscription(value = "animals", contentType = "application/x.java")
8.8 Configuring Thread pools
The framework allows users to configure separate thread pools to be used for Publishers and Subscriber. You can set a reference to a pre-defined executor at Pub/Sub configuration properties, or create custom configurations for publishers and subscribers. Micronaut supports definition of custom ExecutorService implementations, see ExecutorConfiguration for the full list of options.
Pub/Sub client libraries require an ScheduledExecutorService for both Publisher and Subscriber make sure your custom executor is of type scheduled or an error will be thrown.
|
For example:
custom
thread poolmicronaut.executors.custom.type=scheduled
micronaut.executors.custom.nThreads=32
micronaut:
executors:
custom:
type: scheduled
nThreads: 32
[micronaut]
[micronaut.executors]
[micronaut.executors.custom]
type="scheduled"
nThreads=32
micronaut {
executors {
custom {
type = "scheduled"
nThreads = 32
}
}
}
{
micronaut {
executors {
custom {
type = "scheduled"
nThreads = 32
}
}
}
}
{
"micronaut": {
"executors": {
"custom": {
"type": "scheduled",
"nThreads": 32
}
}
}
}
If no configuration is supplied, the framework will use the default named scheduled
executor.
8.9 Using Google Cloud Pub/Sub emulator
Google Cloud Pub/Sub has a local emulator to enable developers to test their applications locally with no need to connect to the cloud Pub/Sub service.
The framework supports automatic switching of the TransportChannelProvider
to use PUBSUB_EMULATOR_HOST
if this variable is set on the environment.
Make sure that you also set GCP_PROJECT_ID to be the same as the project you have configured the emulator to use. Otherwise the framework may pick up the projectId used by the default credentials.
|
You will need to manually configure topics and subscriptions on the emulator if you want to test locally. You can easily create resources using the Pub/Sub REST interface to create topics and subscriptions.
For instance, the following curl
command would create a topic named micronaut-devices
on the test-project
project.
curl -XPUT $PUBSUB_EMULATOR_HOST/v1/projects/test-project/topics/micronaut-devices
9 Google Cloud Secret Manager Support
Google Cloud Secret Manager Secret Manager is a secure and convenient storage system for API keys, passwords, certificates, and other sensitive data. Applications can also use it to store configuration files and use it as secure distributed repository of metadata.
To add support for Google Cloud Secret Manager to an existing project, add the following dependencies to your build.
implementation("io.micronaut.gcp:micronaut-gcp-secret-manager")
<dependency>
<groupId>io.micronaut.gcp</groupId>
<artifactId>micronaut-gcp-secret-manager</artifactId>
</dependency>
9.1 Distributed Configuration
You can leverage Distributed Configuration and rely on Google Cloud Secret Manager to store your configuration files.
To enable it, add a bootstrap configuration file to src/main/resources/
:
micronaut.application.name=hello-world
micronaut.config-client.enabled=true
micronaut:
application:
name: hello-world
config-client:
enabled: true
[micronaut]
[micronaut.application]
name="hello-world"
[micronaut.config-client]
enabled=true
micronaut {
application {
name = "hello-world"
}
configClient {
enabled = true
}
}
{
micronaut {
application {
name = "hello-world"
}
config-client {
enabled = true
}
}
}
{
"micronaut": {
"application": {
"name": "hello-world"
},
"config-client": {
"enabled": true
}
}
}
Make sure you have configured the correct credentials on your project following the Setting up GCP section. And that the service account you designated your application has proper rights to read secrets. Follow the official Access Control guide for Secret Manager if you need more information. |
Configuration file resolution
It’s important to note that Google Cloud Secret Manager does not support file extensions.
When you upload your application configuration file (e.g. application.yml
or application.properties
) remove the file extension.
Micronaut tries to circumvent this limitation, adopting a convention when fetching configuration files. The following table displays the possible secrets that are fetched by default:
Name |
Description |
|
Configuration shared by all applications |
|
Environment specific configuration, for instance if you have |
|
Application specific configuration, example |
|
Application specific configuration bound to an environment, example |
Supported formats
Internally the framework will try to read the files (that lack extensions) on the following order: yaml
, json
, properties
.
The first PropertySourceLoader that succeeds reading the contents of the secrets interrupts the chain.
Loading non default config files
It may be desired to sometimes load a secret as a PropertySource that does not follow the above convention.
You can whitelist
secrets that you would like to loaded as PropertySources using the following configuration:
micronaut.application.name=hello-world
micronaut.config-client.enabled=true
gcp.secret-manager.custom-configs[0]=my-first-config
gcp.secret-manager.custom-configs[1]=my-second-config
micronaut:
application:
name: hello-world
config-client:
enabled: true
gcp:
secret-manager:
custom-configs:
- my-first-config
- my-second-config
[micronaut]
[micronaut.application]
name="hello-world"
[micronaut.config-client]
enabled=true
[gcp]
[gcp.secret-manager]
custom-configs=[
"my-first-config",
"my-second-config"
]
micronaut {
application {
name = "hello-world"
}
configClient {
enabled = true
}
}
gcp {
secretManager {
customConfigs = ["my-first-config", "my-second-config"]
}
}
{
micronaut {
application {
name = "hello-world"
}
config-client {
enabled = true
}
}
gcp {
secret-manager {
custom-configs = ["my-first-config", "my-second-config"]
}
}
}
{
"micronaut": {
"application": {
"name": "hello-world"
},
"config-client": {
"enabled": true
}
},
"gcp": {
"secret-manager": {
"custom-configs": ["my-first-config", "my-second-config"]
}
}
}
The secrets you list on custom-configs must be valid configuration files and not single keys.
|
Loading key/value pairs into a PropertySource
Google Cloud Secret Manager can also host single keys mapping to a single String value.
For instance one could have a DB_PASSWORD
secret, instead of storing it on a configuration file.
Because Secret Manager does not offer a hierarchical approach, loading all secrets in a project is not a viable solution.
Instead you need to whitelist the keys you would like to pre-fetch such as the following config demonstrates:
micronaut.application.name=hello-world
micronaut.config-client.enabled=true
gcp.secret-manager.keys[0]=DB_PASSWORD
gcp.secret-manager.keys[1]=dbUser
micronaut:
application:
name: hello-world
config-client:
enabled: true
gcp:
secret-manager:
keys:
- DB_PASSWORD
- dbUser
[micronaut]
[micronaut.application]
name="hello-world"
[micronaut.config-client]
enabled=true
[gcp]
[gcp.secret-manager]
keys=[
"DB_PASSWORD",
"dbUser"
]
micronaut {
application {
name = "hello-world"
}
configClient {
enabled = true
}
}
gcp {
secretManager {
keys = ["DB_PASSWORD", "dbUser"]
}
}
{
micronaut {
application {
name = "hello-world"
}
config-client {
enabled = true
}
}
gcp {
secret-manager {
keys = ["DB_PASSWORD", "dbUser"]
}
}
}
{
"micronaut": {
"application": {
"name": "hello-world"
},
"config-client": {
"enabled": true
}
},
"gcp": {
"secret-manager": {
"keys": ["DB_PASSWORD", "dbUser"]
}
}
}
The framework will load all Secret Manager keys and prefix them with a sm
key. The table below shows how entries could be accessed:
Secret Name |
Micronaut resolved property |
|
|
|
|
Secret Versioning
Google Cloud Secret Manager supports secret versioning.
For the distributed configuration use case Micronaut will always load the latest
version of each secret into a property source file.
It’s currently not supported specifying what version of your application
configuration file should be loaded.
9.2 Low-Level Secret Manager Client access
Accessing Secret Manager via client libraries
If you require access to Secret Manager via the Google client java libraries, Micronaut provides a bean of type SecretManagerServiceClient for injection.
import com.google.cloud.secretmanager.v1.AccessSecretVersionRequest;
import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse;
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
import com.google.cloud.secretmanager.v1.SecretVersionName;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.runtime.event.annotation.EventListener;
public final class GoogleClientExample {
private final SecretManagerServiceClient client;
public GoogleClientExample(SecretManagerServiceClient googleSecretManagerClient) { // (1)
this.client = googleSecretManagerClient;
}
@EventListener
public void onStartup(StartupEvent event) {
AccessSecretVersionResponse response = client.accessSecretVersion(AccessSecretVersionRequest
.newBuilder()
.setName(SecretVersionName.of("my-cloud-project", "secretName", "latest").toString())
.build());
String secret = response.getPayload().getData().toStringUtf8();
}
}
import com.google.cloud.secretmanager.v1.AccessSecretVersionRequest;
import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse;
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
import com.google.cloud.secretmanager.v1.SecretVersionName;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.runtime.event.annotation.EventListener;
class GoogleClientExample {
private final SecretManagerServiceClient client
GoogleClientExample(SecretManagerServiceClient googleSecretManagerClient) { // (1)
this.client = googleSecretManagerClient;
}
@EventListener
void onStartup(StartupEvent event) {
AccessSecretVersionResponse response = client.accessSecretVersion(AccessSecretVersionRequest
.newBuilder()
.setName(SecretVersionName.of("my-cloud-project", "secretName", "latest").toString())
.build())
String secret = response.getPayload().getData().toStringUtf8()
}
}
import com.google.cloud.secretmanager.v1.AccessSecretVersionRequest
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient
import com.google.cloud.secretmanager.v1.SecretVersionName
import io.micronaut.context.event.StartupEvent
import io.micronaut.runtime.event.annotation.EventListener
class GoogleClientExample (private val client: SecretManagerServiceClient) { // (1)
@EventListener
fun onStartup(event: StartupEvent) {
val response = client.accessSecretVersion(AccessSecretVersionRequest
.newBuilder()
.setName(SecretVersionName.of("my-cloud-project", "secretName", "latest").toString())
.build())
val secret = response.payload.data.toStringUtf8()
}
}
1 | Inject SecretManagerServiceClient |
Micronaut Secret Manager Client
If you rather not deal with Google gRPC libraries, internally the framework uses a wrapper client: SecretManagerClient that provides a reactive based approach:
import io.micronaut.context.event.StartupEvent;
import io.micronaut.gcp.secretmanager.client.SecretManagerClient;
import io.micronaut.gcp.secretmanager.client.VersionedSecret;
import io.micronaut.runtime.event.annotation.EventListener;
import reactor.core.publisher.Mono;
public final class ClientExample {
private final SecretManagerClient client;
public ClientExample(SecretManagerClient client) {
this.client = client;
}
@EventListener
public void onStartup(StartupEvent event) {
Mono<VersionedSecret> secret = Mono.from(client.getSecret("secretId")); // (1)
Mono<VersionedSecret> v2 = Mono.from(client.getSecret("secretId", "v2")); //(2)
Mono<VersionedSecret> fromOtherProject = Mono.from(client.getSecret("secretId", "latest", "another-project-id")); //(3)
}
}
import io.micronaut.context.event.StartupEvent
import io.micronaut.gcp.secretmanager.client.SecretManagerClient
import io.micronaut.gcp.secretmanager.client.VersionedSecret
import io.micronaut.runtime.event.annotation.EventListener
import reactor.core.publisher.Mono
class ClientExample {
private final SecretManagerClient client
ClientExample(SecretManagerClient client) {
this.client = client
}
@EventListener
void onStartup(StartupEvent event) {
Mono<VersionedSecret> secret = client.getSecret("secretId") // (1)
Mono<VersionedSecret> v2 = client.getSecret("secretId", "v2") //(2)
Mono<VersionedSecret> fromOtherProject = client.getSecret("secretId", "latest", "another-project-id") //(3)
}
}
import io.micronaut.context.event.StartupEvent
import io.micronaut.gcp.secretmanager.client.SecretManagerClient
import io.micronaut.runtime.event.annotation.EventListener
class ClientExample(private val client: SecretManagerClient) {
@EventListener
fun onStartup(event: StartupEvent) {
val secret = client.getSecret("secretId") // (1)
val v2 = client.getSecret("secretId", "v2") //(2)
val fromOtherProject = client.getSecret("secretId", "latest", "another-project-id") //(3)
}
}
1 | Uses latest version and current project from GoogleCloudCredentials |
2 | Uses current project but overrides version |
3 | Fetches a secret from a different project |
You can use the client libraries at any point in time of your runtime, however it’s advised that secrets should be loaded only once during application startup, to reduce latency of the remote call as well as costs associated with secret retrieval. |
10 Generating Native Images
Some GCP modules require Native Image Support for Google Cloud Libraries to build GraalVM native images.
The library is only needed when compiling the native image.
For Gradle add the following dependency:
dependencies {
...
nativeImageCompileOnly("com.google.cloud:native-image-support")
}
For Maven add the following profile:
<profiles>
<profile>
<id>graalVM</id>
<activation>
<property>
<name>packaging</name>
<value>native-image</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>native-image-support</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
11 Google Cloud Storage Support
Micronaut provides a high-level, uniform object storage API that works across the major cloud providers: Micronaut Object Storage.
To get started, select the object-storage-gcp
feature in Micronaut Launch, or add the following dependency:
implementation("io.micronaut.objectstorage:micronaut-object-storage-gcp")
<dependency>
<groupId>io.micronaut.objectstorage</groupId>
<artifactId>micronaut-object-storage-gcp</artifactId>
</dependency>
For more information, check the Micronaut Object Storage GCP support documentation.
12 Repository
You can find the source code of this project in this repository: