Micronaut GCP

Provides integration between Micronaut and Google Cloud Platform (GCP)

Version:

1 Introduction

This project provides various extensions to Micronaut to integrate Micronaut with Google Cloud Platform (GCP).

2 Setting up GCP Support

The micronaut-gcp-common module includes basic setup for running applications on Google Cloud.

implementation("io.micronaut.gcp:micronaut-gcp-common")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-common</artifactId>
</dependency>

Prerequisites:

  1. You should have a Google Cloud Platform project created

  2. Install gcloud CLI

  3. Configure default project gcloud config set project YOUR_PROJECT_ID

  4. Authenticate with gcloud auth login

  5. Authenticate application default credential with gcloud auth application-default login

It’s strongly recommended that you use a Service Account for your application.

Google Project ID

The module features a base GoogleCloudConfiguration which you can use to configure or retrieve the GCP Project ID:

🔗
Table 1. Configuration Properties for GoogleCloudConfiguration
Property Type Description

gcp.project-id

java.lang.String

Returns the Google project ID for the project.

You can inject this bean and use the getProjectId() method to retrieve the configured or detected project ID.

Google Credentials

The module will setup a bean of exposing the com.google.auth.oauth2.GoogleCredentials instance that are either detected from the local environment or configured by GoogleCredentialsConfiguration:

🔗
Table 2. Configuration Properties for GoogleCredentialsConfiguration
Property Type Description

gcp.credentials.scopes

java.util.List

The scopes to use.

gcp.credentials.location

java.lang.String

The location of the service account credential key file. See <a href="https://cloud.google.com/iam/docs/understanding-service-accounts">Understanding Service Accounts</a> for more information on generating a service account key file.

gcp.credentials.encoded-key

java.lang.String

The Base64 encoded service account key content. This is not recommended except if you need to encode service account key via an environmental variable. For other use cases, configure <pre>location</pre> instead.

gcp.credentials.enabled

boolean

Allows disabling Google credentials configuration. This may be useful in situations where you don’t want to authenticate despite having the Google Cloud SDK configured. Default value is true.

gcp.credentials.use-http-client

boolean

If the HttpClient based transport should be used for retrieving authentication tokens. Default value is true. Note that if HttpClient is not on the classpath, the GCP SDK’s default transport will be used instead.

Debug Logging

The underlying GCP SDK libraries use the standard java.util.logging package (JUL) for log statements. The libraries are fairly conservative in what they log by default. If you need to debug the GCP libraries' activity, especially their GRPC-based communication with the GCP cloud services, it can be useful to turn up the logging level. In order to do this in conjunction with the framework’s SLF4J-based logging, it is necessary to perform some additional setup to enable the JUL bridge library for SLF4J.

There is an unavoidable performance impact to enabled JUL log statements when using the jul-to-slf4j bridge, thus it is advised to be conservative in enabling this configuration, preferably only for debugging purposes.

To enable the GCP library debug logging, first add the jul-to-slf4j.jar dependency to your classpath:

runtimeOnly("org.slf4j:jul-to-slf4j:2.0.9")
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>jul-to-slf4j</artifactId>
    <version>2.0.9</version>
    <scope>runtime</scope>
</dependency>

Next you can either enable the JUL bridge class SLF4JBridgeHandler programmatically during application initialization (such as in the main method of your application), or by adding the following line to a logging.properties file on your classpath (see the SLF4JBridgeHandler javadocs for more details):

 handlers = org.slf4j.bridge.SLF4JBridgeHandler

Next add the following configuration for LevelChangePropagator (which eliminates the performance impact of disabled JUL log statements) to your SLF4J configuration:

<configuration>
  <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator"/>
   <!-- rest of the configuration file .... -->
</configuration>

Once this is done, you can set the logging level for GCP library classes in the usual manner using SLF4J configuration.

3 Release History

For this project, you can find a list of releases (with release notes) here:

4 Google Cloud Logging Support

Google Cloud Logging integration is available via StackdriverJsonLayout that formats log output using Stackdriver structured logging format.

To enable it add the following dependency to your project:

implementation("io.micronaut.gcp:micronaut-gcp-logging")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-logging</artifactId>
</dependency>

By default if an application is using a CONSOLE appender Stackdriver log parser will consider the entire payload of the message as a text entry, making searching and correlation with tracing impractical.

Console log output

Logs on the picture above can’t be searched by attributes such as thread and ansi coloring makes regex searching even more challenging.

When enabled the JSON appender allows logs to be easily filtered:

If you combine this module with the Stackdriver Trace module, all logs will also have a traceId field, making possible to correlate traces with log entries.
JSON log output
1 Visual display of log levels
2 Correlated traceId for tracing when you have enabled cloud tracing
3 Simplified output without extra fields or ANSI coloring codes

Configuring logging via console

<configuration>
    <!-- Uncomment the next line to enable ANSI color code interpretation -->
    <!-- <property name="STDOUT_WITH_JANSI" value="true" /> -->
    <include resource="io/micronaut/gcp/logging/logback-json-appender.xml" /> (1)
    <root level="INFO">
        <appender-ref ref="CONSOLE_JSON" /> (2)
    </root>
</configuration>
1 Import the logback configuration that defines the JsonLayout appender
2 Define your root level as CONSOLE_JSON

Overriding defaults on logging configuration

If you would like to override the fields that are included in the JsonLayout appender, you can declare it on your own logback configuration instead of including the default from logback-json-appender.xml:

<configuration>
    <appender name="CONSOLE_JSON" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
            <layout class="io.micronaut.gcp.logging.StackdriverJsonLayout">
                <projectId>${projectId}</projectId> (1)
                <includeTraceId>true</includeTraceId>
                <includeSpanId>true</includeSpanId>
                <includeLevel>true</includeLevel>
                <includeThreadName>false</includeThreadName>
                <includeMDC>true</includeMDC>
                <includeLoggerName>true</includeLoggerName>
                <includeFormattedMessage>true</includeFormattedMessage>
                <includeExceptionInMessage>true</includeExceptionInMessage>
                <includeContextName>true</includeContextName>
                <includeMessage>false</includeMessage>
                <includeException>false</includeException>
            </layout>
        </encoder>
    </appender>
    <root level="INFO">
        <appender-ref ref="CONSOLE_JSON" >

        </appender-ref>
    </root>
</configuration>
1 You can override the projectId settings via property. If not defined, the default ServiceOptions.getDefaultProjectId() will be used.

Dynamic appender selection

The JSONLayout comes in hand when using Google Cloud Logging, but when running locally it will make your console logs unreadable. The default logback-json-appender configuration includes both a STDOUT and a CONSOLE_JSON appenders, as well as a dynamic logback property called google_cloud_logging.

You can use that variable to switch your logger appender dynamically.

You logging configuration would look like this:

<configuration>
    <include resource="io/micronaut/gcp/logging/logback-json-appender.xml" />
    <root level="INFO">
        <appender-ref ref="${google_cloud_logging}" /> (1)
    </root>
</configuration>
1 Chooses the appropriate appender depending on the environment.
The environment detection executes a HTTP request to the Google Cloud metadata server. If you rather skip this to improve startup time, just set MICRONAUT_ENVIRONMENTS environment variable or the micronaut.environments System property as described in the reference documentation.

5 Stackdriver Trace

The micronaut-gcp-tracing integrates Micronaut with Cloud Trace from Google Cloud Operations (formerly Stackdriver).

To enable it add the following dependency:

implementation("io.micronaut.gcp:micronaut-gcp-tracing")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-tracing</artifactId>
</dependency>

Then enabling Zipkin tracing in your application configuration:

Enabling Stackdriver Trace
tracing.zipkin.enabled=true
tracing.zipkin.sampler=probability=1.0
tracing.gcp.tracing.enabled=true
tracing:
    zipkin:
        enabled: true
        sampler:
            probability=1.0
    gcp:
        tracing:
            enabled: true
[tracing]
  [tracing.zipkin]
    enabled=true
    sampler="probability=1.0"
  [tracing.gcp]
    [tracing.gcp.tracing]
      enabled=true
tracing {
  zipkin {
    enabled = true
    sampler = "probability=1.0"
  }
  gcp {
    tracing {
      enabled = true
    }
  }
}
{
  tracing {
    zipkin {
      enabled = true
      sampler = "probability=1.0"
    }
    gcp {
      tracing {
        enabled = true
      }
    }
  }
}
{
  "tracing": {
    "zipkin": {
      "enabled": true,
      "sampler": "probability=1.0"
    },
    "gcp": {
      "tracing": {
        "enabled": true
      }
    }
  }
}
  • sampler.probability (optional) Set sampling probability to 100% for dev/testing purposes to observe traces

  • gcp.tracing.enabled (optional) Enable/disable Stackdriver Trace configuration, defaults to true

6 Authorizing HTTP Clients

The micronaut-gcp-http-client module can be used to help authorize service-to-service communication. To get started add the following module:

implementation("io.micronaut.gcp:micronaut-gcp-http-client")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-http-client</artifactId>
</dependency>

You should then configure the service accounts as per the documentation on service-to-service communication and the enable the filter for the outgoing URI paths you wish to include the Google-signed OAuth ID token:

