Micronaut R2DBC

Integration with Micronaut and R2DBC

Version:

1 Introduction

This module provides integration between Micronaut and R2DBC.

2 Release History

For this project, you can find a list of releases (with release notes) here:

3 Quick Start

The quickest way to get started is to create a new Micronaut application with Micronaut Launch and choose the data-r2dbc, mysql and flyway features. This can also be done via the Micronaut 2.2 and above CLI:

Creating an application with the CLI
# For Maven add: --build maven
$ mn create-app --lang java example --features data-r2dbc,flyway,mysql

Or via curl:

Creating an application with curl
# For Maven add to the URL: &build=maven
$ curl https://launch.micronaut.io/demo.zip?lang=java&features=data-r2dbc,flyway,mysql -o demo.zip && unzip demo.zip -d demo && cd demo

The generated application will use MySQL since we passed the mysql feature adding dependency on the R2DBC driver for MySQL:

runtimeOnly("dev.miku:r2dbc-mysql")
<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <scope>runtime</scope>
</dependency>

And for flyway the JDBC driver:

runtimeOnly("mysql:mysql-connector-java")
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <scope>runtime</scope>
</dependency>

To create configurations for other drivers you can select the appropriate feature: oracle, postgres, sqlserver, h2 or mariadb.

Now define a SQL script that creates your initial schema in src/main/resources/db/migration. For example:

Example V1__create-schema.sql
CREATE TABLE book(id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, title VARCHAR(255), pages INT, author_id BIGINT NOT NULL);
CREATE TABLE author(id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255));

You can now configure your application to connect to the database using src/main/resources/application.yml which contains the application configuration:

Example application.yml
flyway: (1)
  datasources:
    default:
      enabled: true
datasources:
  default: (2)
    url: jdbc:mysql://localhost:3306/mydatabase
r2dbc:
  datasources:
    default: (3)
      url: r2dbc:mysql:///mydatabase
1 The Flyway configuration ensures the schema migration is applied. See Micronaut Flyway for more information.
2 The Flyway configuration needs a JDBC datasource configured, this setting configures one. See Micronaut JDBC for more information.
3 The property r2dbc.datasources.default.url is used to configure the default R2DBC ConnectionFactory
The R2DBC ConnectionFactory object can be injected anywhere in your code with dependency injection.

Now define a @MappedEntity that maps to the author table defined in the schema:

package example;

import io.micronaut.data.annotation.*;

@MappedEntity
public class Author {
    @GeneratedValue
    @Id
    private Long id;
    private final String name;

    public Author(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }
}
package example

import io.micronaut.data.annotation.*

@MappedEntity
class Author {
    @GeneratedValue
    @Id
    Long id
    final String name

    Author(String name) {
        this.name = name
    }
}
package example

import io.micronaut.data.annotation.GeneratedValue
import io.micronaut.data.annotation.Id
import io.micronaut.data.annotation.MappedEntity

@MappedEntity
data class Author(val name: String) {
    @GeneratedValue
    @Id
    var id: Long? = null
}

And a repository interface to access the database that extends from ReactiveStreamsRepository:

package example;

import edu.umd.cs.findbugs.annotations.NonNull;
import io.micronaut.data.model.query.builder.sql.Dialect;
import io.micronaut.data.r2dbc.annotation.R2dbcRepository;
import io.micronaut.data.repository.reactive.ReactiveStreamsCrudRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.validation.constraints.NotNull;

@R2dbcRepository(dialect = Dialect.MYSQL) (1)
public interface AuthorRepository extends ReactiveStreamsCrudRepository<Author, Long> {
    @NonNull
    @Override
    Mono<Author> findById(@NonNull @NotNull Long aLong); (2)

    @NonNull
    @Override
    Flux<Author> findAll();
}
package example

import edu.umd.cs.findbugs.annotations.NonNull
import io.micronaut.data.model.query.builder.sql.Dialect
import io.micronaut.data.r2dbc.annotation.R2dbcRepository
import io.micronaut.data.repository.reactive.ReactiveStreamsCrudRepository
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

import javax.validation.constraints.NotNull

