Merge pull request #16405 from Michaelin007/webfluxmono

https://jira.baeldung.com/browse/BAEL-7789
This commit is contained in:
Maiklins 2024-04-28 21:22:37 +02:00 committed by GitHub
commit edf3b45366
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 318 additions and 0 deletions

View File

@ -58,6 +58,19 @@
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>${jmh.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
@ -141,6 +154,7 @@
<reactor-spring.version>1.0.1.RELEASE</reactor-spring.version>
<jetty-reactive-httpclient.version>1.1.6</jetty-reactive-httpclient.version>
<resilience4j.version>1.7.1</resilience4j.version>
<jmh.version>1.37</jmh.version>
</properties>
</project>

View File

@ -0,0 +1,12 @@
package com.baeldung.webclientretrievevsexchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RetrieveAndExchangeApp {
public static void main(String[] args) {
SpringApplication.run(RetrieveAndExchangeApp.class, args);
}
}

View File

@ -0,0 +1,135 @@
package com.baeldung.webclientretrievevsexchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
public class RetrieveAndExchangeController {
private static final Logger logger = LoggerFactory.getLogger(RetrieveAndExchangeController.class);
WebClient client = WebClient.create("https://jsonplaceholder.typicode.com/users");
@GetMapping("/user/{id}")
Mono<User> retrieveOneUser(@PathVariable int id) {
return client.get()
.uri("/{id}", id)
.retrieve()
.bodyToMono(User.class)
.onErrorResume(Mono::error);
}
@GetMapping("/user-status/{id}")
Mono<User> retrieveOneUserAndHandleErrorBasedOnStatus(@PathVariable int id) {
return client.get()
.uri("/{id}", id)
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, response -> Mono.error(new RuntimeException("Client Error: can't fetch user")))
.onStatus(HttpStatusCode::is5xxServerError, response -> Mono.error(new RuntimeException("Server Error: can't fetch user")))
.bodyToMono(User.class);
}
@GetMapping("/user-id/{id}")
Mono<ResponseEntity<User>> retrieveOneUserWithResponseEntity(@PathVariable int id) {
return client.get()
.uri("/{id}", id)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.toEntity(User.class)
.onErrorResume(Mono::error);
}
@GetMapping("/users")
Flux<User> retrieveAllUsers() {
return client.get()
.retrieve()
.bodyToFlux(User.class)
.onErrorResume(Flux::error);
}
@GetMapping("/user/exchange-alter/{id}")
Mono<User> retrieveOneUserWithExchangeAndManipulate(@PathVariable int id) {
return client.get()
.uri("/{id}", id)
.exchangeToMono(res -> res.bodyToMono(User.class))
.map(user -> {
user.setName(user.getName()
.toUpperCase());
user.setId(user.getId() + 100);
return user;
});
}
@GetMapping("/user/exchange-mono/{id}")
Mono<User> retrieveUsersWithExchangeAndError(@PathVariable int id) {
return client.get()
.uri("/{id}", id)
.exchangeToMono(res -> {
if (res.statusCode()
.is2xxSuccessful()) {
return res.bodyToMono(User.class);
} else if (res.statusCode()
.is4xxClientError()) {
return Mono.error(new RuntimeException("Client Error: can't fetch user"));
} else if (res.statusCode()
.is5xxServerError()) {
return Mono.error(new RuntimeException("Server Error: can't fetch user"));
} else {
return res.createError();
}
});
}
@GetMapping("/user/exchange-header/{id}")
Mono<User> retrieveUsersWithExchangeAndHeader(@PathVariable int id) {
return client.get()
.uri("/{id}", id)
.exchangeToMono(res -> {
if (res.statusCode()
.is2xxSuccessful()) {
logger.info("Status code: " + res.headers()
.asHttpHeaders());
logger.info("Content-type" + res.headers()
.contentType());
return res.bodyToMono(User.class);
} else if (res.statusCode()
.is4xxClientError()) {
return Mono.error(new RuntimeException("Client Error: can't fetch user"));
} else if (res.statusCode()
.is5xxServerError()) {
return Mono.error(new RuntimeException("Server Error: can't fetch user"));
} else {
return res.createError();
}
});
}
@GetMapping("/user-exchange")
Flux<User> retrieveAllUserWithExchange(@PathVariable int id) {
return client.get()
.exchangeToFlux(res -> res.bodyToFlux(User.class))
.onErrorResume(Flux::error);
}
@GetMapping("/user-exchange-flux")
Flux<User> retrieveUsersWithExchange() {
return client.get()
.exchangeToFlux(res -> {
if (res.statusCode()
.is2xxSuccessful()) {
return res.bodyToFlux(User.class);
} else {
return Flux.error(new RuntimeException("Error while fetching users"));
}
});
}
}

