From 1ca3005151d3199475f6867fdf6d2df0c6f620ed Mon Sep 17 00:00:00 2001 From: Mateusz Szablak Date: Thu, 15 Apr 2021 13:57:22 +0200 Subject: [PATCH] BAEL-4839 Set a timeout in Spring 5 WebFlux WebClient --- .../timeout/WebClientTimeoutProvider.java | 89 ++++++++++++ .../WebClientTimeoutIntegrationTest.java | 129 ++++++++++++++++++ 2 files changed, 218 insertions(+) create mode 100644 spring-5-reactive-client/src/main/java/com/baeldung/webclient/timeout/WebClientTimeoutProvider.java create mode 100644 spring-5-reactive-client/src/test/java/com/baeldung/webclient/timeout/WebClientTimeoutIntegrationTest.java diff --git a/spring-5-reactive-client/src/main/java/com/baeldung/webclient/timeout/WebClientTimeoutProvider.java b/spring-5-reactive-client/src/main/java/com/baeldung/webclient/timeout/WebClientTimeoutProvider.java new file mode 100644 index 0000000000..6bb5a2db38 --- /dev/null +++ b/spring-5-reactive-client/src/main/java/com/baeldung/webclient/timeout/WebClientTimeoutProvider.java @@ -0,0 +1,89 @@ +package com.baeldung.webclient.timeout; + +import io.netty.channel.ChannelOption; +import io.netty.channel.epoll.EpollChannelOption; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; +import lombok.experimental.UtilityClass; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.netty.http.client.HttpClient; +import reactor.netty.tcp.SslProvider; +import reactor.netty.transport.ProxyProvider; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +@UtilityClass +public class WebClientTimeoutProvider { + + public static WebClient defaultWebClient() { + HttpClient httpClient = HttpClient.create(); + + return buildWebClient(httpClient); + } + + public WebClient responseTimeoutClient() { + HttpClient httpClient = HttpClient.create() + .responseTimeout(Duration.ofSeconds(1)); + + return buildWebClient(httpClient); + } + + public WebClient connectionTimeoutClient() { + HttpClient httpClient = HttpClient.create() + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000); + + return buildWebClient(httpClient); + } + + public WebClient connectionTimeoutWithKeepAliveClient() { + HttpClient httpClient = HttpClient.create() + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(EpollChannelOption.TCP_KEEPIDLE, 300) + .option(EpollChannelOption.TCP_KEEPINTVL, 60) + .option(EpollChannelOption.TCP_KEEPCNT, 8); + + return buildWebClient(httpClient); + } + + public WebClient readWriteTimeoutClient() { + HttpClient httpClient = HttpClient.create() + .doOnConnected(conn -> conn + .addHandler(new ReadTimeoutHandler(5, TimeUnit.SECONDS)) + .addHandler(new WriteTimeoutHandler(5))); + + return buildWebClient(httpClient); + } + + public WebClient sslTimeoutClient() { + HttpClient httpClient = HttpClient.create() + .secure(spec -> spec + .sslContext(SslContextBuilder.forClient()) + .defaultConfiguration(SslProvider.DefaultConfigurationType.TCP) + .handshakeTimeout(Duration.ofSeconds(30)) + .closeNotifyFlushTimeout(Duration.ofSeconds(10)) + .closeNotifyReadTimeout(Duration.ofSeconds(10))); + + return buildWebClient(httpClient); + } + + public WebClient proxyTimeoutClient() { + HttpClient httpClient = HttpClient.create() + .proxy(spec -> spec + .type(ProxyProvider.Proxy.HTTP) + .host("http://proxy") + .port(8080) + .connectTimeoutMillis(3000)); + + return buildWebClient(httpClient); + } + + private WebClient buildWebClient(HttpClient httpClient) { + return WebClient.builder() + .clientConnector(new ReactorClientHttpConnector(httpClient)) + .build(); + } +} diff --git a/spring-5-reactive-client/src/test/java/com/baeldung/webclient/timeout/WebClientTimeoutIntegrationTest.java b/spring-5-reactive-client/src/test/java/com/baeldung/webclient/timeout/WebClientTimeoutIntegrationTest.java new file mode 100644 index 0000000000..d2e009fe6a --- /dev/null +++ b/spring-5-reactive-client/src/test/java/com/baeldung/webclient/timeout/WebClientTimeoutIntegrationTest.java @@ -0,0 +1,129 @@ +package com.baeldung.webclient.timeout; + +import com.github.tomakehurst.wiremock.WireMockServer; +import io.netty.handler.timeout.ReadTimeoutException; +import lombok.val; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.springframework.http.HttpStatus; +import org.springframework.web.reactive.function.client.WebClientRequestException; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClientRequest; + + +import java.time.Duration; +import java.util.concurrent.TimeoutException; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.configureFor; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class WebClientTimeoutIntegrationTest { + + private WireMockServer wireMockServer; + + @Before + public void setup() { + wireMockServer = new WireMockServer(wireMockConfig().dynamicPort()); + wireMockServer.start(); + configureFor("localhost", wireMockServer.port()); + } + + @After + public void tearDown() { + wireMockServer.stop(); + } + + @AfterEach + public void tearDownEach() { + wireMockServer.resetAll(); + } + + @Test + public void givenResponseTimeoutClientWhenRequestTimeoutThenReadTimeoutException() { + val path = "/response-timeout"; + val delay = Math.toIntExact(Duration.ofSeconds(2).toMillis()); + stubFor(get(urlEqualTo(path)).willReturn(aResponse().withFixedDelay(delay) + .withStatus(HttpStatus.OK.value()))); + + val webClient = WebClientTimeoutProvider.responseTimeoutClient(); + + val ex = assertThrows(RuntimeException.class, () -> + webClient.get() + .uri(wireMockServer.baseUrl() + path) + .exchangeToMono(Mono::just) + .log() + .block()); + assertThat(ex).isInstanceOf(WebClientRequestException.class) + .getCause().isInstanceOf(ReadTimeoutException.class); + } + + @Test + public void givenReadWriteTimeoutClientWhenRequestTimeoutThenReadTimeoutException() { + val path = "/read-write-timeout"; + val delay = Math.toIntExact(Duration.ofSeconds(6).toMillis()); + stubFor(get(urlEqualTo(path)).willReturn(aResponse().withFixedDelay(delay) + .withStatus(HttpStatus.OK.value()))); + + val webClient = WebClientTimeoutProvider.readWriteTimeoutClient(); + + val ex = assertThrows(RuntimeException.class, () -> + webClient.get() + .uri(wireMockServer.baseUrl() + path) + .exchangeToMono(Mono::just) + .log() + .block()); + assertThat(ex).isInstanceOf(WebClientRequestException.class) + .getCause().isInstanceOf(ReadTimeoutException.class); + } + + @Test + public void givenNoTimeoutClientAndReactorTimeoutWhenRequestTimeoutThenTimeoutException() { + val path = "/reactor-timeout"; + val delay = Math.toIntExact(Duration.ofSeconds(5).toMillis()); + stubFor(get(urlEqualTo(path)).willReturn(aResponse().withFixedDelay(delay) + .withStatus(HttpStatus.OK.value()))); + + val webClient = WebClientTimeoutProvider.defaultWebClient(); + + val ex = assertThrows(RuntimeException.class, () -> + webClient.get() + .uri(wireMockServer.baseUrl() + path) + .exchangeToMono(Mono::just) + .timeout(Duration.ofSeconds(1)) + .log() + .block()); + assertThat(ex).hasMessageContaining("Did not observe any item") + .getCause().isInstanceOf(TimeoutException.class); + } + + @Test + public void givenNoTimeoutClientAndTimeoutHttpRequestWhenRequestTimeoutThenReadTimeoutException() { + val path = "/reactor-http-request-timeout"; + val delay = Math.toIntExact(Duration.ofSeconds(5).toMillis()); + stubFor(get(urlEqualTo(path)).willReturn(aResponse().withFixedDelay(delay) + .withStatus(HttpStatus.OK.value()))); + + val webClient = WebClientTimeoutProvider.defaultWebClient(); + + val ex = assertThrows(RuntimeException.class, () -> + webClient.get() + .uri(wireMockServer.baseUrl() + path) + .httpRequest(httpRequest -> { + HttpClientRequest reactorRequest = httpRequest.getNativeRequest(); + reactorRequest.responseTimeout(Duration.ofSeconds(1)); + }) + .exchangeToMono(Mono::just) + .log() + .block()); + assertThat(ex).isInstanceOf(WebClientRequestException.class) + .getCause().isInstanceOf(ReadTimeoutException.class); + } +}