@R2dbcRepository(dialect = Dialect.MYSQL) (1)
interface AuthorRepository extends ReactiveStreamsCrudRepository<Author, Long> {
    @NonNull
    @Override
    Mono<Author> findById(@NonNull @NotNull Long aLong) (2)

    @NonNull
    @Override
    Flux<Author> findAll()
}
package example

import io.micronaut.data.model.query.builder.sql.Dialect
import io.micronaut.data.r2dbc.annotation.R2dbcRepository
import io.micronaut.data.repository.reactive.ReactiveStreamsCrudRepository
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import javax.validation.constraints.NotNull

@R2dbcRepository(dialect = Dialect.MYSQL) (1)
interface AuthorRepository : ReactiveStreamsCrudRepository<Author, Long> {
    override fun findById(id: @NotNull Long): Mono<Author> (2)
    override fun findAll(): Flux<Author>
}
1 The @R2dbcRepository annotation can be used to specify the datasource and dialect
2 You can override methods from the super interface to specialize the default Publisher return type with a concrete implementation

You can now inject this interface into controllers and use it to perform R2DBC queries:

package example;

import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Controller("/authors")
public class AuthorController {
    private final AuthorRepository repository;

    public AuthorController(AuthorRepository repository) {
        this.repository = repository;
    }

    @Get
    Flux<Author> all() { (1)
        return repository.findAll();
    }

    @Get("/id")
    Mono<Author> get(Long id) { (2)
        return repository.findById(id);
    }
}
package example

import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

@Controller("/authors")
class AuthorController {
    private final AuthorRepository repository

    AuthorController(AuthorRepository repository) {
        this.repository = repository
    }

    @Get
    Flux<Author> all() { (1)
        return repository.findAll()
    }

    @Get("/id")
    Mono<Author> get(Long id) { (2)
        return repository.findById(id)
    }
}
package example

import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

@Controller("/authors")
class AuthorController(private val repository: AuthorRepository) {
    @Get
    fun all(): Flux<Author> { (1)
        return repository.findAll()
    }

    @Get("/id")
    fun get(id: Long): Mono<Author> { (2)
        return repository.findById(id)
    }
}
1 By returning a reactive type that emits many items you can stream data (either Flowable or Flux)
2 By returning a reactive type that emits a single item you return the entire response (either Single or Mono)

4 Available Drivers

The following drivers are available as of this writing.

H2

R2DBC Driver:

runtimeOnly("io.r2dbc:r2dbc-h2")
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-h2</artifactId>
    <scope>runtime</scope>
</dependency>

And for Flyway migrations the JDBC driver:

runtimeOnly("com.h2database:h2")
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
</dependency>

MySQL

R2DBC Driver:

runtimeOnly("dev.miku:r2dbc-mysql")
<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <scope>runtime</scope>
</dependency>

And for Flyway migrations the JDBC driver:

runtimeOnly("mysql:mysql-connector-java")
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <scope>runtime</scope>
</dependency>

MariaDB

R2DBC Driver:

runtimeOnly("org.mariadb:r2dbc-mariadb:1.0.0")
<dependency>
    <groupId>org.mariadb</groupId>
    <artifactId>r2dbc-mariadb</artifactId>
    <version>1.0.0</version>
    <scope>runtime</scope>
</dependency>

And for Flyway migrations the JDBC driver:

runtimeOnly("org.mariadb.jdbc:mariadb-java-client")
<dependency>
    <groupId>org.mariadb.jdbc</groupId>
    <artifactId>mariadb-java-client</artifactId>
    <scope>runtime</scope>
</dependency>

Postgresql

R2DBC Driver:

runtimeOnly("io.r2dbc:r2dbc-postgresql")
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-postgresql</artifactId>
    <scope>runtime</scope>
</dependency>

And for Flyway migrations the JDBC driver:

runtimeOnly("org.postgresql:postgresql")
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <scope>runtime</scope>
</dependency>

SQL Server

R2DBC Driver:

runtimeOnly("io.r2dbc:r2dbc-mssql")
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-mssql</artifactId>
    <scope>runtime</scope>
</dependency>

And for Flyway migrations the JDBC driver:

