From 6db1aa687132a2d9bc727dd5da0d83f47dd554d8 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Sun, 8 Nov 2020 21:59:06 +0100 Subject: [PATCH] DATAES-588 - Add HttpClientConfigCallback for non-reactive setup. Original PR: #548 --- .../reference/elasticsearch-clients.adoc | 50 +++++++++++-------- .../client/ClientConfiguration.java | 18 ++++++- .../client/ClientConfigurationBuilder.java | 13 ++++- .../client/DefaultClientConfiguration.java | 11 +++- .../elasticsearch/client/RestClients.java | 2 + .../client/ClientConfigurationUnitTests.java | 16 ++++++ .../elasticsearch/client/RestClientsTest.java | 49 ++++++++++-------- 7 files changed, 115 insertions(+), 44 deletions(-) diff --git a/src/main/asciidoc/reference/elasticsearch-clients.adoc b/src/main/asciidoc/reference/elasticsearch-clients.adoc index 2f90a4e89..67e9ac88e 100644 --- a/src/main/asciidoc/reference/elasticsearch-clients.adoc +++ b/src/main/asciidoc/reference/elasticsearch-clients.adoc @@ -150,36 +150,46 @@ Client behaviour can be changed via the `ClientConfiguration` that allows to set [source,java] ---- HttpHeaders httpHeaders = new HttpHeaders(); -httpHeaders.add("some-header", "on every request") <1> +httpHeaders.add("some-header", "on every request") <.> ClientConfiguration clientConfiguration = ClientConfiguration.builder() - .connectedTo("localhost:9200", "localhost:9291") <2> - .useSsl() <3> - .withProxy("localhost:8888") <4> - .withPathPrefix("ela") <5> - .withConnectTimeout(Duration.ofSeconds(5)) <6> - .withSocketTimeout(Duration.ofSeconds(3)) <7> - .withDefaultHeaders(defaultHeaders) <8> - .withBasicAuth(username, password) <9> - .withHeaders(() -> { <10> + .connectedTo("localhost:9200", "localhost:9291") <.> + .useSsl() <.> + .withProxy("localhost:8888") <.> + .withPathPrefix("ela") <.> + .withConnectTimeout(Duration.ofSeconds(5)) <.> + .withSocketTimeout(Duration.ofSeconds(3)) <.> + .withDefaultHeaders(defaultHeaders) <.> + .withBasicAuth(username, password) <.> + .withHeaders(() -> { <.> HttpHeaders headers = new HttpHeaders(); headers.add("currentTime", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); return headers; }) + .withWebClientConfigurer(webClient -> { <.> + //... + return webClient; + }) + .withHttpClientConfigurer(clientBuilder -> { <.> + //... + return clientBuilder; + }) . // ... other options .build(); ---- -<1> Define default headers, if they need to be customized -<2> Use the builder to provide cluster addresses, set default `HttpHeaders` or enable SSL. -<3> Optionally enable SSL. -<4> Optionally set a proxy. -<5> Optionally set a path prefix, mostly used when different clusters a behind some reverse proxy. -<6> Set the connection timeout. Default is 10 sec. -<7> Set the socket timeout. Default is 5 sec. -<8> Optionally set headers. -<9> Add basic authentication. -<10> A `Supplier
` function can be specified which is called every time before a request is sent to Elasticsearch - here, as an example, the current time is written in a header. +<.> Define default headers, if they need to be customized +<.> Use the builder to provide cluster addresses, set default `HttpHeaders` or enable SSL. +<.> Optionally enable SSL. +<.> Optionally set a proxy. +<.> Optionally set a path prefix, mostly used when different clusters a behind some reverse proxy. +<.> Set the connection timeout. Default is 10 sec. +<.> Set the socket timeout. Default is 5 sec. +<.> Optionally set headers. +<.> Add basic authentication. +<.> A `Supplier
` function can be specified which is called every time before a request is sent to Elasticsearch - here, as an example, the current time is written in a header. +<.> for reactive setup a function configuring the `WebClient` +<.> for non-reactive setup a function configuring the REST client ==== IMPORTANT: Adding a Header supplier as shown in above example allows to inject headers that may change over the time, like authentication JWT tokens. If this is used in the reactive setup, the supplier function *must not* block! diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java index 3200577e2..e1804f5f6 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java @@ -26,6 +26,7 @@ import java.util.function.Supplier; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; +import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback; import org.springframework.http.HttpHeaders; import org.springframework.lang.Nullable; import org.springframework.web.reactive.function.client.WebClient; @@ -171,6 +172,12 @@ public interface ClientConfiguration { */ Function getWebClientConfigurer(); + /** + * @return the client configuration callback. + * @since 4.2 + */ + HttpClientConfigCallback getHttpClientConfigurer(); + /** * @return the supplier for custom headers. */ @@ -341,13 +348,22 @@ public interface ClientConfiguration { */ TerminalClientConfigurationBuilder withWebClientConfigurer(Function webClientConfigurer); + /** + * Register a {HttpClientConfigCallback} to configure the non-reactive REST client. + * + * @param httpClientConfigurer configuration callback, must not be null. + * @return the {@link TerminalClientConfigurationBuilder}. + * @since 4.2 + */ + TerminalClientConfigurationBuilder withHttpClientConfigurer(HttpClientConfigCallback httpClientConfigurer); + /** * set a supplier for custom headers. This is invoked for every HTTP request to Elasticsearch to retrieve headers * that should be sent with the request. A common use case is passing in authentication headers that may change. *
* Note: When used in a reactive environment, the calling of {@link Supplier#get()} function must not do any * blocking operations. It may return {@literal null}. - * + * * @param headers supplier function for headers, must not be {@literal null} * @return the {@link TerminalClientConfigurationBuilder}. * @since 4.0 diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java index 07c01ec32..1268368df 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java @@ -27,6 +27,7 @@ import java.util.stream.Collectors; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; +import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback; import org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithRequiredEndpoint; import org.springframework.data.elasticsearch.client.ClientConfiguration.MaybeSecureClientConfigurationBuilder; import org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder; @@ -61,6 +62,7 @@ class ClientConfigurationBuilder private @Nullable String proxy; private Function webClientConfigurer = Function.identity(); private Supplier headersSupplier = () -> HttpHeaders.EMPTY; + private HttpClientConfigCallback httpClientConfigurer = httpClientBuilder -> httpClientBuilder; /* * (non-Javadoc) @@ -207,6 +209,15 @@ class ClientConfigurationBuilder return this; } + @Override + public TerminalClientConfigurationBuilder withHttpClientConfigurer(HttpClientConfigCallback httpClientConfigurer) { + + Assert.notNull(httpClientConfigurer, "httpClientConfigurer must not be null"); + + this.httpClientConfigurer = httpClientConfigurer; + return this; + } + @Override public TerminalClientConfigurationBuilder withHeaders(Supplier headers) { @@ -231,7 +242,7 @@ class ClientConfigurationBuilder } return new DefaultClientConfiguration(hosts, headers, useSsl, sslContext, soTimeout, connectTimeout, pathPrefix, - hostnameVerifier, proxy, webClientConfigurer, headersSupplier); + hostnameVerifier, proxy, webClientConfigurer, httpClientConfigurer, headersSupplier); } private static InetSocketAddress parse(String hostAndPort) { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/DefaultClientConfiguration.java b/src/main/java/org/springframework/data/elasticsearch/client/DefaultClientConfiguration.java index ab43f15a8..f47a79c2e 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/DefaultClientConfiguration.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/DefaultClientConfiguration.java @@ -27,6 +27,7 @@ import java.util.function.Supplier; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; +import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback; import org.springframework.http.HttpHeaders; import org.springframework.lang.Nullable; import org.springframework.web.reactive.function.client.WebClient; @@ -52,12 +53,14 @@ class DefaultClientConfiguration implements ClientConfiguration { private final @Nullable HostnameVerifier hostnameVerifier; private final @Nullable String proxy; private final Function webClientConfigurer; + private final HttpClientConfigCallback httpClientConfigurer; private final Supplier headersSupplier; DefaultClientConfiguration(List hosts, HttpHeaders headers, boolean useSsl, @Nullable SSLContext sslContext, Duration soTimeout, Duration connectTimeout, @Nullable String pathPrefix, @Nullable HostnameVerifier hostnameVerifier, @Nullable String proxy, - Function webClientConfigurer, Supplier headersSupplier) { + Function webClientConfigurer, HttpClientConfigCallback httpClientConfigurer, + Supplier headersSupplier) { this.hosts = Collections.unmodifiableList(new ArrayList<>(hosts)); this.headers = new HttpHeaders(headers); @@ -69,6 +72,7 @@ class DefaultClientConfiguration implements ClientConfiguration { this.hostnameVerifier = hostnameVerifier; this.proxy = proxy; this.webClientConfigurer = webClientConfigurer; + this.httpClientConfigurer = httpClientConfigurer; this.headersSupplier = headersSupplier; } @@ -123,6 +127,11 @@ class DefaultClientConfiguration implements ClientConfiguration { return webClientConfigurer; } + @Override + public HttpClientConfigCallback getHttpClientConfigurer() { + return httpClientConfigurer; + } + @Override public Supplier getHeadersSupplier() { return headersSupplier; diff --git a/src/main/java/org/springframework/data/elasticsearch/client/RestClients.java b/src/main/java/org/springframework/data/elasticsearch/client/RestClients.java index 63f22ffa0..72613dcb8 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/RestClients.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/RestClients.java @@ -119,6 +119,8 @@ public final class RestClients { clientConfiguration.getProxy().map(HttpHost::create).ifPresent(clientBuilder::setProxy); + clientBuilder = clientConfiguration.getHttpClientConfigurer().customizeHttpClient(clientBuilder); + return clientBuilder; }); diff --git a/src/test/java/org/springframework/data/elasticsearch/client/ClientConfigurationUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/ClientConfigurationUnitTests.java index f37da88a1..7b82c3653 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/ClientConfigurationUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/ClientConfigurationUnitTests.java @@ -25,6 +25,8 @@ import java.util.function.Function; import javax.net.ssl.SSLContext; import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.elasticsearch.client.RestClientBuilder; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.http.HttpHeaders; import org.springframework.web.reactive.function.client.WebClient; @@ -165,6 +167,20 @@ public class ClientConfigurationUnitTests { assertThat(clientConfiguration.getWebClientConfigurer()).isEqualTo(webClientConfigurer); } + @Test // DATAES-588 + @DisplayName("should use configured httpClientConfigurer") + void shouldUseConfiguredHttpClientConfigurer() { + + RestClientBuilder.HttpClientConfigCallback callback = httpClientBuilder -> httpClientBuilder; + + ClientConfiguration clientConfiguration = ClientConfiguration.builder() // + .connectedTo("foo", "bar") // + .withHttpClientConfigurer(callback) // + .build(); + + assertThat(clientConfiguration.getHttpClientConfigurer()).isEqualTo(callback); + } + private static String buildBasicAuth(String username, String password) { HttpHeaders headers = new HttpHeaders(); diff --git a/src/test/java/org/springframework/data/elasticsearch/client/RestClientsTest.java b/src/test/java/org/springframework/data/elasticsearch/client/RestClientsTest.java index 0fcd8340b..d3e167ead 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/RestClientsTest.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/RestClientsTest.java @@ -2,6 +2,7 @@ package org.springframework.data.elasticsearch.client; import static com.github.tomakehurst.wiremock.client.WireMock.*; import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options; +import static org.assertj.core.api.Assertions.*; import java.io.IOException; import java.util.Arrays; @@ -10,6 +11,7 @@ import java.util.function.Consumer; import java.util.stream.Stream; import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.DisplayName; @@ -62,10 +64,10 @@ public class RestClientsTest { }); } - @ParameterizedTest // DATAES-801 + @ParameterizedTest // DATAES-801, DATAES-588 @MethodSource("clientUnderTestFactorySource") - @DisplayName("should set all required headers") - void shouldSetAllRequiredHeaders(ClientUnderTestFactory clientUnderTestFactory) { + @DisplayName("should configure client and set all required headers") + void shouldConfigureClientAndSetAllRequiredHeaders(ClientUnderTestFactory clientUnderTestFactory) { wireMockServer(server -> { WireMock.configureFor(server.port()); @@ -78,6 +80,12 @@ public class RestClientsTest { defaultHeaders.add("def2", "def2-1"); AtomicInteger supplierCount = new AtomicInteger(1); + AtomicInteger clientConfigurerCount = new AtomicInteger(0); + + RestClientBuilder.HttpClientConfigCallback configCallback = httpClientBuilder -> { + clientConfigurerCount.incrementAndGet(); + return httpClientBuilder; + }; ClientConfigurationBuilder configurationBuilder = new ClientConfigurationBuilder(); ClientConfiguration clientConfiguration = configurationBuilder // @@ -89,7 +97,8 @@ public class RestClientsTest { httpHeaders.add("supplied", "val0"); httpHeaders.add("supplied", "val" + supplierCount.getAndIncrement()); return httpHeaders; - }).build(); + }) // + .withHttpClientConfigurer(configCallback).build(); ClientUnderTest clientUnderTest = clientUnderTestFactory.create(clientConfiguration); @@ -97,7 +106,8 @@ public class RestClientsTest { for (int i = 1; i <= 3; i++) { clientUnderTest.ping(); - verify(headRequestedFor(urlEqualTo("/")).withHeader("Authorization", new AnythingPattern()) // + verify(headRequestedFor(urlEqualTo("/")) // + .withHeader("Authorization", new AnythingPattern()) // .withHeader("def1", new EqualToPattern("def1-1")) // .withHeader("def1", new EqualToPattern("def1-2")) // .withHeader("def2", new EqualToPattern("def2-1")) // @@ -105,6 +115,11 @@ public class RestClientsTest { .withHeader("supplied", new EqualToPattern("val" + i)) // ); } + + // clientConfigurer is only used in non-reactive setup + if (!(clientUnderTestFactory instanceof ReactiveElasticsearchClientUnderTestFactory)) { + assertThat(clientConfigurerCount).hasValue(1); + } }); } @@ -148,9 +163,9 @@ public class RestClientsTest { */ interface ClientUnderTest { /** - * Pings the configured server. + * Pings the configured server. Must use a HEAD request to "/". * - * @return + * @return true if successful */ boolean ping() throws Exception; } @@ -182,13 +197,7 @@ public class RestClientsTest { @Override ClientUnderTest create(ClientConfiguration clientConfiguration) { RestHighLevelClient client = RestClients.create(clientConfiguration).rest(); - return new ClientUnderTest() { - - @Override - public boolean ping() throws Exception { - return client.ping(RequestOptions.DEFAULT); - } - }; + return () -> client.ping(RequestOptions.DEFAULT); } } @@ -206,12 +215,7 @@ public class RestClientsTest { @Override ClientUnderTest create(ClientConfiguration clientConfiguration) { ReactiveElasticsearchClient client = ReactiveRestClients.create(clientConfiguration); - return new ClientUnderTest() { - @Override - public boolean ping() throws Exception { - return client.ping().block(); - } - }; + return () -> client.ping().block(); } } @@ -221,6 +225,9 @@ public class RestClientsTest { * @return stream of factories */ static Stream clientUnderTestFactorySource() { - return Stream.of(new RestClientUnderTestFactory(), new ReactiveElasticsearchClientUnderTestFactory()); + return Stream.of( // + new RestClientUnderTestFactory(), // + new ReactiveElasticsearchClientUnderTestFactory() // + ); } }