BAEL-4839 Set a timeout in Spring 5 WebFlux WebClient
This commit is contained in:
parent
036fb5ca71
commit
1ca3005151
|
@ -0,0 +1,89 @@
|
||||||
|
package com.baeldung.webclient.timeout;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelOption;
|
||||||
|
import io.netty.channel.epoll.EpollChannelOption;
|
||||||
|
import io.netty.handler.ssl.SslContextBuilder;
|
||||||
|
import io.netty.handler.timeout.ReadTimeoutHandler;
|
||||||
|
import io.netty.handler.timeout.WriteTimeoutHandler;
|
||||||
|
import lombok.experimental.UtilityClass;
|
||||||
|
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
|
||||||
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
|
import reactor.netty.http.client.HttpClient;
|
||||||
|
import reactor.netty.tcp.SslProvider;
|
||||||
|
import reactor.netty.transport.ProxyProvider;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@UtilityClass
|
||||||
|
public class WebClientTimeoutProvider {
|
||||||
|
|
||||||
|
public static WebClient defaultWebClient() {
|
||||||
|
HttpClient httpClient = HttpClient.create();
|
||||||
|
|
||||||
|
return buildWebClient(httpClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebClient responseTimeoutClient() {
|
||||||
|
HttpClient httpClient = HttpClient.create()
|
||||||
|
.responseTimeout(Duration.ofSeconds(1));
|
||||||
|
|
||||||
|
return buildWebClient(httpClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebClient connectionTimeoutClient() {
|
||||||
|
HttpClient httpClient = HttpClient.create()
|
||||||
|
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
|
||||||
|
|
||||||
|
return buildWebClient(httpClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebClient connectionTimeoutWithKeepAliveClient() {
|
||||||
|
HttpClient httpClient = HttpClient.create()
|
||||||
|
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
|
||||||
|
.option(ChannelOption.SO_KEEPALIVE, true)
|
||||||
|
.option(EpollChannelOption.TCP_KEEPIDLE, 300)
|
||||||
|
.option(EpollChannelOption.TCP_KEEPINTVL, 60)
|
||||||
|
.option(EpollChannelOption.TCP_KEEPCNT, 8);
|
||||||
|
|
||||||
|
return buildWebClient(httpClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebClient readWriteTimeoutClient() {
|
||||||
|
HttpClient httpClient = HttpClient.create()
|
||||||
|
.doOnConnected(conn -> conn
|
||||||
|
.addHandler(new ReadTimeoutHandler(5, TimeUnit.SECONDS))
|
||||||
|
.addHandler(new WriteTimeoutHandler(5)));
|
||||||
|
|
||||||
|
return buildWebClient(httpClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebClient sslTimeoutClient() {
|
||||||
|
HttpClient httpClient = HttpClient.create()
|
||||||
|
.secure(spec -> spec
|
||||||
|
.sslContext(SslContextBuilder.forClient())
|
||||||
|
.defaultConfiguration(SslProvider.DefaultConfigurationType.TCP)
|
||||||
|
.handshakeTimeout(Duration.ofSeconds(30))
|
||||||
|
.closeNotifyFlushTimeout(Duration.ofSeconds(10))
|
||||||
|
.closeNotifyReadTimeout(Duration.ofSeconds(10)));
|
||||||
|
|
||||||
|
return buildWebClient(httpClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
public WebClient proxyTimeoutClient() {
|
||||||
|
HttpClient httpClient = HttpClient.create()
|
||||||
|
.proxy(spec -> spec
|
||||||
|
.type(ProxyProvider.Proxy.HTTP)
|
||||||
|
.host("http://proxy")
|
||||||
|
.port(8080)
|
||||||
|
.connectTimeoutMillis(3000));
|
||||||
|
|
||||||
|
return buildWebClient(httpClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
private WebClient buildWebClient(HttpClient httpClient) {
|
||||||
|
return WebClient.builder()
|
||||||
|
.clientConnector(new ReactorClientHttpConnector(httpClient))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,129 @@
|
||||||
|
package com.baeldung.webclient.timeout;
|
||||||
|
|
||||||
|
import com.github.tomakehurst.wiremock.WireMockServer;
|
||||||
|
import io.netty.handler.timeout.ReadTimeoutException;
|
||||||
|
import lombok.val;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.springframework.http.HttpStatus;
|
||||||
|
import org.springframework.web.reactive.function.client.WebClientRequestException;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.netty.http.client.HttpClientRequest;
|
||||||
|
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
|
||||||
|
import static com.github.tomakehurst.wiremock.client.WireMock.configureFor;
|
||||||
|
import static com.github.tomakehurst.wiremock.client.WireMock.get;
|
||||||
|
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
|
||||||
|
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
|
||||||
|
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
|
||||||
|
public class WebClientTimeoutIntegrationTest {
|
||||||
|
|
||||||
|
private WireMockServer wireMockServer;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
wireMockServer = new WireMockServer(wireMockConfig().dynamicPort());
|
||||||
|
wireMockServer.start();
|
||||||
|
configureFor("localhost", wireMockServer.port());
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
wireMockServer.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
public void tearDownEach() {
|
||||||
|
wireMockServer.resetAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenResponseTimeoutClientWhenRequestTimeoutThenReadTimeoutException() {
|
||||||
|
val path = "/response-timeout";
|
||||||
|
val delay = Math.toIntExact(Duration.ofSeconds(2).toMillis());
|
||||||
|
stubFor(get(urlEqualTo(path)).willReturn(aResponse().withFixedDelay(delay)
|
||||||
|
.withStatus(HttpStatus.OK.value())));
|
||||||
|
|
||||||
|
val webClient = WebClientTimeoutProvider.responseTimeoutClient();
|
||||||
|
|
||||||
|
val ex = assertThrows(RuntimeException.class, () ->
|
||||||
|
webClient.get()
|
||||||
|
.uri(wireMockServer.baseUrl() + path)
|
||||||
|
.exchangeToMono(Mono::just)
|
||||||
|
.log()
|
||||||
|
.block());
|
||||||
|
assertThat(ex).isInstanceOf(WebClientRequestException.class)
|
||||||
|
.getCause().isInstanceOf(ReadTimeoutException.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenReadWriteTimeoutClientWhenRequestTimeoutThenReadTimeoutException() {
|
||||||
|
val path = "/read-write-timeout";
|
||||||
|
val delay = Math.toIntExact(Duration.ofSeconds(6).toMillis());
|
||||||
|
stubFor(get(urlEqualTo(path)).willReturn(aResponse().withFixedDelay(delay)
|
||||||
|
.withStatus(HttpStatus.OK.value())));
|
||||||
|
|
||||||
|
val webClient = WebClientTimeoutProvider.readWriteTimeoutClient();
|
||||||
|
|
||||||
|
val ex = assertThrows(RuntimeException.class, () ->
|
||||||
|
webClient.get()
|
||||||
|
.uri(wireMockServer.baseUrl() + path)
|
||||||
|
.exchangeToMono(Mono::just)
|
||||||
|
.log()
|
||||||
|
.block());
|
||||||
|
assertThat(ex).isInstanceOf(WebClientRequestException.class)
|
||||||
|
.getCause().isInstanceOf(ReadTimeoutException.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenNoTimeoutClientAndReactorTimeoutWhenRequestTimeoutThenTimeoutException() {
|
||||||
|
val path = "/reactor-timeout";
|
||||||
|
val delay = Math.toIntExact(Duration.ofSeconds(5).toMillis());
|
||||||
|
stubFor(get(urlEqualTo(path)).willReturn(aResponse().withFixedDelay(delay)
|
||||||
|
.withStatus(HttpStatus.OK.value())));
|
||||||
|
|
||||||
|
val webClient = WebClientTimeoutProvider.defaultWebClient();
|
||||||
|
|
||||||
|
val ex = assertThrows(RuntimeException.class, () ->
|
||||||
|
webClient.get()
|
||||||
|
.uri(wireMockServer.baseUrl() + path)
|
||||||
|
.exchangeToMono(Mono::just)
|
||||||
|
.timeout(Duration.ofSeconds(1))
|
||||||
|
.log()
|
||||||
|
.block());
|
||||||
|
assertThat(ex).hasMessageContaining("Did not observe any item")
|
||||||
|
.getCause().isInstanceOf(TimeoutException.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenNoTimeoutClientAndTimeoutHttpRequestWhenRequestTimeoutThenReadTimeoutException() {
|
||||||
|
val path = "/reactor-http-request-timeout";
|
||||||
|
val delay = Math.toIntExact(Duration.ofSeconds(5).toMillis());
|
||||||
|
stubFor(get(urlEqualTo(path)).willReturn(aResponse().withFixedDelay(delay)
|
||||||
|
.withStatus(HttpStatus.OK.value())));
|
||||||
|
|
||||||
|
val webClient = WebClientTimeoutProvider.defaultWebClient();
|
||||||
|
|
||||||
|
val ex = assertThrows(RuntimeException.class, () ->
|
||||||
|
webClient.get()
|
||||||
|
.uri(wireMockServer.baseUrl() + path)
|
||||||
|
.httpRequest(httpRequest -> {
|
||||||
|
HttpClientRequest reactorRequest = httpRequest.getNativeRequest();
|
||||||
|
reactorRequest.responseTimeout(Duration.ofSeconds(1));
|
||||||
|
})
|
||||||
|
.exchangeToMono(Mono::just)
|
||||||
|
.log()
|
||||||
|
.block());
|
||||||
|
assertThat(ex).isInstanceOf(WebClientRequestException.class)
|
||||||
|
.getCause().isInstanceOf(ReadTimeoutException.class);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue