Micronaut GCP

Provides integration between Micronaut and Google Cloud Platform (GCP)

Version:

1 Introduction

This project provides various extensions to Micronaut to integrate Micronaut with Google Cloud Platform (GCP).

2 Release History

For this project, you can find a list of releases (with release notes) here:

3 Setting up GCP Support

The micronaut-gcp-common module includes basic setup for running applications on Google Cloud.

implementation("io.micronaut.gcp:micronaut-gcp-common:3.3.1.BUILD-SNAPSHOT")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-common</artifactId>
    <version>3.3.1.BUILD-SNAPSHOT</version>
</dependency>

Prerequisites:

  1. You should have a Google Cloud Platform project created

  2. Install gcloud CLI

  3. Configure default project gcloud config set project YOUR_PROJECT_ID

  4. Authenticate with gcloud auth login

  5. Authenticate application default credential with gcloud auth application-default login

It’s strongly recommended that you use a Service Account for your application.

Google Project ID

The module features a base GoogleCloudConfiguration which you can use to configure or retrieve the GCP Project ID:

🔗
Table 1. Configuration Properties for GoogleCloudConfiguration
Property Type Description

gcp.project-id

java.lang.String

Sets the project id to use.

You can inject this bean and use the getProjectId() method to retrieve the configured or detected project ID.

Google Credentials

The module will setup a bean of exposing the com.google.auth.oauth2.GoogleCredentials instance that are either detected from the local environment or configured by GoogleCredentialsConfiguration:

🔗
Table 2. Configuration Properties for GoogleCredentialsConfiguration
Property Type Description

gcp.credentials.scopes

java.util.List

The default scopes to associate with the application to access specific APIs. See <a href="https://developers.google.com/identity/protocols/googlescopes">Google Scopes</a> for a complete list. Leave this empty if you don’t need additional API access.

gcp.credentials.location

java.lang.String

Sets the location to the service account credential key file.

gcp.credentials.encoded-key

java.lang.String

Sets the Base64 encoded service account key content..

4 Google Cloud Logging Support

Google Cloud Logging integration is available via StackdriverJsonLayout that formats log output using Stackdriver structured logging format.

To enable it add the following dependency to your project:

implementation("io.micronaut.gcp:micronaut-gcp-logging:3.3.1.BUILD-SNAPSHOT")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-logging</artifactId>
    <version>3.3.1.BUILD-SNAPSHOT</version>
</dependency>

By default if an application is using a CONSOLE appender Stackdriver log parser will consider the entire payload of the message as a text entry, making searching and correlation with tracing impractical.

Console log output

Logs on the picture above can’t be searched by attributes such as thread and ansi coloring makes regex searching even more challenging.

When enabled the JSON appender allows logs to be easily filtered:

If you combine this module with the Stackdriver Trace module, all logs will also have a traceId field, making possible to correlate traces with log entries.
JSON log output
1 Visual display of log levels
2 Correlated traceId for tracing when you have enabled cloud tracing
3 Simplified output without extra fields or ANSI coloring codes

Configuring logging via console

<configuration>
    <include resource="io/micronaut/gcp/logging/logback-json-appender.xml" /> (1)
    <root level="INFO">
        <appender-ref ref="CONSOLE_JSON" /> (2)
    </root>
</configuration>
1 Import the logback configuration that defines the JsonLayout appender
2 Define your root level as CONSOLE_JSON

Overriding defaults on logging configuration

If you would like to override the fields that are included in the JsonLayout appender, you can declare it on your own logback configuration instead of including the default from logback-json-appender.xml:

<configuration>
    <appender name="CONSOLE_JSON" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
            <layout class="io.micronaut.gcp.logging.StackdriverJsonLayout">
                <projectId>${projectId}</projectId> (1)
                <includeTraceId>true</includeTraceId>
                <includeSpanId>true</includeSpanId>
                <includeLevel>true</includeLevel>
                <includeThreadName>false</includeThreadName>
                <includeMDC>true</includeMDC>
                <includeLoggerName>true</includeLoggerName>
                <includeFormattedMessage>true</includeFormattedMessage>
                <includeExceptionInMessage>true</includeExceptionInMessage>
                <includeContextName>true</includeContextName>
                <includeMessage>false</includeMessage>
                <includeException>false</includeException>
            </layout>
        </encoder>
    </appender>
    <root level="INFO">
        <appender-ref ref="CONSOLE_JSON" >

        </appender-ref>
    </root>
</configuration>
1 You can override the projectId settings via property. If not defined, the default ServiceOptions.getDefaultProjectId() will be used.

Dynamic appender selection

The JSONLayout comes in hand when using Google Cloud Logging, but when running locally it will make your console logs unreadable. The default logback-json-appender configuration includes both a STDOUT and a CONSOLE_JSON appenders, as well as a dynamic logback property called google_cloud_logging.

You can use that variable to switch your logger appender dynamically.

You logging configuration would look like this:

<configuration>
    <include resource="io/micronaut/gcp/logging/logback-json-appender.xml" />
    <root level="INFO">
        <appender-ref ref="${google_cloud_logging}" /> (1)
    </root>
</configuration>
1 Chooses the appropriate appender depending on the environment.
The environment detection executes a HTTP request to the Google Cloud metadata server. If you rather skip this to improve startup time, just set MICRONAUT_ENVIRONMENTS environment variable or the micronaut.environments System property as described in the reference documentation.

5 Stackdriver Trace

The micronaut-gcp-tracing integrates Micronaut with Cloud Trace from Google Cloud Operations (formerly Stackdriver).

To enable it add the following dependency:

implementation("io.micronaut.gcp:micronaut-gcp-tracing:3.3.1.BUILD-SNAPSHOT")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-tracing</artifactId>
    <version>3.3.1.BUILD-SNAPSHOT</version>
</dependency>

Then enabling Zipkin tracing in your configuration application.yml:

Enabling Stackdriver Trace
tracing:
    zipkin:
        enabled: true
        # [Optional] Set sampling probability to 100% for dev/testing purposes to observe traces.
        #
        # sampler:
        #    probability=1.0

# [Optional] configuration to enable/disable Stackdriver Trace configuration.
# This is defaulted to true.
#
# gcp:
#    tracing:
#        enabled: true

6 Authorizing HTTP Clients

The micronaut-gcp-http-client module can be used to help authorize service-to-service communication. To get started add the following module:

implementation("io.micronaut.gcp:micronaut-gcp-http-client")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-http-client</artifactId>
</dependency>

You should then configure the service accounts as per the documentation on service-to-service communication and the enable the filter for the outgoing URI paths you wish to include the Google-signed OAuth ID token:

