diff --git a/spring-5-webflux/pom.xml b/spring-5-webflux/pom.xml index 48b5b823fb..766521a80f 100644 --- a/spring-5-webflux/pom.xml +++ b/spring-5-webflux/pom.xml @@ -53,6 +53,11 @@ reactor-test test + + + com.squareup.okhttp3 + mockwebserver + diff --git a/spring-5-webflux/src/main/java/com/baeldung/spring/retry/ExternalConnector.java b/spring-5-webflux/src/main/java/com/baeldung/spring/retry/ExternalConnector.java new file mode 100644 index 0000000000..baace095a7 --- /dev/null +++ b/spring-5-webflux/src/main/java/com/baeldung/spring/retry/ExternalConnector.java @@ -0,0 +1,73 @@ +package com.baeldung.spring.retry; + +import java.time.Duration; + +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; + +import lombok.AllArgsConstructor; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +@Component +@AllArgsConstructor +public class ExternalConnector { + + private static final String PATH_BY_ID = "/data/{id}"; + + private final WebClient webClient; + + public Mono getData(String stockId) { + return webClient.get() + .uri(PATH_BY_ID, stockId) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new ServiceException("Server error", response.rawStatusCode()))) + .bodyToMono(String.class) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)) + .filter(throwable -> throwable instanceof ServiceException) + .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> { + throw new ServiceException("External Service failed to process after max retries", HttpStatus.SERVICE_UNAVAILABLE.value()); + })); + } + + public Mono getDataWithRetry(String stockId) { + return webClient.get() + .uri(PATH_BY_ID, stockId) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .bodyToMono(String.class) + .retryWhen(Retry.max(3)); + } + + public Mono getDataWithRetryFixedDelay(String stockId) { + return webClient.get() + .uri(PATH_BY_ID, stockId) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .bodyToMono(String.class) + .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2))); + } + + public Mono getDataWithRetryBackoff(String stockId) { + return webClient.get() + .uri(PATH_BY_ID, stockId) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .bodyToMono(String.class) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(2))); + } + + public Mono getDataWithRetryBackoffJitter(String stockId) { + return webClient.get() + .uri(PATH_BY_ID, stockId) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .bodyToMono(String.class) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)) + .jitter(1)); + } + +} diff --git a/spring-5-webflux/src/main/java/com/baeldung/spring/retry/ServiceException.java b/spring-5-webflux/src/main/java/com/baeldung/spring/retry/ServiceException.java new file mode 100644 index 0000000000..cbfa71f986 --- /dev/null +++ b/spring-5-webflux/src/main/java/com/baeldung/spring/retry/ServiceException.java @@ -0,0 +1,15 @@ +package com.baeldung.spring.retry; + +public class ServiceException extends RuntimeException { + + private final int statusCode; + + public ServiceException(String message, int statusCode) { + super(message); + this.statusCode = statusCode; + } + + public int getStatusCode() { + return statusCode; + } +} diff --git a/spring-5-webflux/src/main/java/com/baeldung/spring/retry/StockController.java b/spring-5-webflux/src/main/java/com/baeldung/spring/retry/StockController.java new file mode 100644 index 0000000000..b03558d53f --- /dev/null +++ b/spring-5-webflux/src/main/java/com/baeldung/spring/retry/StockController.java @@ -0,0 +1,23 @@ +package com.baeldung.spring.retry; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import lombok.AllArgsConstructor; +import reactor.core.publisher.Mono; + +@AllArgsConstructor +@RestController +@RequestMapping("/stocks/data") +public class StockController { + + private final ExternalConnector externalConnector; + + @GetMapping("/{stockId}") + public Mono getData(@PathVariable String stockId) { + return externalConnector.getData(stockId); + } + +} diff --git a/spring-5-webflux/src/main/java/com/baeldung/spring/retry/StockDataApp.java b/spring-5-webflux/src/main/java/com/baeldung/spring/retry/StockDataApp.java new file mode 100644 index 0000000000..cfd1852f62 --- /dev/null +++ b/spring-5-webflux/src/main/java/com/baeldung/spring/retry/StockDataApp.java @@ -0,0 +1,18 @@ +package com.baeldung.spring.retry; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.web.reactive.function.client.WebClient; + +@SpringBootApplication +public class StockDataApp { + public static void main(String[] args) { + SpringApplication.run(StockDataApp.class, args); + } + + @Bean + public WebClient webClient() { + return WebClient.create(); + } +} diff --git a/spring-5-webflux/src/test/java/com/baeldung/spring/retry/ExternalConnectorIntegrationTest.java b/spring-5-webflux/src/test/java/com/baeldung/spring/retry/ExternalConnectorIntegrationTest.java new file mode 100644 index 0000000000..60d22def74 --- /dev/null +++ b/spring-5-webflux/src/test/java/com/baeldung/spring/retry/ExternalConnectorIntegrationTest.java @@ -0,0 +1,90 @@ +package com.baeldung.spring.retry; + +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; + +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import reactor.test.StepVerifier; + +class ExternalConnectorIntegrationTest { + + private ExternalConnector externalConnector; + + private MockWebServer mockExternalService; + + @BeforeEach + void setup() throws IOException { + externalConnector = new ExternalConnector(WebClient.builder() + .baseUrl("http://localhost:8090") + .build()); + mockExternalService = new MockWebServer(); + mockExternalService.start(8090); + } + + @AfterEach + void tearDown() throws IOException { + mockExternalService.shutdown(); + } + + @Test + void givenExternalServiceReturnsError_whenGettingData_thenRetryAndReturnResponse() throws Exception { + + mockExternalService.enqueue(new MockResponse().setResponseCode(SERVICE_UNAVAILABLE.code())); + mockExternalService.enqueue(new MockResponse().setResponseCode(SERVICE_UNAVAILABLE.code())); + mockExternalService.enqueue(new MockResponse().setResponseCode(SERVICE_UNAVAILABLE.code())); + mockExternalService.enqueue(new MockResponse().setBody("stock data")); + + StepVerifier.create(externalConnector.getData("ABC")) + .expectNextMatches(response -> response.equals("stock data")) + .verifyComplete(); + + verifyNumberOfGetRequests(4); + } + + @Test + void givenExternalServiceReturnsClientError_whenGettingData_thenNoRetry() throws Exception { + + mockExternalService.enqueue(new MockResponse().setResponseCode(UNAUTHORIZED.code())); + + StepVerifier.create(externalConnector.getData("ABC")) + .expectError(WebClientResponseException.class) + .verify(); + + verifyNumberOfGetRequests(1); + } + + @Test + void givenExternalServiceRetryAttemptsExhausted_whenGettingData_thenRetryAndReturnError() throws Exception { + + mockExternalService.enqueue(new MockResponse().setResponseCode(SERVICE_UNAVAILABLE.code())); + mockExternalService.enqueue(new MockResponse().setResponseCode(SERVICE_UNAVAILABLE.code())); + mockExternalService.enqueue(new MockResponse().setResponseCode(SERVICE_UNAVAILABLE.code())); + mockExternalService.enqueue(new MockResponse().setResponseCode(SERVICE_UNAVAILABLE.code())); + + StepVerifier.create(externalConnector.getData("ABC")) + .expectError(ServiceException.class) + .verify(); + + verifyNumberOfGetRequests(4); + } + + private void verifyNumberOfGetRequests(int times) throws Exception { + for (int i = 0; i < times; i++) { + RecordedRequest recordedRequest = mockExternalService.takeRequest(); + assertThat(recordedRequest.getMethod()).isEqualTo("GET"); + assertThat(recordedRequest.getPath()).isEqualTo("/data/ABC"); + } + } + +} diff --git a/spring-5-webflux/src/test/java/com/baeldung/spring/retry/StockControllerIntegrationTest.java b/spring-5-webflux/src/test/java/com/baeldung/spring/retry/StockControllerIntegrationTest.java new file mode 100644 index 0000000000..2ad94f4d6a --- /dev/null +++ b/spring-5-webflux/src/test/java/com/baeldung/spring/retry/StockControllerIntegrationTest.java @@ -0,0 +1,37 @@ +package com.baeldung.spring.retry; + +import static org.mockito.BDDMockito.given; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.web.reactive.server.WebTestClient; + +import reactor.core.publisher.Mono; + +@RunWith(SpringRunner.class) +@WebFluxTest +public class StockControllerIntegrationTest { + + @Autowired + private WebTestClient webClient; + + @MockBean + private ExternalConnector externalConnector; + + @Test + public void shouldReturnStockData() { + given(externalConnector.getData("ABC")).willReturn(Mono.just("stock data")); + + webClient.get() + .uri("/stocks/data/{id}", "ABC") + .exchange() + .expectStatus() + .isOk() + .expectBody(String.class) + .isEqualTo("stock data"); + } +}