runtimeOnly("com.microsoft.sqlserver:mssql-jdbc")
<dependency>
    <groupId>com.microsoft.sqlserver</groupId>
    <artifactId>mssql-jdbc</artifactId>
    <scope>runtime</scope>
</dependency>

5 Micronaut Data R2DBC

The R2DBC module features support for defining reactive Micronaut Data repositories that use the underlying R2DBC driver to automatically implement queries for your at compilation time.

5.1 Writing Queries

To write queries with Micronaut Data R2DBC you can use all the same patterns as defined in the Micronaut Data documentation on querying.

The method pattern is essentially:

finderpattern
Figure 1. Query Method Pattern

The Reactive return type is used to figure out whether the query is for a single result or multiple. If you use for example Single or Reactor’s Mono type a single result will be returned, whilst a Flowable or Reactor Flux will return multiple records.

You can pass a Pageable instance to any query to paginate results, although note that the Page return type is not supported since it is an implicitly blocking API.

5.2 Performing CRUD Operations

The ReactiveStreamsCrudRepository interface exposes methods to perform Create, Read, Update and Delete (CRUD) operations reactively.

To create a new instance you can use the save method:

@Post("/")
Single<Book> create(@Valid Book book) {
    return Single.fromPublisher(bookRepository.save(book));
}
@Post("/")
Single<Book> create(@Valid Book book) {
    return Single.fromPublisher(bookRepository.save(book))
}
@Post("/")
fun create(book: @Valid Book): Single<Book> {
    return Single.fromPublisher(bookRepository.save(book))
}

To read objects you can use findAll or findById:

@Get("/")
Flux<Book> all() {
    return bookRepository.findAll(); (1)
}

@Get("/{id}")
Mono<Book> show(Long id) {
    return bookRepository.findById(id); (2)
}
@Get("/")
Flux<Book> all() {
    return bookRepository.findAll() (1)
}

@Get("/{id}")
Mono<Book> show(Long id) {
    return bookRepository.findById(id) (2)
}
@Get("/")
fun all(): Flux<Book> {
    return bookRepository.findAll() (1)
}

@Get("/{id}")
fun show(id: Long): Mono<Book> {
    return bookRepository.findById(id) (2)
}

To perform an update you can use the update method:

@Put("/{id}")
Single<Book> update(@NotNull Long id, @Valid Book book) {
    return Single.fromPublisher(bookRepository.update(book));
}
@Put("/{id}")
Single<Book> update(@NotNull Long id, @Valid Book book) {
    return Single.fromPublisher(bookRepository.update(book))
}
@Put("/{id}")
fun update(id: Long, book: @Valid Book): Single<Book> {
    return Single.fromPublisher(bookRepository.update(book))
}

Finally to delete an instance you can use the deleteById method:

@Delete("/{id}")
Single<HttpResponse<?>> delete(@NotNull Long id) {
    return Single.fromPublisher(bookRepository.deleteById(id))
            .map(deleted -> deleted > 0 ? HttpResponse.noContent() : HttpResponse.notFound());
}
@Delete("/{id}")
Single<HttpResponse<?>> delete(@NotNull Long id) {
    return Single.fromPublisher(bookRepository.deleteById(id))
            .map(deleted -> deleted > 0 ? HttpResponse.noContent() : HttpResponse.notFound())
}
@Delete("/{id}")
fun delete(id: Long): Single<HttpResponse<*>> {
    return Single.fromPublisher(bookRepository.deleteById(id))
            .map { deleted: Long -> if (deleted > 0) HttpResponse.noContent() else HttpResponse.notFound<Any>() }
}

For more information on the possible write operations, see the documentation on Data Updates in the Micronaut Data documentation which also apply to R2DBC.

5.3 Micronaut Data Transactions with R2DBC

Micronaut Data R2DBC features Reactive transaction management support whereby you can declare javax.transaction.Transactional on your methods and a reactive transaction will be initiated, for example:

package example;

import reactor.core.publisher.Mono;

import javax.inject.Singleton;
import javax.transaction.Transactional;
import java.util.Arrays;

@Singleton
public class AuthorService {
    private final AuthorRepository authorRepository;
    private final BookRepository bookRepository;

