mirror of
https://github.com/spring-projects/spring-data-elasticsearch.git
synced 2025-06-21 19:42:10 +00:00
DATAES-719 - Add customization hook for reactive WebClient.
Original PR: #363 (cherry picked from commit f7a14c1135189a62675a3345d70ccb1a6ec16af4)
This commit is contained in:
parent
31a391522a
commit
b731b47b1b
@ -104,8 +104,15 @@ static class Config {
|
||||
@Bean
|
||||
ReactiveElasticsearchClient client() {
|
||||
|
||||
ClientConfiguration clientConfiguration = ClientConfiguration.builder() <1>
|
||||
ClientConfiguration clientConfiguration = ClientConfiguration.builder() <1>
|
||||
.connectedTo("localhost:9200", "localhost:9291")
|
||||
.withWebClientConfigurer(webClient -> { <2>
|
||||
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
|
||||
.codecs(configurer -> configurer.defaultCodecs()
|
||||
.maxInMemorySize(-1))
|
||||
.build();
|
||||
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
|
||||
})
|
||||
.build();
|
||||
|
||||
return ReactiveRestClients.create(clientConfiguration);
|
||||
@ -124,6 +131,7 @@ Mono<IndexResponse> response = client.index(request ->
|
||||
);
|
||||
----
|
||||
<1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enable SSL.
|
||||
<2> when configuring a reactive client, the `withWebClientConfigurer` hook can be used to customize the WebClient.
|
||||
====
|
||||
|
||||
NOTE: The ReactiveClient response, especially for search operations, is bound to the `from` (offset) & `size` (limit) options of the request.
|
||||
|
@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.client.reactive.ClientHttpConnector;
|
||||
@ -45,6 +46,7 @@ class DefaultWebClientProvider implements WebClientProvider {
|
||||
private final Consumer<Throwable> errorListener;
|
||||
private final HttpHeaders headers;
|
||||
private final String pathPrefix;
|
||||
private final Function<WebClient, WebClient> webClientConfigurer;
|
||||
|
||||
/**
|
||||
* Create new {@link DefaultWebClientProvider} with empty {@link HttpHeaders} and no-op {@literal error listener}.
|
||||
@ -53,24 +55,28 @@ class DefaultWebClientProvider implements WebClientProvider {
|
||||
* @param connector can be {@literal null}.
|
||||
*/
|
||||
DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector) {
|
||||
this(scheme, connector, e -> {}, HttpHeaders.EMPTY, null);
|
||||
this(scheme, connector, e -> {}, HttpHeaders.EMPTY, null, Function.identity());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create new {@link DefaultWebClientProvider} with empty {@link HttpHeaders} and no-op {@literal error listener}.
|
||||
*
|
||||
* @param scheme must not be {@literal null}.
|
||||
* @param connector can be {@literal null}.
|
||||
* @param errorListener must not be {@literal null}.
|
||||
* @param headers must not be {@literal null}.
|
||||
* @param pathPrefixcan be {@literal null}
|
||||
* @param connector can be {@literal null}.
|
||||
* @param errorListener must not be {@literal null}.
|
||||
* @param headers must not be {@literal null}.
|
||||
* @param pathPrefix can be {@literal null}
|
||||
* @param webClientConfigurer must not be {@literal null}.
|
||||
*/
|
||||
private DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector,
|
||||
Consumer<Throwable> errorListener, HttpHeaders headers, @Nullable String pathPrefix) {
|
||||
Consumer<Throwable> errorListener, HttpHeaders headers, @Nullable String pathPrefix,
|
||||
Function<WebClient, WebClient> webClientConfigurer) {
|
||||
|
||||
Assert.notNull(scheme, "Scheme must not be null! A common scheme would be 'http'.");
|
||||
Assert.notNull(errorListener, "ErrorListener must not be null! You may want use a no-op one 'e -> {}' instead.");
|
||||
Assert.notNull(errorListener, "errorListener must not be null! You may want use a no-op one 'e -> {}' instead.");
|
||||
Assert.notNull(headers, "headers must not be null! Think about using 'HttpHeaders.EMPTY' as an alternative.");
|
||||
Assert.notNull(webClientConfigurer,
|
||||
"webClientConfigurer must not be null! You may want use a no-op one 'Function.identity()' instead.");
|
||||
|
||||
this.cachedClients = new ConcurrentHashMap<>();
|
||||
this.scheme = scheme;
|
||||
@ -78,12 +84,9 @@ class DefaultWebClientProvider implements WebClientProvider {
|
||||
this.errorListener = errorListener;
|
||||
this.headers = headers;
|
||||
this.pathPrefix = pathPrefix;
|
||||
this.webClientConfigurer = webClientConfigurer;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#get(java.net.InetSocketAddress)
|
||||
*/
|
||||
@Override
|
||||
public WebClient get(InetSocketAddress endpoint) {
|
||||
|
||||
@ -92,19 +95,21 @@ class DefaultWebClientProvider implements WebClientProvider {
|
||||
return this.cachedClients.computeIfAbsent(endpoint, this::createWebClientForSocketAddress);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#getDefaultHeaders()
|
||||
*/
|
||||
@Override
|
||||
public HttpHeaders getDefaultHeaders() {
|
||||
return headers;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#withDefaultHeaders(org.springframework.http.HttpHeaders)
|
||||
*/
|
||||
@Override
|
||||
public Consumer<Throwable> getErrorListener() {
|
||||
return this.errorListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPathPrefix() {
|
||||
return pathPrefix;
|
||||
}
|
||||
|
||||
@Override
|
||||
public WebClientProvider withDefaultHeaders(HttpHeaders headers) {
|
||||
|
||||
@ -114,51 +119,33 @@ class DefaultWebClientProvider implements WebClientProvider {
|
||||
merged.addAll(this.headers);
|
||||
merged.addAll(headers);
|
||||
|
||||
return new DefaultWebClientProvider(this.scheme, this.connector, errorListener, merged, this.pathPrefix);
|
||||
return new DefaultWebClientProvider(scheme, connector, errorListener, merged, pathPrefix, webClientConfigurer);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#getErrorListener()
|
||||
*/
|
||||
@Override
|
||||
public Consumer<Throwable> getErrorListener() {
|
||||
return this.errorListener;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#getPathPrefix()
|
||||
*/
|
||||
@Override
|
||||
public String getPathPrefix() {
|
||||
return pathPrefix;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#withErrorListener(java.util.function.Consumer)
|
||||
*/
|
||||
@Override
|
||||
public WebClientProvider withErrorListener(Consumer<Throwable> errorListener) {
|
||||
|
||||
Assert.notNull(errorListener, "Error listener must not be null.");
|
||||
|
||||
Consumer<Throwable> listener = this.errorListener.andThen(errorListener);
|
||||
return new DefaultWebClientProvider(this.scheme, this.connector, listener, this.headers, this.pathPrefix);
|
||||
return new DefaultWebClientProvider(scheme, this.connector, listener, headers, pathPrefix, webClientConfigurer);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.elasticsearch.client.reactive.WebClientProvider#withPathPrefix(java.lang.String)
|
||||
*/
|
||||
@Override
|
||||
public WebClientProvider withPathPrefix(String pathPrefix) {
|
||||
Assert.notNull(pathPrefix, "pathPrefix must not be null.");
|
||||
|
||||
return new DefaultWebClientProvider(this.scheme, this.connector, this.errorListener, this.headers, pathPrefix);
|
||||
return new DefaultWebClientProvider(this.scheme, this.connector, this.errorListener, this.headers, pathPrefix,
|
||||
webClientConfigurer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WebClientProvider withWebClientConfigurer(Function<WebClient, WebClient> webClientConfigurer) {
|
||||
return new DefaultWebClientProvider(scheme, connector, errorListener, headers, pathPrefix, webClientConfigurer);
|
||||
|
||||
}
|
||||
|
||||
|
||||
protected WebClient createWebClientForSocketAddress(InetSocketAddress socketAddress) {
|
||||
|
||||
Builder builder = WebClient.builder().defaultHeaders(it -> it.addAll(getDefaultHeaders()));
|
||||
@ -168,7 +155,8 @@ class DefaultWebClientProvider implements WebClientProvider {
|
||||
}
|
||||
|
||||
String baseUrl = String.format("%s://%s:%d%s", this.scheme, socketAddress.getHostString(), socketAddress.getPort(),
|
||||
pathPrefix == null ? "" : "/" + pathPrefix);
|
||||
return builder.baseUrl(baseUrl).filter((request, next) -> next.exchange(request).doOnError(errorListener)).build();
|
||||
pathPrefix == null ? "" : '/' + pathPrefix);
|
||||
WebClient webClient = builder.baseUrl(baseUrl).filter((request, next) -> next.exchange(request).doOnError(errorListener)).build();
|
||||
return webClientConfigurer.apply(webClient);
|
||||
}
|
||||
}
|
||||
|
@ -20,12 +20,14 @@ import static org.mockito.Mockito.*;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.time.Duration;
|
||||
import java.util.function.Function;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import org.apache.http.conn.ssl.NoopHostnameVerifier;
|
||||
import org.junit.Test;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link ClientConfiguration}.
|
||||
@ -143,6 +145,26 @@ public class ClientConfigurationUnitTests {
|
||||
assertThat(clientConfiguration.getHostNameVerifier()).contains(NoopHostnameVerifier.INSTANCE);
|
||||
}
|
||||
|
||||
@Test // DATAES-719
|
||||
public void shouldHaveDefaultWebClientConfigurer() {
|
||||
ClientConfiguration clientConfiguration = ClientConfiguration.builder() //
|
||||
.connectedTo("foo", "bar") //
|
||||
.build();
|
||||
|
||||
assertThat(clientConfiguration.getWebClientConfigurer()).isEqualTo(Function.identity());
|
||||
}
|
||||
|
||||
@Test // DATAES-719
|
||||
public void shouldUseConfiguredWebClientConfigurer() {
|
||||
Function<WebClient, WebClient> webClientConfigurer = webClient -> webClient;
|
||||
ClientConfiguration clientConfiguration = ClientConfiguration.builder() //
|
||||
.connectedTo("foo", "bar") //
|
||||
.withWebClientConfigurer(webClientConfigurer) //
|
||||
.build();
|
||||
|
||||
assertThat(clientConfiguration.getWebClientConfigurer()).isEqualTo(webClientConfigurer);
|
||||
}
|
||||
|
||||
private static String buildBasicAuth(String username, String password) {
|
||||
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
|
@ -18,12 +18,15 @@ package org.springframework.data.elasticsearch.client.reactive;
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
||||
/**
|
||||
* @author Christoph Strobl
|
||||
* @author Peter-Josef Meisch
|
||||
*/
|
||||
public class DefaultWebClientProviderUnitTests {
|
||||
|
||||
@ -40,4 +43,18 @@ public class DefaultWebClientProviderUnitTests {
|
||||
assertThat(shouldBeCachedInstanceOfClient1).isSameAs(client1);
|
||||
assertThat(notClient1ButAnotherInstance).isNotSameAs(client1);
|
||||
}
|
||||
|
||||
@Test // DATAES-719
|
||||
public void shouldCallWebClientConfigurer() {
|
||||
AtomicReference<Boolean> configurerCalled = new AtomicReference<>(false);
|
||||
Function<WebClient, WebClient> configurer = webClient -> {
|
||||
configurerCalled.set(true);
|
||||
return webClient;
|
||||
};
|
||||
WebClientProvider provider = new DefaultWebClientProvider("http", null).withWebClientConfigurer(configurer);
|
||||
|
||||
provider.get(InetSocketAddress.createUnresolved("localhost", 9200));
|
||||
|
||||
assertThat(configurerCalled).hasValue(true);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user