[BAEL-4096] Retry in WebFlux (#10675)
* [BAEL-4096] Retry in WebFlux * [BAEL-4096] Integration test cleanup * [BAEL-4096] Code formatting (Eclipse formatter) * [BAEL-4096] Add methods to show various retry strategies
This commit is contained in:
		
							parent
							
								
									afb8f12bf2
								
							
						
					
					
						commit
						738c6bf6e8
					
				| @ -53,6 +53,11 @@ | ||||
|             <artifactId>reactor-test</artifactId> | ||||
|             <scope>test</scope> | ||||
|         </dependency> | ||||
| 
 | ||||
|         <dependency> | ||||
|             <groupId>com.squareup.okhttp3</groupId> | ||||
|             <artifactId>mockwebserver</artifactId> | ||||
|         </dependency> | ||||
|     </dependencies> | ||||
| 
 | ||||
|     <build> | ||||
|  | ||||
| @ -0,0 +1,73 @@ | ||||
| package com.baeldung.spring.retry; | ||||
| 
 | ||||
| import java.time.Duration; | ||||
| 
 | ||||
| import org.springframework.http.HttpStatus; | ||||
| import org.springframework.http.MediaType; | ||||
| import org.springframework.stereotype.Component; | ||||
| import org.springframework.web.reactive.function.client.WebClient; | ||||
| 
 | ||||
| import lombok.AllArgsConstructor; | ||||
| import reactor.core.publisher.Mono; | ||||
| import reactor.util.retry.Retry; | ||||
| 
 | ||||
| @Component | ||||
| @AllArgsConstructor | ||||
| public class ExternalConnector { | ||||
| 
 | ||||
|     private static final String PATH_BY_ID = "/data/{id}"; | ||||
| 
 | ||||
|     private final WebClient webClient; | ||||
| 
 | ||||
|     public Mono<String> getData(String stockId) { | ||||
|         return webClient.get() | ||||
|             .uri(PATH_BY_ID, stockId) | ||||
|             .accept(MediaType.APPLICATION_JSON) | ||||
|             .retrieve() | ||||
|             .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new ServiceException("Server error", response.rawStatusCode()))) | ||||
|             .bodyToMono(String.class) | ||||
|             .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)) | ||||
|                 .filter(throwable -> throwable instanceof ServiceException) | ||||
|                 .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> { | ||||
|                     throw new ServiceException("External Service failed to process after max retries", HttpStatus.SERVICE_UNAVAILABLE.value()); | ||||
|                 })); | ||||
|     } | ||||
| 
 | ||||
|     public Mono<String> getDataWithRetry(String stockId) { | ||||
|         return webClient.get() | ||||
|             .uri(PATH_BY_ID, stockId) | ||||
|             .accept(MediaType.APPLICATION_JSON) | ||||
|             .retrieve() | ||||
|             .bodyToMono(String.class) | ||||
|             .retryWhen(Retry.max(3)); | ||||
|     } | ||||
| 
 | ||||
|     public Mono<String> getDataWithRetryFixedDelay(String stockId) { | ||||
|         return webClient.get() | ||||
|             .uri(PATH_BY_ID, stockId) | ||||
|             .accept(MediaType.APPLICATION_JSON) | ||||
|             .retrieve() | ||||
|             .bodyToMono(String.class) | ||||
|             .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2))); | ||||
|     } | ||||
| 
 | ||||
|     public Mono<String> getDataWithRetryBackoff(String stockId) { | ||||
|         return webClient.get() | ||||
|             .uri(PATH_BY_ID, stockId) | ||||
|             .accept(MediaType.APPLICATION_JSON) | ||||
|             .retrieve() | ||||
|             .bodyToMono(String.class) | ||||
|             .retryWhen(Retry.backoff(3, Duration.ofSeconds(2))); | ||||
|     } | ||||
| 
 | ||||
|     public Mono<String> getDataWithRetryBackoffJitter(String stockId) { | ||||
|         return webClient.get() | ||||
|             .uri(PATH_BY_ID, stockId) | ||||
|             .accept(MediaType.APPLICATION_JSON) | ||||
|             .retrieve() | ||||
|             .bodyToMono(String.class) | ||||
|             .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)) | ||||
|                 .jitter(1)); | ||||
|     } | ||||
| 
 | ||||