    public AuthorService(AuthorRepository authorRepository, BookRepository bookRepository) { (1)
        this.authorRepository = authorRepository;
        this.bookRepository = bookRepository;
    }

    @Transactional (2)
    Mono<Void> setupData() {
        return Mono.from(authorRepository.save(new Author("Stephen King")))
                .flatMapMany((author -> bookRepository.saveAll(Arrays.asList(
                        new Book("The Stand", 1000, author),
                        new Book("The Shining", 400, author)
                ))))
                .then(Mono.from(authorRepository.save(new Author("James Patterson"))))
                .flatMapMany((author ->
                        bookRepository.save(new Book("Along Came a Spider", 300, author))
                )).then();
    }
}
package example

import reactor.core.publisher.Mono

import javax.inject.Singleton
import javax.transaction.Transactional

@Singleton
class AuthorService {
    private final AuthorRepository authorRepository
    private final BookRepository bookRepository

    AuthorService(AuthorRepository authorRepository, BookRepository bookRepository) { (1)
        this.authorRepository = authorRepository
        this.bookRepository = bookRepository
    }

    @Transactional (2)
    Mono<Void> setupData() {
        return Mono.from(authorRepository.save(new Author("Stephen King")))
                .flatMapMany((author -> bookRepository.saveAll([
                        new Book("The Stand", 1000, author),
                        new Book("The Shining", 400, author)
                ])))
                .then(Mono.from(authorRepository.save(new Author("James Patterson"))))
                .flatMapMany((author ->
                        bookRepository.save(new Book("Along Came a Spider", 300, author))
                )).then()
    }
}
package example

import reactor.core.publisher.Mono
import javax.inject.Singleton
import javax.transaction.Transactional

@Singleton
open class AuthorService(
        private val authorRepository: AuthorRepository,
        private val bookRepository: BookRepository) { (1)

    @Transactional (2)
    open fun setupData(): Mono<Void> {
        return Mono.from(authorRepository.save(Author("Stephen King")))
                .flatMapMany { author: Author ->
                    bookRepository.saveAll(listOf(
                            Book("The Stand", 1000, author),
                            Book("The Shining", 400, author)
                    ))
                }
                .then(Mono.from(authorRepository.save(Author("James Patterson"))))
                .flatMapMany { author: Author ->
                    bookRepository.save(Book("Along Came a Spider", 300, author))
                }.then()
    }
}
1 Supporting repositories are injected
2 @Transactional is used to declare a transaction

This same declarative logic can be done programmatically as well by injecting the R2dbcOperations interface:

Mono.from(operations.withTransaction(status ->
    Flux.from(authorRepository.save(new Author("Stephen King")))
            .flatMap((author -> bookRepository.saveAll(Arrays.asList(
                    new Book("The Stand", 1000, author),
                    new Book("The Shining", 400, author)
            ))))
    .thenMany(Flux.from(authorRepository.save(new Author("James Patterson"))))
        .flatMap((author ->
                bookRepository.save(new Book("Along Came a Spider", 300, author))
    )).then()
)).block();
Mono.from(operations.withTransaction(status ->
        Flux.from(authorRepository.save(new Author("Stephen King")))
                .flatMap((author -> bookRepository.saveAll([
                        new Book("The Stand", 1000, author),
                        new Book("The Shining", 400, author)
                ])))
                .thenMany(Flux.from(authorRepository.save(new Author("James Patterson"))))
                .flatMap((author ->
                        bookRepository.save(new Book("Along Came a Spider", 300, author))
                )).then()
)).block()
Mono.from(operations.withTransaction {
    Flux.from(authorRepository.save(Author("Stephen King")))
            .flatMap { author: Author ->
                bookRepository.saveAll(listOf(
                        Book("The Stand", 1000, author),
                        Book("The Shining", 400, author)
                ))
            }
            .thenMany(Flux.from(authorRepository.save(Author("James Patterson"))))
            .flatMap { author: Author -> bookRepository.save(Book("Along Came a Spider", 300, author)) }.then()
}).block()

In the above case the withTransaction method is used to initiate a transaction.

Note however, that transaction management is possibly one of the most challenging areas to get right in reactive programming since you need to propagate the transaction across the reactive flow.