gcp:
  http:
    client:
      auth:
        patterns:
          - /foo/**
          - /bar/**

7 Cloud Function Support

Micronaut GCP includes extended support for Google Cloud Function - designed for serverless workloads.

7.1 Simple Functions

Micronaut GCP offers two ways to write cloud functions with Micronaut. The first way is more low level and involves using Micronaut’s built in support for functions. Simply add the following dependency to your classpath:

implementation("io.micronaut.gcp:micronaut-gcp-function")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-function</artifactId>
</dependency>

Then add the Cloud Function API as a compileOnly dependency (provided with Maven):

compileOnly("com.google.cloud.functions:functions-framework-api:1.0.1")
<dependency>
    <groupId>com.google.cloud.functions</groupId>
    <artifactId>functions-framework-api</artifactId>
    <version>1.0.1</version>
    <scope>provided</scope>
</dependency>

Now define a class that implements one of Google Cloud Found’s interfaces, for example com.google.cloud.functions.BackgroundFunction, and extends from io.micronaut.function.executor.FunctionInitializer.

The following is an example of a BackgroundFunction that uses Micronaut and Google Cloud Function:

/*
 * Copyright 2017-2020 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package example.background;

import com.google.cloud.functions.*;
import io.micronaut.gcp.function.GoogleFunctionInitializer;

import javax.inject.*;
import java.util.*;

public class Example extends GoogleFunctionInitializer (1)
        implements BackgroundFunction<PubSubMessage> { (2)

    @Inject LoggingService loggingService; (3)

    @Override
    public void accept(PubSubMessage message, Context context) {
        loggingService.logMessage(message);
    }
}

class PubSubMessage {
    String data;
    Map<String, String> attributes;
    String messageId;
    String publishTime;
}

@Singleton
class LoggingService {

    void logMessage(PubSubMessage message) {
        // log the message
    }
}
/*
 * Copyright 2017-2020 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package example.background

import com.google.cloud.functions.*
import io.micronaut.gcp.function.GoogleFunctionInitializer

import javax.inject.*

class Example extends GoogleFunctionInitializer (1)
        implements BackgroundFunction<PubSubMessage> { (2)

    @Inject LoggingService loggingService (3)

    @Override
    void accept(PubSubMessage message, Context context) {
        loggingService.logMessage(message)
    }
}

class PubSubMessage {
    String data
    Map<String, String> attributes
    String messageId
    String publishTime
}

@Singleton
class LoggingService {

    void logMessage(PubSubMessage message) {
        // log the message
    }
}
/*
 * Copyright 2017-2020 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package example.background

import com.google.cloud.functions.BackgroundFunction
import com.google.cloud.functions.Context
import io.micronaut.gcp.function.GoogleFunctionInitializer
import javax.inject.Inject
import javax.inject.Singleton

class Example : GoogleFunctionInitializer(), (1)
        BackgroundFunction<PubSubMessage> { (2)
    @Inject
    lateinit var loggingService: LoggingService (3)

    override fun accept(message: PubSubMessage, context: Context) {
        loggingService.logMessage(message)
    }
}

class PubSubMessage {
    var data: String? = null
    var attributes: Map<String, String>? = null
    var messageId: String? = null
    var publishTime: String? = null
}

@Singleton
class LoggingService {

    fun logMessage(message: PubSubMessage) {
        // log the message
    }
}
1 The function extends from io.micronaut.function.executor.FunctionInitializer
2 The function implements com.google.cloud.functions.BackgroundFunction
3 Dependency injection can be used on the fields

When you extend from FunctionInitializer the Micronaut ApplicationContext will be initialized and dependency injection will be performed on the function instance. You can use inject any bean using javax.inject.Inject as usual.

Functions require a no argument constructor hence you must use field injection (which requires lateinit in Kotlin) when injecting dependencies into the function itself.

The FunctionInitializer super class provides numerous methods that you can override to customize how the ApplicationContext is built if desired.

Running Functions Locally

Raw functions cannot be executed locally. They can be tested by instantiating the function and inspecting any side effects by providing mock arguments or mocking dependent beans.

Deployment

When deploying the function to Cloud Function you should use the fully qualified name of the function class as the handler reference.

First build the function with:

$ ./gradlew clean shadowJar

Then cd into the build/libs directory (deployment has to be done from the location where the JAR file resides):

$ cd build/libs

To deploy the function make sure you have gcloud CLI then run:

$ gcloud beta functions deploy myfunction --entry-point example.function.Function --runtime java11 --trigger-http

In the example above myfunction refers to the name of your function and can be changed to whatever name you prefer to name your function. example.function.Function refers to the fully qualified name of your function class.

To obtain the trigger URL you can use the following command:

$ YOUR_HTTP_TRIGGER_URL=$(gcloud beta functions describe myfunction --format='value(httpsTrigger.url)')

You can then use this variable to test the function invocation:

$ curl -i $YOUR_HTTP_TRIGGER_URL

7.2 HTTP Functions

It is common to want to take just a slice of a regular Micronaut HTTP server application and deploy it as a function.

Configuration

To facilitate this model, Micronaut GCP includes an additional module that allows you to use regular Micronaut annotations like @Controller and @Get to define your functions that can be deployed to cloud function.

With this model you need to add the micronaut-gcp-function-http dependency to your application:

implementation("io.micronaut.gcp:micronaut-gcp-function-http:3.3.1.BUILD-SNAPSHOT")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-function-http</artifactId>
    <version>3.3.1.BUILD-SNAPSHOT</version>
</dependency>

And define the Google Function API as a development only dependency:

developmentOnly("com.google.cloud.functions:functions-framework-api:1.0.1")
<dependency>
    <groupId>com.google.cloud.functions</groupId>
    <artifactId>functions-framework-api</artifactId>
    <version>1.0.1</version>
    <scope>developmentOnly</scope>
</dependency>

Running Functions Locally

First to run the function locally you should then make the regular Micronaut server a developmentOnly dependency since it is not necessary to include it in the JAR file that will be deployed to Cloud Function:

developmentOnly("io.micronaut:micronaut-http-server-netty")
<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-http-server-netty</artifactId>
    <scope>developmentOnly</scope>
</dependency>

You can then use ./gradlew run or ./mvnw compile exec:exec to run the function locally using Micronaut’s Netty-based server.

Alternatively, you could configure the Google Function Framework for Java which includes a Maven plugin, or for Gradle include the following:

Configuring the Function framework in Gradle
configurations {
    invoker
}

dependencies {
    invoker 'com.google.cloud.functions.invoker:java-function-invoker:1.0.0-beta1'
}


task('runFunction', type: JavaExec, dependsOn: classes) {
    main = 'com.google.cloud.functions.invoker.runner.Invoker'
    classpath(configurations.invoker)
    args(
            '--target', 'io.micronaut.gcp.function.http.HttpFunction',
            '--classpath', (configurations.runtimeClasspath + sourceSets.main.output).asPath,
            '--port', 8081

    )
}

With this in place you can run ./gradlew runFunction to run the function locally.

Deployment

When deploying the function to Cloud Function you should use the HttpFunction class as the handler reference.

First build the function with:

$ ./gradlew clean shadowJar

Then cd into the build/libs directory (deployment has to be done from the location where the JAR file resides):

$ cd build/libs

To deploy the function make sure you have gcloud CLI then run:

$ gcloud beta functions deploy myfunction --entry-point io.micronaut.gcp.function.http.HttpFunction --runtime java11 --trigger-http

In the example above myfunction refers to the name of your function and can be changed to whatever name you prefer to name your function.

To obtain the trigger URL you can use the following command:

$ YOUR_HTTP_TRIGGER_URL=$(gcloud beta functions describe myfunction --format='value(httpsTrigger.url)')

You can then use this variable to test the function invocation:

$ curl -i $YOUR_HTTP_TRIGGER_URL/hello/John

8 Google Cloud Pub/Sub Support

8.1 Introduction

This project provides integration between Micronaut and Google Cloud PubSub. It uses the official Google Cloud Pub/Sub client java libraries to create Publisher and Subscribers while keeping a similar programming model for messaging as the ones defined in Micronaut RabbitMQ and Micronaut Kafka.

The project does not attempt to create any of the resources in Google Cloud such as Topics and Subscription. Make sure your project has the correct resources and the Service Account being used has proper permissions.

8.2 Quickstart

To add support for Google Cloud Pub/Sub to an existing project, add the following dependencies to your build.

implementation("io.micronaut.gcp:micronaut-gcp-pubsub")
<dependency>
    <groupId>io.micronaut.gcp</groupId>
    <artifactId>micronaut-gcp-pubsub</artifactId>
</dependency>

Creating a Pub/Sub Publisher with @PubSubClient

To publish messages to Google Cloud Pub/Sub, just define an interface that is annotated with @PubSubClient.

import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;

@PubSubClient (1)
public interface AnimalClient {

    @Topic("animals") (2)
    void send(byte[] data); (3)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;

@PubSubClient (1)
interface AnimalClient {

    @Topic("animals") (2)
    void send(byte[] data) (3)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic

@PubSubClient (1)
interface AnimalClient {

	@Topic("animals") (2)
	fun send(data: ByteArray) (3)

}
1 The @PubSubClient annotation is used to designate this interface as a client
2 The topic animals will be used to publish messages
3 The method accepts one argument that will be used as message body
Make sure your current GCP project has a topic named animals before starting the application.

At compile time Micronaut will create an implementation of the above interface. You can then inject that interface on your business methods via @Inject and use it.

import io.micronaut.gcp.pubsub.support.Animal;
import javax.inject.Singleton;

@Singleton
public final class AnimalService {
    private final AnimalClient animalClient;

    public AnimalService(AnimalClient animalClient) { (1)
        this.animalClient = animalClient;
    }

    public void someBusinessMethod(Animal animal) {
        byte[] serializedBody = serialize(animal);
        animalClient.send(serializedBody);
    }

    private byte[] serialize(Animal animal) { (2)
        return null;
    }
}
import javax.inject.Singleton;

@Singleton
final class AnimalService {
    private final AnimalClient animalClient;

    AnimalService(AnimalClient animalClient) { (1)
        this.animalClient = animalClient
    }

    void someBusinessMethod(Animal animal) {
        byte[] serializedBody = serialize(animal)
        animalClient.send(serializedBody)
    }

    private byte[] serialize(Animal animal) { (2)
        return null
    }
}
import io.micronaut.gcp.pubsub.support.Animal
import javax.inject.Singleton

@Singleton
class AnimalService(private val animalClient: AnimalClient)  { (1)

	fun someBusinessMethod(animal: Animal) {
		val serializedBody = serialize(animal)
		animalClient.send(serializedBody)
	}

	private fun serialize(animal: Animal): ByteArray { (2)
		return ByteArray(0)
	}

}
1 Constructor based injection
2 Your custom serialization logic

Creating a Pub/Sub Subscriber with @PubSubListener

To listen to Pub/Sub messages you can use @PubSubListener annotation on a class to mark it as a message listener.

The following example would listen on a subscription named animals that is configured to be attached to the topic of the previous example.

import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;

@PubSubListener (1)
public class AnimalListener {


    /**
     *
     * @param data raw data
     */
    @Subscription("animals") (2)
    public void onMessage(byte[] data) { (3)
        System.out.println("Message received");
    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;

@PubSubListener (1)
class AnimalListener {

    @Subscription("animals") (2)
    void onMessage(byte[] data) { (3)
        System.out.println("Message received")
    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription

@PubSubListener (1)
class AnimalListener {

	@Subscription("animals") (2)
	fun onMessage(data: ByteArray) { (3)
		println("Message received")
	}

}
1 Designates this class to be a message listener for Pub/Sub messages
2 Messages routed to the animals subscription will be delivered to this method
3 The method has a single argument that contains the serialized body of the Pub/Sub message

8.3 Pub/Sub configuration Properties

You can customize certain aspects of the client. Pub/Sub client libraries leverage a ScheduledExecutorService for both message publishing and consumption.

If not specified the framework will configure the framework default Scheduled executor service to be used for both Publishers and Subscribers. See ExecutorConfiguration for the full list of options.

You can override it at PubSubConfigurationProperties to make a default value for all clients, or you can setup per Topic Publisher, or Subscription listener as discussed further bellow.

Creating a custom executor is presented on the section Configuring Thread pools .

🔗
Table 1. Configuration Properties for PubSubConfigurationProperties
Property Type Description

gcp.pubsub.keep-alive-interval-minutes

int

How often to ping the server to keep the channel alive. Default: 5 minutes.

gcp.pubsub.publishing-executor

java.lang.String

Name of the {@link java.util.concurrent.ScheduledExecutorService} to be used by all {@link com.google.cloud.pubsub.v1.Publisher} instances. Default: "scheduled"

gcp.pubsub.subscribing-executor

java.lang.String

Name of the {@link java.util.concurrent.ScheduledExecutorService} to be used by all {@link com.google.cloud.pubsub.v1.Subscriber} instances. Default: "scheduled"

8.4 Pub/Sub Publishers

Pub/Sub support in micronaut follows the same pattern used for other types of clients such as the HTTP Client. By annotating an interface with @PubSubClient the framework will create an implementation bean that handles communication with Pub/Sub for you.

Topics

In order to publish messages to Pub/Sub you need a method annotated with a @Topic annotation. For each annotated method the framework will create a dedicated Publisher with its own configuration for RetrySettings, BatchSettings and its own Executor. All settings can be overridden via configuration properties and the appropriate configuration can be passed via the configuration attribute of the @Topic annotation.

import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubClient (1)
public interface SimpleClient {

    @Topic("animals")
    void send(PubsubMessage message); (2)

    @Topic("animals")
    void send(byte[] data); (3)

    @Topic("animals")
    void send(Animal animal); (4)
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal


@PubSubClient (1)
interface SimpleClient {

    @Topic("animals")
    void send(PubsubMessage message) (2)

    @Topic("animals")
    void send(byte[] data) (3)

    @Topic("animals")
    void send(Animal animal) (4)
}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal

@PubSubClient (1)
interface SimpleClient {

	@Topic("animals")
	fun send(message: PubsubMessage) (2)

	@Topic("animals")
	fun send(data: ByteArray) (3)

	@Topic("animals")
	fun send(animal: Animal) (4)
}
1 The @PubSubClient enables this interface to be replaced by bean implementation by micronaut.
2 Sending a PubsubMessage object skips any SerDes or contentType headers
3 Sending a byte array, SerDes will be bypassed and bytes will just be copied, but the default content type will still be application/json
4 You can send any domain object as long as a SerDes is configured to handled it. By default application/json is used.
If a body argument cannot be found, an exception will be thrown.

Resource naming

On the previous examples you noticed we used a simple naming for the topic such as animals. Inside Google Cloud however resources are only accessible via their FQN. In Pub/Sub case topics are named as projects/$PROJECT_ID/topics/$TOPIC_NAME. Micronaut integration with GCP will automatically grab the default project id available (please refer to the section Google Project Id ) and convert the simple naming of the resource into a FQN. You can also support a FQN that uses a different project name, that is useful when your project has to publish message to more than the default project configured for the Service Account.

When publishing to a different project, make sure your service account has the proper access to the resource.
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubClient
public interface MultipleProjectClient {

    @Topic("animals")
    void sendUS(Animal animal); (1)

    @Topic("projects/eu-project/topics/animals")
    void sendEU(Animal animal); (2)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubClient
interface MultipleProjectClient {

    @Topic("animals")
    void sendUS(Animal animal) (1)

    @Topic("projects/eu-project/topics/animals")
    void sendEU(Animal animal) (2)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal

@PubSubClient
interface MultipleProjectClient {

	@Topic("animals")
	fun sendUS(animal: Animal) (1)

	@Topic("projects/eu-project/topics/animals")
	fun sendEU(animal: Animal) (2)

}
1 This would use the default project configured for the application.
2 This would override and publish messages to a different project.

8.4.1 Content-Type and message serialization

The contents of a PubSubMessage are always a base64 encoded ByteString. This framework provide a way to create custom Serialization/Deserialization as explained in the section Custom Serialization/Deserialization.

By default any body message argument will be serialized via JsonPubSubMessageSerDes unless specified otherwise via the contentType property of the @Topic annotation.

The rules of message serialization are the following:

  1. If the body type is PubSubMessage then SerDes is bypassed completely and no header is added to the message.

  2. If the body type is byte[] SerDes logic is bypassed, but a Content-Type header of application/json will be added unless overwritten by the @Topic annotation.

  3. For any other type, the type defined by contentType will be used to located the correct PubSubMessageSerDes to handle it, if none is passed application/json will be used.

import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.http.MediaType;

@PubSubClient
public interface CustomSerDesClient {

    @Topic("animals") (1)
    void send(PubsubMessage pubsubMessage);

    @Topic("animals") (2)
    void send(byte[] data);

    @Topic(value = "animals", contentType = MediaType.IMAGE_GIF) (3)
    void sendWithCustomType(byte[] data);

    @Topic("animals") (4)
    void send(Animal animal);

    @Topic(value = "animals", contentType = MediaType.APPLICATION_XML) (5)
    void sendWithCustomType(Animal animal);

}
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.http.MediaType;

@PubSubClient
interface CustomSerDesClient {

    @Topic("animals") (1)
    void send(PubsubMessage pubsubMessage)

    @Topic("animals") (2)
    void send(byte[] data)

    @Topic(value = "animals", contentType = MediaType.IMAGE_GIF) (3)
    void sendWithCustomType(byte[] data)

    @Topic("animals") (4)
    void send(Animal animal)

    @Topic(value = "animals", contentType = MediaType.APPLICATION_XML) (5)
    void sendWithCustomType(Animal animal)

}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.http.MediaType

@PubSubClient
interface CustomSerDesClient {

	@Topic("animals") (1)
	fun send(pubsubMessage: PubsubMessage)

	@Topic("animals")  (2)
	fun send(data: ByteArray)

	@Topic(value = "animals", contentType = MediaType.IMAGE_GIF) (3)
	fun sendWithCustomType(data: ByteArray)

	@Topic("animals") (4)
	fun send(animal: Animal)

	@Topic(value = "animals", contentType = MediaType.APPLICATION_XML) (5)
	fun sendWithCustomType(animal: Animal)

}
1 Using PubsubMessage no SerDes will be used, no headers are added to the message, user is responsible to create the message.
2 Using byte array. No SerDes logic is used, however a Content-Type header in this example is added with value application/json
3 Using byte array and specifying a contentType. SerDes will be bypassed and a Content-Type header with value image/gif will be added.
4 No contentType defined, and JsonPubSubMessageSerDes will be use to serialize the object.
5 Using specific contentType, the framework will look for a PubSubMessageSerDes registered for type application/xml user must provide this bean or an exception is thrown at runtime.

8.4.2 Message Headers

Google Cloud Pub/Sub messages contain a dictionary of message attributes in the form of a Map<String, String>. The framework binds those attributes to a @Header annotation that can be used at the class level or to the method or an argument of the method.

import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.annotation.Header;

@PubSubClient
@Header(name = "application-name", value = "petclinic") (1)
public interface CustomHeadersClient {

    @Header(name = "status", value = "healthy") (2)
    @Topic("animals")
    void sendWithStaticHeaders(Animal animal);

    @Topic("animals")
    void sendWithDynamicHeaders(Animal animal, @Header(name = "code") Integer code); (3)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.annotation.Header;

@PubSubClient
@Header(name = "application-name", value = "petclinic") (1)
 interface CustomHeadersClient {

    @Header(name = "status", value = "healthy") (2)
    @Topic("animals")
    void sendWithStaticHeaders(Animal animal)

    @Topic("animals")
    void sendWithDynamicHeaders(Animal animal, @Header(name = "code") Integer code) (3)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.annotation.Header

@PubSubClient
@Header(name = "application-name", value = "petclinic") (1)
interface CustomHeadersClient {

	@Header(name = "status", value = "healthy") (2)
	@Topic("animals")
	fun sendWithStaticHeaders(animal: Animal)

	@Topic("animals")
	fun sendWithDynamicHeaders(animal: Animal, @Header(name = "code") code: Int) (3)

}
1 Headers specified at class level will be added to all annotated methods using @Topic
2 You can pass a static value as part of @Header
3 Values can also be passed via arguments. A ConversionService will try to convert them into a String value.

8.4.3 Publisher properties

Pub/Sub allows each Publisher to have its own configuration for things such as executors or batching settings. This is useful when you need to publish messages to topic that uses different SLAs.

The framework allows you to create configurations and then bind those configurations to each topic annotation using the configuration parameter.

Table 1. Configuration properties for PublisherConfigurationProperties
Property Type Description

gcp.pubsub.publisher.*.executor

java.lang.String

Name of the executor to use. Default: scheduled

gcp.pubsub.publisher.*.retry.total-timeout

org.threeten.bp.Duration

How long the logic should keep trying the remote calluntil it gives up completely. Default 600 seconds

gcp.pubsub.publisher.*.retry.initial-retry-delay

org.threeten.bp.Duration

Delay before the first retry. Default: 100ms

gcp.pubsub.publisher.*.retry.retry-delay-multiplier

double

Controls the change in retry delay. The retry delay of the previous call is multiplied by the RetryDelayMultiplier to calculate the retry delay for the next call. Default: 1.3

gcp.pubsub.publisher.*.retry.max-retry-delay

org.threeten.bp.Duration

Puts a limit on the value of the retry delay, so that the RetryDelayMultiplier can’t increase the retry delay higher than this amount. Default: 60 seconds

gcp.pubsub.publisher.*.retry.max-attempts

int

Defines the maximum number of attempts to perform. Default: 0

gcp.pubsub.publisher.*.retry.jittered

boolean

Determines if the delay time should be randomized. Default: true

gcp.pubsub.publisher.*.retry.initial-rpc-timeout

org.threeten.bp.Duration

Controls the timeout for the initial RPC. Default: 5 seconds

gcp.pubsub.publisher.*.retry.rpc-timeout-multiplier

double

Controls the change in RPC timeout. The timeout of the previous call is multiplied by the RpcTimeoutMultiplier to calculate the timeout for the next call. Default: 1.0

gcp.pubsub.publisher.*.retry.max-rpc-timeout

org.threeten.bp.Duration

Puts a limit on the value of the RPC timeout, so that the RpcTimeoutMultiplier can’t increase the RPC timeout higher than this amount. Default 0

gcp.pubsub.publisher.*.batching.element-count-threshold

java.lang.Long

Set the element count threshold to use for batching. After this many elements are accumulated, they will be wrapped up in a batch and sent. Default: 100

gcp.pubsub.publisher.*.batching.request-byte-threshold

java.lang.Long

Set the request byte threshold to use for batching. After this many bytes are accumulated, the elements will be wrapped up in a batch and sent. Default 1000 (1Kb)

gcp.pubsub.publisher.*.batching.delay-threshold

org.threeten.bp.Duration

Set the delay threshold to use for batching. After this amount of time has elapsed (counting from the first element added), the elements will be wrapped up in a batch and sent. Default 1ms

gcp.pubsub.publisher.*.batching.is-enabled

java.lang.Boolean

Indicate if the batching is enabled. Default : true

gcp.pubsub.publisher.*.flow-control.max-outstanding-element-count

java.lang.Long

Maximum number of outstanding elements to keep in memory before enforcing flow control.

gcp.pubsub.publisher.*.flow-control.max-outstanding-request-bytes

java.lang.Long

Maximum number of outstanding bytes to keep in memory before enforcing flow control.

gcp.pubsub.publisher.*.flow-control.limit-exceeded-behavior

com.google.api.gax.batching.FlowController$LimitExceededBehavior

The behavior of FlowController when the specified limits are exceeded. Defaults to Ignore.

For example suppose you have the following configuration:

gcp:
  pubsub:
    publisher:
      batching:
        executor: batch-executor
      immediate:
        executor: immediate-executor
        batching:
          enabled: false

You can then apply it to individual methods as:

import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubClient
public interface CustomConfigurationClient {

    @Topic(value = "animals", configuration = "batching") (1)
    void batchSend(Animal animal);

    @Topic(value = "animals", configuration = "immediate") (2)
    void send(Animal animal);

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubClient
interface CustomConfigurationClient {

    @Topic(value = "animals", configuration = "batching") (1)
    void batchSend(Animal animal)

    @Topic(value = "animals", configuration = "immediate") (2)
    void send(Animal animal)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal

@PubSubClient
interface CustomConfigurationClient {
	@Topic(value = "animals", configuration = "batching")
	fun batchSend(animal: Animal)  (1)

	@Topic(value = "animals", configuration = "immediate")
	fun send(animal: Animal)  (2)
}
1 The Publisher will be configured using a configuration named batching
2 The Publisher will be configured using a configuration named immediate
FlowControlSettings are actually configured for the BatchingSettings property, due the nature of Google’s Builders the configuration was flattened at PubSubConfigurationProperties level, and it’s injected it into the RetrySettings later.

8.4.4 Retrieving message Ids (broker acknowledge)

All the examples so far have been using void on the method signature. However getting a message acknowledge from the broker is usually required.

Pub/Sub returns a String object that contains the message id that the broker generated. Your methods can also be defined using either String or Single<String> (for reactive support).

When you define your client you can actually choose between a few different method signatures. Depending on your choice you may get the message acknowledge back and control if the method is blocking or reactive.

  1. If your method has void as a return, then a blocking call to publish the message is made and you don’t get the message id returned by the Pub/Sub broker.

  2. If your method return a String then a blocking call to publish the message is made and the message id is returned.

  3. If your method returns Single<String> then it’s a reactive call, and you can subscribe to the publisher to retrieve the message id.

import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import io.reactivex.Single;

@PubSubClient
public interface CustomReturnClient {

    @Topic("animals")
    void send(Animal animal); (1)

    @Topic("animals")
    String sendWithId(Animal animal); (2)

    @Topic("animals")
    Single<String> sendReactive(Animal animal); (3)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;
import io.reactivex.Single;

@PubSubClient
interface CustomReturnClient {

    @Topic("animals")
    void send(Animal animal) (1)

    @Topic("animals")
    String sendWithId(Animal animal) (2)

    @Topic("animals")
    Single<String> sendReactive(Animal animal) (3)

}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal
import io.reactivex.Single

@PubSubClient
interface CustomReturnClient {

	@Topic("animals")
	fun send(animal: Animal) (1)

	@Topic("animals")
	fun sendWithId(animal: Animal): String (2)

	@Topic("animals")
	fun sendReactive(animal: Animal?): Single<String> (3)

}
1 Blocking call, message id is not returned
2 Blocking call, message id is returned as String
3 Reactive call

8.5 Restricting locations and message ordering

Restricting storage locations

Google Cloud Pub/Sub is a globally distributed event platform. When a client publishes a message, the platform stores the message on the nearest region to the publisher. Sometimes regulations such as GDPR impose restrictions on where customer data can live. When using Micronaut integration you can specify the endpoint of the topic so that Pub/Sub will persist the message data on the specified region. To learn more about this feature visit the resource location restriction page.

import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubClient
public interface LocationClient {
    @Topic(value = "animals", endpoint = "europe-west1-pubsub.googleapis.com:443") (1)
    void send(Animal animal);
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal

@PubSubClient
interface LocationClient {
    @Topic(value = "animals", endpoint = "europe-west1-pubsub.googleapis.com:443") (1)
    void send(Animal animal);
}
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Animal

@PubSubClient
interface LocationClient {
	@Topic(value = "animals", endpoint = "europe-west1-pubsub.googleapis.com:443") (1)
	fun send(animal: Animal?)
}
1 The endpoint attribute enforces that messages sent via this method will be stored on the europe-west1 region.

Message ordering

Google Cloud Pub/Sub supports message ordering if messages are published to a single location, and specify an ordering key. An example of this would be trading orders placed for a specific symbol. If you use the symbol as the ordering key, all messages regardless of how many publishers are guaranteed to be delivered in order.

To enable message ordering for your publishers, use the @OrderingKey in one of the method’s arguments to declare it an ordering key.

import io.micronaut.gcp.pubsub.annotation.OrderingKey;
import io.micronaut.gcp.pubsub.annotation.PubSubClient;
import io.micronaut.gcp.pubsub.annotation.Topic;
import io.micronaut.gcp.pubsub.support.Order;

@PubSubClient
public interface OrderClient {
    @Topic(value = "orders", endpoint = "us-central1-pubsub.googleapis.com:443") (1)
    void send(Order order, @OrderingKey String key); (2)
}
import io.micronaut.gcp.pubsub.annotation.OrderingKey
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Order

@PubSubClient
interface OrderClient {
    @Topic(value = "orders", endpoint = "us-central1-pubsub.googleapis.com:443") (1)
    void send(Order order, @OrderingKey String key) (2)
}
import io.micronaut.gcp.pubsub.annotation.OrderingKey
import io.micronaut.gcp.pubsub.annotation.PubSubClient
import io.micronaut.gcp.pubsub.annotation.Topic
import io.micronaut.gcp.pubsub.support.Order

@PubSubClient
interface OrderClient {
	@Topic(value = "orders", endpoint = "us-central1-pubsub.googleapis.com:443") (1)
	fun send(order: Order, @OrderingKey key: String) (2)
}
1 Ordering only works on regional endpoints, so you must declare an endpoint first
2 Although the Pub/Sub API requires the key to be a String, the framework uses a conversion service to convert other types to String.

Here’s an example of how to use ordering on your clients:

import io.micronaut.gcp.pubsub.support.Order;
import javax.inject.Singleton;

@Singleton
public class OrderService {

    private final OrderClient client;

    public OrderService(OrderClient client) {
        this.client = client;
    }

    public void placeOrder() {
        Order order = new Order(100, "GOOG");
        client.send(order, order.getSymbol());
    }

}
import io.micronaut.gcp.pubsub.support.Order
import javax.inject.Singleton

@Singleton
class OrderService {

    private final OrderClient client

    OrderService(OrderClient client) {
        this.client = client
    }

    void placeOrder() {
        Order order = new Order(100, "GOOG")
        client.send(order, order.getSymbol())
    }

}
import io.micronaut.gcp.pubsub.support.Order
import javax.inject.Singleton

@Singleton
class OrderService(private val client: OrderClient) {
	fun placeOrder() {
		val order = Order(100, "GOOG")
		client.send(order, order.symbol)
	}
}

8.6 Receiving messages via @PubSubListener methods

To start receiving messages you annotate a class with @PubSubListener, the framework will then use AOP to deliver messages to methods annotated with @Subscription.

Subscriptions

All methods annotated with @Subscription will be invoked by the framework. Each annotated method creates an individual Subscriber, that can be configured using the configuration parameter of the @Subscription annotation.

Methods annotated with @Subscription must be unique on your application. If two distinct methods try to subscribe to the same Subscription an error is thrown. This is intended to avoid issues with message Acknowledgement control.
The annotated method must have at least one argument that is bound to the body of the message or an exception is thrown.

Resource naming

Just as described on the Pub/Sub Publisher section, subscriptions examples are also using simple names such as animals. Inside Google Cloud however resources are only accessible via their FQN. A Subscription name follows the pattern: projects/$PROJECT_ID/subscriptions/$SUBSCRIPTION_NAME. Micronaut integration with GCP will automatically grab the default project id available (please refer to the section Google Project Id ) and convert the simple naming of the resource into a FQN. You can also pass a FQN as the subscription name. This is helpful when you need to listen to subscriptions from different projects.

import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubListener (1)
public class SimpleSubscriber {

    @Subscription("animals") (2)
    public void onMessage(Animal animal) {

    }

    @Subscription("projects/eu-project/subscriptions/animals") (3)
    public void onMessageEU(Animal animal) {

    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubListener (1)
class SimpleSubscriber {

    @Subscription("animals") (2)
    void onMessage(Animal animal) {

    }

    @Subscription("projects/eu-project/subscriptions/animals") (3)
    void onMessageEU(Animal animal) {

    }
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal

@PubSubListener (1)
class SimpleSubscriber {

	@Subscription("animals") (2)
	fun onMessage(animal: Animal) {

	}

	@Subscription("projects/eu-project/subscriptions/animals") (3)
	fun onMessageEU(animal: Animal) {

	}

}
1 @PubSubListener marks this class to be a Message Listener
2 Methods annotated with @Subscription will receive messages from that subscription
3 You can also use a FQN for the subscription, specially when you need to access a resource on a different project
When publishing to a different project, make sure your service account has the proper access to the resource.

8.6.1 Content-Type and message deserialization

The framework provides a custom serialization/deserialization (SerDes) mechanism for both message producers and message listeners. On the receiving end the rules to deserialize a PubSubMessage are the following:

  1. If the body argument of the method is of PubSubMessage type, SerDes is bypassed and the "raw" message is copied to the argument.

  2. If the body argument of the method is a byte array, SerDes is bypassed and the byte contents of the PubSubMessage are copied to the argument.

  3. If the body argument is a Pojo then the following applies:

    1. The default Content-Type is application/json and the framework will use it if not overridden

    2. If the message contains an attribute Content-Type that value is used

    3. Finally if the @Subscription has a contentType value this value overrides all of the previous values

Automatic SerDes is a nice feature that the framework offers, but sometimes you may need to have access to the PubSubMessage id. This is provided via the @MessageId annotation. Once you annotate an argument of type String with this annotation, the message id will be copied to that argument.

PubSubMessage ids are always of type String for that reason your annotated argument must also be a String.
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.MessageId;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubListener
public class ContentTypeSubscriber {

    @Subscription("raw-subscription") (1)
    void receiveRaw(byte[] data, @MessageId String id) {
    }

    @Subscription("native-subscription") (2)
    void receiveNative(PubsubMessage message) {
    }

    @Subscription("animals") (3)
    void receivePojo(Animal animal, @MessageId String id) {
    }

    @Subscription(value = "animals-legacy", contentType = "application/xml") (4)
    void receiveXML(Animal animal, @MessageId String id) {
    }

}
import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.MessageId;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubListener
class ContentTypeSubscriber {

    @Subscription("raw-subscription") (1)
    void receiveRaw(byte[] data, @MessageId String id) {
    }

    @Subscription("native-subscription") (2)
    void receiveNative(PubsubMessage message) {
    }

    @Subscription("animals") (3)
    void receivePojo(Animal animal, @MessageId String id) {
    }

    @Subscription(value = "animals-legacy", contentType = "application/xml") (4)
    void receiveXML(Animal animal, @MessageId String id) {
    }

}
import com.google.pubsub.v1.PubsubMessage
import io.micronaut.gcp.pubsub.annotation.MessageId
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal

@PubSubListener
class ContentTypeSubscriber {
	@Subscription("raw-subscription")
	fun	receiveRaw(data: ByteArray, @MessageId id: String) { (1)
	}

	@Subscription("native-subscription")
	fun  receiveNative(message: PubsubMessage) { (2)
	}

	@Subscription("animals")
	fun receivePojo(animal: Animal, @MessageId id: String) { (3)
	}

	@Subscription(value = "animals-legacy", contentType = "application/xml")
	fun  receiveXML(animal: Animal, @MessageId id: String) { (4)
	}
}
1 Bytes are copied, SerDes is bypassed, message id injected for usage
2 SerDes is bypassed, PubSubMessage object is copied, no need to use @MessageId
3 The framework will try to deserialize this payload. If no Content-Type header is found, will default to application/json
4 Uses a custom SerDes and the framework will find a PubSubMessageSerDes that can handle application/xml

8.6.2 Message Headers

Google Cloud Pub/Sub messages contain a dictionary of message attributes in the form of a Map<String, String>.

The framework binds those attributes to a @Header annotation that can be used at an argument of the method.

A ConversionService is used to try to convert from the String value of the attribute to the target type on the method.

import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.annotation.Header;

@PubSubListener
public class CustomHeaderSubscriber {

    @Subscription("animals")
    public void onMessage(Animal animal, @Header("Content-Type") String contentType, @Header("code") Integer code) { (1)

    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.annotation.Header;

@PubSubListener
class CustomHeaderSubscriber {

    @Subscription("animals")
    void onMessage(Animal animal, @Header("Content-Type") String contentType, @Header("code") Integer code) { (1)

    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.annotation.Header

@PubSubListener
class CustomHeaderSubscriber {

	@Subscription("animals")
	fun onMessage(animal: Animal,
				  @Header("Content-Type") contentType: String, @Header("code") code: Int) { (1)
	}
}
1 Each annotated argument will be mapped to the corresponding attribute value. The ConversionService will try to convert to the target type

8.6.3 Subscriber properties

Pub/Sub allows each Subscriber to have its own configuration for things such as executors or flow control settings.

The framework allows you to create configurations and then bind those configurations to each @Subscription using the configuration parameter.

Table 1. Configuration properties for SubscriberConfigurationProperties
Property Type Description

gcp.pubsub.subscriber.*.executor

java.lang.String

Name of the executor to use. Default: scheduled

gcp.pubsub.subscriber.*.parallel-pull-count

java.lang.Integer

number of concurrent pulls. Default: 1

gcp.pubsub.subscriber.*.max-ack-extension-period

org.threeten.bp.Duration

Set the maximum period a message ack deadline will be extended. Default: one hour.

gcp.pubsub.subscriber.*.max-duration-per-ack-extension

org.threeten.bp.Duration

Set the upper bound for a single mod ack extention period. Default: one hour.

gcp.pubsub.subscriber.*.flow-control.max-outstanding-element-count

java.lang.Long

Maximum number of outstanding elements to keep in memory before enforcing flow control. Default: 1000

gcp.pubsub.subscriber.*.flow-control.max-outstanding-request-bytes

java.lang.Long

Maximum number of outstanding bytes to keep in memory before enforcing flow control. Default: 100 * 1024 * 1024

gcp.pubsub.subscriber.*.flow-control.limit-exceeded-behavior

com.google.api.gax.batching.FlowController$LimitExceededBehavior

Default: LimitExceededBehavior.Block

Suppose you have the following configuration for a subscriber:

gcp:
  pubsub:
    subscriber:
      custom:
        executor: custom-executor
        parallel-pull-count: 4
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubListener
public class CustomConfigurationSubscriber {

    @Subscription(value = "animals", configuration = "custom") (1)
    public void onMessage(Animal animal) {

    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubListener
class CustomConfigurationSubscriber {

    @Subscription(value = "animals", configuration = "custom") (1)
    void onMessage(Animal animal) {

    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal

@PubSubListener
class CustomConfigurationSubscriber {

	@Subscription(value = "animals", configuration = "custom") (1)
	fun onMessage(animal: Animal) {

	}

}
1 The Subscriber will be configured using a configuration named custom

8.6.4 Handling message acknowledgement

The message delivery is handled by an IntroductionAdvice class PubSubConsumerAdvice. It invokes the method annotated with @Subscription. By default messages are auto acknowledged if the method returns without exceptions, see the section Consumer ErrorHandling for more information on error handling. It is possible to have manual acknowledgement control by adding an argument of type Acknowledge and manually invoking ack() or nack() methods.

Google Cloud Pub/Sub controls Acknowledgment at the Subscription level, please refer to the Pub/Sub Subscriber documentation for more information.

If you provide an Acknowledge type in your method and forget to invoke ack()/nack() the framework will log a warning message to let you know if you forgot to manually register an acknowledgement.
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.Acknowledgement;

@PubSubListener
public class AcknowledgementSubscriber {

    @Subscription("animals")
    public void onMessage(Animal animal, Acknowledgement acknowledgement) { (1)

    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.annotation.Subscription;
import io.micronaut.gcp.pubsub.support.Animal;
import io.micronaut.messaging.Acknowledgement;

@PubSubListener
class AcknowledgementSubscriber {

    @Subscription("animals")
    void onMessage(Animal animal, Acknowledgement acknowledgement) { (1)

    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.annotation.Subscription
import io.micronaut.gcp.pubsub.support.Animal
import io.micronaut.messaging.Acknowledgement

@PubSubListener
class AcknowledgementSubscriber {

	@Subscription("animals")
	fun onMessage(animal: Animal, acknowledgement: Acknowledgement) { (1)

	}
}

8.6.5 Consumer error handling

During message handling for listeners errors can happen at the framework or at your method level such as:

  • Problems binding method arguments to the message body

  • Content-Type deserialization issues

  • Acknowledgement

  • Uncaught exceptions at the annotated method

The framework provides a Global Error Handler DefaultPubSubMessageReceiverExceptionHandler, this handler will catch any errors and just log it. This behavior is far from ideal as messages would continue to be redelivered since they are not acknowledged.

There’s two ways you can provide error handling.

The PubSubMessageReceiverException contains references to the originating bean that threw the exception as well as a reference to PubSubConsumerState which has state information regarding the message handling.

import com.google.pubsub.v1.PubsubMessage;
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.bind.PubSubConsumerState;
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException;
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubListener
public class ErrorHandlingSubscriber implements PubSubMessageReceiverExceptionHandler { (1)
    /**
     *
     * @param animal payload
     */
    public void onMessage(Animal animal) {
        throw new RuntimeException("error");
    }

    @Override
    public void handle(PubSubMessageReceiverException exception) { (2)

        Object listener = exception.getListener(); (3)
        PubSubConsumerState state = exception.getState(); (4)
        PubsubMessage originalMessage = state.getPubsubMessage();
        String contentType = state.getContentType();
        //some logic
        state.getAckReplyConsumer().ack(); (5)

    }
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler
import io.micronaut.gcp.pubsub.support.Animal

@PubSubListener
class ErrorHandlingSubscriber implements PubSubMessageReceiverExceptionHandler { (1)

    void onMessage(Animal animal) {
        throw new RuntimeException("error");
    }

    @Override
    void handle(PubSubMessageReceiverException exception) { (2)

        def listener = exception.listener(3)
        def state = exception.state (4)
        def originalMessage = state.pubsubMessage
        def contentType = state.contentType
        //some logic
        state.getAckReplyConsumer().ack(); (5)

    }
}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverException
import io.micronaut.gcp.pubsub.exception.PubSubMessageReceiverExceptionHandler
import io.micronaut.gcp.pubsub.support.Animal

@PubSubListener
class ErrorHandlingSubscriber : PubSubMessageReceiverExceptionHandler { (1)

	fun onMessage(animal: Animal) {
		throw RuntimeException("error")
	}

	override fun handle(exception: PubSubMessageReceiverException) { (2)

		val listener = exception.listener (3)
		val state = exception.state (4)
		val originalMessage = state.pubsubMessage
		val contentType = state.contentType
		//some logic
		state.ackReplyConsumer.ack() (5)

	}
}
1 Implement PubSubMessageReceiverExceptionHandler to mark this bean as a local error handler
2 Method invoked if any error happens during the processing of this PubSubListener class
3 Reference to the class that originated the exception
4 Contains information related to the subscription
5 Depending on your use case you can ack() of nack() the message

8.6.6 Custom Parameter Binding

Default Binding Functionality

Consumer argument binding is achieved through an ArgumentBinderRegistry that is specific for binding consumers from Pub/Sub messages. The class responsible for this is the PubSubBinderRegistry. The registry supports argument binders that are used based on an annotation applied to the argument or the argument type. All argument binders must implement either PubSubAnnotatedArgumentBinder or PubSubTypeArgumentBinder. The exception to that rule is the PubSubDefaultArgumentBinder which is used when no other binders support a given argument.

When an argument needs bound, the PubSubConsumerState is used as the source of all of the available data. The binder registry follows a small sequence of steps to attempt to find a binder that supports the argument.

  1. Search the annotation based binders for one that matches any annotation on the argument that is annotated with @Bindable.

  2. Search the type based binders for one that matches or is a subclass of the argument type.

  3. Return the default binder.

Custom Binding

To inject your own argument binding behavior, it is as simple as registering a bean. The existing binder registry will inject it and include it in the normal processing.

Annotation Binding

A custom annotation can be created to bind consumer arguments. A custom binder can then be created to use that annotation and the PubSubConsumerState to supply a value for the argument. The value may in fact come from anywhere, however for the purposes of this documentation, we will show how you would create an annotation to bind the message publish time.

import io.micronaut.core.bind.annotation.Bindable;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PARAMETER})
@Bindable (1)
public @interface MessagePublishTime {
}
import io.micronaut.core.bind.annotation.Bindable;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target([ElementType.PARAMETER])
@Bindable (1)
@interface MessagePublishTime {
}
import io.micronaut.core.bind.annotation.Bindable

@MustBeDocumented
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.VALUE_PARAMETER)
@Bindable (1)
annotation class MessagePublishTime
1 The @Bindable annotation is required for the annotation to be considered for binding.
import com.google.protobuf.util.Timestamps;
import io.micronaut.core.convert.ArgumentConversionContext;
import io.micronaut.core.convert.ConversionService;

import javax.inject.Singleton;

@Singleton (1)
public class MessagePublishTimeAnnotationBinder implements PubSubAnnotatedArgumentBinder<MessagePublishTime> { (2)

    private final ConversionService<?> conversionService;

    public MessagePublishTimeAnnotationBinder(ConversionService<?> conversionService) { (3)
        this.conversionService = conversionService;
    }

    @Override
    public Class<MessagePublishTime> getAnnotationType() {
        return MessagePublishTime.class;
    }

    @Override
    public BindingResult<Object> bind(ArgumentConversionContext<Object> context, PubSubConsumerState source) {
        Long epochMillis = Timestamps.toMillis(source.getPubsubMessage().getPublishTime()); (4)
        return () -> conversionService.convert(epochMillis, context); (5)
    }
}
import com.google.protobuf.util.Timestamps
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService

import javax.inject.Singleton;

@Singleton (1)
class MessagePublishTimeAnnotationBinder implements PubSubAnnotatedArgumentBinder<MessagePublishTime> { (2)

    private final ConversionService<?> conversionService;

    public MessagePublishTimeAnnotationBinder(ConversionService<?> conversionService) { (3)
        this.conversionService = conversionService;
    }

    @Override
    Class<MessagePublishTime> getAnnotationType() {
        MessagePublishTime
    }

    @Override
    public BindingResult<Object> bind(ArgumentConversionContext<Object> context, PubSubConsumerState source) {
        def epochMillis = Timestamps.toMillis(source.pubsubMessage.publishTime) (4)
        return { -> conversionService.convert(epochMillis, context) } (5)
    }
}
import com.google.protobuf.util.Timestamps
import io.micronaut.core.bind.ArgumentBinder
import io.micronaut.core.bind.ArgumentBinder.BindingResult
import io.micronaut.core.convert.ArgumentConversionContext
import io.micronaut.core.convert.ConversionService
import javax.inject.Singleton

@Singleton (1)
class MessagePublishTimeAnnotationBinder(private val conversionService: ConversionService<*>) (3)
	: PubSubAnnotatedArgumentBinder<MessagePublishTime> (2)
{

	override fun getAnnotationType(): Class<MessagePublishTime> {
		return MessagePublishTime::class.java
	}

	override fun bind(context: ArgumentConversionContext<Any>, source: PubSubConsumerState): BindingResult<Any> {
		val epochMillis = Timestamps.toMillis(source.pubsubMessage.publishTime) (4)
		return ArgumentBinder.BindingResult { conversionService.convert(epochMillis, context) } (5)
	}


}
1 The class is made a bean by annotating with @Singleton.
2 The custom annotation is used as the generic type for the interface.
3 The conversion service is injected into the instance.
4 The message publish time is retrieved from the message state.
5 The value is converted to the argument type. For example this allows the argument to be a String even though the epochMillis is a Long.
You could also return a com.google.protobuf.Timestamp if you register the appropriate type converter to the conversion service, but such example is outside of the scope of this documentation.

The annotation can now be used on the argument in a consumer method.

import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubListener
public class CustomBindingSubscriber {

    /**
     *
     * @param animal payload
     * @param publishTime message publish time
     */
    public void onMessage(Animal animal, @MessagePublishTime Long publishTime) { (1)

    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener;
import io.micronaut.gcp.pubsub.support.Animal;

@PubSubListener
class CustomBindingSubscriber {

    /**
     *
     * @param animal payload
     * @param publishTime message publish time
     */
    void onMessage(Animal animal, @MessagePublishTime Long publishTime) { (1)

    }

}
import io.micronaut.gcp.pubsub.annotation.PubSubListener
import io.micronaut.gcp.pubsub.support.Animal

@PubSubListener
class CustomBindingSubscriber {
	/**
	 *
	 * @param animal payload
	 * @param publishTime message publish time
	 */
	fun onMessage(animal: Animal, @MessagePublishTime publishTime: Long) { (1)

	}
}
1 The message publish time will now be passed to the annotated argument

8.7 Message Serialization/Deserialization (SerDes)

The serialization and deserialization of message bodies is handled through instances of PubSubMessageSerDes. The ser-des (Serializer/Deserializer) is responsible for both serialization and deserialization of Pub/Sub message bodies into the message body types defined in your clients and consumers methods.

The ser-des are managed by a PubSubMessageSerDesRegistry. All ser-des beans are injected in order into the registry and then searched for when serialization or deserialization is needed. The search is based on the Content-Type defined for the message, the framework default is application/json. If a ser-des can’t be located an exception is thrown. You can supply your own ser-des by simply registering a bean of type PuvSubMessageSerDes.

For example let’s say you want to implement a ser-des that uses java native object serialization instead of the Json serialization provided. First you need to define a custom mimeType for that, we will use application/x.java, following the best principles for handling mime types.

This is a fictional example, java serialization is not ideal, and its not portable.
import io.micronaut.core.serialize.exceptions.SerializationException;
import io.micronaut.core.type.Argument;

import javax.inject.Singleton;
import java.io.*;

@Singleton (1)
public class JavaMessageSerDes implements PubSubMessageSerDes {

    @Override
    public String supportedType() { (2)
        return "application/x.java";
    }

    @Override
    public Object deserialize(byte[] data, Argument<?> type) {
        ByteArrayInputStream bin = new ByteArrayInputStream(data);
        Object result = null;
        try {
            ObjectInputStream reader = new ObjectInputStream(bin);
            result = reader.readObject();
        } catch (Exception e) {
            throw new SerializationException("Failed to deserialize object", e);
        }
        return result;
    }

    @Override
    public byte[] serialize(Object data) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try {
            ObjectOutputStream writer = new ObjectOutputStream(baos);
            writer.writeObject(data);
        } catch (IOException e) {
            throw new SerializationException("Failed to serialize object", e);
        }
        return baos.toByteArray();
    }

}
import io.micronaut.core.serialize.exceptions.SerializationException;
import io.micronaut.core.type.Argument;

import javax.inject.Singleton;
import java.io.*;

@Singleton (1)
class JavaMessageSerDes implements PubSubMessageSerDes {

    @Override
    String supportedType() { (2)
        "application/x.java"
    }

    @Override
    public Object deserialize(byte[] data, Argument<?> type) {
        ByteArrayInputStream bin = new ByteArrayInputStream(data)
        Object result = null
        try {
            ObjectInputStream reader = new ObjectInputStream(bin)
            result = reader.readObject();
        } catch (Exception e) {
            throw new SerializationException("Failed to deserialize object", e)
        }
        return result;
    }

    @Override
    byte[] serialize(Object data) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream()
        try {
            ObjectOutputStream writer = new ObjectOutputStream(baos)
            writer.writeObject(data)
        } catch (IOException e) {
            throw new SerializationException("Failed to serialize object", e)
        }
        return baos.toByteArray()
    }

}
import io.micronaut.core.serialize.exceptions.SerializationException
import io.micronaut.core.type.Argument
import java.io.*
import javax.inject.Singleton

@Singleton (1)
class JavaMessageSerDes : PubSubMessageSerDes {

	override fun supportedType(): String { (2)
		return "application/x.java"
	}

	override fun deserialize(data: ByteArray, type: Argument<*>): Any {
		val bin = ByteArrayInputStream(data)
		var result: Any? = null
		result = try {
			val reader = ObjectInputStream(bin)
			reader.readObject()
		} catch (e: Exception) {
			throw SerializationException("Failed to deserialize object", e)
		}
		return result
	}

	override fun serialize(data: Any): ByteArray {
		val baos = ByteArrayOutputStream()
		try {
			val writer = ObjectOutputStream(baos)
			writer.writeObject(data)
		} catch (e: IOException) {
			throw SerializationException("Failed to serialize object", e)
		}
		return baos.toByteArray()
	}
}
1 The class is declared as a singleton so it will be registered with the context
2 The contentType defines what kind of messages will this implementation be able to handle

To publish messages with this ser-des set the Content-Type for @Topic

@Topic(value = "animals", contentType = "application/x.java")

If the messages that are arriving on your subscriber already contain a Content-Type header with this type the framework will pick it up, or you can just force it on the @Suibscription annotation too.

@Subscription(value = "animals", contentType = "application/x.java")

8.8 Configuring Thread pools

The framework allows users to configure separate thread pools to be used for Publishers and Subscriber. You can set a reference to a pre-defined executor at Pub/Sub configuration properties, or create custom configurations for publishers and subscribers. Micronaut supports definition of custom ExecutorService implementations, see ExecutorConfiguration for the full list of options.

Pub/Sub client libraries require an ScheduledExecutorService for both Publisher and Subscriber make sure your custom executor is of type scheduled or an error will be thrown.

For example:

Configuring the custom thread pool
micronaut:
    executors:
        custom:
            type: scheduled
            nThreads: 32

If no configuration is supplied, the framework will use the default named scheduled executor.

8.9 Using Google Cloud Pub/Sub emulator

Google Cloud Pub/Sub has a local emulator to enable developers to test their applications locally with no need to connect to the cloud Pub/Sub service.

The framework supports automatic switching of the TransportChannelProvider to use PUBSUB_EMULATOR_HOST if this variable is set on the environment.

Make sure that you also set GCP_PROJECT_ID to be the same as the project you have configured the emulator to use. Otherwise the framework may pick up the projectId used by the default credentials.

You will need to manually configure topics and subscriptions on the emulator if you want to test locally. You can easily create resources using the Pub/Sub REST interface to create topics and subscriptions.

For instance, the following curl command would create a topic named micronaut-devices on the test-project project.

curl -XPUT $PUBSUB_EMULATOR_HOST/v1/projects/test-project/topics/micronaut-devices