| } | ||||
| @ -0,0 +1,15 @@ | ||||
| package com.baeldung.spring.retry; | ||||
| 
 | ||||
| public class ServiceException extends RuntimeException { | ||||
| 
 | ||||
|     private final int statusCode; | ||||
| 
 | ||||
|     public ServiceException(String message, int statusCode) { | ||||
|         super(message); | ||||
|         this.statusCode = statusCode; | ||||
|     } | ||||
| 
 | ||||
|     public int getStatusCode() { | ||||
|         return statusCode; | ||||
|     } | ||||
| } | ||||
| @ -0,0 +1,23 @@ | ||||
| package com.baeldung.spring.retry; | ||||
| 
 | ||||
| import org.springframework.web.bind.annotation.GetMapping; | ||||
| import org.springframework.web.bind.annotation.PathVariable; | ||||
| import org.springframework.web.bind.annotation.RequestMapping; | ||||
| import org.springframework.web.bind.annotation.RestController; | ||||
| 
 | ||||
| import lombok.AllArgsConstructor; | ||||
| import reactor.core.publisher.Mono; | ||||
| 
 | ||||
| @AllArgsConstructor | ||||
| @RestController | ||||
| @RequestMapping("/stocks/data") | ||||
| public class StockController { | ||||
| 
 | ||||
|     private final ExternalConnector externalConnector; | ||||
| 
 | ||||
|     @GetMapping("/{stockId}") | ||||
|     public Mono<String> getData(@PathVariable String stockId) { | ||||
|         return externalConnector.getData(stockId); | ||||
|     } | ||||
| 
 | ||||
| } | ||||
| @ -0,0 +1,18 @@ | ||||
| package com.baeldung.spring.retry; | ||||
| 
 | ||||
| import org.springframework.boot.SpringApplication; | ||||
| import org.springframework.boot.autoconfigure.SpringBootApplication; | ||||
| import org.springframework.context.annotation.Bean; | ||||
| import org.springframework.web.reactive.function.client.WebClient; | ||||
| 
 | ||||
| @SpringBootApplication | ||||
| public class StockDataApp { | ||||
|     public static void main(String[] args) { | ||||
|         SpringApplication.run(StockDataApp.class, args); | ||||
|     } | ||||
| 
 | ||||
|     @Bean | ||||
|     public WebClient webClient() { | ||||
|         return WebClient.create(); | ||||
|     } | ||||
| } | ||||
| @ -0,0 +1,90 @@ | ||||
| package com.baeldung.spring.retry; | ||||
| 
 | ||||
| import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; | ||||
| import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED; | ||||
| import static org.assertj.core.api.Assertions.assertThat; | ||||
| 
 | ||||
| import java.io.IOException; | ||||
| 
 | ||||
| import org.junit.jupiter.api.AfterEach; | ||||
| import org.junit.jupiter.api.BeforeEach; | ||||
| import org.junit.jupiter.api.Test; | ||||
| import org.springframework.web.reactive.function.client.WebClient; | ||||
| import org.springframework.web.reactive.function.client.WebClientResponseException; | ||||
| 
 | ||||
| import okhttp3.mockwebserver.MockResponse; | ||||
| import okhttp3.mockwebserver.MockWebServer; | ||||
| import okhttp3.mockwebserver.RecordedRequest; | ||||
| import reactor.test.StepVerifier; | ||||
| 
 | ||||
