From b731b47b1b22ac311be664ea8219d7d8e2451697 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Wed, 25 Dec 2019 08:35:48 +0100 Subject: [PATCH] DATAES-719 - Add customization hook for reactive WebClient. Original PR: #363 (cherry picked from commit f7a14c1135189a62675a3345d70ccb1a6ec16af4) --- .../reference/elasticsearch-clients.adoc | 10 ++- .../reactive/DefaultWebClientProvider.java | 88 ++++++++----------- .../client/ClientConfigurationUnitTests.java | 22 +++++ .../DefaultWebClientProviderUnitTests.java | 17 ++++ 4 files changed, 86 insertions(+), 51 deletions(-) diff --git a/src/main/asciidoc/reference/elasticsearch-clients.adoc b/src/main/asciidoc/reference/elasticsearch-clients.adoc index 1d1b8d813..5a9e23801 100644 --- a/src/main/asciidoc/reference/elasticsearch-clients.adoc +++ b/src/main/asciidoc/reference/elasticsearch-clients.adoc @@ -104,8 +104,15 @@ static class Config { @Bean ReactiveElasticsearchClient client() { - ClientConfiguration clientConfiguration = ClientConfiguration.builder() <1> + ClientConfiguration clientConfiguration = ClientConfiguration.builder() <1> .connectedTo("localhost:9200", "localhost:9291") + .withWebClientConfigurer(webClient -> { <2> + ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() + .codecs(configurer -> configurer.defaultCodecs() + .maxInMemorySize(-1)) + .build(); + return webClient.mutate().exchangeStrategies(exchangeStrategies).build(); + }) .build(); return ReactiveRestClients.create(clientConfiguration); @@ -124,6 +131,7 @@ Mono response = client.index(request -> ); ---- <1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enable SSL. +<2> when configuring a reactive client, the `withWebClientConfigurer` hook can be used to customize the WebClient. ==== NOTE: The ReactiveClient response, especially for search operations, is bound to the `from` (offset) & `size` (limit) options of the request. diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java index 155ca2d45..e55f3839a 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java @@ -19,6 +19,7 @@ import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; +import java.util.function.Function; import org.springframework.http.HttpHeaders; import org.springframework.http.client.reactive.ClientHttpConnector; @@ -45,6 +46,7 @@ class DefaultWebClientProvider implements WebClientProvider { private final Consumer errorListener; private final HttpHeaders headers; private final String pathPrefix; + private final Function webClientConfigurer; /** * Create new {@link DefaultWebClientProvider} with empty {@link HttpHeaders} and no-op {@literal error listener}. @@ -53,24 +55,28 @@ class DefaultWebClientProvider implements WebClientProvider { * @param connector can be {@literal null}. */ DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector) { - this(scheme, connector, e -> {}, HttpHeaders.EMPTY, null); + this(scheme, connector, e -> {}, HttpHeaders.EMPTY, null, Function.identity()); } /** * Create new {@link DefaultWebClientProvider} with empty {@link HttpHeaders} and no-op {@literal error listener}. * * @param scheme must not be {@literal null}. - * @param connector can be {@literal null}. - * @param errorListener must not be {@literal null}. - * @param headers must not be {@literal null}. - * @param pathPrefixcan be {@literal null} + * @param connector can be {@literal null}. + * @param errorListener must not be {@literal null}. + * @param headers must not be {@literal null}. + * @param pathPrefix can be {@literal null} + * @param webClientConfigurer must not be {@literal null}. */ private DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector, - Consumer errorListener, HttpHeaders headers, @Nullable String pathPrefix) { + Consumer errorListener, HttpHeaders headers, @Nullable String pathPrefix, + Function webClientConfigurer) { Assert.notNull(scheme, "Scheme must not be null! A common scheme would be 'http'."); - Assert.notNull(errorListener, "ErrorListener must not be null! You may want use a no-op one 'e -> {}' instead."); + Assert.notNull(errorListener, "errorListener must not be null! You may want use a no-op one 'e -> {}' instead."); Assert.notNull(headers, "headers must not be null! Think about using 'HttpHeaders.EMPTY' as an alternative."); + Assert.notNull(webClientConfigurer, + "webClientConfigurer must not be null! You may want use a no-op one 'Function.identity()' instead."); this.cachedClients = new ConcurrentHashMap<>(); this.scheme = scheme; @@ -78,12 +84,9 @@ class DefaultWebClientProvider implements WebClientProvider { this.errorListener = errorListener; this.headers = headers; this.pathPrefix = pathPrefix; + this.webClientConfigurer = webClientConfigurer; } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#get(java.net.InetSocketAddress) - */ @Override public WebClient get(InetSocketAddress endpoint) { @@ -92,19 +95,21 @@ class DefaultWebClientProvider implements WebClientProvider { return this.cachedClients.computeIfAbsent(endpoint, this::createWebClientForSocketAddress); } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#getDefaultHeaders() - */ @Override public HttpHeaders getDefaultHeaders() { return headers; } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#withDefaultHeaders(org.springframework.http.HttpHeaders) - */ + @Override + public Consumer getErrorListener() { + return this.errorListener; + } + + @Override + public String getPathPrefix() { + return pathPrefix; + } + @Override public WebClientProvider withDefaultHeaders(HttpHeaders headers) { @@ -114,51 +119,33 @@ class DefaultWebClientProvider implements WebClientProvider { merged.addAll(this.headers); merged.addAll(headers); - return new DefaultWebClientProvider(this.scheme, this.connector, errorListener, merged, this.pathPrefix); + return new DefaultWebClientProvider(scheme, connector, errorListener, merged, pathPrefix, webClientConfigurer); } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#getErrorListener() - */ - @Override - public Consumer getErrorListener() { - return this.errorListener; - } - - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#getPathPrefix() - */ - @Override - public String getPathPrefix() { - return pathPrefix; - } - - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#withErrorListener(java.util.function.Consumer) - */ @Override public WebClientProvider withErrorListener(Consumer errorListener) { Assert.notNull(errorListener, "Error listener must not be null."); Consumer listener = this.errorListener.andThen(errorListener); - return new DefaultWebClientProvider(this.scheme, this.connector, listener, this.headers, this.pathPrefix); + return new DefaultWebClientProvider(scheme, this.connector, listener, headers, pathPrefix, webClientConfigurer); } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#withPathPrefix(java.lang.String) - */ @Override public WebClientProvider withPathPrefix(String pathPrefix) { Assert.notNull(pathPrefix, "pathPrefix must not be null."); - return new DefaultWebClientProvider(this.scheme, this.connector, this.errorListener, this.headers, pathPrefix); + return new DefaultWebClientProvider(this.scheme, this.connector, this.errorListener, this.headers, pathPrefix, + webClientConfigurer); } + @Override + public WebClientProvider withWebClientConfigurer(Function webClientConfigurer) { + return new DefaultWebClientProvider(scheme, connector, errorListener, headers, pathPrefix, webClientConfigurer); + + } + + protected WebClient createWebClientForSocketAddress(InetSocketAddress socketAddress) { Builder builder = WebClient.builder().defaultHeaders(it -> it.addAll(getDefaultHeaders())); @@ -168,7 +155,8 @@ class DefaultWebClientProvider implements WebClientProvider { } String baseUrl = String.format("%s://%s:%d%s", this.scheme, socketAddress.getHostString(), socketAddress.getPort(), - pathPrefix == null ? "" : "/" + pathPrefix); - return builder.baseUrl(baseUrl).filter((request, next) -> next.exchange(request).doOnError(errorListener)).build(); + pathPrefix == null ? "" : '/' + pathPrefix); + WebClient webClient = builder.baseUrl(baseUrl).filter((request, next) -> next.exchange(request).doOnError(errorListener)).build(); + return webClientConfigurer.apply(webClient); } } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/ClientConfigurationUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/ClientConfigurationUnitTests.java index 81a763cb1..474bb4448 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/ClientConfigurationUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/ClientConfigurationUnitTests.java @@ -20,12 +20,14 @@ import static org.mockito.Mockito.*; import java.net.InetSocketAddress; import java.time.Duration; +import java.util.function.Function; import javax.net.ssl.SSLContext; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.junit.Test; import org.springframework.http.HttpHeaders; +import org.springframework.web.reactive.function.client.WebClient; /** * Unit tests for {@link ClientConfiguration}. @@ -143,6 +145,26 @@ public class ClientConfigurationUnitTests { assertThat(clientConfiguration.getHostNameVerifier()).contains(NoopHostnameVerifier.INSTANCE); } + @Test // DATAES-719 + public void shouldHaveDefaultWebClientConfigurer() { + ClientConfiguration clientConfiguration = ClientConfiguration.builder() // + .connectedTo("foo", "bar") // + .build(); + + assertThat(clientConfiguration.getWebClientConfigurer()).isEqualTo(Function.identity()); + } + + @Test // DATAES-719 + public void shouldUseConfiguredWebClientConfigurer() { + Function webClientConfigurer = webClient -> webClient; + ClientConfiguration clientConfiguration = ClientConfiguration.builder() // + .connectedTo("foo", "bar") // + .withWebClientConfigurer(webClientConfigurer) // + .build(); + + assertThat(clientConfiguration.getWebClientConfigurer()).isEqualTo(webClientConfigurer); + } + private static String buildBasicAuth(String username, String password) { HttpHeaders headers = new HttpHeaders(); diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProviderUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProviderUnitTests.java index 709f342dc..d3bad482b 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProviderUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProviderUnitTests.java @@ -18,12 +18,15 @@ package org.springframework.data.elasticsearch.client.reactive; import static org.assertj.core.api.Assertions.*; import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import org.junit.Test; import org.springframework.web.reactive.function.client.WebClient; /** * @author Christoph Strobl + * @author Peter-Josef Meisch */ public class DefaultWebClientProviderUnitTests { @@ -40,4 +43,18 @@ public class DefaultWebClientProviderUnitTests { assertThat(shouldBeCachedInstanceOfClient1).isSameAs(client1); assertThat(notClient1ButAnotherInstance).isNotSameAs(client1); } + + @Test // DATAES-719 + public void shouldCallWebClientConfigurer() { + AtomicReference configurerCalled = new AtomicReference<>(false); + Function configurer = webClient -> { + configurerCalled.set(true); + return webClient; + }; + WebClientProvider provider = new DefaultWebClientProvider("http", null).withWebClientConfigurer(configurer); + + provider.get(InetSocketAddress.createUnresolved("localhost", 9200)); + + assertThat(configurerCalled).hasValue(true); + } }