mirror of
https://github.com/spring-projects/spring-data-elasticsearch.git
synced 2025-05-30 08:42:10 +00:00
DATAES-588 - Add HttpClientConfigCallback for non-reactive setup.
Original PR: #548
This commit is contained in:
parent
6bfeade7a0
commit
6db1aa6871
@ -150,36 +150,46 @@ Client behaviour can be changed via the `ClientConfiguration` that allows to set
|
||||
[source,java]
|
||||
----
|
||||
HttpHeaders httpHeaders = new HttpHeaders();
|
||||
httpHeaders.add("some-header", "on every request") <1>
|
||||
httpHeaders.add("some-header", "on every request") <.>
|
||||
|
||||
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
|
||||
.connectedTo("localhost:9200", "localhost:9291") <2>
|
||||
.useSsl() <3>
|
||||
.withProxy("localhost:8888") <4>
|
||||
.withPathPrefix("ela") <5>
|
||||
.withConnectTimeout(Duration.ofSeconds(5)) <6>
|
||||
.withSocketTimeout(Duration.ofSeconds(3)) <7>
|
||||
.withDefaultHeaders(defaultHeaders) <8>
|
||||
.withBasicAuth(username, password) <9>
|
||||
.withHeaders(() -> { <10>
|
||||
.connectedTo("localhost:9200", "localhost:9291") <.>
|
||||
.useSsl() <.>
|
||||
.withProxy("localhost:8888") <.>
|
||||
.withPathPrefix("ela") <.>
|
||||
.withConnectTimeout(Duration.ofSeconds(5)) <.>
|
||||
.withSocketTimeout(Duration.ofSeconds(3)) <.>
|
||||
.withDefaultHeaders(defaultHeaders) <.>
|
||||
.withBasicAuth(username, password) <.>
|
||||
.withHeaders(() -> { <.>
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.add("currentTime", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
|
||||
return headers;
|
||||
})
|
||||
.withWebClientConfigurer(webClient -> { <.>
|
||||
//...
|
||||
return webClient;
|
||||
})
|
||||
.withHttpClientConfigurer(clientBuilder -> { <.>
|
||||
//...
|
||||
return clientBuilder;
|
||||
})
|
||||
. // ... other options
|
||||
.build();
|
||||
|
||||
----
|
||||
<1> Define default headers, if they need to be customized
|
||||
<2> Use the builder to provide cluster addresses, set default `HttpHeaders` or enable SSL.
|
||||
<3> Optionally enable SSL.
|
||||
<4> Optionally set a proxy.
|
||||
<5> Optionally set a path prefix, mostly used when different clusters a behind some reverse proxy.
|
||||
<6> Set the connection timeout. Default is 10 sec.
|
||||
<7> Set the socket timeout. Default is 5 sec.
|
||||
<8> Optionally set headers.
|
||||
<9> Add basic authentication.
|
||||
<10> A `Supplier<Header>` function can be specified which is called every time before a request is sent to Elasticsearch - here, as an example, the current time is written in a header.
|
||||
<.> Define default headers, if they need to be customized
|
||||
<.> Use the builder to provide cluster addresses, set default `HttpHeaders` or enable SSL.
|
||||
<.> Optionally enable SSL.
|
||||
<.> Optionally set a proxy.
|
||||
<.> Optionally set a path prefix, mostly used when different clusters a behind some reverse proxy.
|
||||
<.> Set the connection timeout. Default is 10 sec.
|
||||
<.> Set the socket timeout. Default is 5 sec.
|
||||
<.> Optionally set headers.
|
||||
<.> Add basic authentication.
|
||||
<.> A `Supplier<Header>` function can be specified which is called every time before a request is sent to Elasticsearch - here, as an example, the current time is written in a header.
|
||||
<.> for reactive setup a function configuring the `WebClient`
|
||||
<.> for non-reactive setup a function configuring the REST client
|
||||
====
|
||||
|
||||
IMPORTANT: Adding a Header supplier as shown in above example allows to inject headers that may change over the time, like authentication JWT tokens. If this is used in the reactive setup, the supplier function *must not* block!
|
||||
|
@ -26,6 +26,7 @@ import java.util.function.Supplier;
|
||||
import javax.net.ssl.HostnameVerifier;
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
@ -171,6 +172,12 @@ public interface ClientConfiguration {
|
||||
*/
|
||||
Function<WebClient, WebClient> getWebClientConfigurer();
|
||||
|
||||
/**
|
||||
* @return the client configuration callback.
|
||||
* @since 4.2
|
||||
*/
|
||||
HttpClientConfigCallback getHttpClientConfigurer();
|
||||
|
||||
/**
|
||||
* @return the supplier for custom headers.
|
||||
*/
|
||||
@ -341,13 +348,22 @@ public interface ClientConfiguration {
|
||||
*/
|
||||
TerminalClientConfigurationBuilder withWebClientConfigurer(Function<WebClient, WebClient> webClientConfigurer);
|
||||
|
||||
/**
|
||||
* Register a {HttpClientConfigCallback} to configure the non-reactive REST client.
|
||||
*
|
||||
* @param httpClientConfigurer configuration callback, must not be null.
|
||||
* @return the {@link TerminalClientConfigurationBuilder}.
|
||||
* @since 4.2
|
||||
*/
|
||||
TerminalClientConfigurationBuilder withHttpClientConfigurer(HttpClientConfigCallback httpClientConfigurer);
|
||||
|
||||
/**
|
||||
* set a supplier for custom headers. This is invoked for every HTTP request to Elasticsearch to retrieve headers
|
||||
* that should be sent with the request. A common use case is passing in authentication headers that may change.
|
||||
* <br/>
|
||||
* Note: When used in a reactive environment, the calling of {@link Supplier#get()} function must not do any
|
||||
* blocking operations. It may return {@literal null}.
|
||||
*
|
||||
*
|
||||
* @param headers supplier function for headers, must not be {@literal null}
|
||||
* @return the {@link TerminalClientConfigurationBuilder}.
|
||||
* @since 4.0
|
||||
|
@ -27,6 +27,7 @@ import java.util.stream.Collectors;
|
||||
import javax.net.ssl.HostnameVerifier;
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
|
||||
import org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithRequiredEndpoint;
|
||||
import org.springframework.data.elasticsearch.client.ClientConfiguration.MaybeSecureClientConfigurationBuilder;
|
||||
import org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder;
|
||||
@ -61,6 +62,7 @@ class ClientConfigurationBuilder
|
||||
private @Nullable String proxy;
|
||||
private Function<WebClient, WebClient> webClientConfigurer = Function.identity();
|
||||
private Supplier<HttpHeaders> headersSupplier = () -> HttpHeaders.EMPTY;
|
||||
private HttpClientConfigCallback httpClientConfigurer = httpClientBuilder -> httpClientBuilder;
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
@ -207,6 +209,15 @@ class ClientConfigurationBuilder
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TerminalClientConfigurationBuilder withHttpClientConfigurer(HttpClientConfigCallback httpClientConfigurer) {
|
||||
|
||||
Assert.notNull(httpClientConfigurer, "httpClientConfigurer must not be null");
|
||||
|
||||
this.httpClientConfigurer = httpClientConfigurer;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TerminalClientConfigurationBuilder withHeaders(Supplier<HttpHeaders> headers) {
|
||||
|
||||
@ -231,7 +242,7 @@ class ClientConfigurationBuilder
|
||||
}
|
||||
|
||||
return new DefaultClientConfiguration(hosts, headers, useSsl, sslContext, soTimeout, connectTimeout, pathPrefix,
|
||||
hostnameVerifier, proxy, webClientConfigurer, headersSupplier);
|
||||
hostnameVerifier, proxy, webClientConfigurer, httpClientConfigurer, headersSupplier);
|
||||
}
|
||||
|
||||
private static InetSocketAddress parse(String hostAndPort) {
|
||||
|
@ -27,6 +27,7 @@ import java.util.function.Supplier;
|
||||
import javax.net.ssl.HostnameVerifier;
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
@ -52,12 +53,14 @@ class DefaultClientConfiguration implements ClientConfiguration {
|
||||
private final @Nullable HostnameVerifier hostnameVerifier;
|
||||
private final @Nullable String proxy;
|
||||
private final Function<WebClient, WebClient> webClientConfigurer;
|
||||
private final HttpClientConfigCallback httpClientConfigurer;
|
||||
private final Supplier<HttpHeaders> headersSupplier;
|
||||
|
||||
DefaultClientConfiguration(List<InetSocketAddress> hosts, HttpHeaders headers, boolean useSsl,
|
||||
@Nullable SSLContext sslContext, Duration soTimeout, Duration connectTimeout, @Nullable String pathPrefix,
|
||||
@Nullable HostnameVerifier hostnameVerifier, @Nullable String proxy,
|
||||
Function<WebClient, WebClient> webClientConfigurer, Supplier<HttpHeaders> headersSupplier) {
|
||||
Function<WebClient, WebClient> webClientConfigurer, HttpClientConfigCallback httpClientConfigurer,
|
||||
Supplier<HttpHeaders> headersSupplier) {
|
||||
|
||||
this.hosts = Collections.unmodifiableList(new ArrayList<>(hosts));
|
||||
this.headers = new HttpHeaders(headers);
|
||||
@ -69,6 +72,7 @@ class DefaultClientConfiguration implements ClientConfiguration {
|
||||
this.hostnameVerifier = hostnameVerifier;
|
||||
this.proxy = proxy;
|
||||
this.webClientConfigurer = webClientConfigurer;
|
||||
this.httpClientConfigurer = httpClientConfigurer;
|
||||
this.headersSupplier = headersSupplier;
|
||||
}
|
||||
|
||||
@ -123,6 +127,11 @@ class DefaultClientConfiguration implements ClientConfiguration {
|
||||
return webClientConfigurer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpClientConfigCallback getHttpClientConfigurer() {
|
||||
return httpClientConfigurer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Supplier<HttpHeaders> getHeadersSupplier() {
|
||||
return headersSupplier;
|
||||
|
@ -119,6 +119,8 @@ public final class RestClients {
|
||||
|
||||
clientConfiguration.getProxy().map(HttpHost::create).ifPresent(clientBuilder::setProxy);
|
||||
|
||||
clientBuilder = clientConfiguration.getHttpClientConfigurer().customizeHttpClient(clientBuilder);
|
||||
|
||||
return clientBuilder;
|
||||
});
|
||||
|
||||
|
@ -25,6 +25,8 @@ import java.util.function.Function;
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import org.apache.http.conn.ssl.NoopHostnameVerifier;
|
||||
import org.elasticsearch.client.RestClientBuilder;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
@ -165,6 +167,20 @@ public class ClientConfigurationUnitTests {
|
||||
assertThat(clientConfiguration.getWebClientConfigurer()).isEqualTo(webClientConfigurer);
|
||||
}
|
||||
|
||||
@Test // DATAES-588
|
||||
@DisplayName("should use configured httpClientConfigurer")
|
||||
void shouldUseConfiguredHttpClientConfigurer() {
|
||||
|
||||
RestClientBuilder.HttpClientConfigCallback callback = httpClientBuilder -> httpClientBuilder;
|
||||
|
||||
ClientConfiguration clientConfiguration = ClientConfiguration.builder() //
|
||||
.connectedTo("foo", "bar") //
|
||||
.withHttpClientConfigurer(callback) //
|
||||
.build();
|
||||
|
||||
assertThat(clientConfiguration.getHttpClientConfigurer()).isEqualTo(callback);
|
||||
}
|
||||
|
||||
private static String buildBasicAuth(String username, String password) {
|
||||
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
|
@ -2,6 +2,7 @@ package org.springframework.data.elasticsearch.client;
|
||||
|
||||
import static com.github.tomakehurst.wiremock.client.WireMock.*;
|
||||
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
@ -10,6 +11,7 @@ import java.util.function.Consumer;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.RestClientBuilder;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
@ -62,10 +64,10 @@ public class RestClientsTest {
|
||||
});
|
||||
}
|
||||
|
||||
@ParameterizedTest // DATAES-801
|
||||
@ParameterizedTest // DATAES-801, DATAES-588
|
||||
@MethodSource("clientUnderTestFactorySource")
|
||||
@DisplayName("should set all required headers")
|
||||
void shouldSetAllRequiredHeaders(ClientUnderTestFactory clientUnderTestFactory) {
|
||||
@DisplayName("should configure client and set all required headers")
|
||||
void shouldConfigureClientAndSetAllRequiredHeaders(ClientUnderTestFactory clientUnderTestFactory) {
|
||||
wireMockServer(server -> {
|
||||
|
||||
WireMock.configureFor(server.port());
|
||||
@ -78,6 +80,12 @@ public class RestClientsTest {
|
||||
defaultHeaders.add("def2", "def2-1");
|
||||
|
||||
AtomicInteger supplierCount = new AtomicInteger(1);
|
||||
AtomicInteger clientConfigurerCount = new AtomicInteger(0);
|
||||
|
||||
RestClientBuilder.HttpClientConfigCallback configCallback = httpClientBuilder -> {
|
||||
clientConfigurerCount.incrementAndGet();
|
||||
return httpClientBuilder;
|
||||
};
|
||||
|
||||
ClientConfigurationBuilder configurationBuilder = new ClientConfigurationBuilder();
|
||||
ClientConfiguration clientConfiguration = configurationBuilder //
|
||||
@ -89,7 +97,8 @@ public class RestClientsTest {
|
||||
httpHeaders.add("supplied", "val0");
|
||||
httpHeaders.add("supplied", "val" + supplierCount.getAndIncrement());
|
||||
return httpHeaders;
|
||||
}).build();
|
||||
}) //
|
||||
.withHttpClientConfigurer(configCallback).build();
|
||||
|
||||
ClientUnderTest clientUnderTest = clientUnderTestFactory.create(clientConfiguration);
|
||||
|
||||
@ -97,7 +106,8 @@ public class RestClientsTest {
|
||||
for (int i = 1; i <= 3; i++) {
|
||||
clientUnderTest.ping();
|
||||
|
||||
verify(headRequestedFor(urlEqualTo("/")).withHeader("Authorization", new AnythingPattern()) //
|
||||
verify(headRequestedFor(urlEqualTo("/")) //
|
||||
.withHeader("Authorization", new AnythingPattern()) //
|
||||
.withHeader("def1", new EqualToPattern("def1-1")) //
|
||||
.withHeader("def1", new EqualToPattern("def1-2")) //
|
||||
.withHeader("def2", new EqualToPattern("def2-1")) //
|
||||
@ -105,6 +115,11 @@ public class RestClientsTest {
|
||||
.withHeader("supplied", new EqualToPattern("val" + i)) //
|
||||
);
|
||||
}
|
||||
|
||||
// clientConfigurer is only used in non-reactive setup
|
||||
if (!(clientUnderTestFactory instanceof ReactiveElasticsearchClientUnderTestFactory)) {
|
||||
assertThat(clientConfigurerCount).hasValue(1);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@ -148,9 +163,9 @@ public class RestClientsTest {
|
||||
*/
|
||||
interface ClientUnderTest {
|
||||
/**
|
||||
* Pings the configured server.
|
||||
* Pings the configured server. Must use a HEAD request to "/".
|
||||
*
|
||||
* @return
|
||||
* @return true if successful
|
||||
*/
|
||||
boolean ping() throws Exception;
|
||||
}
|
||||
@ -182,13 +197,7 @@ public class RestClientsTest {
|
||||
@Override
|
||||
ClientUnderTest create(ClientConfiguration clientConfiguration) {
|
||||
RestHighLevelClient client = RestClients.create(clientConfiguration).rest();
|
||||
return new ClientUnderTest() {
|
||||
|
||||
@Override
|
||||
public boolean ping() throws Exception {
|
||||
return client.ping(RequestOptions.DEFAULT);
|
||||
}
|
||||
};
|
||||
return () -> client.ping(RequestOptions.DEFAULT);
|
||||
}
|
||||
|
||||
}
|
||||
@ -206,12 +215,7 @@ public class RestClientsTest {
|
||||
@Override
|
||||
ClientUnderTest create(ClientConfiguration clientConfiguration) {
|
||||
ReactiveElasticsearchClient client = ReactiveRestClients.create(clientConfiguration);
|
||||
return new ClientUnderTest() {
|
||||
@Override
|
||||
public boolean ping() throws Exception {
|
||||
return client.ping().block();
|
||||
}
|
||||
};
|
||||
return () -> client.ping().block();
|
||||
}
|
||||
}
|
||||
|
||||
@ -221,6 +225,9 @@ public class RestClientsTest {
|
||||
* @return stream of factories
|
||||
*/
|
||||
static Stream<ClientUnderTestFactory> clientUnderTestFactorySource() {
|
||||
return Stream.of(new RestClientUnderTestFactory(), new ReactiveElasticsearchClientUnderTestFactory());
|
||||
return Stream.of( //
|
||||
new RestClientUnderTestFactory(), //
|
||||
new ReactiveElasticsearchClientUnderTestFactory() //
|
||||
);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user