gcp.http.client.auth.patterns[0]=/foo/**
gcp.http.client.auth.patterns[1]=/bar/**
gcp:
  http:
    client:
      auth:
        patterns:
          - /foo/**
          - /bar/**
[gcp]
  [gcp.http]
    [gcp.http.client]
      [gcp.http.client.auth]
        patterns=[
          "/foo/**",
          "/bar/**"
        ]
gcp {
  http {
    client {
      auth {
        patterns = ["/foo/**", "/bar/**"]
      }
    }
  }
}
{
  gcp {
    http {
      client {
        auth {
          patterns = ["/foo/**", "/bar/**"]
        }
      }
    }
  }
}
{
  "gcp": {
    "http": {
      "client": {
        "auth": {
          "patterns": ["/foo/**", "/bar/**"]
        }
      }
    }
  }
}

7 Cloud Function Support

Micronaut GCP includes extended support for Google Cloud Function - designed for serverless workloads.

7.1 Simple Functions

Micronaut GCP offers two ways to write cloud functions with Micronaut. The first way is more low level and involves using Micronaut’s built in support for functions. Simply add the following dependency to your classpath:

implementation("io.micronaut.gcp:micronaut-gcp-function")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-function</artifactId>
</dependency>

Then add the Cloud Function API as a compileOnly dependency (provided with Maven):

compileOnly("com.google.cloud.functions:functions-framework-api")
<dependency>
    <groupId>com.google.cloud.functions</groupId>
    <artifactId>functions-framework-api</artifactId>
    <scope>provided</scope>
</dependency>

Now define a class that implements one of Google Cloud Found’s interfaces, for example com.google.cloud.functions.BackgroundFunction, and extends from io.micronaut.function.executor.FunctionInitializer.

The following is an example of a BackgroundFunction that uses Micronaut and Google Cloud Function:

/*
 * Copyright 2017-2020 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package example.background;

import com.google.cloud.functions.*;
import io.micronaut.gcp.function.GoogleFunctionInitializer;

import jakarta.inject.*;
import java.util.*;

public class Example extends GoogleFunctionInitializer // (1)
        implements BackgroundFunction<PubSubMessage> { // (2)

    @Inject LoggingService loggingService; // (3)

    @Override
    public void accept(PubSubMessage message, Context context) {
        loggingService.logMessage(message);
    }
}

class PubSubMessage {
    String data;
    Map<String, String> attributes;
    String messageId;
    String publishTime;
}

@Singleton
class LoggingService {

    void logMessage(PubSubMessage message) {
        // log the message
    }
}
/*
 * Copyright 2017-2020 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package example.background

import com.google.cloud.functions.*
import io.micronaut.gcp.function.GoogleFunctionInitializer

import jakarta.inject.*

class Example extends GoogleFunctionInitializer // (1)
        implements BackgroundFunction<PubSubMessage> { // (2)

    @Inject LoggingService loggingService // (3)

    @Override
    void accept(PubSubMessage message, Context context) {
        loggingService.logMessage(message)
    }
}

class PubSubMessage {
    String data
    Map<String, String> attributes
    String messageId
    String publishTime
}

@Singleton
class LoggingService {

    void logMessage(PubSubMessage message) {
        // log the message
    }
}
/*
 * Copyright 2017-2020 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package example.background

import com.google.cloud.functions.BackgroundFunction
import com.google.cloud.functions.Context
import io.micronaut.gcp.function.GoogleFunctionInitializer
import jakarta.inject.Inject
import jakarta.inject.Singleton

class Example : GoogleFunctionInitializer(), // (1)
        BackgroundFunction<PubSubMessage> { // (2)
    @Inject
    lateinit var loggingService: LoggingService // (3)

    override fun accept(message: PubSubMessage, context: Context) {
        loggingService.logMessage(message)
    }
}

class PubSubMessage {
    var data: String? = null
    var attributes: Map<String, String>? = null
    var messageId: String? = null
    var publishTime: String? = null
}

@Singleton
class LoggingService {

    fun logMessage(message: PubSubMessage) {
        // log the message
    }
}
1 The function extends from io.micronaut.function.executor.FunctionInitializer
2 The function implements com.google.cloud.functions.BackgroundFunction
3 Dependency injection can be used on the fields

When you extend from FunctionInitializer the Micronaut ApplicationContext will be initialized and dependency injection will be performed on the function instance. You can use inject any bean using jakarta.inject.Inject as usual.

Functions require a no argument constructor hence you must use field injection (which requires lateinit in Kotlin) when injecting dependencies into the function itself.

The FunctionInitializer super class provides numerous methods that you can override to customize how the ApplicationContext is built if desired.

Running Functions Locally

Raw functions cannot be executed locally. They can be tested by instantiating the function and inspecting any side effects by providing mock arguments or mocking dependent beans.

Deployment

When deploying the function to Cloud Function you should use the fully qualified name of the function class as the handler reference.

First build the function with:

$ ./gradlew clean shadowJar

Then cd into the build/libs directory (deployment has to be done from the location where the JAR file resides):

$ cd build/libs

To deploy the function make sure you have gcloud CLI then run:

$ gcloud beta functions deploy myfunction --entry-point example.function.Function --runtime java11 --trigger-http

In the example above myfunction refers to the name of your function and can be changed to whatever name you prefer to name your function. example.function.Function refers to the fully qualified name of your function class.

To obtain the trigger URL you can use the following command:

$ YOUR_HTTP_TRIGGER_URL=$(gcloud beta functions describe myfunction --format='value(httpsTrigger.url)')

You can then use this variable to test the function invocation:

$ curl -i $YOUR_HTTP_TRIGGER_URL

7.2 HTTP Functions

It is common to want to take just a slice of a regular Micronaut HTTP server application and deploy it as a function.

Configuration

To facilitate this model, Micronaut GCP includes an additional module that allows you to use regular Micronaut annotations like @Controller and @Get to define your functions that can be deployed to cloud function.

With this model you need to add the micronaut-gcp-function-http dependency to your application:

implementation("io.micronaut.gcp:micronaut-gcp-function-http")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-function-http</artifactId>
</dependency>

And define the Google Function API as a development only dependency:

developmentOnly("com.google.cloud.functions:functions-framework-api")
<dependency>
    <groupId>com.google.cloud.functions</groupId>
    <artifactId>functions-framework-api</artifactId>
    <scope>provided</scope>
</dependency>

Running Functions Locally

First to run the function locally you should then make the regular Micronaut server a developmentOnly dependency since it is not necessary to include it in the JAR file that will be deployed to Cloud Function:

developmentOnly("io.micronaut:micronaut-http-server-netty")
<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-http-server-netty</artifactId>
    <scope>provided</scope>
</dependency>

You can then use ./gradlew run or ./mvnw compile exec:exec to run the function locally using Micronaut’s Netty-based server.

Alternatively, you could configure the Google Function Framework for Java which includes a Maven plugin, or for Gradle include the following:

Configuring the Function framework in Gradle
configurations {
    invoker
}

dependencies {
    invoker 'com.google.cloud.functions.invoker:java-function-invoker:1.0.0-beta1'
}


task('runFunction', type: JavaExec, dependsOn: classes) {
    main = 'com.google.cloud.functions.invoker.runner.Invoker'
    classpath(configurations.invoker)
    args(
            '--target', 'io.micronaut.gcp.function.http.HttpFunction',
            '--classpath', (configurations.runtimeClasspath + sourceSets.main.output).asPath,
            '--port', 8081

    )
}

With this in place you can run ./gradlew runFunction to run the function locally.

Deployment

When deploying the function to Cloud Function you should use the HttpFunction class as the handler reference.

First build the function with:

$ ./gradlew clean shadowJar

Then cd into the build/libs directory (deployment has to be done from the location where the JAR file resides):

$ cd build/libs

To deploy the function make sure you have gcloud CLI then run:

$ gcloud beta functions deploy myfunction --entry-point io.micronaut.gcp.function.http.HttpFunction --runtime java11 --trigger-http

In the example above myfunction refers to the name of your function and can be changed to whatever name you prefer to name your function.

To obtain the trigger URL you can use the following command:

$ YOUR_HTTP_TRIGGER_URL=$(gcloud beta functions describe myfunction --format='value(httpsTrigger.url)')

You can then use this variable to test the function invocation:

$ curl -i $YOUR_HTTP_TRIGGER_URL/hello/John
See the guide for Deploy an HTTP Function to Google Cloud Functions to learn more.

7.3 CloudEvents Functions

To use CloudEvents, add the following dependency:

implementation("io.micronaut.gcp:micronaut-gcp-function-cloudevents")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-function-cloudevents</artifactId>
</dependency>

In your application, create a class that extends GoogleCloudEventsFunction. Google publishes POJOs for Google Cloud Events. For example, to subscribe to a Google Cloud Storage event, your function may look like this:

public class TestFunction extends GoogleCloudEventsFunction<StorageObjectData> {
    @Override
    protected void accept(@NonNull CloudEventContext context, @Nullable StorageObjectData data) throws Exception {
    }
}

8 Google Cloud Run

Google Cloud Run ia a fully-managed serverless platform for containerized applications.

To learn more see the Micronaut guides for Google Cloud Run

9 Google Cloud Pub/Sub Support

9.1 Introduction

This project provides integration between Micronaut and Google Cloud PubSub. It uses the official Google Cloud Pub/Sub client java libraries to create Publisher and Subscribers while keeping a similar programming model for messaging as the ones defined in Micronaut RabbitMQ and Micronaut Kafka.

Support is provided for the Pull (for long-running processes) and Push (ideal for serverless environments such as Cloud Run) styles of message consumption using a consistent programming model.

The project does not attempt to create any of the resources in Google Cloud such as Topics and Subscription. Make sure your project has the correct resources and the Service Account being used has proper permissions.

9.2 Quickstart

To add support for Google Cloud Pub/Sub to an existing project, add the following dependencies to your build.

implementation("io.micronaut.gcp:micronaut-gcp-pubsub")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-pubsub</artifactId>
</dependency>

Creating a Pub/Sub Publisher with @PubSubClient

To publish messages to Google Cloud Pub/Sub, just define an interface that is annotated with @PubSubClient.

import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;

@PubSubClient // (1)
public interface AnimalClient {

    @Topic("animals") // (2)
    void send(byte[] data); // (3)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;

@PubSubClient // (1)
interface AnimalClient {

    @Topic("animals") // (2)
    void send(byte[] data) // (3)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic

@PubSubClient // (1)
interface AnimalClient {

	@Topic("animals") // (2)
	fun send(data: ByteArray) // (3)

}
1 The @PubSubClient annotation is used to designate this interface as a client
2 The topic animals will be used to publish messages
3 The method accepts one argument that will be used as message body
Make sure your current GCP project has a topic named animals before starting the application.

At compile time Micronaut will create an implementation of the above interface. You can then inject that interface on your business methods via @Inject and use it.

import io.micronaut.gcp.pubsub.support.Animal;
import jakarta.inject.Singleton;

@Singleton
public final class AnimalService {
    private final AnimalClient animalClient;

    public AnimalService(AnimalClient animalClient) { // (1)
        this.animalClient = animalClient;
    }

    public void someBusinessMethod(Animal animal) {
        byte[] serializedBody = serialize(animal);
        animalClient.send(serializedBody);
    }

    private byte[] serialize(Animal animal) { // (2)
        return null;
    }
}
import jakarta.inject.Singleton;

@Singleton
final class AnimalService {
    private final AnimalClient animalClient;

    AnimalService(AnimalClient animalClient) { // (1)
        this.animalClient = animalClient
    }

    void someBusinessMethod(Animal animal) {
        byte[] serializedBody = serialize(animal)
        animalClient.send(serializedBody)
    }

    private byte[] serialize(Animal animal) { // (2)
        return null
    }
}
import io.micronaut.gcp.pubsub.support.Animal
import jakarta.inject.Singleton

@Singleton
class AnimalService(private val animalClient: AnimalClient)  { // (1)

	fun someBusinessMethod(animal: Animal) {
		val serializedBody = serialize(animal)
		animalClient.send(serializedBody)
	}

	private fun serialize(animal: Animal): ByteArray { // (2)
		return ByteArray(0)
	}

}
1 Constructor based injection
2 Your custom serialization logic

Creating a Pub/Sub Pull Subscriber with @PubSubListener and @Subscription

To listen to Pub/Sub messages you can use @PubSubListener annotation on a class to mark it as a message listener.

The following example would listen on a subscription named animals that is configured to be attached to the topic of the previous example.

import io.micronaut.gcp.pubsub.annotation.PubSubListener;

import io.micronaut.gcp.pubsub.annotation.Subscription;

@PubSubListener // (1)
public class AnimalListener {

    @Subscription("animals") // (2)
    public void onMessage(byte[] data) { // (3)
        System.out.println("Message received");
    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;

import io.micronaut.gcp.pubsub.annotation.Subscription;

@PubSubListener // (1)
class AnimalListener {

    @Subscription("animals") // (2)
    void onMessage(byte[] data) { // (3)
        System.out.println("Message received")
    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;

import io.micronaut.gcp.pubsub.annotation.Subscription;

@PubSubListener // (1)
class AnimalListener {

	@Subscription("animals") // (2)
	fun onMessage(data: ByteArray) { // (3)
		println("Message received")
	}

}
1 Designates this class to be a message listener for Pub/Sub messages
2 Messages routed to the animals subscription will be delivered to this method via Pull subscription
3 The method has a single argument that contains the serialized body of the Pub/Sub message

Methods annotated with @Subscription will use a long-running Pull subscription style.

Creating a Pub/Sub Push Subscriber with @PubSubListener and @PushSubscription

Using the Push style of subscription is similar to the above example, only substituting the @PushSubscription annotation instead.

The following example would listen on a subscription named animals-push that is configured to be attached to the same topic of the previous example.

import io.micronaut.gcp.pubsub.annotation.PubSubListener;

import io.micronaut.gcp.pubsub.annotation.PushSubscription;

@PubSubListener // (1)
public class AnimalListener {

    @PushSubscription("animals-push") // (2)
    public void onPushMessage(byte[] data) { // (3)
        System.out.println("Message received");
    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;

import io.micronaut.gcp.pubsub.annotation.PushSubscription;

@PubSubListener // (1)
class AnimalListener {

    @PushSubscription("animals-push") // (2)
    void onPushMessage(byte[] data) { // (3)
        System.out.println("Message received")
    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;

import io.micronaut.gcp.pubsub.annotation.PushSubscription;

@PubSubListener // (1)
class AnimalListener {

    @PushSubscription("animals-push") // (2)
    fun onPushMessage(data: ByteArray) { // (3)
        println("Message received")
    }

}
1 Designates this class to be a message listener for Pub/Sub messages
2 Messages routed to the animals subscription will be delivered to this method via Push subscription
3 The method has a single argument that contains the serialized body of the Pub/Sub message

Methods annotated with @PushSubscription will the Push subscription style, where messages are delivered to the application via HTTP request.

9.3 Pub/Sub configuration Properties

You can customize certain aspects of the client. Pub/Sub client libraries leverage a ScheduledExecutorService for both message publishing and consumption.

If not specified the framework will configure the framework default Scheduled executor service to be used for both Publishers and Subscribers. See ExecutorConfiguration for the full list of options.

You can override it at PubSubConfigurationProperties to make a default value for all clients, or you can setup per Topic Publisher, or Subscription listener as discussed further bellow.

Creating a custom executor is presented on the section Configuring Thread pools .

When the application is shutting down, stopAsync() is invoked on all of the running GCP library Subscriber instances. The subscribers will attempt to fully process all pending in-memory messages before releasing the configured executor threads. By default, the framework will in turn continue to invoke the bound subscription methods on all @PubSubListener beans until all messages have been processed. To discontinue processing of messages and enable faster shutdown, the gcp.pubsub.nack-on-shutdown property can be set to true, which will cause all pending unprocessed messages that have not yet reached a subscriber method to be eagerly nacked, which will cause PubSub to redeliver them according to each subscription’s configuration.

🔗
Table 1. Configuration Properties for PubSubConfigurationProperties
Property Type Description

gcp.pubsub.publishing-executor

java.lang.String

The name of the {@link java.util.concurrent.ScheduledExecutorService} to be used by all {@link com.google.cloud.pubsub.v1.Publisher} instances. Defaults to "scheduled".

gcp.pubsub.subscribing-executor

java.lang.String

The name of the {@link java.util.concurrent.ScheduledExecutorService} to be used by all {@link com.google.cloud.pubsub.v1.Subscriber} instances. Defaults to "scheduled".

gcp.pubsub.keep-alive-interval-minutes

int

How often to ping the server to keep the channel alive. Defaults to 5 minutes.

gcp.pubsub.topic-endpoint

java.lang.String

Which endpoint the {@link com.google.cloud.pubsub.v1.Publisher} should publish messages to. Defaults to the global endpoint

gcp.pubsub.nack-on-shutdown

boolean

Whether subscribers should stop processing pending in-memory messages and eagerly nack() during application shutdown. Defaults to false.

9.4 Pub/Sub Publishers

Pub/Sub support in micronaut follows the same pattern used for other types of clients such as the HTTP Client. By annotating an interface with @PubSubClient the framework will create an implementation bean that handles communication with Pub/Sub for you.

Topics

In order to publish messages to Pub/Sub you need a method annotated with a @Topic annotation. For each annotated method the framework will create a dedicated Publisher with its own configuration for RetrySettings, BatchSettings and its own Executor. All settings can be overridden via configuration properties and the appropriate configuration can be passed via the configuration attribute of the @Topic annotation.

import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubClient // (1)
public interface SimpleClient {

    @Topic("animals")
    void send(PubsubMessage message); // (2)

    @Topic("animals")
    void send(byte[] data); // (3)

    @Topic("animals")
    void send(Animal animal); // (4)
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal


@PubSubClient // (1)
interface SimpleClient {

    @Topic("animals")
    void send(PubsubMessage message) // (2)

    @Topic("animals")
    void send(byte[] data) // (3)

    @Topic("animals")
    void send(Animal animal) // (4)
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal

@PubSubClient // (1)
interface SimpleClient {

	@Topic("animals")
	fun send(message: PubsubMessage) // (2)

	@Topic("animals")
	fun send(data: ByteArray) // (3)

	@Topic("animals")
	fun send(animal: Animal) // (4)
}
1 The @PubSubClient enables this interface to be replaced by bean implementation by micronaut.
2 Sending a PubsubMessage object skips any SerDes or contentType headers
3 Sending a byte array, SerDes will be bypassed and bytes will just be copied, but the default content type will still be application/json
4 You can send any domain object as long as a SerDes is configured to handled it. By default application/json is used.
If a body argument cannot be found, an exception will be thrown.

Resource naming

On the previous examples you noticed we used a simple naming for the topic such as animals. Inside Google Cloud however resources are only accessible via their FQN. In Pub/Sub case topics are named as projects/$PROJECT_ID/topics/$TOPIC_NAME. Micronaut integration with GCP will automatically grab the default project id available (please refer to the section Google Project Id ) and convert the simple naming of the resource into a FQN. You can also support a FQN that uses a different project name, that is useful when your project has to publish message to more than the default project configured for the Service Account.

When publishing to a different project, make sure your service account has the proper access to the resource.
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubClient
public interface MultipleProjectClient {

    @Topic("animals")
    void sendUS(Animal animal); // (1)

    @Topic("projects/eu-project/topics/animals")
    void sendEU(Animal animal); // (2)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubClient
interface MultipleProjectClient {

    @Topic("animals")
    void sendUS(Animal animal) // (1)

    @Topic("projects/eu-project/topics/animals")
    void sendEU(Animal animal) // (2)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal

@PubSubClient
interface MultipleProjectClient {

	@Topic("animals")
	fun sendUS(animal: Animal) // (1)

	@Topic("projects/eu-project/topics/animals")
	fun sendEU(animal: Animal) // (2)

}
1 This would use the default project configured for the application.
2 This would override and publish messages to a different project.

9.4.1 Content-Type and message serialization

The contents of a PubSubMessage are always a base64 encoded ByteString. This framework provide a way to create custom Serialization/Deserialization as explained in the section Custom Serialization/Deserialization.

By default any body message argument will be serialized via JsonPubSubMessageSerDes unless specified otherwise via the contentType property of the @Topic annotation.

The rules of message serialization are the following:

  1. If the body type is PubSubMessage then SerDes is bypassed completely and no header is added to the message.

  2. If the body type is byte[] SerDes logic is bypassed, but a Content-Type header of application/json will be added unless overwritten by the @Topic annotation.

  3. For any other type, the type defined by contentType will be used to located the correct PubSubMessageSerDes to handle it, if none is passed application/json will be used.

import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.http.MediaType;

@PubSubClient
public interface CustomSerDesClient {

    @Topic("animals") // (1)
    void send(PubsubMessage pubsubMessage);

    @Topic("animals") // (2)
    void send(byte[] data);

    @Topic(value = "animals", contentType = MediaType.IMAGE_GIF) // (3)
    void sendWithCustomType(byte[] data);

    @Topic("animals") // (4)
    void send(Animal animal);

    @Topic(value = "animals", contentType = MediaType.APPLICATION_XML) // (5)
    void sendWithCustomType(Animal animal);

}
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.http.MediaType;

@PubSubClient
interface CustomSerDesClient {

    @Topic("animals") // (1)
    void send(PubsubMessage pubsubMessage)

    @Topic("animals") // (2)
    void send(byte[] data)

    @Topic(value = "animals", contentType = MediaType.IMAGE_GIF) // (3)
    void sendWithCustomType(byte[] data)

    @Topic("animals") // (4)
    void send(Animal animal)

    @Topic(value = "animals", contentType = MediaType.APPLICATION_XML) // (5)
    void sendWithCustomType(Animal animal)

}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.http.MediaType

@PubSubClient
interface CustomSerDesClient {

	@Topic("animals") // (1)
	fun send(pubsubMessage: PubsubMessage)

	@Topic("animals")  // (2)
	fun send(data: ByteArray)

	@Topic(value = "animals", contentType = MediaType.IMAGE_GIF) // (3)
	fun sendWithCustomType(data: ByteArray)

	@Topic("animals") // (4)
	fun send(animal: Animal)

	@Topic(value = "animals", contentType = MediaType.APPLICATION_XML) // (5)
	fun sendWithCustomType(animal: Animal)

}
1 Using PubsubMessage no SerDes will be used, no headers are added to the message, user is responsible to create the message.
2 Using byte array. No SerDes logic is used, however a Content-Type header in this example is added with value application/json
3 Using byte array and specifying a contentType. SerDes will be bypassed and a Content-Type header with value image/gif will be added.
4 No contentType defined, and JsonPubSubMessageSerDes will be use to serialize the object.
5 Using specific contentType, the framework will look for a PubSubMessageSerDes registered for type application/xml user must provide this bean or an exception is thrown at runtime.

9.4.2 Message Headers

Google Cloud Pub/Sub messages contain a dictionary of message attributes in the form of a Map<String, String>. The framework binds those attributes to a @Header annotation that can be used at the class level or to the method or an argument of the method.

import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.annotation.MessageHeader;

@PubSubClient
@MessageHeader(name = "application-name", value = "petclinic") // (1)
public interface CustomHeadersClient {

    @MessageHeader(name = "status", value = "healthy") // (2)
    @Topic("animals")
    void sendWithStaticHeaders(Animal animal);

    @Topic("animals")
    void sendWithDynamicHeaders(Animal animal, @MessageHeader(name = "code") Integer code); // (3)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.annotation.MessageHeader;

@PubSubClient
@MessageHeader(name = "application-name", value = "petclinic") // (1)
 interface CustomHeadersClient {

    @MessageHeader(name = "status", value = "healthy") // (2)
    @Topic("animals")
    void sendWithStaticHeaders(Animal animal)

    @Topic("animals")
    void sendWithDynamicHeaders(Animal animal, @MessageHeader(name = "code") Integer code) // (3)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.annotation.MessageHeader


@PubSubClient
@MessageHeader(name = "application-name", value = "petclinic") // (1)
interface CustomHeadersClient {

	@MessageHeader(name = "status", value = "healthy") // (2)
	@Topic("animals")
	fun sendWithStaticHeaders(animal: Animal)

	@Topic("animals")
	fun sendWithDynamicHeaders(animal: Animal, @MessageHeader(name = "code") code: Int) // (3)

}
1 Headers specified at class level will be added to all annotated methods using @Topic
2 You can pass a static value as part of @Header
3 Values can also be passed via arguments. A ConversionService will try to convert them into a String value.

9.4.3 Publisher properties

Pub/Sub allows each Publisher to have its own configuration for things such as executors or batching settings. This is useful when you need to publish messages to topic that uses different SLAs.

The framework allows you to create configurations and then bind those configurations to each topic annotation using the configuration parameter.

Table 1. Configuration properties for PublisherConfigurationProperties
Property Type Description

gcp.pubsub.publisher.*.executor

java.lang.String

Name of the executor to use. Default: scheduled

gcp.pubsub.publisher.*.retry.total-timeout

org.threeten.bp.Duration

How long the logic should keep trying the remote calluntil it gives up completely. Default 600 seconds

gcp.pubsub.publisher.*.retry.initial-retry-delay

org.threeten.bp.Duration

Delay before the first retry. Default: 100ms

gcp.pubsub.publisher.*.retry.retry-delay-multiplier

double

Controls the change in retry delay. The retry delay of the previous call is multiplied by the RetryDelayMultiplier to calculate the retry delay for the next call. Default: 1.3

gcp.pubsub.publisher.*.retry.max-retry-delay

org.threeten.bp.Duration

Puts a limit on the value of the retry delay, so that the RetryDelayMultiplier can’t increase the retry delay higher than this amount. Default: 60 seconds

gcp.pubsub.publisher.*.retry.max-attempts

int

Defines the maximum number of attempts to perform. Default: 0

gcp.pubsub.publisher.*.retry.jittered

boolean

Determines if the delay time should be randomized. Default: true

gcp.pubsub.publisher.*.retry.initial-rpc-timeout

org.threeten.bp.Duration

Controls the timeout for the initial RPC. Default: 5 seconds

gcp.pubsub.publisher.*.retry.rpc-timeout-multiplier

double

Controls the change in RPC timeout. The timeout of the previous call is multiplied by the RpcTimeoutMultiplier to calculate the timeout for the next call. Default: 1.0

gcp.pubsub.publisher.*.retry.max-rpc-timeout

org.threeten.bp.Duration

Puts a limit on the value of the RPC timeout, so that the RpcTimeoutMultiplier can’t increase the RPC timeout higher than this amount. Default 0

gcp.pubsub.publisher.*.batching.element-count-threshold

java.lang.Long

Set the element count threshold to use for batching. After this many elements are accumulated, they will be wrapped up in a batch and sent. Default: 100

gcp.pubsub.publisher.*.batching.request-byte-threshold

java.lang.Long

Set the request byte threshold to use for batching. After this many bytes are accumulated, the elements will be wrapped up in a batch and sent. Default 1000 (1Kb)

gcp.pubsub.publisher.*.batching.delay-threshold

org.threeten.bp.Duration

Set the delay threshold to use for batching. After this amount of time has elapsed (counting from the first element added), the elements will be wrapped up in a batch and sent. Default 1ms

gcp.pubsub.publisher.*.batching.is-enabled

java.lang.Boolean

Indicate if the batching is enabled. Default : true

gcp.pubsub.publisher.*.flow-control.max-outstanding-element-count

java.lang.Long

Maximum number of outstanding elements to keep in memory before enforcing flow control.

gcp.pubsub.publisher.*.flow-control.max-outstanding-request-bytes

java.lang.Long

Maximum number of outstanding bytes to keep in memory before enforcing flow control.

gcp.pubsub.publisher.*.flow-control.limit-exceeded-behavior

com.google.api.gax.batching.FlowController$LimitExceededBehavior

The behavior of FlowController when the specified limits are exceeded. Defaults to Ignore.

For example suppose you have the following configuration:

gcp.pubsub.publisher.batching.executor=batch-executor
gcp.pubsub.publisher.immediate.executor=immediate-executor
gcp.pubsub.publisher.immediate.batching.enabled=false
gcp:
  pubsub:
    publisher:
      batching:
        executor: batch-executor
      immediate:
        executor: immediate-executor
        batching:
          enabled: false
[gcp]
  [gcp.pubsub]
    [gcp.pubsub.publisher]
      [gcp.pubsub.publisher.batching]
        executor="batch-executor"
      [gcp.pubsub.publisher.immediate]
        executor="immediate-executor"
        [gcp.pubsub.publisher.immediate.batching]
          enabled=false
gcp {
  pubsub {
    publisher {
      batching {
        executor = "batch-executor"
      }
      immediate {
        executor = "immediate-executor"
        batching {
          enabled = false
        }
      }
    }
  }
}
{
  gcp {
    pubsub {
      publisher {
        batching {
          executor = "batch-executor"
        }
        immediate {
          executor = "immediate-executor"
          batching {
            enabled = false
          }
        }
      }
    }
  }
}
{
  "gcp": {
    "pubsub": {
      "publisher": {
        "batching": {
          "executor": "batch-executor"
        },
        "immediate": {
          "executor": "immediate-executor",
          "batching": {
            "enabled": false
          }
        }
      }
    }
  }
}

You can then apply it to individual methods as:

import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubClient
public interface CustomConfigurationClient {

    @Topic(value = "animals", configuration = "batching") // (1)
    void batchSend(Animal animal);

    @Topic(value = "animals", configuration = "immediate") // (2)
    void send(Animal animal);

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubClient
interface CustomConfigurationClient {

    @Topic(value = "animals", configuration = "batching") // (1)
    void batchSend(Animal animal)

    @Topic(value = "animals", configuration = "immediate") // (2)
    void send(Animal animal)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal

@PubSubClient
interface CustomConfigurationClient {
	@Topic(value = "animals", configuration = "batching")
	fun batchSend(animal: Animal)  // (1)

	@Topic(value = "animals", configuration = "immediate")
	fun send(animal: Animal)  // (2)
}
1 The Publisher will be configured using a configuration named batching
2 The Publisher will be configured using a configuration named immediate
FlowControlSettings are actually configured for the BatchingSettings property, due the nature of Google’s Builders the configuration was flattened at PubSubConfigurationProperties level, and it’s injected it into the RetrySettings later.

9.4.4 Retrieving message Ids (broker acknowledge)

All the examples so far have been using void on the method signature. However getting a message acknowledge from the broker is usually required.

Pub/Sub returns a String object that contains the message id that the broker generated. Your methods can also be defined using either String or Single<String> (for reactive support).

When you define your client you can actually choose between a few different method signatures. Depending on your choice you may get the message acknowledge back and control if the method is blocking or reactive.

  1. If your method has void as a return, then a blocking call to publish the message is made and you don’t get the message id returned by the Pub/Sub broker.

  2. If your method return a String then a blocking call to publish the message is made and the message id is returned.

  3. If your method returns Single<String> then it’s a reactive call, and you can subscribe to the publisher to retrieve the message id.

import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import reactor.core.publisher.Mono;

@PubSubClient
public interface CustomReturnClient {

    @Topic("animals")
    void send(Animal animal); // (1)

    @Topic("animals")
    String sendWithId(Animal animal); // (2)

    @Topic("animals")
    Mono<String> sendReactive(Animal animal); //(3)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
import reactor.core.publisher.Mono


@PubSubClient
interface CustomReturnClient {

    @Topic("animals")
    void send(Animal animal) // (1)

    @Topic("animals")
    String sendWithId(Animal animal) // (2)

    @Topic("animals")
    Mono<String> sendReactive(Animal animal) //(3)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
import reactor.core.publisher.Mono


@PubSubClient
interface CustomReturnClient {

	@Topic("animals")
	fun send(animal: Animal) // (1)

	@Topic("animals")
	fun sendWithId(animal: Animal): String // (2)

	@Topic("animals")
	fun sendReactive(animal: Animal?): Mono<String> //(3)
}
1 Blocking call, message id is not returned
2 Blocking call, message id is returned as String
3 Reactive call

9.5 Restricting locations and message ordering

Restricting storage locations

Google Cloud Pub/Sub is a globally distributed event platform. When a client publishes a message, the platform stores the message on the nearest region to the publisher. Sometimes regulations such as GDPR impose restrictions on where customer data can live. When using Micronaut integration you can specify the endpoint of the topic, either via the topic annotation or properties, so that Pub/Sub will persist the message data on the specified region. To learn more about this feature visit the resource location restriction page.

import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubClient
public interface LocationClient {
    @Topic(value = "animals", endpoint = "europe-west1-pubsub.googleapis.com:443") // (1)
    void send(Animal animal);
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal

@PubSubClient
interface LocationClient {
    @Topic(value = "animals", endpoint = "europe-west1-pubsub.googleapis.com:443") // (1)
    void send(Animal animal);
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal

@PubSubClient
interface LocationClient {
	@Topic(value = "animals", endpoint = "europe-west1-pubsub.googleapis.com:443") // (1)
	fun send(animal: Animal?)
}
1 The endpoint attribute enforces that messages sent via this method will be stored on the europe-west1 region.

Message ordering

Google Cloud Pub/Sub supports message ordering if messages are published to a single location, and specify an ordering key. An example of this would be trading orders placed for a specific symbol. If you use the symbol as the ordering key, all messages regardless of how many publishers are guaranteed to be delivered in order.

To enable message ordering for your publishers, use the @OrderingKey in one of the method’s arguments to declare it an ordering key.

import io.micronaut.gcp.pubsub.annotation.OrderingKey;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Order;

@PubSubClient
public interface OrderClient {
    @Topic(value = "orders", endpoint = "us-central1-pubsub.googleapis.com:443") // (1)
    void send(Order order, @OrderingKey String key); // (2)
}
import io.micronaut.gcp.pubsub.annotation.OrderingKey
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Order

@PubSubClient
interface OrderClient {
    @Topic(value = "orders", endpoint = "us-central1-pubsub.googleapis.com:443") // (1)
    void send(Order order, @OrderingKey String key) // (2)
}
import io.micronaut.gcp.pubsub.annotation.OrderingKey
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Order

@PubSubClient
interface OrderClient {
	@Topic(value = "orders", endpoint = "us-central1-pubsub.googleapis.com:443") // (1)
	fun send(order: Order, @OrderingKey key: String) // (2)
}
1 Ordering only works on regional endpoints, so you must declare an endpoint first
2 Although the Pub/Sub API requires the key to be a String, the framework uses a conversion service to convert other types to String.

Here’s an example of how to use ordering on your clients:

import io.micronaut.gcp.pubsub.support.Order;
import jakarta.inject.Singleton;

@Singleton
public final class OrderService {

    private final OrderClient client;

    public OrderService(OrderClient client) {
        this.client = client;
    }

    public void placeOrder() {
        Order order = new Order(100, "GOOG");
        client.send(order, order.getSymbol());
    }

}
import io.micronaut.gcp.pubsub.support.Order
import jakarta.inject.Singleton

@Singleton
class OrderService {

    private final OrderClient client

    OrderService(OrderClient client) {
        this.client = client
    }

    void placeOrder() {
        Order order = new Order(100, "GOOG")
        client.send(order, order.getSymbol())
    }

}
import io.micronaut.gcp.pubsub.support.Order
import jakarta.inject.Singleton

@Singleton
class OrderService(private val client: OrderClient) {
	fun placeOrder() {
		val order = Order(100, "GOOG")
		client.send(order, order.symbol)
	}
}

9.6 Receiving messages via @PubSubListener methods

To start receiving messages you annotate a class with @PubSubListener, the framework will then use AOP to deliver messages to methods annotated with either @Subscription or @PushSubscription.

The semantics for how methods work when annotated with these two different subscription annotations are identical, differing only by the infrastructure that the framework transparently sets up to deliver the messages. In the following examples, @Subscription can be exchanged for @PushSubscription and the behavior will be the same except for some minor differences noted in the following sections.

Subscriptions

Pull Subscriptions

All methods annotated with @Subscription will be invoked by the framework in response to receiving messages via Pull subscription. Each annotated method creates an individual Subscriber, that can be configured using the configuration parameter of the @Subscription annotation.

Pull subscriptions are long-running processes that continually poll the PubSub service, and are meant to be used in an environment such as Google Kubernetes Engine.

Push Subscriptions

Methods annotated with @PushSubscription will be invoked by the framework in response to receiving messages via Push subscription.

Push messages are sent to the application by the PubSub service via HTTP request, making them an ideal fit for serverless environments such as Cloud Run.

When Push subscriptions are enabled, the application will expose a single HTTP endpoint for processing all push requests, and messages will be routed to the matching PushSubscription method. The default path for this endpoint is /push. This endpoint URL must be specified when setting up a Push subscription in GCP.

As Push message handling uses HTTP, you must have a Micronaut HTTP server implementation available on your classpath, or else push handling will be disabled.
The available parameters of the @PushSubscription annotation are identical to those of @Subscription, except for configuration which is relevant only to Pull subscriptions.
Methods annotated with @Subscription or @PushSubscription must be unique in your application. If two distinct methods try to subscribe to the same Subscription an error is thrown. This is intended to avoid issues with message Acknowledgement control.
The annotated method must have at least one argument that is bound to the body of the message or an exception is thrown.

Resource naming

Just as described in the Pub/Sub Publisher section, subscriptions also use simple names such as animals. Inside Google Cloud however resources are only accessible via their FQN. A Subscription name follows the pattern: projects/$PROJECT_ID/subscriptions/$SUBSCRIPTION_NAME. Micronaut integration with GCP will automatically grab the default project id available (please refer to the section Google Project Id ) and convert the simple naming of the resource into a FQN. You can also pass a FQN as the subscription name. This is helpful when you need to listen to subscriptions from different projects.

import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubListener // (1)
public class SimpleSubscriber {

    @Subscription("animals") // (2)
    public void onMessage(Animal animal) {

    }

    @Subscription("projects/eu-project/subscriptions/animals") // (3)
    public void onMessageEU(Animal animal) {

    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal

@PubSubListener // (1)
class SimpleSubscriber {

    @Subscription("animals") // (2)
    void onMessage(Animal animal) {

    }

    @Subscription("projects/eu-project/subscriptions/animals") // (3)
    void onMessageEU(Animal animal) {

    }
}
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal

@PubSubListener // (1)
class SimpleSubscriber {

	@Subscription("animals") // (2)
	fun onMessage(animal: Animal) {
	}

	@Subscription("projects/eu-project/subscriptions/animals") // (3)
	fun onMessageEU(animal: Animal) {
	}
}
1 @PubSubListener marks this class to be a Message Listener
2 Methods annotated with @Subscription will receive messages from that subscription
3 You can also use a FQN for the subscription, specially when you need to access a resource on a different project
When publishing to a different project, make sure your service account has the proper access to the resource.

9.6.1 Content-Type and message deserialization

The framework provides a custom serialization/deserialization (SerDes) mechanism for both message producers and message listeners. On the receiving end the rules to deserialize a PubSubMessage are the following:

  1. If the body argument of the method is of PubSubMessage type, SerDes is bypassed and the "raw" message is copied to the argument.

  2. If the body argument of the method is a byte array, SerDes is bypassed and the byte contents of the PubSubMessage are copied to the argument.

  3. If the body argument is a Pojo then the following applies:

    1. The default Content-Type is application/json and the framework will use it if not overridden

    2. If the message contains an attribute Content-Type that value is used

    3. Finally if the @Subscription or @PushSubscription has a contentType value this value overrides all of the previous values

Automatic SerDes is a nice feature that the framework offers, but sometimes you may need to have access to the PubSubMessage id. This is provided via the @MessageId annotation. Once you annotate an argument of type String with this annotation, the message id will be copied to that argument.

PubSubMessage ids are always of type String, thus your annotated argument must also be a String.
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.MessageId;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.http.MediaType;

@PubSubListener
public class ContentTypeSubscriber {

    @Subscription("raw-subscription") // (1)
    void receiveRaw(byte[] data, @MessageId String id) {
    }

    @Subscription("native-subscription") // (2)
    void receiveNative(PubsubMessage message) {
    }

    @Subscription("animals") // (3)
    void receivePojo(Animal animal, @MessageId String id) {
    }

    @Subscription(value = "animals-legacy", contentType = MediaType.APPLICATION_XML) // (4)
    void receiveXML(Animal animal, @MessageId String id) {
    }

}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.MessageId;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubListener
class ContentTypeSubscriber {

    @Subscription("raw-subscription") // (1)
    void receiveRaw(byte[] data, @MessageId String id) {
    }

    @Subscription("native-subscription") // (2)
    void receiveNative(PubsubMessage message) {
    }

    @Subscription("animals") // (3)
    void receivePojo(Animal animal, @MessageId String id) {
    }

    @Subscription(value = "animals-legacy", contentType = "application/xml") // (4)
    void receiveXML(Animal animal, @MessageId String id) {
    }

}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.MessageId
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal

@PubSubListener
class ContentTypeSubscriber(private val messageProcessor: MessageProcessor)  {
	@Subscription("raw-subscription")
	fun	receiveRaw(data: ByteArray, @MessageId id: String) { // (1)
        messageProcessor.handleByteArrayMessage(data).block()
	}

	@Subscription("native-subscription")
	fun  receiveNative(message: PubsubMessage) { // (2)
        messageProcessor.handlePubsubMessage(message).block()
	}

	@Subscription("animals")
	fun receivePojo(animal: Animal, @MessageId id: String) { // (3)
        messageProcessor.handleAnimalMessage(animal).block()
	}

	@Subscription(value = "animals-legacy", contentType = "application/xml")
	fun  receiveXML(animal: Animal, @MessageId id: String) { // (4)
        messageProcessor.handleAnimalMessage(animal).block()
	}
}
1 Bytes are copied, SerDes is bypassed, message id injected for usage
2 SerDes is bypassed, PubSubMessage object is copied, no need to use @MessageId
3 The framework will try to deserialize this payload. If no Content-Type header is found, will default to application/json
4 Uses a custom SerDes and the framework will find a PubSubMessageSerDes that can handle application/xml

Though the deserialization is identical for @PushSubscription methods, one thing that requires additional consideration is that since Push messages are delivered via HTTP the subscriber methods will be executed on the main HTTP event loop thread by default. Care must be taken not to block the event loop thread.

As a convenience, @PushSubscription methods that are known to use blocking operations during the course of message processing may be annotated with @ExecuteOn. This will cause the invocation of the method to occur in a separate thread using the ExecutorService specified in the annotation.

If all the subscriber methods in a given @PubSubListener class are known to be blocking, then the @ExecuteOn annotation may be used at the class level instead and all @PushSubscription methods in that class will be executed by the specified ExecutorService.

If you use the TaskExecutors.BLOCKING ExecutorService, Virtual Threads will be used if available.

The following example is equivalent to the preceding one, except using @ExecuteOn at the class level and @PushSubscription methods:

import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.MessageId;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.PushSubscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.http.MediaType;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;

@PubSubListener
@ExecuteOn(TaskExecutors.BLOCKING) // (1)
public class ContentTypePushSubscriber {

    @PushSubscription("raw-push-subscription") // (2)
    void receiveRaw(byte[] data, @MessageId String id) {
        //process with blocking code
    }

    @PushSubscription("native-push-subscription") // (3)
    void receiveNative(PubsubMessage message) {
        //process with blocking code
    }

    @PushSubscription("animals-push") // (4)
    void receivePojo(Animal animal, @MessageId String id) {
        //process with blocking code
    }

    @PushSubscription(value = "animals-legacy-push", contentType = MediaType.APPLICATION_XML) // (5)
    void receiveXML(Animal animal, @MessageId String id) {
        //process with blocking code
    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.PushSubscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn

@PubSubListener
@ExecuteOn(TaskExecutors.BLOCKING) // (1)
class ContentTypePushSubscriber {

    @PushSubscription("raw-push-subscription") // (2)
    void receiveRaw(byte[] data, @MessageId String id) {
        //process with blocking code
    }

    @PushSubscription("native-push-subscription") // (3)
    void receiveNative(PubsubMessage message) {
        //process with blocking code
    }

    @PushSubscription("animals-push") // (4)
    void receivePojo(Animal animal, @MessageId String id) {
        //process with blocking code
    }

    @PushSubscription(value = "animals-legacy-push", contentType = "application/xml") // (5)
    void receiveXML(Animal animal, @MessageId String id) {
        //process with blocking code
    }

}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.MessageId
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.PushSubscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn


@PubSubListener
@ExecuteOn(TaskExecutors.BLOCKING) // (1)
class ContentTypePushSubscriber(private val messageProcessor: MessageProcessor)  {
	@PushSubscription("raw-push-subscription")
	fun	receiveRaw(data: ByteArray, @MessageId id: String) { // (2)
        messageProcessor.handleByteArrayMessage(data).block()
	}

	@PushSubscription("native-push-subscription")
	fun  receiveNative(message: PubsubMessage) { // (3)
        messageProcessor.handlePubsubMessage(message).block()
	}

	@PushSubscription("animals-push")
	fun receivePojo(animal: Animal, @MessageId id: String) { // (4)
        messageProcessor.handleAnimalMessage(animal).block()
	}

	@PushSubscription(value = "animals-legacy-push", contentType = "application/xml")
	fun  receiveXML(animal: Animal, @MessageId id: String) { // (5)
        messageProcessor.handleAnimalMessage(animal).block()
	}
}
1 The class is annotated with @ExecuteOn so that all of the @PushSubscription methods will be executed on a separate thread
2 Bytes are copied, SerDes is bypassed, message id injected for usage
3 SerDes is bypassed, PubSubMessage object is copied, no need to use @MessageId
4 The framework will try to deserialize this payload. If no Content-Type header is found, will default to application/json
5 Uses a custom SerDes and the framework will find a PubSubMessageSerDes that can handle application/xml

9.6.2 Receiving and Returning Reactive Types

In addition to byte[], PubsubMessage, and POJOs you can also define listener methods that receive a Reactive type such as a Reactor Mono or a RxJava Single. The same deserialization rules as above will be applied using the type parameter of the Reactive type.

For the conversion to Reactive types to work correctly, you must add either the library Micronaut Reactor or Micronaut RxJava 3 to your application’s dependencies.

For example, using Reactor:

Using Reactive Types
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.MessageId;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
import reactor.core.publisher.Mono;

@PubSubListener
public class ReactiveSubscriber {

    private final MessageProcessor messageProcessor;

    public ReactiveSubscriber(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor;
    }

    @Subscription("raw-subscription") // (1)
    Mono<Object> receiveRaw(Mono<byte[]> data, @MessageId String id) {
        return data.flatMap(messageProcessor::handleByteArrayMessage);
    }

    @Subscription("native-subscription") // (2)
    Mono<Object> receiveNative(Mono<PubsubMessage> message) {
        return message.flatMap(messageProcessor::handlePubSubMessage);
    }

    @Subscription("animals") // (3)
    Mono<Object> receivePojo(Mono<Animal> animal, @MessageId String id) {
        return animal.flatMap(messageProcessor::handleAnimalMessage);
    }

    @Subscription(value = "animals-legacy", contentType = "application/xml") // (4)
    Mono<Object> receiveXML(Mono<Animal> animal, @MessageId String id) {
        return animal.flatMap(messageProcessor::handleAnimalMessage);
    }
}
Using Reactive Types
import io.micronaut.gcp.pubsub.annotation.MessageId
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import reactor.core.publisher.Mono


@PubSubListener
class ReactiveSubscriber {

    private final MessageProcessor messageProcessor

    ReactiveSubscriber(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor
    }

    @Subscription("raw-subscription") // (1)
    Mono<Object> receiveRaw(Mono<byte[]> data, @MessageId String id) {
        return data.flatMap(messageProcessor::handleByteArrayMessage)
    }

    @Subscription("native-subscription") // (2)
    Mono<Object> receiveNative(Mono<PubsubMessage> message) {
        return message.flatMap(messageProcessor::handlePubSubMessage)
    }

    @Subscription("animals") // (3)
    Mono<Object> receivePojo(Mono<Animal> animal, @MessageId String id) {
        return animal.flatMap(messageProcessor::handleAnimalMessage)
    }

    @Subscription(value = "animals-legacy", contentType = "application/xml") // (4)
    Mono<Object> receiveXML(Mono<Animal> animal, @MessageId String id) {
        return animal.flatMap(messageProcessor::handleAnimalMessage)
    }

}
Using Reactive Types
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.MessageId
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import reactor.core.publisher.Mono


@PubSubListener
class ReactiveSubscriber(private val messageProcessor: MessageProcessor) {

    @Subscription("raw-subscription")
    fun receiveRaw(data: Mono<ByteArray>, @MessageId id: String): Mono<Any> { // (1)
        return data.flatMap { payload ->
            messageProcessor.handleByteArrayMessage(payload)
        }
    }

    @Subscription("native-subscription")
    fun receiveNative(message: Mono<PubsubMessage>): Mono<Any> { // (2)
        return message.flatMap { payload ->
            messageProcessor.handlePubsubMessage(payload)
        }
    }

    @Subscription("animals")
    fun receivePojo(message: Mono<Animal>, @MessageId id: String): Mono<Any> { // (3)
        return message.flatMap { animal ->
            messageProcessor.handleAnimalMessage(animal)
        }
    }

    @Subscription(value = "animals-legacy", contentType = "application/xml")
    fun receiveXML(message: Mono<Animal>, @MessageId id: String): Mono<Any> { // (4)
        return message.flatMap { animal ->
            messageProcessor.handleAnimalMessage(animal)
        }
    }
}
1 Bytes are copied and wrapped in a Mono, SerDes is bypassed, message id injected for usage
2 SerDes is bypassed, PubSubMessage object is copied and wrapped in a Mono, no need to use @MessageId
3 The framework will try to deserialize this payload into Mono<Animal>. If no Content-Type header is found, will default to application/json
4 Uses a custom SerDes and the framework will find a PubSubMessageSerDes that can handle application/xml and then pass the deserialized payload as Mono<Animal>.

Using Reactor with push subscriptions is similar:

Using Reactive Types With Push Subscriptions
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.MessageId;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.PushSubscription;
import io.micronaut.gcp.pubsub.support.Animal;
import reactor.core.publisher.Mono;

@PubSubListener
public class ReactivePushSubscriber {

    private final MessageProcessor messageProcessor;

    public ReactivePushSubscriber(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor;
    }

    @PushSubscription("raw-push-subscription") // (1)
    Mono<Object> receiveRaw(Mono<byte[]> data, @MessageId String id) {
        return data.flatMap(messageProcessor::handleByteArrayMessage);
    }

    @PushSubscription("native-push-subscription") // (2)
    Mono<Object> receiveNative(Mono<PubsubMessage> message) {
        return message.flatMap(messageProcessor::handlePubSubMessage);
    }

    @PushSubscription("animals-push") // (3)
    Mono<Object> receivePojo(Mono<Animal> animal, @MessageId String id) {
        return animal.flatMap(messageProcessor::handleAnimalMessage);
    }

    @PushSubscription(value = "animals-legacy-push", contentType = "application/xml") // (4)
    Mono<Object> receiveXML(Mono<Animal> animal, @MessageId String id) {
        return animal.flatMap(messageProcessor::handleAnimalMessage);
    }
}
Using Reactive Types With Push Subscriptions
import io.micronaut.gcp.pubsub.annotation.PushSubscription
import io.micronaut.gcp.pubsub.support.Animal
import reactor.core.publisher.Mono


@PubSubListener
class ReactivePushSubscriber {

    private final MessageProcessor messageProcessor

    ReactivePushSubscriber(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor
    }

    @PushSubscription("raw-push-subscription") // (1)
    Mono<Object> receiveRaw(Mono<byte[]> data, @MessageId String id) {
        return data.flatMap(messageProcessor::handleByteArrayMessage)
    }

    @PushSubscription("native-push-subscription") // (2)
    Mono<Object> receiveNative(Mono<PubsubMessage> message) {
        return message.flatMap(messageProcessor::handlePubSubMessage)
    }

    @PushSubscription("animals-push") // (3)
    Mono<Object> receivePojo(Mono<Animal> animal, @MessageId String id) {
        return animal.flatMap(messageProcessor::handleAnimalMessage)
    }

    @PushSubscription(value = "animals-legacy-push", contentType = "application/xml") // (4)
    Mono<Object> receiveXML(Mono<Animal> animal, @MessageId String id) {
        return animal.flatMap(messageProcessor::handleAnimalMessage)
    }

}
Using Reactive Types With Push Subscriptions
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.MessageId
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.PushSubscription
import io.micronaut.gcp.pubsub.support.Animal
import reactor.core.publisher.Mono


@PubSubListener
class ReactivePushSubscriber(private val messageProcessor: MessageProcessor) {

    @PushSubscription("raw-push-subscription")
    fun receiveRaw(data: Mono<ByteArray>, @MessageId id: String): Mono<Any> { // (1)
        return data.flatMap { payload ->
            messageProcessor.handleByteArrayMessage(payload)
        }
    }

    @PushSubscription("native-push-subscription")
    fun receiveNative(message: Mono<PubsubMessage>): Mono<Any> { // (2)
        return message.flatMap { payload ->
            messageProcessor.handlePubsubMessage(payload)
        }
    }

    @PushSubscription("animals-push")
    fun receivePojo(message: Mono<Animal>, @MessageId id: String): Mono<Any> { // (3)
        return message.flatMap { animal ->
            messageProcessor.handleAnimalMessage(animal)
        }
    }

    @PushSubscription(value = "animals-legacy-push", contentType = "application/xml")
    fun receiveXML(message: Mono<Animal>, @MessageId id: String): Mono<Any> { // (4)
        return message.flatMap { animal ->
            messageProcessor.handleAnimalMessage(animal)
        }
    }
}
1 Bytes are copied and wrapped in a Mono, SerDes is bypassed, message id injected for usage
2 SerDes is bypassed, PubSubMessage object is copied and wrapped in a Mono, no need to use @MessageId
3 The framework will try to deserialize this payload into Mono<Animal>. If no Content-Type header is found, will default to application/json
4 Uses a custom SerDes and the framework will find a PubSubMessageSerDes that can handle application/xml and then pass the deserialized payload as Mono<Animal>.

Note that the above examples all return a Mono<Object> to allow for a fully non-blocking reactive message processing pipeline. When a Publisher is returned from a @Subscription method, it will be subscribed to by the framework and the message will not be auto-acknowledged until the Publisher completes successfully. If the Publisher completes with an error, the framework will nack() the message for re-delivery.

9.6.3 Message Headers

Google Cloud Pub/Sub messages contain a dictionary of message attributes in the form of a Map<String, String>.

The framework binds those attributes to a @Header annotation that can be used at an argument of the method.

A ConversionService is used to try to convert from the String value of the attribute to the target type on the method.

import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.annotation.MessageHeader;

@PubSubListener
public class CustomHeaderSubscriber {

    @Subscription("animals")
    public void onMessage(Animal animal, @MessageHeader("Content-Type") String contentType, @MessageHeader("code") Integer code) { // (1)

    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.annotation.MessageHeader

@PubSubListener
class CustomHeaderSubscriber {

    @Subscription("animals")
    void onMessage(Animal animal, @MessageHeader("Content-Type") String contentType, @MessageHeader("code") Integer code) { // (1)

    }

}
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.annotation.MessageHeader

@PubSubListener
class CustomHeaderSubscriber {

	@Subscription("animals")
	fun onMessage(animal: Animal,
				  @MessageHeader("Content-Type") contentType: String, @MessageHeader("code") code: Int) { // (1)
	}
}
1 Each annotated argument will be mapped to the corresponding attribute value. The ConversionService will try to convert to the target type

9.6.4 Pull Subscriber properties

Pub/Sub allows each Pull Subscriber to have its own configuration for things such as executors or flow control settings.

The framework allows you to create configurations and then bind those configurations to each @Subscription using the configuration parameter.

Table 1. Configuration properties for SubscriberConfigurationProperties
Property Type Description

gcp.pubsub.subscriber.*.executor

java.lang.String

Name of the executor to use. Default: scheduled

gcp.pubsub.subscriber.*.parallel-pull-count

java.lang.Integer

number of concurrent pulls. Default: 1

gcp.pubsub.subscriber.*.max-ack-extension-period

org.threeten.bp.Duration

Set the maximum period a message ack deadline will be extended. Default: one hour.

gcp.pubsub.subscriber.*.max-duration-per-ack-extension

org.threeten.bp.Duration

Set the upper bound for a single mod ack extention period. Default: one hour.

gcp.pubsub.subscriber.*.flow-control.max-outstanding-element-count

java.lang.Long

Maximum number of outstanding elements to keep in memory before enforcing flow control. Default: 1000

gcp.pubsub.subscriber.*.flow-control.max-outstanding-request-bytes

java.lang.Long

Maximum number of outstanding bytes to keep in memory before enforcing flow control. Default: 100 * 1024 * 1024

gcp.pubsub.subscriber.*.flow-control.limit-exceeded-behavior

com.google.api.gax.batching.FlowController$LimitExceededBehavior

Default: LimitExceededBehavior.Block

Suppose you have the following configuration for a subscriber:

gcp.pubsub.subscriber.custom.executor=custom-executor
gcp.pubsub.subscriber.custom.parallel-pull-count=4
gcp:
  pubsub:
    subscriber:
      custom:
        executor: custom-executor
        parallel-pull-count: 4
[gcp]
  [gcp.pubsub]
    [gcp.pubsub.subscriber]
      [gcp.pubsub.subscriber.custom]
        executor="custom-executor"
        parallel-pull-count=4
gcp {
  pubsub {
    subscriber {
      custom {
        executor = "custom-executor"
        parallelPullCount = 4
      }
    }
  }
}
{
  gcp {
    pubsub {
      subscriber {
        custom {
          executor = "custom-executor"
          parallel-pull-count = 4
        }
      }
    }
  }
}
{
  "gcp": {
    "pubsub": {
      "subscriber": {
        "custom": {
          "executor": "custom-executor",
          "parallel-pull-count": 4
        }
      }
    }
  }
}
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubListener
public class CustomConfigurationSubscriber {

    @Subscription(value = "animals", configuration = "custom") // (1)
    public void onMessage(Animal animal) {

    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal

@PubSubListener
class CustomConfigurationSubscriber {

    @Subscription(value = "animals", configuration = "custom") // (1)
    void onMessage(Animal animal) {

    }

}
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal

@PubSubListener
class CustomConfigurationSubscriber {

	@Subscription(value = "animals", configuration = "custom") // (1)
	fun onMessage(animal: Animal) {

	}

}
1 The Subscriber will be configured using a configuration named custom

9.6.5 Push Subscriber configuration

Push support is enabled by default if the Micronaut HTTP server support is on the classpath. It can be explicitly disabled via configuration (see the table below).

The push endpoint is exposed at /push by default. This path is also configurable.

🔗
Table 1. Configuration Properties for PubSubPushConfigurationProperties
Property Type Description

gcp.pubsub.push.enabled

boolean

Whether PubSub Push is enabled.

gcp.pubsub.push.path

java.lang.String

The configured path for the PubSub Push HTTP endpoint. Default value "/push"

9.6.6 Handling message acknowledgement

When messages are delivered to @Subscription and @PushSubscription methods, they are by default auto-acknowledged to the PubSub service if the method returns without exceptions. When an error occurs during processing, a nack signal will be sent to the service instead, potentially resulting in redelivery of the message.

See the section Consumer ErrorHandling for more information on error handling.
Google Cloud Pub/Sub controls delivery behavior in response to nack signals at the Subscription level, please refer to the Pub/Sub Subscriber documentation for more information

It is possible to have manual acknowledgement control by adding an argument of type Acknowledgement and manually invoking ack() or nack() methods..

If you provide an Acknowledgement type in your method and forget to invoke ack()/nack() the framework will log a warning message to let you know if you forgot to manually register an acknowledgement. Messages that are processed without an ack or nack signal being sent could potentially cause undesired behavior that could negatively affect performance.

The following example shows usage of manual acknowledgement:

Manual Acknowledgement of Pull Messages
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.Acknowledgement;
import reactor.core.publisher.Mono;

@PubSubListener
public class AcknowledgementSubscriber {

    private final MessageProcessor messageProcessor;

    public AcknowledgementSubscriber(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor;
    }

    @Subscription("animals")
    public void onMessage(Animal animal, Acknowledgement acknowledgement) {
        if (Boolean.TRUE.equals(messageProcessor.handleAnimalMessage(animal).block())) {
            acknowledgement.ack();
        } else {
            acknowledgement.nack();
        }
    }

    @Subscription("animals-async")
    public Mono<Boolean> onReactiveMessage(Mono<Animal> animal, Acknowledgement acknowledgement) {
        return animal.flatMap(messageProcessor::handleAnimalMessage)
            .doOnNext(result -> {
                if (Boolean.TRUE.equals(result)) {
                    acknowledgement.ack();
                } else {
                    acknowledgement.nack();
                }
            });
    }

}
Manual Acknowledgement of Pull Messages
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.Acknowledgement
import reactor.core.publisher.Mono


@PubSubListener
class AcknowledgementSubscriber {

    MessageProcessor messageProcessor

    AcknowledgementSubscriber(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor
    }

    @Subscription("animals")
    void onMessage(Animal animal, Acknowledgement acknowledgement) {
        if (Boolean.TRUE == messageProcessor.handleAnimalMessage(animal).block()) {
            acknowledgement.ack()
        } else {
            acknowledgement.nack()
        }
    }

    @Subscription("animals-async")
    Mono<Boolean> onReactiveMessage(Mono<Animal> animal, Acknowledgement acknowledgement) {
        return animal.flatMap(messageProcessor::handleAnimalMessage)
                .doOnNext(result -> {
                    if (Boolean.TRUE == result) {
                        acknowledgement.ack()
                    } else {
                        acknowledgement.nack()
                    }
                })
    }

}
Manual Acknowledgement of Pull Messages
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.Acknowledgement
import reactor.core.publisher.Mono


@PubSubListener
class AcknowledgementSubscriber(private val messageProcessor: MessageProcessor) {

    @Subscription("animals")
    fun onMessage(animal: Animal, acknowledgement: Acknowledgement) {
        if (messageProcessor.handleAnimalMessage(animal).block() == true) {
            acknowledgement.ack()
            messageProcessor.recordAcknowledgement(acknowledgement)
        } else {
            acknowledgement.nack()
            messageProcessor.recordAcknowledgement(acknowledgement)
        }
    }

    @Subscription("animals-async")
    fun onReactiveMessage(message: Mono<Animal>, acknowledgement: Acknowledgement): Mono<Boolean> {
        return message.flatMap { animal -> messageProcessor.handleAnimalMessage(animal) }
            .doOnNext { result ->
                if (result) {
                    acknowledgement.ack()
                    messageProcessor.recordAcknowledgement(acknowledgement)
                } else {
                    acknowledgement.nack()
                    messageProcessor.recordAcknowledgement(acknowledgement)
                }
            }
    }
}

The following example shows usage of manual acknowledgement with push messages:

Manual Acknowledgement of Push Messages
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.PushSubscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.Acknowledgement;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import reactor.core.publisher.Mono;

@PubSubListener
public class AcknowledgementPushSubscriber {

    private final MessageProcessor messageProcessor;

    public AcknowledgementPushSubscriber(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor;
    }

    @ExecuteOn(TaskExecutors.BLOCKING)
    @PushSubscription("animals-push")
    public void onMessage(Animal animal, Acknowledgement acknowledgement) {
        if (Boolean.TRUE.equals(messageProcessor.handleAnimalMessage(animal).block())) {
            acknowledgement.ack();
        } else {
            acknowledgement.nack();
        }
    }

    @PushSubscription("animals-async-push")
    public Mono<Boolean> onReactiveMessage(Mono<Animal> animal, Acknowledgement acknowledgement) {
        return animal.flatMap(messageProcessor::handleAnimalMessage)
            .doOnNext(result -> {
                if (Boolean.TRUE.equals(result)) {
                    acknowledgement.ack();
                } else {
                    acknowledgement.nack();
                }
            });
    }

}
Manual Acknowledgement of Push Messages
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import reactor.core.publisher.Mono

@PubSubListener
class AcknowledgementPushSubscriber {

    MessageProcessor messageProcessor

    AcknowledgementPushSubscriber(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor
    }

    @ExecuteOn(TaskExecutors.BLOCKING)
    @PushSubscription("animals-push")
    void onMessage(Animal animal, Acknowledgement acknowledgement) {
        if (Boolean.TRUE == messageProcessor.handleAnimalMessage(animal).block()) {
            acknowledgement.ack()
        } else {
            acknowledgement.nack()
        }
    }

    @PushSubscription("animals-async-push")
    Mono<Boolean> onReactiveMessage(Mono<Animal> animal, Acknowledgement acknowledgement) {
        return animal.flatMap(messageProcessor::handleAnimalMessage)
                .doOnNext(result -> {
                    if (Boolean.TRUE == result) {
                        acknowledgement.ack()
                    } else {
                        acknowledgement.nack()
                    }
                })
    }

}
Manual Acknowledgement of Push Messages
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.PushSubscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.Acknowledgement
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import reactor.core.publisher.Mono


@PubSubListener
class AcknowledgementPushSubscriber(private val messageProcessor: MessageProcessor) {

    @ExecuteOn(TaskExecutors.BLOCKING)
    @PushSubscription("animals-push")
    fun onMessage(animal: Animal, acknowledgement: Acknowledgement) {
        if (messageProcessor.handleAnimalMessage(animal).block() == true) {
            acknowledgement.ack()
            messageProcessor.recordAcknowledgement(acknowledgement)
        } else {
            acknowledgement.nack()
            messageProcessor.recordAcknowledgement(acknowledgement)
        }
    }

    @PushSubscription("animals-async-push")
    fun onReactiveMessage(message: Mono<Animal>, acknowledgement: Acknowledgement): Mono<Boolean> {
        return message.flatMap { animal -> messageProcessor.handleAnimalMessage(animal) }
            .doOnNext { result ->
                if (result) {
                    acknowledgement.ack()
                    messageProcessor.recordAcknowledgement(acknowledgement)
                } else {
                    acknowledgement.nack()
                    messageProcessor.recordAcknowledgement(acknowledgement)
                }
            }
    }
}

9.6.7 Consumer error handling

During message handling for listeners errors can happen at the framework or at your method level such as:

  • Problems binding method arguments to the message body

  • Content-Type deserialization issues

  • Acknowledgement

  • Uncaught exceptions at the annotated method

The framework provides a Global Error Handler DefaultPubSubMessageReceiverExceptionHandler, this handler will catch any errors and just log it. This behavior is far from ideal as messages would continue to be redelivered since they are not acknowledged.

There’s two ways you can provide error handling.

The PubSubMessageReceiverException contains references to the originating bean that threw the exception as well as a reference to PubSubConsumerState which has state information regarding the message handling.

import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.context.annotation.Requires;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.bind.PubSubConsumerState;
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException;
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubListener
public class ErrorHandlingSubscriber implements PubSubMessageReceiverExceptionHandler { // (1)
    /**
     *
     * @param animal payload
     */
    public void onMessage(Animal animal) {
        throw new RuntimeException("error");
    }

    @Override
    public void handle(PubSubMessageReceiverException exception) { // (2)

        Object listener = exception.getListener(); // (3)
        PubSubConsumerState state = exception.getState(); // (4)
        PubsubMessage originalMessage = state.getPubsubMessage();
        String contentType = state.getContentType();
        //some logic
        state.getAckReplyConsumer().ack(); // (5)

    }
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler
import io.micronaut.gcp.pubsub.support.Animal