Most R2DBC drivers are implemented in Project Reactor which has the ability to propagate a context across reactive operators and Micronaut Data R2DBC will populate this context and ensure the transaction is re-used if it is found within it.

However, it is still pretty easy for the context to be lost since different libraries that implement Reactive Streams don’t propagate contexts between each other so if you include RxJava or any other reactive operator library it is likely the context will be lost.

To ensure this doesn’t happen it is recommended that you annotate write operations that participate within a transaction as MANDATORY which ensures it is not possible to run these methods without a surrounding transaction present so that if the transaction is somehow lost within the reactive flow it doesn’t cause operations to be run in separate transactions:

@NonNull
@Override
@Transactional(Transactional.TxType.MANDATORY)
<S extends Book> Publisher<S> save(@NonNull @Valid @NotNull S entity);

@NonNull
@Override
@Transactional(Transactional.TxType.MANDATORY)
<S extends Book> Publisher<S> saveAll(@NonNull @Valid @NotNull Iterable<S> entities);
@NonNull
@Override
@Transactional(Transactional.TxType.MANDATORY)
<S extends Book> Publisher<S> save(@NonNull @Valid @NotNull S entity);

@NonNull
@Override
@Transactional(Transactional.TxType.MANDATORY)
<S extends Book> Publisher<S> saveAll(@NonNull @Valid @NotNull Iterable<S> entities);
@Transactional(Transactional.TxType.MANDATORY)
override fun <S : Book?> save(entity: @Valid @NotNull S): Publisher<S>

@Transactional(Transactional.TxType.MANDATORY)
override fun <S : Book?> saveAll(entities: @Valid @NotNull Iterable<S>): Publisher<S>

If the transaction is somehow lost during the reactive flow there are a couple of ways you can solve the problem. One way is to use the withTransaction method of the R2dbcOperations interface to obtain the current ReactiveTransactionStatus, you can then pass this instance into another execution of the withTransaction method or pass it directly as the last argument to any method declared on the repository itself.

An example of the former approach is presented below, using RxJava 2 this time which will cause propagation loss:

Flowable.fromPublisher(operations.withTransaction(status -> (1)
        Flowable.fromPublisher(authorRepository.save(new Author("Michael Crichton")))
                .flatMap((author -> operations.withTransaction(status, (s) -> (2)
                        bookRepository.saveAll(Arrays.asList(
                                new Book("Jurassic Park", 300, author),
                                new Book("Disclosure", 400, author)
                        )))))
)).blockingSubscribe();
Flowable.fromPublisher(operations.withTransaction(status -> (1)
        Flowable.fromPublisher(authorRepository.save(new Author("Michael Crichton")))
                .flatMap((author -> operations.withTransaction(status, (s) -> (2)
                        bookRepository.saveAll([
                                new Book("Jurassic Park", 300, author),
                                new Book("Disclosure", 400, author)
                        ]))))
)).blockingSubscribe()
Flowable.fromPublisher(operations.withTransaction { status: ReactiveTransactionStatus<Connection> ->  (1)
    Flowable.fromPublisher(authorRepository.save(Author("Michael Crichton")))
            .flatMap { author: Author ->
                operations.withTransaction(status) {   (2)
                    bookRepository.saveAll(listOf(
                            Book("Jurassic Park", 300, author),
                            Book("Disclosure", 400, author)
                    ))
                }
            }
}).blockingSubscribe()
1 An outer withTransaction call starts the transaction
2 An inner call ensures the existing transaction is propagated

6 R2DBC Clients

By default R2DBC is not opinonated about clients, therefore you can use the R2DBC ConnectionFactory with any external R2DBC client you choose.

RxJava 2 Client (Experimental)

As part of this project there is an experimental RxJava 2 client which you can add with the following dependency:

implementation("io.micronaut:io.micronaut.r2dbc:micronaut-r2dbc-rxjava2")
<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>io.micronaut.r2dbc</artifactId>
    <version>micronaut-r2dbc-rxjava2</version>
</dependency>

Which allows you to inject a RxConnectionFactory bean that allows you to do basic operations with R2DBC and RxJava 2.

7 Repository

You can find the source code of this project in this repository: