BAEL-2378 Non-blocking Spring Boot with Kotlin Coroutines (#6966)

This commit is contained in:
Alexander Molochko 2019-07-13 12:31:50 +03:00 committed by ashleyfrieze
parent 7909ec9081
commit 328a0d1e47
17 changed files with 1388 additions and 823 deletions

1647
pom.xml

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,2 @@
### Relevant Articles:
- [Non-blocking Spring Boot with Kotlin Coroutines](http://www.baeldung.com/non-blocking-spring-boot-with-kotlin-coroutines)

166
spring-boot-kotlin/pom.xml Normal file
View File

@ -0,0 +1,166 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-boot-kotlin</artifactId>
<name>spring-boot-kotlin</name>
<packaging>jar</packaging>
<description>Demo project showing how to use non-blocking in Kotlin with Spring Boot</description>
<parent>
<artifactId>parent-kotlin</artifactId>
<groupId>com.baeldung</groupId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../parent-kotlin</relativePath>
</parent>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
<version>${kotlinx-coroutines.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactor</artifactId>
<version>${kotlinx-coroutines.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-r2dbc</artifactId>
<version>${r2dbc.version}</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<version>${h2-r2dbc.version}</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi</artifactId>
<version>${r2dbc-spi.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<configuration>
<args>
<arg>-Xjsr305=strict</arg>
</args>
<compilerPlugins>
<plugin>spring</plugin>
</compilerPlugins>
</configuration>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-allopen</artifactId>
<version>${kotlin.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
<properties>
<kotlin.version>1.3.31</kotlin.version>
<r2dbc.version>1.0.0.M1</r2dbc.version>
<r2dbc-spi.version>1.0.0.M7</r2dbc-spi.version>
<h2-r2dbc.version>1.0.0.BUILD-SNAPSHOT</h2-r2dbc.version>
<kotlinx-coroutines.version>1.2.1</kotlinx-coroutines.version>
<spring-boot.version>2.2.0.M2</spring-boot.version>
</properties>
</project>

View File

@ -0,0 +1,11 @@
package com.baeldung.nonblockingcoroutines
import org.springframework.boot.SpringApplication.run
import org.springframework.boot.autoconfigure.SpringBootApplication
@SpringBootApplication
class SpringApplication
fun main(args: Array<String>) {
run(SpringApplication::class.java, *args)
}

View File

@ -0,0 +1,32 @@
package com.baeldung.nonblockingcoroutines.config
import io.r2dbc.h2.H2ConnectionConfiguration
import io.r2dbc.h2.H2ConnectionFactory
import io.r2dbc.spi.ConnectionFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories
@Configuration
@EnableR2dbcRepositories
class DatastoreConfig : AbstractR2dbcConfiguration() {
@Value("\${spring.datasource.username}")
private val userName: String = ""
@Value("\${spring.datasource.password}")
private val password: String = ""
@Value("\${spring.datasource.dbname}")
private val dbName: String = ""
@Bean
override fun connectionFactory(): ConnectionFactory {
return H2ConnectionFactory(H2ConnectionConfiguration.builder()
.inMemory(dbName)
.username(userName)
.password(password)
.build())
}
}

View File

@ -0,0 +1,19 @@
package com.baeldung.nonblockingcoroutines.config
import com.baeldung.nonblockingcoroutines.handlers.ProductsHandler
import kotlinx.coroutines.FlowPreview
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.function.server.coRouter
@Configuration
class RouterConfiguration {
@FlowPreview
@Bean
fun productRoutes(productsHandler: ProductsHandler) = coRouter {
GET("/", productsHandler::findAll)
GET("/{id}", productsHandler::findOne)
GET("/{id}/stock", productsHandler::findOneInStock)
}
}

View File

@ -0,0 +1,12 @@
package com.baeldung.nonblockingcoroutines.config
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.function.client.WebClient
@Configuration
class WebClientConfiguration {
@Bean
fun webClient() = WebClient.builder().baseUrl("http://localhost:8080").build()
}

View File

@ -0,0 +1,49 @@
package com.baeldung.nonblockingcoroutines.controller
import com.baeldung.nonblockingcoroutines.model.Product
import com.baeldung.nonblockingcoroutines.repository.ProductRepository
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.MediaType
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.bodyToMono
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
class ProductController {
@Autowired
lateinit var webClient: WebClient
@Autowired
lateinit var productRepository: ProductRepository
@GetMapping("/{id}")
fun findOne(@PathVariable id: Int): Mono<Product> {
return productRepository
.getProductById(id)
}
@GetMapping("/{id}/stock")
fun findOneInStock(@PathVariable id: Int): Mono<ProductStockView> {
val product = productRepository.getProductById(id)
val stockQuantity = webClient.get()
.uri("/stock-service/product/$id/quantity")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono<Int>()
return product.zipWith(stockQuantity) { productInStock, stockQty ->
ProductStockView(productInStock, stockQty)
}
}
@GetMapping("/stock-service/product/{id}/quantity")
fun getStockQuantity(): Mono<Int> {
return Mono.just(2)
}
@GetMapping("/")
fun findAll(): Flux<Product> {
return productRepository.getAllProducts()
}
}

View File

@ -0,0 +1,49 @@
package com.baeldung.nonblockingcoroutines.controller
import com.baeldung.nonblockingcoroutines.model.Product
import com.baeldung.nonblockingcoroutines.repository.ProductRepositoryCoroutines
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.MediaType.APPLICATION_JSON
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.awaitBody
import org.springframework.web.reactive.function.client.awaitExchange
class ProductControllerCoroutines {
@Autowired
lateinit var webClient: WebClient
@Autowired
lateinit var productRepository: ProductRepositoryCoroutines
@GetMapping("/{id}")
suspend fun findOne(@PathVariable id: Int): Product? {
return productRepository.getProductById(id)
}
@GetMapping("/{id}/stock")
suspend fun findOneInStock(@PathVariable id: Int): ProductStockView {
val product: Deferred<Product?> = GlobalScope.async {
productRepository.getProductById(id)
}
val quantity: Deferred<Int> = GlobalScope.async {
webClient.get()
.uri("/stock-service/product/$id/quantity")
.accept(APPLICATION_JSON)
.awaitExchange().awaitBody<Int>()
}
return ProductStockView(product.await()!!, quantity.await())
}
@FlowPreview
@GetMapping("/")
fun findAll(): Flow<Product> {
return productRepository.getAllProducts()
}
}

View File

@ -0,0 +1,15 @@
package com.baeldung.nonblockingcoroutines.controller
import com.baeldung.nonblockingcoroutines.model.Product
class ProductStockView(product: Product, var stockQuantity: Int) {
var id: Int = 0
var name: String = ""
var price: Float = 0.0f
init {
this.id = product.id
this.name = product.name
this.price = product.price
}
}

View File

@ -0,0 +1,49 @@
package com.baeldung.nonblockingcoroutines.handlers
import com.baeldung.nonblockingcoroutines.controller.ProductStockView
import com.baeldung.nonblockingcoroutines.model.Product
import com.baeldung.nonblockingcoroutines.repository.ProductRepositoryCoroutines
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.MediaType
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.awaitBody
import org.springframework.web.reactive.function.client.awaitExchange
import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse
import org.springframework.web.reactive.function.server.bodyAndAwait
import org.springframework.web.reactive.function.server.json
@Component
class ProductsHandler(
@Autowired var webClient: WebClient,
@Autowired var productRepository: ProductRepositoryCoroutines) {
@FlowPreview
suspend fun findAll(request: ServerRequest): ServerResponse =
ServerResponse.ok().json().bodyAndAwait(productRepository.getAllProducts())
suspend fun findOneInStock(request: ServerRequest): ServerResponse {
val id = request.pathVariable("id").toInt()
val product: Deferred<Product?> = GlobalScope.async {
productRepository.getProductById(id)
}
val quantity: Deferred<Int> = GlobalScope.async {
webClient.get()
.uri("/stock-service/product/$id/quantity")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange().awaitBody<Int>()
}
return ServerResponse.ok().json().bodyAndAwait(ProductStockView(product.await()!!, quantity.await()))
}
suspend fun findOne(request: ServerRequest): ServerResponse {
val id = request.pathVariable("id").toInt()
return ServerResponse.ok().json().bodyAndAwait(productRepository.getProductById(id)!!)
}
}

View File

@ -0,0 +1,7 @@
package com.baeldung.nonblockingcoroutines.model
data class Product(
var id: Int = 0,
var name: String = "",
var price: Float = 0.0f
)

View File

@ -0,0 +1,34 @@
package com.baeldung.nonblockingcoroutines.repository
import com.baeldung.nonblockingcoroutines.model.Product
import org.springframework.data.r2dbc.function.DatabaseClient
import org.springframework.stereotype.Repository
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@Repository
class ProductRepository(private val client: DatabaseClient) {
fun getProductById(id: Int): Mono<Product> {
return client.execute().sql("SELECT * FROM products WHERE id = $1")
.bind(0, id)
.`as`(Product::class.java)
.fetch()
.one()
}
fun addNewProduct(name: String, price: Float): Mono<Void> {
return client.execute()
.sql("INSERT INTO products (name, price) VALUES($1, $2)")
.bind(0, name)
.bind(1, price)
.then()
}
fun getAllProducts(): Flux<Product> {
return client.select().from("products")
.`as`(Product::class.java)
.fetch()
.all()
}
}

View File

@ -0,0 +1,40 @@
package com.baeldung.nonblockingcoroutines.repository
import com.baeldung.nonblockingcoroutines.model.Product
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.flow.asFlow
import org.springframework.data.r2dbc.function.DatabaseClient
import org.springframework.stereotype.Repository
@Repository
class ProductRepositoryCoroutines(private val client: DatabaseClient) {
suspend fun getProductById(id: Int): Product? =
client.execute().sql("SELECT * FROM products WHERE id = $1")
.bind(0, id)
.`as`(Product::class.java)
.fetch()
.one()
.awaitFirstOrNull()
suspend fun addNewProduct(name: String, price: Float) =
client.execute()
.sql("INSERT INTO products (name, price) VALUES($1, $2)")
.bind(0, name)
.bind(1, price)
.then()
.awaitFirstOrNull()
@FlowPreview
fun getAllProducts(): Flow<Product> =
client.select()
.from("products")
.`as`(Product::class.java)
.fetch()
.all()
.log()
.asFlow()
}

View File

@ -0,0 +1,8 @@
logging.level.org.springframework.data.r2dbc=DEBUG
logging.level.org.springframework.web.reactive.function.client.ExchangeFunctions=TRACE
spring.http.log-request-details=true
spring.h2.console.enabled=true
spring.datasource.username=sa
spring.datasource.url=jdbc:h2:mem:testdb
spring.datasource.password=
spring.datasource.dbname=testdb

View File

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@ -0,0 +1,58 @@
package com.baeldung.nonblockingcoroutines
import com.baeldung.nonblockingcoroutines.config.RouterConfiguration
import com.baeldung.nonblockingcoroutines.handlers.ProductsHandler
import com.baeldung.nonblockingcoroutines.model.Product
import com.baeldung.nonblockingcoroutines.repository.ProductRepositoryCoroutines
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.reactive.flow.asFlow
import org.junit.Test
import org.junit.runner.RunWith
import org.mockito.BDDMockito.given
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.security.reactive.ReactiveSecurityAutoConfiguration
import org.springframework.boot.autoconfigure.security.reactive.ReactiveUserDetailsServiceAutoConfiguration
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.test.context.junit4.SpringRunner
import org.springframework.test.web.reactive.server.WebTestClient
import org.springframework.test.web.reactive.server.expectBodyList
import org.springframework.web.reactive.function.client.WebClient
import reactor.core.publisher.Flux
import org.springframework.test.context.ContextConfiguration
@WebFluxTest(
excludeAutoConfiguration = [ReactiveUserDetailsServiceAutoConfiguration::class, ReactiveSecurityAutoConfiguration::class]
)
@RunWith(SpringRunner::class)
@ContextConfiguration(classes = [ProductsHandler::class, RouterConfiguration::class])
class ProductHandlerTest {
@Autowired
private lateinit var client: WebTestClient
@MockBean
private lateinit var webClient: WebClient
@MockBean
private lateinit var productsRepository: ProductRepositoryCoroutines
@FlowPreview
@Test
public fun `get all products`() {
val productsFlow = Flux.just(
Product(1, "product1", 1000.0F),
Product(2, "product2", 2000.0F),
Product(3, "product3", 3000.0F)
).asFlow()
given(productsRepository.getAllProducts()).willReturn(productsFlow)
client.get()
.uri("/")
.exchange()
.expectStatus()
.isOk
.expectBodyList<Product>()
}
}