@PubSubListener
class ErrorHandlingSubscriber implements PubSubMessageReceiverExceptionHandler { // (1)

    void onMessage(Animal animal) {
        throw new RuntimeException("error");
    }

    @Override
    void handle(PubSubMessageReceiverException exception) { // (2)

        def listener = exception.listener// (3)
        def state = exception.state // (4)
        def originalMessage = state.pubsubMessage
        def contentType = state.contentType
        //some logic
        state.getAckReplyConsumer().ack(); // (5)

    }
}
import io.micronaut.context.annotation.Requires
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler
import io.micronaut.gcp.pubsub.support.Animal

@PubSubListener
class ErrorHandlingSubscriber : PubSubMessageReceiverExceptionHandler { // (1)

	fun onMessage(animal: Animal) {
		throw RuntimeException("error")
	}

	override fun handle(exception: PubSubMessageReceiverException) { // (2)

		val listener = exception.listener // (3)
		val state = exception.state // (4)
		val originalMessage = state.pubsubMessage
		val contentType = state.contentType
		//some logic
		state.ackReplyConsumer.ack() // (5)

	}
}
1 Implement PubSubMessageReceiverExceptionHandler to mark this bean as a local error handler
2 Method invoked if any error happens during the processing of this PubSubListener class
3 Reference to the class that originated the exception
4 Contains information related to the subscription
5 Depending on your use case you can ack() of nack() the message

9.6.8 Custom Parameter Binding

Default Binding Functionality

Consumer argument binding is achieved through an ArgumentBinderRegistry that is specific for binding consumers from Pub/Sub messages. The class responsible for this is the PubSubBinderRegistry. The registry supports argument binders that are used based on an annotation applied to the argument or the argument type. All argument binders must implement either PubSubAnnotatedArgumentBinder or PubSubTypeArgumentBinder. The exception to that rule is the PubSubDefaultArgumentBinder which is used when no other binders support a given argument.

When an argument needs bound, the PubSubConsumerState is used as the source of all of the available data. The binder registry follows a small sequence of steps to attempt to find a binder that supports the argument.

  1. Search the annotation based binders for one that matches any annotation on the argument that is annotated with @Bindable.

  2. Search the type based binders for one that matches or is a subclass of the argument type.

  3. Return the default binder.

Custom Binding

To inject your own argument binding behavior, it is as simple as registering a bean. The existing binder registry will inject it and include it in the normal processing.

Annotation Binding

A custom annotation can be created to bind consumer arguments. A custom binder can then be created to use that annotation and the PubSubConsumerState to supply a value for the argument. The value may in fact come from anywhere, however for the purposes of this documentation, we will show how you would create an annotation to bind the message publish time.

import io.micronaut.core.bind.annotation.Bindable;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
@Bindable // (1)
public @interface MessagePublishTime {
}
import io.micronaut.core.bind.annotation.Bindable;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target([ElementType.PARAMETER])
@Bindable // (1)
@interface MessagePublishTime {
}
import io.micronaut.core.bind.annotation.Bindable

