mirror of
https://github.com/spring-projects/spring-data-elasticsearch.git
synced 2025-06-24 04:52:12 +00:00
DATAES-719 - Add customization hook for reactive WebClient.
Original PR: #363
This commit is contained in:
parent
6dfeee3ba9
commit
f7a14c1135
@ -108,8 +108,15 @@ static class Config {
|
|||||||
@Bean
|
@Bean
|
||||||
ReactiveElasticsearchClient client() {
|
ReactiveElasticsearchClient client() {
|
||||||
|
|
||||||
ClientConfiguration clientConfiguration = ClientConfiguration.builder() <1>
|
ClientConfiguration clientConfiguration = ClientConfiguration.builder() <1>
|
||||||
.connectedTo("localhost:9200", "localhost:9291")
|
.connectedTo("localhost:9200", "localhost:9291")
|
||||||
|
.withWebClientConfigurer(webClient -> { <2>
|
||||||
|
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
|
||||||
|
.codecs(configurer -> configurer.defaultCodecs()
|
||||||
|
.maxInMemorySize(-1))
|
||||||
|
.build();
|
||||||
|
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
|
||||||
|
})
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
return ReactiveRestClients.create(clientConfiguration);
|
return ReactiveRestClients.create(clientConfiguration);
|
||||||
@ -128,6 +135,7 @@ Mono<IndexResponse> response = client.index(request ->
|
|||||||
);
|
);
|
||||||
----
|
----
|
||||||
<1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enable SSL.
|
<1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enable SSL.
|
||||||
|
<2> when configuring a reactive client, the `withWebClientConfigurer` hook can be used to customize the WebClient.
|
||||||
====
|
====
|
||||||
|
|
||||||
NOTE: The ReactiveClient response, especially for search operations, is bound to the `from` (offset) & `size` (limit) options of the request.
|
NOTE: The ReactiveClient response, especially for search operations, is bound to the `from` (offset) & `size` (limit) options of the request.
|
||||||
|
@ -20,11 +20,13 @@ import java.net.SocketAddress;
|
|||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import javax.net.ssl.HostnameVerifier;
|
import javax.net.ssl.HostnameVerifier;
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
|
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configuration interface exposing common client configuration properties for Elasticsearch clients.
|
* Configuration interface exposing common client configuration properties for Elasticsearch clients.
|
||||||
@ -161,6 +163,11 @@ public interface ClientConfiguration {
|
|||||||
*/
|
*/
|
||||||
Optional<String> getProxy();
|
Optional<String> getProxy();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the function for configuring a WebClient.
|
||||||
|
*/
|
||||||
|
Function<WebClient, WebClient> getWebClientConfigurer();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Christoph Strobl
|
* @author Christoph Strobl
|
||||||
*/
|
*/
|
||||||
@ -314,9 +321,17 @@ public interface ClientConfiguration {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* @param proxy a proxy formatted as String {@literal host:port}.
|
* @param proxy a proxy formatted as String {@literal host:port}.
|
||||||
* @return the {@link MaybeSecureClientConfigurationBuilder}.
|
* @return the {@link TerminalClientConfigurationBuilder}.
|
||||||
*/
|
*/
|
||||||
MaybeSecureClientConfigurationBuilder withProxy(String proxy);
|
TerminalClientConfigurationBuilder withProxy(String proxy);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* set customization hook in case of a reactive configuration
|
||||||
|
*
|
||||||
|
* @param webClientConfigurer function to configure the WebClient
|
||||||
|
* @return the {@link TerminalClientConfigurationBuilder}.
|
||||||
|
*/
|
||||||
|
TerminalClientConfigurationBuilder withWebClientConfigurer(Function<WebClient, WebClient> webClientConfigurer);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build the {@link ClientConfiguration} object.
|
* Build the {@link ClientConfiguration} object.
|
||||||
|
@ -20,6 +20,7 @@ import java.time.Duration;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import javax.net.ssl.HostnameVerifier;
|
import javax.net.ssl.HostnameVerifier;
|
||||||
@ -31,6 +32,7 @@ import org.springframework.data.elasticsearch.client.ClientConfiguration.Termina
|
|||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.lang.Nullable;
|
import org.springframework.lang.Nullable;
|
||||||
import org.springframework.util.Assert;
|
import org.springframework.util.Assert;
|
||||||
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default builder implementation for {@link ClientConfiguration}.
|
* Default builder implementation for {@link ClientConfiguration}.
|
||||||
@ -56,6 +58,7 @@ class ClientConfigurationBuilder
|
|||||||
private String password;
|
private String password;
|
||||||
private String pathPrefix;
|
private String pathPrefix;
|
||||||
private String proxy;
|
private String proxy;
|
||||||
|
private Function<WebClient, WebClient> webClientConfigurer;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* (non-Javadoc)
|
* (non-Javadoc)
|
||||||
@ -187,12 +190,20 @@ class ClientConfigurationBuilder
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TerminalClientConfigurationBuilder withPathPrefix(String pathPrefix) {
|
public TerminalClientConfigurationBuilder withPathPrefix(String pathPrefix) {
|
||||||
|
|
||||||
this.pathPrefix = pathPrefix;
|
this.pathPrefix = pathPrefix;
|
||||||
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TerminalClientConfigurationBuilder withWebClientConfigurer(Function<WebClient, WebClient> webClientConfigurer) {
|
||||||
|
|
||||||
|
Assert.notNull(webClientConfigurer, "webClientConfigurer must not be null");
|
||||||
|
|
||||||
|
this.webClientConfigurer = webClientConfigurer;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* (non-Javadoc)
|
* (non-Javadoc)
|
||||||
* @see org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithOptionalDefaultHeaders#build()
|
* @see org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithOptionalDefaultHeaders#build()
|
||||||
@ -208,7 +219,7 @@ class ClientConfigurationBuilder
|
|||||||
}
|
}
|
||||||
|
|
||||||
return new DefaultClientConfiguration(hosts, headers, useSsl, sslContext, soTimeout, connectTimeout, pathPrefix,
|
return new DefaultClientConfiguration(hosts, headers, useSsl, sslContext, soTimeout, connectTimeout, pathPrefix,
|
||||||
hostnameVerifier, proxy);
|
hostnameVerifier, proxy, webClientConfigurer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static InetSocketAddress parse(String hostAndPort) {
|
private static InetSocketAddress parse(String hostAndPort) {
|
||||||
|
@ -21,12 +21,14 @@ import java.util.ArrayList;
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import javax.net.ssl.HostnameVerifier;
|
import javax.net.ssl.HostnameVerifier;
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
|
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.lang.Nullable;
|
import org.springframework.lang.Nullable;
|
||||||
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default {@link ClientConfiguration} implementation.
|
* Default {@link ClientConfiguration} implementation.
|
||||||
@ -34,6 +36,7 @@ import org.springframework.lang.Nullable;
|
|||||||
* @author Mark Paluch
|
* @author Mark Paluch
|
||||||
* @author Christoph Strobl
|
* @author Christoph Strobl
|
||||||
* @author Huw Ayling-Miller
|
* @author Huw Ayling-Miller
|
||||||
|
* @author Peter-Josef Meisch
|
||||||
* @since 3.2
|
* @since 3.2
|
||||||
*/
|
*/
|
||||||
class DefaultClientConfiguration implements ClientConfiguration {
|
class DefaultClientConfiguration implements ClientConfiguration {
|
||||||
@ -47,10 +50,11 @@ class DefaultClientConfiguration implements ClientConfiguration {
|
|||||||
private final String pathPrefix;
|
private final String pathPrefix;
|
||||||
private final @Nullable HostnameVerifier hostnameVerifier;
|
private final @Nullable HostnameVerifier hostnameVerifier;
|
||||||
private final String proxy;
|
private final String proxy;
|
||||||
|
private final Function<WebClient, WebClient> webClientConfigurer;
|
||||||
|
|
||||||
DefaultClientConfiguration(List<InetSocketAddress> hosts, HttpHeaders headers, boolean useSsl,
|
DefaultClientConfiguration(List<InetSocketAddress> hosts, HttpHeaders headers, boolean useSsl,
|
||||||
@Nullable SSLContext sslContext, Duration soTimeout, Duration connectTimeout, @Nullable String pathPrefix,
|
@Nullable SSLContext sslContext, Duration soTimeout, Duration connectTimeout, @Nullable String pathPrefix,
|
||||||
@Nullable HostnameVerifier hostnameVerifier, String proxy) {
|
@Nullable HostnameVerifier hostnameVerifier, String proxy, Function<WebClient, WebClient> webClientConfigurer) {
|
||||||
|
|
||||||
this.hosts = Collections.unmodifiableList(new ArrayList<>(hosts));
|
this.hosts = Collections.unmodifiableList(new ArrayList<>(hosts));
|
||||||
this.headers = new HttpHeaders(headers);
|
this.headers = new HttpHeaders(headers);
|
||||||
@ -61,86 +65,56 @@ class DefaultClientConfiguration implements ClientConfiguration {
|
|||||||
this.pathPrefix = pathPrefix;
|
this.pathPrefix = pathPrefix;
|
||||||
this.hostnameVerifier = hostnameVerifier;
|
this.hostnameVerifier = hostnameVerifier;
|
||||||
this.proxy = proxy;
|
this.proxy = proxy;
|
||||||
|
this.webClientConfigurer = webClientConfigurer;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* (non-Javadoc)
|
|
||||||
* @see org.springframework.data.elasticsearch.client.ClientConfiguration#getEndpoints()
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public List<InetSocketAddress> getEndpoints() {
|
public List<InetSocketAddress> getEndpoints() {
|
||||||
return this.hosts;
|
return this.hosts;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* (non-Javadoc)
|
|
||||||
* @see org.springframework.data.elasticsearch.client.ClientConfiguration#getDefaultHeaders()
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public HttpHeaders getDefaultHeaders() {
|
public HttpHeaders getDefaultHeaders() {
|
||||||
return this.headers;
|
return this.headers;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* (non-Javadoc)
|
|
||||||
* @see org.springframework.data.elasticsearch.client.ClientConfiguration#useSsl()
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public boolean useSsl() {
|
public boolean useSsl() {
|
||||||
return this.useSsl;
|
return this.useSsl;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* (non-Javadoc)
|
|
||||||
* @see org.springframework.data.elasticsearch.client.ClientConfiguration#getSslContext()
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<SSLContext> getSslContext() {
|
public Optional<SSLContext> getSslContext() {
|
||||||
return Optional.ofNullable(this.sslContext);
|
return Optional.ofNullable(this.sslContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* (non-Javadoc)
|
|
||||||
* @see org.springframework.data.elasticsearch.client.ClientConfiguration#getHostNameVerifier()
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<HostnameVerifier> getHostNameVerifier() {
|
public Optional<HostnameVerifier> getHostNameVerifier() {
|
||||||
return Optional.ofNullable(this.hostnameVerifier);
|
return Optional.ofNullable(this.hostnameVerifier);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* (non-Javadoc)
|
|
||||||
* @see org.springframework.data.elasticsearch.client.ClientConfiguration#getConnectTimeout()
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public Duration getConnectTimeout() {
|
public Duration getConnectTimeout() {
|
||||||
return this.connectTimeout;
|
return this.connectTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* (non-Javadoc)
|
|
||||||
* @see org.springframework.data.elasticsearch.client.ClientConfiguration#getSocketTimeout()
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public Duration getSocketTimeout() {
|
public Duration getSocketTimeout() {
|
||||||
return this.soTimeout;
|
return this.soTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* (non-Javadoc)
|
|
||||||
* @see org.springframework.data.elasticsearch.client.ClientConfiguration#getPathPrefix()
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public String getPathPrefix() {
|
public String getPathPrefix() {
|
||||||
return this.pathPrefix;
|
return this.pathPrefix;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* (non-Javadoc)
|
|
||||||
* @see org.springframework.data.elasticsearch.client.ClientConfiguration#getProxy()
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<String> getProxy() {
|
public Optional<String> getProxy() {
|
||||||
return Optional.ofNullable(proxy);
|
return Optional.ofNullable(proxy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Function<WebClient, WebClient> getWebClientConfigurer() {
|
||||||
|
return webClientConfigurer != null ? webClientConfigurer : Function.identity();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -226,7 +226,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
|||||||
provider = provider.withPathPrefix(clientConfiguration.getPathPrefix());
|
provider = provider.withPathPrefix(clientConfiguration.getPathPrefix());
|
||||||
}
|
}
|
||||||
|
|
||||||
return provider.withDefaultHeaders(clientConfiguration.getDefaultHeaders());
|
provider = provider.withDefaultHeaders(clientConfiguration.getDefaultHeaders()) //
|
||||||
|
.withWebClientConfigurer(clientConfiguration.getWebClientConfigurer());
|
||||||
|
return provider;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.client.reactive.ClientHttpConnector;
|
import org.springframework.http.client.reactive.ClientHttpConnector;
|
||||||
@ -33,6 +34,7 @@ import org.springframework.web.reactive.function.client.WebClient.Builder;
|
|||||||
* @author Mark Paluch
|
* @author Mark Paluch
|
||||||
* @author Christoph Strobl
|
* @author Christoph Strobl
|
||||||
* @author Huw Ayling-Miller
|
* @author Huw Ayling-Miller
|
||||||
|
* @author Peter-Josef Meisch
|
||||||
* @since 3.2
|
* @since 3.2
|
||||||
*/
|
*/
|
||||||
class DefaultWebClientProvider implements WebClientProvider {
|
class DefaultWebClientProvider implements WebClientProvider {
|
||||||
@ -44,6 +46,7 @@ class DefaultWebClientProvider implements WebClientProvider {
|
|||||||
private final Consumer<Throwable> errorListener;
|
private final Consumer<Throwable> errorListener;
|
||||||
private final HttpHeaders headers;
|
private final HttpHeaders headers;
|
||||||
private final String pathPrefix;
|
private final String pathPrefix;
|
||||||
|
private final Function<WebClient, WebClient> webClientConfigurer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create new {@link DefaultWebClientProvider} with empty {@link HttpHeaders} and no-op {@literal error listener}.
|
* Create new {@link DefaultWebClientProvider} with empty {@link HttpHeaders} and no-op {@literal error listener}.
|
||||||
@ -52,24 +55,28 @@ class DefaultWebClientProvider implements WebClientProvider {
|
|||||||
* @param connector can be {@literal null}.
|
* @param connector can be {@literal null}.
|
||||||
*/
|
*/
|
||||||
DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector) {
|
DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector) {
|
||||||
this(scheme, connector, e -> {}, HttpHeaders.EMPTY, null);
|
this(scheme, connector, e -> {}, HttpHeaders.EMPTY, null, Function.identity());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create new {@link DefaultWebClientProvider} with empty {@link HttpHeaders} and no-op {@literal error listener}.
|
* Create new {@link DefaultWebClientProvider} with empty {@link HttpHeaders} and no-op {@literal error listener}.
|
||||||
*
|
*
|
||||||
|
* @param pathPrefixcan be {@literal null}
|
||||||
* @param scheme must not be {@literal null}.
|
* @param scheme must not be {@literal null}.
|
||||||
* @param connector can be {@literal null}.
|
* @param connector can be {@literal null}.
|
||||||
* @param errorListener must not be {@literal null}.
|
* @param errorListener must not be {@literal null}.
|
||||||
* @param headers must not be {@literal null}.
|
* @param headers must not be {@literal null}.
|
||||||
* @param pathPrefixcan be {@literal null}
|
* @param webClientConfigurer must not be {@literal null}.
|
||||||
*/
|
*/
|
||||||
private DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector,
|
private DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector,
|
||||||
Consumer<Throwable> errorListener, HttpHeaders headers, @Nullable String pathPrefix) {
|
Consumer<Throwable> errorListener, HttpHeaders headers, @Nullable String pathPrefix,
|
||||||
|
Function<WebClient, WebClient> webClientConfigurer) {
|
||||||
|
|
||||||
Assert.notNull(scheme, "Scheme must not be null! A common scheme would be 'http'.");
|
Assert.notNull(scheme, "Scheme must not be null! A common scheme would be 'http'.");
|
||||||
Assert.notNull(errorListener, "ErrorListener must not be null! You may want use a no-op one 'e -> {}' instead.");
|
Assert.notNull(errorListener, "errorListener must not be null! You may want use a no-op one 'e -> {}' instead.");
|
||||||
Assert.notNull(headers, "headers must not be null! Think about using 'HttpHeaders.EMPTY' as an alternative.");
|
Assert.notNull(headers, "headers must not be null! Think about using 'HttpHeaders.EMPTY' as an alternative.");
|
||||||
|
Assert.notNull(webClientConfigurer,
|
||||||
|
"webClientConfigurer must not be null! You may want use a no-op one 'Function.identity()' instead.");
|
||||||
|
|
||||||
this.cachedClients = new ConcurrentHashMap<>();
|
this.cachedClients = new ConcurrentHashMap<>();
|
||||||
this.scheme = scheme;
|
this.scheme = scheme;
|
||||||
@ -77,12 +84,9 @@ class DefaultWebClientProvider implements WebClientProvider {
|
|||||||
this.errorListener = errorListener;
|
this.errorListener = errorListener;
|
||||||
this.headers = headers;
|
this.headers = headers;
|
||||||
this.pathPrefix = pathPrefix;
|
this.pathPrefix = pathPrefix;
|
||||||
|
this.webClientConfigurer = webClientConfigurer;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* (non-Javadoc)
|
|
||||||
* @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#get(java.net.InetSocketAddress)
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public WebClient get(InetSocketAddress endpoint) {
|
public WebClient get(InetSocketAddress endpoint) {
|
||||||
|
|
||||||
@ -91,19 +95,21 @@ class DefaultWebClientProvider implements WebClientProvider {
|
|||||||
return this.cachedClients.computeIfAbsent(endpoint, this::createWebClientForSocketAddress);
|
return this.cachedClients.computeIfAbsent(endpoint, this::createWebClientForSocketAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* (non-Javadoc)
|
|
||||||
* @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#getDefaultHeaders()
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public HttpHeaders getDefaultHeaders() {
|
public HttpHeaders getDefaultHeaders() {
|
||||||
return headers;
|
return headers;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
@Override
|
||||||
* (non-Javadoc)
|
public Consumer<Throwable> getErrorListener() {
|
||||||
* @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#withDefaultHeaders(org.springframework.http.HttpHeaders)
|
return this.errorListener;
|
||||||
*/
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getPathPrefix() {
|
||||||
|
return pathPrefix;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public WebClientProvider withDefaultHeaders(HttpHeaders headers) {
|
public WebClientProvider withDefaultHeaders(HttpHeaders headers) {
|
||||||
|
|
||||||
@ -113,51 +119,33 @@ class DefaultWebClientProvider implements WebClientProvider {
|
|||||||
merged.addAll(this.headers);
|
merged.addAll(this.headers);
|
||||||
merged.addAll(headers);
|
merged.addAll(headers);
|
||||||
|
|
||||||
return new DefaultWebClientProvider(this.scheme, this.connector, errorListener, merged, this.pathPrefix);
|
return new DefaultWebClientProvider(scheme, connector, errorListener, merged, pathPrefix, webClientConfigurer);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* (non-Javadoc)
|
|
||||||
* @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#getErrorListener()
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public Consumer<Throwable> getErrorListener() {
|
|
||||||
return this.errorListener;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* (non-Javadoc)
|
|
||||||
* @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#getPathPrefix()
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public String getPathPrefix() {
|
|
||||||
return pathPrefix;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* (non-Javadoc)
|
|
||||||
* @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#withErrorListener(java.util.function.Consumer)
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public WebClientProvider withErrorListener(Consumer<Throwable> errorListener) {
|
public WebClientProvider withErrorListener(Consumer<Throwable> errorListener) {
|
||||||
|
|
||||||
Assert.notNull(errorListener, "Error listener must not be null.");
|
Assert.notNull(errorListener, "Error listener must not be null.");
|
||||||
|
|
||||||
Consumer<Throwable> listener = this.errorListener.andThen(errorListener);
|
Consumer<Throwable> listener = this.errorListener.andThen(errorListener);
|
||||||
return new DefaultWebClientProvider(this.scheme, this.connector, listener, this.headers, this.pathPrefix);
|
return new DefaultWebClientProvider(scheme, this.connector, listener, headers, pathPrefix, webClientConfigurer);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* (non-Javadoc)
|
|
||||||
* @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#withPathPrefix(java.lang.String)
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public WebClientProvider withPathPrefix(String pathPrefix) {
|
public WebClientProvider withPathPrefix(String pathPrefix) {
|
||||||
Assert.notNull(pathPrefix, "pathPrefix must not be null.");
|
Assert.notNull(pathPrefix, "pathPrefix must not be null.");
|
||||||
|
|
||||||
return new DefaultWebClientProvider(this.scheme, this.connector, this.errorListener, this.headers, pathPrefix);
|
return new DefaultWebClientProvider(this.scheme, this.connector, this.errorListener, this.headers, pathPrefix,
|
||||||
|
webClientConfigurer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WebClientProvider withWebClientConfigurer(Function<WebClient, WebClient> webClientConfigurer) {
|
||||||
|
return new DefaultWebClientProvider(scheme, connector, errorListener, headers, pathPrefix, webClientConfigurer);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
protected WebClient createWebClientForSocketAddress(InetSocketAddress socketAddress) {
|
protected WebClient createWebClientForSocketAddress(InetSocketAddress socketAddress) {
|
||||||
|
|
||||||
Builder builder = WebClient.builder().defaultHeaders(it -> it.addAll(getDefaultHeaders()));
|
Builder builder = WebClient.builder().defaultHeaders(it -> it.addAll(getDefaultHeaders()));
|
||||||
@ -167,7 +155,8 @@ class DefaultWebClientProvider implements WebClientProvider {
|
|||||||
}
|
}
|
||||||
|
|
||||||
String baseUrl = String.format("%s://%s:%d%s", this.scheme, socketAddress.getHostString(), socketAddress.getPort(),
|
String baseUrl = String.format("%s://%s:%d%s", this.scheme, socketAddress.getHostString(), socketAddress.getPort(),
|
||||||
pathPrefix == null ? "" : "/" + pathPrefix);
|
pathPrefix == null ? "" : '/' + pathPrefix);
|
||||||
return builder.baseUrl(baseUrl).filter((request, next) -> next.exchange(request).doOnError(errorListener)).build();
|
WebClient webClient = builder.baseUrl(baseUrl).filter((request, next) -> next.exchange(request).doOnError(errorListener)).build();
|
||||||
|
return webClientConfigurer.apply(webClient);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@ package org.springframework.data.elasticsearch.client.reactive;
|
|||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.client.reactive.ClientHttpConnector;
|
import org.springframework.http.client.reactive.ClientHttpConnector;
|
||||||
@ -35,6 +36,7 @@ import org.springframework.web.reactive.function.client.WebClient;
|
|||||||
* @author Christoph Strobl
|
* @author Christoph Strobl
|
||||||
* @author Mark Paluch
|
* @author Mark Paluch
|
||||||
* @author Huw Ayling-Miller
|
* @author Huw Ayling-Miller
|
||||||
|
* @author Peter-Josef Meisch
|
||||||
* @since 3.2
|
* @since 3.2
|
||||||
*/
|
*/
|
||||||
public interface WebClientProvider {
|
public interface WebClientProvider {
|
||||||
@ -129,4 +131,12 @@ public interface WebClientProvider {
|
|||||||
* @since 4.0
|
* @since 4.0
|
||||||
*/
|
*/
|
||||||
WebClientProvider withPathPrefix(String pathPrefix);
|
WebClientProvider withPathPrefix(String pathPrefix);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new instance of {@link WebClientProvider} calling the given {@link Function} to configure the {@link WebClient}.
|
||||||
|
* @param webClientConfigurer configuration function
|
||||||
|
* @return new instance of {@link WebClientProvider}
|
||||||
|
* @since 4.0
|
||||||
|
*/
|
||||||
|
WebClientProvider withWebClientConfigurer(Function<WebClient, WebClient> webClientConfigurer);
|
||||||
}
|
}
|
||||||
|
@ -20,12 +20,14 @@ import static org.mockito.Mockito.*;
|
|||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
|
|
||||||
import org.apache.http.conn.ssl.NoopHostnameVerifier;
|
import org.apache.http.conn.ssl.NoopHostnameVerifier;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests for {@link ClientConfiguration}.
|
* Unit tests for {@link ClientConfiguration}.
|
||||||
@ -143,6 +145,26 @@ public class ClientConfigurationUnitTests {
|
|||||||
assertThat(clientConfiguration.getHostNameVerifier()).contains(NoopHostnameVerifier.INSTANCE);
|
assertThat(clientConfiguration.getHostNameVerifier()).contains(NoopHostnameVerifier.INSTANCE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test // DATAES-719
|
||||||
|
void shouldHaveDefaultWebClientConfigurer() {
|
||||||
|
ClientConfiguration clientConfiguration = ClientConfiguration.builder() //
|
||||||
|
.connectedTo("foo", "bar") //
|
||||||
|
.build();
|
||||||
|
|
||||||
|
assertThat(clientConfiguration.getWebClientConfigurer()).isEqualTo(Function.identity());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test // DATAES-719
|
||||||
|
void shouldUseConfiguredWebClientConfigurer() {
|
||||||
|
Function<WebClient, WebClient> webClientConfigurer = webClient -> webClient;
|
||||||
|
ClientConfiguration clientConfiguration = ClientConfiguration.builder() //
|
||||||
|
.connectedTo("foo", "bar") //
|
||||||
|
.withWebClientConfigurer(webClientConfigurer) //
|
||||||
|
.build();
|
||||||
|
|
||||||
|
assertThat(clientConfiguration.getWebClientConfigurer()).isEqualTo(webClientConfigurer);
|
||||||
|
}
|
||||||
|
|
||||||
private static String buildBasicAuth(String username, String password) {
|
private static String buildBasicAuth(String username, String password) {
|
||||||
|
|
||||||
HttpHeaders headers = new HttpHeaders();
|
HttpHeaders headers = new HttpHeaders();
|
||||||
|
@ -18,12 +18,15 @@ package org.springframework.data.elasticsearch.client.reactive;
|
|||||||
import static org.assertj.core.api.Assertions.*;
|
import static org.assertj.core.api.Assertions.*;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.springframework.web.reactive.function.client.WebClient;
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Christoph Strobl
|
* @author Christoph Strobl
|
||||||
|
* @author Peter-Josef Meisch
|
||||||
*/
|
*/
|
||||||
public class DefaultWebClientProviderUnitTests {
|
public class DefaultWebClientProviderUnitTests {
|
||||||
|
|
||||||
@ -40,4 +43,18 @@ public class DefaultWebClientProviderUnitTests {
|
|||||||
assertThat(shouldBeCachedInstanceOfClient1).isSameAs(client1);
|
assertThat(shouldBeCachedInstanceOfClient1).isSameAs(client1);
|
||||||
assertThat(notClient1ButAnotherInstance).isNotSameAs(client1);
|
assertThat(notClient1ButAnotherInstance).isNotSameAs(client1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test // DATAES-719
|
||||||
|
void shouldCallWebClientConfigurer() {
|
||||||
|
AtomicReference<Boolean> configurerCalled = new AtomicReference<>(false);
|
||||||
|
Function<WebClient, WebClient> configurer = webClient -> {
|
||||||
|
configurerCalled.set(true);
|
||||||
|
return webClient;
|
||||||
|
};
|
||||||
|
WebClientProvider provider = new DefaultWebClientProvider("http", null).withWebClientConfigurer(configurer);
|
||||||
|
|
||||||
|
provider.get(InetSocketAddress.createUnresolved("localhost", 9200));
|
||||||
|
|
||||||
|
assertThat(configurerCalled).hasValue(true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -61,6 +61,7 @@ import org.springframework.web.util.UriBuilder;
|
|||||||
* @author Christoph Strobl
|
* @author Christoph Strobl
|
||||||
* @author Huw Ayling-Miller
|
* @author Huw Ayling-Miller
|
||||||
* @author Henrique Amaral
|
* @author Henrique Amaral
|
||||||
|
* @author Peter-Josef Meisch
|
||||||
*/
|
*/
|
||||||
public class ReactiveMockClientTestsUtils {
|
public class ReactiveMockClientTestsUtils {
|
||||||
|
|
||||||
@ -276,6 +277,11 @@ public class ReactiveMockClientTestsUtils {
|
|||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WebClientProvider withWebClientConfigurer(Function<WebClient, WebClient> webClientConfigurer) {
|
||||||
|
throw new UnsupportedOperationException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
public Send when(String host) {
|
public Send when(String host) {
|
||||||
InetSocketAddress inetSocketAddress = getInetSocketAddress(host);
|
InetSocketAddress inetSocketAddress = getInetSocketAddress(host);
|
||||||
return new CallbackImpl(get(host), headersUriSpecMap.get(inetSocketAddress),
|
return new CallbackImpl(get(host), headersUriSpecMap.get(inetSocketAddress),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user