View File

@ -0,0 +1,26 @@
package com.baeldung.webclientretrievevsexchange;
public class User {
private int id;
private String name;
public User() {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

View File

@ -0,0 +1,63 @@
package com.baeldung.webclientretrievevsexchange;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Benchmark;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations = 3, time = 10, timeUnit = TimeUnit.MICROSECONDS)
@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.MICROSECONDS)
public class RetrieveAndExchangeBenchmark {
private WebClient client;
@Setup
public void setup() {
this.client = WebClient.create("https://jsonplaceholder.typicode.com/users");
}
@Benchmark
public Mono<User> retrieveOneUserUsingRetrieveMethod() {
return client.get()
.uri("/1")
.retrieve()
.bodyToMono(User.class)
.onErrorResume(Mono::error);
}
@Benchmark
public Flux<User> retrieveManyUserUsingRetrieveMethod() {
return client.get()
.retrieve()
.bodyToFlux(User.class)
.onErrorResume(Flux::error);
}
@Benchmark
public Mono<User> retrieveOneUserUsingExchangeToMono() {
return client.get()
.uri("/1")
.exchangeToMono(res -> res.bodyToMono(User.class))
.onErrorResume(Mono::error);
}
@Benchmark
public Flux<User> retrieveManyUserUsingExchangeToFlux() {
return client.get()
.exchangeToFlux(res -> res.bodyToFlux(User.class))
.onErrorResume(Flux::error);
}
}

View File

@ -0,0 +1,68 @@
package com.baeldung.webclientretrievevsexchange;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.test.web.reactive.server.WebTestClient;
@WebFluxTest
class RetrieveAndExchangeIntegrationTest {
@Autowired
private WebTestClient webTestClient;
@Test
void givenFirstUser_whenRetrieveMethodIsUsed_thenReturnOk() throws Exception {
this.webTestClient.get()
.uri("/user/1")
.exchange()
.expectStatus()
.isOk();
}
@Test
void givenFirstUser_whenRetreiveMethodIsUsedWithOnStatusHandler_thenReturnNotFound() throws Exception {
this.webTestClient.get()
.uri("/user-status/100")
.exchange()
.expectStatus()
.is5xxServerError();
}
@Test
void givenFirstUser_whenExchangeMonoMethodIsUsed_thenReturnOk() throws Exception {
this.webTestClient.get()
.uri("/user/exchange-mono/1")
.exchange()
.expectStatus()
.isOk();
}
@Test
void givenAllUsers_whenRetrieveMethodIsUsed_thenReturnOk() throws Exception {
this.webTestClient.get()
.uri("/users")
.exchange()
.expectStatus()
.isOk();
}
@Test
void givenSingleUser_whenResponseBodyIsAltered_thenReturnOk() throws Exception {
this.webTestClient.get()
.uri("/user/exchange-alter/1")
.exchange()
.expectBody()
.json("{\"id\":101,\"name\":\"LEANNE GRAHAM\"}");
}
@Test
void givenAllUsers_whenExchangeFluxMethodIsUsed_thenReturnOk() throws Exception {
this.webTestClient.get()
.uri("/user-exchange-flux")
.exchange()
.expectStatus()
.isOk();
}
}