@MustBeDocumented
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.VALUE_PARAMETER)
@Bindable // (1)
annotation class MessagePublishTime
1 The @Bindable annotation is required for the annotation to be considered for binding.
import com.google.protobuf.util.Timestamps;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionService;

import jakarta.inject.Singleton;

@Singleton // (1)
public class MessagePublishTimeAnnotationBinder implements PubSubAnnotatedArgumentBinder<MessagePublishTime> { // (2)

    private final ConversionService conversionService;

    public MessagePublishTimeAnnotationBinder(ConversionService conversionService) { // (3)
        this.conversionService = conversionService;
    }

    @Override
    public Class<MessagePublishTime> getAnnotationType() {
        return MessagePublishTime.class;
    }

    @Override
    public BindingResult<Object> bind(ArgumentConversionContext<Object> context, PubSubConsumerState source) {
        Long epochMillis = Timestamps.toMillis(source.getPubsubMessage().getPublishTime()); // (4)
        return () -> conversionService.convert(epochMillis, context); // (5)
    }
}
import com.google.protobuf.util.Timestamps
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService

import jakarta.inject.Singleton;

@Singleton // (1)
class MessagePublishTimeAnnotationBinder implements PubSubAnnotatedArgumentBinder<MessagePublishTime> { // (2)