| class ExternalConnectorIntegrationTest { | ||||
| 
 | ||||
|     private ExternalConnector externalConnector; | ||||
| 
 | ||||
|     private MockWebServer mockExternalService; | ||||
| 
 | ||||
|     @BeforeEach | ||||
|     void setup() throws IOException { | ||||
|         externalConnector = new ExternalConnector(WebClient.builder() | ||||
|             .baseUrl("http://localhost:8090") | ||||
|             .build()); | ||||
|         mockExternalService = new MockWebServer(); | ||||
|         mockExternalService.start(8090); | ||||
|     } | ||||
| 
 | ||||
|     @AfterEach | ||||
|     void tearDown() throws IOException { | ||||
|         mockExternalService.shutdown(); | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     void givenExternalServiceReturnsError_whenGettingData_thenRetryAndReturnResponse() throws Exception { | ||||
| 
 | ||||
|         mockExternalService.enqueue(new MockResponse().setResponseCode(SERVICE_UNAVAILABLE.code())); | ||||
|         mockExternalService.enqueue(new MockResponse().setResponseCode(SERVICE_UNAVAILABLE.code())); | ||||
|         mockExternalService.enqueue(new MockResponse().setResponseCode(SERVICE_UNAVAILABLE.code())); | ||||
|         mockExternalService.enqueue(new MockResponse().setBody("stock data")); | ||||
| 
 | ||||
|         StepVerifier.create(externalConnector.getData("ABC")) | ||||
|             .expectNextMatches(response -> response.equals("stock data")) | ||||
|             .verifyComplete(); | ||||
| 
 | ||||
|         verifyNumberOfGetRequests(4); | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     void givenExternalServiceReturnsClientError_whenGettingData_thenNoRetry() throws Exception { | ||||
| 
 | ||||
|         mockExternalService.enqueue(new MockResponse().setResponseCode(UNAUTHORIZED.code())); | ||||
| 
 | ||||
|         StepVerifier.create(externalConnector.getData("ABC")) | ||||
|             .expectError(WebClientResponseException.class) | ||||
|             .verify(); | ||||
| 
 | ||||
|         verifyNumberOfGetRequests(1); | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     void givenExternalServiceRetryAttemptsExhausted_whenGettingData_thenRetryAndReturnError() throws Exception { | ||||
| 
 | ||||
|         mockExternalService.enqueue(new MockResponse().setResponseCode(SERVICE_UNAVAILABLE.code())); | ||||
|         mockExternalService.enqueue(new MockResponse().setResponseCode(SERVICE_UNAVAILABLE.code())); | ||||
|         mockExternalService.enqueue(new MockResponse().setResponseCode(SERVICE_UNAVAILABLE.code())); | ||||
|         mockExternalService.enqueue(new MockResponse().setResponseCode(SERVICE_UNAVAILABLE.code())); | ||||
| 
 | ||||
|         StepVerifier.create(externalConnector.getData("ABC")) | ||||
|             .expectError(ServiceException.class) | ||||
|             .verify(); | ||||
| 
 | ||||
|         verifyNumberOfGetRequests(4); | ||||
|     } | ||||
| 
 | ||||
|     private void verifyNumberOfGetRequests(int times) throws Exception { | ||||
|         for (int i = 0; i < times; i++) { | ||||
|             RecordedRequest recordedRequest = mockExternalService.takeRequest(); | ||||
|             assertThat(recordedRequest.getMethod()).isEqualTo("GET"); | ||||
|             assertThat(recordedRequest.getPath()).isEqualTo("/data/ABC"); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
| } | ||||
| @ -0,0 +1,37 @@ | ||||
| package com.baeldung.spring.retry; | ||||
| 
 | ||||
| import static org.mockito.BDDMockito.given; | ||||
| 
 | ||||
| import org.junit.Test; | ||||
| import org.junit.runner.RunWith; | ||||
| import org.springframework.beans.factory.annotation.Autowired; | ||||
| import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest; | ||||
| import org.springframework.boot.test.mock.mockito.MockBean; | ||||
| import org.springframework.test.context.junit4.SpringRunner; | ||||
| import org.springframework.test.web.reactive.server.WebTestClient; | ||||
| 
 | ||||
| import reactor.core.publisher.Mono; | ||||
| 
 | ||||
| @RunWith(SpringRunner.class) | ||||
| @WebFluxTest | ||||
| public class StockControllerIntegrationTest { | ||||
| 
 | ||||
|     @Autowired | ||||
|     private WebTestClient webClient; | ||||
| 
 | ||||
|     @MockBean | ||||
|     private ExternalConnector externalConnector; | ||||
| 
 | ||||
|     @Test | ||||
|     public void shouldReturnStockData() { | ||||
|         given(externalConnector.getData("ABC")).willReturn(Mono.just("stock data")); | ||||
| 
 | ||||
|         webClient.get() | ||||
|             .uri("/stocks/data/{id}", "ABC") | ||||
|             .exchange() | ||||
|             .expectStatus() | ||||
|             .isOk() | ||||
|             .expectBody(String.class) | ||||
|             .isEqualTo("stock data"); | ||||
|     } | ||||
| } | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user