    private final ConversionService conversionService;

    public MessagePublishTimeAnnotationBinder(ConversionService conversionService) { // (3)
        this.conversionService = conversionService;
    }

    @Override
    Class<MessagePublishTime> getAnnotationType() {
        MessagePublishTime
    }

    @Override
    public BindingResult<Object> bind(ArgumentConversionContext<Object> context, PubSubConsumerState source) {
        def epochMillis = Timestamps.toMillis(source.pubsubMessage.publishTime) // (4)
        return { -> conversionService.convert(epochMillis, context) } // (5)
    }
}
import com.google.protobuf.util.Timestamps
import io.micronaut.core.bind.ArgumentBinder
import io.micronaut.core.bind.ArgumentBinder.BindingResult
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import jakarta.inject.Singleton

@Singleton // (1)
class MessagePublishTimeAnnotationBinder(private val conversionService: ConversionService) // (3)
	: PubSubAnnotatedArgumentBinder<MessagePublishTime> // (2)
{

	override fun getAnnotationType(): Class<MessagePublishTime> {
		return MessagePublishTime::class.java
	}

	override fun bind(context: ArgumentConversionContext<Any>, source: PubSubConsumerState): BindingResult<Any> {
		val epochMillis = Timestamps.toMillis(source.pubsubMessage.publishTime) // (4)
		return ArgumentBinder.BindingResult { conversionService.convert(epochMillis, context) } // (5)
	}


}
1 The class is made a bean by annotating with @Singleton.
2 The custom annotation is used as the generic type for the interface.
3 The conversion service is injected into the instance.
4 The message publish time is retrieved from the message state.
5 The value is converted to the argument type. For example this allows the argument to be a String even though the epochMillis is a Long.
You could also return a com.google.protobuf.Timestamp if you register the appropriate type converter to the conversion service, but such example is outside of the scope of this documentation.

The annotation can now be used on the argument in a consumer method.

import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubListener
public class CustomBindingSubscriber {

    /**
     *
     * @param animal payload
     * @param publishTime message publish time
     */
    public void onMessage(Animal animal, @MessagePublishTime Long publishTime) { // (1)

    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubListener
class CustomBindingSubscriber {

    /**
     *
     * @param animal payload
     * @param publishTime message publish time
     */
    void onMessage(Animal animal, @MessagePublishTime Long publishTime) { // (1)

    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.support.Animal

@PubSubListener
class CustomBindingSubscriber {
	/**
	 *
	 * @param animal payload
	 * @param publishTime message publish time
	 */
	fun onMessage(animal: Animal, @MessagePublishTime publishTime: Long) { // (1)

	}
}
1 The message publish time will now be passed to the annotated argument

9.7 Message Serialization/Deserialization (SerDes)

The serialization and deserialization of message bodies is handled through instances of PubSubMessageSerDes. The ser-des (Serializer/Deserializer) is responsible for both serialization and deserialization of Pub/Sub message bodies into the message body types defined in your clients and consumers methods.

The ser-des are managed by a PubSubMessageSerDesRegistry. All ser-des beans are injected in order into the registry and then searched for when serialization or deserialization is needed. The search is based on the Content-Type defined for the message, the framework default is application/json. If a ser-des can’t be located an exception is thrown. You can supply your own ser-des by simply registering a bean of type PubSubMessageSerDes.

For example let’s say you want to implement a ser-des that uses java native object serialization instead of the Json serialization provided. First you need to define a custom mimeType for that, we will use application/x.java, following the best principles for handling mime types.

This is a fictional example, java serialization is not ideal, and its not portable.
import io.micronaut.core.serialize.exceptions.SerializationException;
import io.micronaut.core.type.Argument;

import jakarta.inject.Singleton;
import java.io.*;

@Singleton // (1)
public class JavaMessageSerDes implements PubSubMessageSerDes {

    @Override
    public String supportedType() { // (2)
        return "application/x.java";
    }

    @Override
    public Object deserialize(byte[] data, Argument<?> type) {
        ByteArrayInputStream bin = new ByteArrayInputStream(data);
        Object result = null;
        try {
            ObjectInputStream reader = new ObjectInputStream(bin);
            result = reader.readObject();
        } catch (Exception e) {
            throw new SerializationException("Failed to deserialize object", e);
        }
        return result;
    }

    @Override
    public byte[] serialize(Object data) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try {
            ObjectOutputStream writer = new ObjectOutputStream(baos);
            writer.writeObject(data);
        } catch (IOException e) {
            throw new SerializationException("Failed to serialize object", e);
        }
        return baos.toByteArray();
    }

}
import io.micronaut.core.serialize.exceptions.SerializationException;
import io.micronaut.core.type.Argument;

import jakarta.inject.Singleton;
import java.io.*;

@Singleton // (1)
class JavaMessageSerDes implements PubSubMessageSerDes {

    @Override
    String supportedType() { // (2)
        "application/x.java"
    }

    @Override
    public Object deserialize(byte[] data, Argument<?> type) {
        ByteArrayInputStream bin = new ByteArrayInputStream(data)
        Object result = null
        try {
            ObjectInputStream reader = new ObjectInputStream(bin)
            result = reader.readObject();
        } catch (Exception e) {
            throw new SerializationException("Failed to deserialize object", e)
        }
        return result;
    }

    @Override
    byte[] serialize(Object data) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream()
        try {
            ObjectOutputStream writer = new ObjectOutputStream(baos)
            writer.writeObject(data)
        } catch (IOException e) {
            throw new SerializationException("Failed to serialize object", e)
        }
        return baos.toByteArray()
    }

}
import io.micronaut.core.serialize.exceptions.SerializationException
import io.micronaut.core.type.Argument
import java.io.*
import jakarta.inject.Singleton

@Singleton // (1)
class JavaMessageSerDes : PubSubMessageSerDes {

	override fun supportedType(): String { // (2)
		return "application/x.java"
	}

	override fun deserialize(data: ByteArray, type: Argument<*>): Any {
		val bin = ByteArrayInputStream(data)
		var result: Any? = null
		result = try {
			val reader = ObjectInputStream(bin)
			reader.readObject()
		} catch (e: Exception) {
			throw SerializationException("Failed to deserialize object", e)
		}
		return result
	}

	override fun serialize(data: Any): ByteArray {
		val baos = ByteArrayOutputStream()
		try {
			val writer = ObjectOutputStream(baos)
			writer.writeObject(data)
		} catch (e: IOException) {
			throw SerializationException("Failed to serialize object", e)
		}
		return baos.toByteArray()
	}
}
1 The class is declared as a singleton so it will be registered with the context
2 The contentType defines what kind of messages will this implementation be able to handle

To publish messages with this ser-des set the Content-Type for @Topic

@Topic(value = "animals", contentType = "application/x.java")

If the messages that are arriving on your subscriber already contain a Content-Type header with this type the framework will pick it up, or you can just force it on the @Suibscription annotation too.

@Subscription(value = "animals", contentType = "application/x.java")

9.8 Configuring Thread pools

The framework allows users to configure separate thread pools to be used for Publishers and Subscriber. You can set a reference to a pre-defined executor at Pub/Sub configuration properties, or create custom configurations for publishers and subscribers. Micronaut supports definition of custom ExecutorService implementations, see ExecutorConfiguration for the full list of options.

Pub/Sub client libraries require an ScheduledExecutorService for both Publisher and Subscriber make sure your custom executor is of type scheduled or an error will be thrown.

For example:

Configuring the custom thread pool
micronaut.executors.custom.type=scheduled
micronaut.executors.custom.nThreads=32
micronaut:
    executors:
        custom:
            type: scheduled
            nThreads: 32
[micronaut]
  [micronaut.executors]
    [micronaut.executors.custom]
      type="scheduled"
      nThreads=32
micronaut {
  executors {
    custom {
      type = "scheduled"
      nThreads = 32
    }
  }
}
{
  micronaut {
    executors {
      custom {
        type = "scheduled"
        nThreads = 32
      }
    }
  }
}
{
  "micronaut": {
    "executors": {
      "custom": {
        "type": "scheduled",
        "nThreads": 32
      }
    }
  }
}

If no configuration is supplied, the framework will use the default named scheduled executor.

9.9 Using Google Cloud Pub/Sub emulator

Google Cloud Pub/Sub has a local emulator to enable developers to test their applications locally with no need to connect to the cloud Pub/Sub service.

The framework supports automatic switching of the TransportChannelProvider to use PUBSUB_EMULATOR_HOST if this variable is set on the environment.

Make sure that you also set GCP_PROJECT_ID to be the same as the project you have configured the emulator to use. Otherwise the framework may pick up the projectId used by the default credentials.

You will need to manually configure topics and subscriptions on the emulator if you want to test locally. You can easily create resources using the Pub/Sub REST interface to create topics and subscriptions.

For instance, the following curl command would create a topic named micronaut-devices on the test-project project.

curl -XPUT $PUBSUB_EMULATOR_HOST/v1/projects/test-project/topics/micronaut-devices

9.10 Testing Push Subscribers

The Pub/Sub emulator does not provide any built-in support for simulating Push messages. That said, testing your PushSubscription endpoints can be done simply by using Micronaut’s HttpClient in to simulate delivery of Push messages.

Push Messages follow a specified JSON format, which can be constructed and serialized with the PushRequest record.

For example:

@MicronautTest
@Property(name = "spec.name", value = "ContentTypePushSubscriberTest")
@Property(name = "gcp.projectId", value = "test-project")
class ContentTypePushSubscriberSpec {
    @Inject
    JsonMapper jsonMapper;


    @Test
    void testJsonPojo() throws IOException {
        Animal dog = new Animal("dog");

        String encodedData = Base64.getEncoder().encodeToString(jsonMapper.writeValueAsBytes(dog)); // (1)

        PushRequest request = new PushRequest("projects/test-project/subscriptions/animals-push", // (2)
            new PushRequest.PushMessage(new HashMap<>(), encodedData, "1", "2021-02-26T19:13:55.749Z"));

        HttpResponse<?> response = pushClient.toBlocking().exchange(HttpRequest.POST("/push", request)); // (3)

        Assertions.assertEquals(HttpStatus.OK, response.getStatus());

    }

}
@MicronautTest
@Property(name = "spec.name", value = "ContentTypePushSubscriberSpec")
@Property(name = "gcp.projectId", value = "test-project")
class ContentTypePushSubscriberSpec extends Specification {
    @Inject
    JsonMapper jsonMapper


    void "receive pojo message from json"() {
        given:
        Animal dog = new Animal("dog")

        String encodedData = Base64.getEncoder().encodeToString(jsonMapper.writeValueAsBytes(dog)) // (1)

        PushRequest request = new PushRequest("projects/test-project/subscriptions/animals-push", // (2)
                new PushRequest.PushMessage(new HashMap<>(), encodedData, "1", "2021-02-26T19:13:55.749Z"))

        when:
        HttpResponse response = pushClient.toBlocking().exchange(HttpRequest.POST("/push", request)) // (3)

        then:
        response.status == HttpStatus.OK

    }

}
1 The data to be sent must be encoded as a Base64 String
2 The fully qualified subscription name must be specified
3 The message is sent to the /push endpoint

10 Google Cloud Secret Manager Support

Google Cloud Secret Manager is a secure and convenient storage system for API keys, passwords, certificates, and other sensitive data. Applications can also use it to store configuration files and use it as secure distributed repository of metadata.

To add support for Google Cloud Secret Manager to an existing project, add the following dependencies to your build.

implementation("io.micronaut.gcp:micronaut-gcp-secret-manager")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-secret-manager</artifactId>
</dependency>

10.1 Distributed Configuration

You can leverage Distributed Configuration and rely on Google Cloud Secret Manager to store your configuration files.

To enable it, add a bootstrap configuration file to src/main/resources/:

bootstrap configuration
micronaut.application.name=hello-world
micronaut.config-client.enabled=true
micronaut:
    application:
        name: hello-world
    config-client:
        enabled: true
[micronaut]
  [micronaut.application]
    name="hello-world"
  [micronaut.config-client]
    enabled=true
micronaut {
  application {
    name = "hello-world"
  }
  configClient {
    enabled = true
  }
}
{
  micronaut {
    application {
      name = "hello-world"
    }
    config-client {
      enabled = true
    }
  }
}
{
  "micronaut": {
    "application": {
      "name": "hello-world"
    },
    "config-client": {
      "enabled": true
    }
  }
}
Make sure you have configured the correct credentials on your project following the Setting up GCP section. And that the service account you designated your application has proper rights to read secrets. Follow the official Access Control guide for Secret Manager if you need more information.

Configuration file resolution

It’s important to note that Google Cloud Secret Manager does not support file extensions. When you upload your application configuration file (e.g. application.yml or application.properties) remove the file extension.

Micronaut tries to circumvent this limitation, adopting a convention when fetching configuration files. The following table displays the possible secrets that are fetched by default:

Name

Description

application

Configuration shared by all applications

application_${env}

Environment specific configuration, for instance if you have gcp and k8s as environments the framework will look for application_gcp and application_k8s

[APPLICATION_NAME]

Application specific configuration, example hello-world

[APPLICATION_NAME]_${env}

Application specific configuration bound to an environment, example hello-world_gcp

Supported formats

Internally the framework will try to read the files (that lack extensions) on the following order: yaml, json, properties. The first PropertySourceLoader that succeeds reading the contents of the secrets interrupts the chain.

Avoid loading default config files

If, for some reason, you don’t want to fetch these default config files, you can set gcp.secret-manager.default-config-enabled to false:

bootstrap configuration
micronaut.application.name=hello-world
micronaut.config-client.enabled=true
gcp.secret-manager.default-config-enabled=false
micronaut:
    application:
        name: hello-world
    config-client:
        enabled: true
gcp:
    secret-manager:
        default-config-enabled: false
[micronaut]
  [micronaut.application]
    name="hello-world"
  [micronaut.config-client]
    enabled=true
[gcp]
  [gcp.secret-manager]
    default-config-enabled=false
micronaut {
  application {
    name = "hello-world"
  }
  configClient {
    enabled = true
  }
}
gcp {
  secretManager {
    defaultConfigEnabled = false
  }
}
{
  micronaut {
    application {
      name = "hello-world"
    }
    config-client {
      enabled = true
    }
  }
  gcp {
    secret-manager {
      default-config-enabled = false
    }
  }
}
{
  "micronaut": {
    "application": {
      "name": "hello-world"
    },
    "config-client": {
      "enabled": true
    }
  },
  "gcp": {
    "secret-manager": {
      "default-config-enabled": false
    }
  }
}

Loading non default config files

It may be desired to sometimes load a secret as a PropertySource that does not follow the above convention. You can whitelist secrets that you would like to loaded as PropertySources using the following configuration:

bootstrap configuration
micronaut.application.name=hello-world
micronaut.config-client.enabled=true
gcp.secret-manager.custom-configs[0]=my-first-config
gcp.secret-manager.custom-configs[1]=my-second-config
micronaut:
    application:
        name: hello-world
    config-client:
        enabled: true
gcp:
    secret-manager:
      custom-configs:
        - my-first-config
        - my-second-config
[micronaut]
  [micronaut.application]
    name="hello-world"
  [micronaut.config-client]
    enabled=true
[gcp]
  [gcp.secret-manager]
    custom-configs=[
      "my-first-config",
      "my-second-config"
    ]
micronaut {
  application {
    name = "hello-world"
  }
  configClient {
    enabled = true
  }
}
gcp {
  secretManager {
    customConfigs = ["my-first-config", "my-second-config"]
  }
}
{
  micronaut {
    application {
      name = "hello-world"
    }
    config-client {
      enabled = true
    }
  }
  gcp {
    secret-manager {
      custom-configs = ["my-first-config", "my-second-config"]
    }
  }
}
{
  "micronaut": {
    "application": {
      "name": "hello-world"
    },
    "config-client": {
      "enabled": true
    }
  },
  "gcp": {
    "secret-manager": {
      "custom-configs": ["my-first-config", "my-second-config"]
    }
  }
}
The secrets you list on custom-configs must be valid configuration files and not single keys.

Loading key/value pairs into a PropertySource

Google Cloud Secret Manager can also host single keys mapping to a single String value. For instance one could have a DB_PASSWORD secret, instead of storing it on a configuration file. Because Secret Manager does not offer a hierarchical approach, loading all secrets in a project is not a viable solution. Instead you need to whitelist the keys you would like to pre-fetch such as the following config demonstrates:

bootstrap configuration
micronaut.application.name=hello-world
micronaut.config-client.enabled=true
gcp.secret-manager.keys[0]=DB_PASSWORD
gcp.secret-manager.keys[1]=dbUser
micronaut:
    application:
        name: hello-world
    config-client:
        enabled: true
gcp:
    secret-manager:
      keys:
        - DB_PASSWORD
        - dbUser
[micronaut]
  [micronaut.application]
    name="hello-world"
  [micronaut.config-client]
    enabled=true
[gcp]
  [gcp.secret-manager]
    keys=[
      "DB_PASSWORD",
      "dbUser"
    ]
micronaut {
  application {
    name = "hello-world"
  }
  configClient {
    enabled = true
  }
}
gcp {
  secretManager {
    keys = ["DB_PASSWORD", "dbUser"]
  }
}
{
  micronaut {
    application {
      name = "hello-world"
    }
    config-client {
      enabled = true
    }
  }
  gcp {
    secret-manager {
      keys = ["DB_PASSWORD", "dbUser"]
    }
  }
}
{
  "micronaut": {
    "application": {
      "name": "hello-world"
    },
    "config-client": {
      "enabled": true
    }
  },
  "gcp": {
    "secret-manager": {
      "keys": ["DB_PASSWORD", "dbUser"]
    }
  }
}

The framework will load all Secret Manager keys and prefix them with a sm key. The table below shows how entries could be accessed:

Secret Name

Micronaut resolved property

DB_PASSWORD

{sm.db.password}

dbUser

{sm.db.user}

Secret Versioning

Google Cloud Secret Manager supports secret versioning. For the distributed configuration use case Micronaut will always load the latest version of each secret into a property source file. It’s currently not supported specifying what version of your application configuration file should be loaded.

10.2 Low-Level Secret Manager Client access

Accessing Secret Manager via client libraries

If you require access to Secret Manager via the Google client java libraries, Micronaut provides a bean of type SecretManagerServiceClient for injection.

import com.google.cloud.secretmanager.v1.AccessSecretVersionRequest;
import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse;
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
import com.google.cloud.secretmanager.v1.SecretVersionName;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.runtime.event.annotation.EventListener;

public final class GoogleClientExample {

    private final SecretManagerServiceClient client;

    public GoogleClientExample(SecretManagerServiceClient googleSecretManagerClient) { // (1)
        this.client = googleSecretManagerClient;
    }

    @EventListener
    public void onStartup(StartupEvent event) {
        AccessSecretVersionResponse response = client.accessSecretVersion(AccessSecretVersionRequest
                .newBuilder()
                .setName(SecretVersionName.of("my-cloud-project", "secretName", "latest").toString())
                .build());
        String secret = response.getPayload().getData().toStringUtf8();
    }
}
import com.google.cloud.secretmanager.v1.AccessSecretVersionRequest;
import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse;
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
import com.google.cloud.secretmanager.v1.SecretVersionName;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.runtime.event.annotation.EventListener;

class GoogleClientExample {

    private final SecretManagerServiceClient client

    GoogleClientExample(SecretManagerServiceClient googleSecretManagerClient) { // (1)
        this.client = googleSecretManagerClient;
    }

    @EventListener
    void onStartup(StartupEvent event) {
        AccessSecretVersionResponse response = client.accessSecretVersion(AccessSecretVersionRequest
                .newBuilder()
                .setName(SecretVersionName.of("my-cloud-project", "secretName", "latest").toString())
                .build())
        String secret = response.getPayload().getData().toStringUtf8()
    }
}
import com.google.cloud.secretmanager.v1.AccessSecretVersionRequest
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient
import com.google.cloud.secretmanager.v1.SecretVersionName
import io.micronaut.context.event.StartupEvent
import io.micronaut.runtime.event.annotation.EventListener

class GoogleClientExample (private val client: SecretManagerServiceClient) { // (1)
	@EventListener
	fun onStartup(event: StartupEvent) {
		val response = client.accessSecretVersion(AccessSecretVersionRequest
				.newBuilder()
				.setName(SecretVersionName.of("my-cloud-project", "secretName", "latest").toString())
				.build())
		val secret = response.payload.data.toStringUtf8()
	}
}
1 Inject SecretManagerServiceClient

Micronaut Secret Manager Client

If you rather not deal with Google gRPC libraries, internally the framework uses a wrapper client: SecretManagerClient that provides a reactive based approach:

import io.micronaut.context.event.StartupEvent;
import io.micronaut.gcp.secretmanager.client.SecretManagerClient;
import io.micronaut.gcp.secretmanager.client.VersionedSecret;
import io.micronaut.runtime.event.annotation.EventListener;
import reactor.core.publisher.Mono;

public final class ClientExample {

    private final SecretManagerClient client;

    public ClientExample(SecretManagerClient client) {
        this.client = client;
    }

    @EventListener
    public void onStartup(StartupEvent event) {
        Mono<VersionedSecret> secret = Mono.from(client.getSecret("secretId")); // (1)
        Mono<VersionedSecret> v2 = Mono.from(client.getSecret("secretId", "v2")); //(2)
        Mono<VersionedSecret> fromOtherProject = Mono.from(client.getSecret("secretId", "latest", "another-project-id")); //(3)
    }
}
import io.micronaut.context.event.StartupEvent
import io.micronaut.gcp.secretmanager.client.SecretManagerClient
import io.micronaut.gcp.secretmanager.client.VersionedSecret
import io.micronaut.runtime.event.annotation.EventListener
import reactor.core.publisher.Mono

 class ClientExample {

    private final SecretManagerClient client

     ClientExample(SecretManagerClient client) {
        this.client = client
    }

    @EventListener
    void onStartup(StartupEvent event) {
        Mono<VersionedSecret> secret = client.getSecret("secretId") // (1)
        Mono<VersionedSecret> v2 = client.getSecret("secretId", "v2") //(2)
        Mono<VersionedSecret> fromOtherProject = client.getSecret("secretId", "latest", "another-project-id") //(3)
    }
}
import io.micronaut.context.event.StartupEvent
import io.micronaut.gcp.secretmanager.client.SecretManagerClient
import io.micronaut.runtime.event.annotation.EventListener

class ClientExample(private val client: SecretManagerClient) {
	@EventListener
	fun onStartup(event: StartupEvent) {
		val secret = client.getSecret("secretId") // (1)
		val v2 = client.getSecret("secretId", "v2") //(2)
		val fromOtherProject = client.getSecret("secretId", "latest", "another-project-id") //(3)
	}
}
1 Uses latest version and current project from GoogleCloudCredentials
2 Uses current project but overrides version
3 Fetches a secret from a different project
You can use the client libraries at any point in time of your runtime, however it’s advised that secrets should be loaded only once during application startup, to reduce latency of the remote call as well as costs associated with secret retrieval.
See the guide for Use Google Secret Manager to store MySQL credentials to learn more.

11 Google Cloud Storage Support

Micronaut provides a high-level, uniform object storage API that works across the major cloud providers: Micronaut Object Storage.

To get started, select the object-storage-gcp feature in Micronaut Launch, or add the following dependency:

implementation("io.micronaut.objectstorage:micronaut-object-storage-gcp")
<dependency>
    <groupId>io.micronaut.objectstorage</groupId>
    <artifactId>micronaut-object-storage-gcp</artifactId>
</dependency>

For more information, check the Micronaut Object Storage GCP support documentation.

12 Guides

See the following list of guides to learn more about working with Google Cloud Platform in the Micronaut Framework:

13 Repository

You can find the source code of this project in this repository: