mirror of
https://github.com/spring-projects/spring-data-elasticsearch.git
synced 2025-10-12 05:18:55 +00:00
Adjust Rest5Client building by using and exposing the callbacks provided by the Elasticsearch Java client library.
Original Pull Request #3143 Closes #3129 Signed-off-by: Peter-Josef Meisch <pj.meisch@sothawo.com>
This commit is contained in:
parent
f51efa2cad
commit
006cda6de6
@ -389,6 +389,63 @@ ClientConfiguration.builder()
|
|||||||
----
|
----
|
||||||
====
|
====
|
||||||
|
|
||||||
|
[[elasticsearch.clients.configurationcallbacks.connectionconfig]]
|
||||||
|
==== Configuration of the ConnectionConfig used by the low level Elasticsearch `Rest5Client`:
|
||||||
|
|
||||||
|
This callback provides a `org.apache.hc.client5.http.config.ConnectionConfig` to configure the connection that is
|
||||||
|
used by the `Rest5Client`.
|
||||||
|
|
||||||
|
====
|
||||||
|
[source,java]
|
||||||
|
----
|
||||||
|
ClientConfiguration.builder()
|
||||||
|
.connectedTo("localhost:9200", "localhost:9291")
|
||||||
|
.withClientConfigurer(Rest5Clients.ElasticsearchConnectionConfigurationCallback.from(connectionConfigBuilder -> {
|
||||||
|
// configure the connection
|
||||||
|
return connectionConfigBuilder;
|
||||||
|
}))
|
||||||
|
.build();
|
||||||
|
----
|
||||||
|
====
|
||||||
|
|
||||||
|
[[elasticsearch.clients.configurationcallbacks.connectioncmanager]]
|
||||||
|
==== Configuration of the ConnectionManager used by the low level Elasticsearch `Rest5Client`:
|
||||||
|
|
||||||
|
This callback provides a `org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder` to configure the connection manager that is
|
||||||
|
used by the `Rest5Client`.
|
||||||
|
|
||||||
|
====
|
||||||
|
[source,java]
|
||||||
|
----
|
||||||
|
ClientConfiguration.builder()
|
||||||
|
.connectedTo("localhost:9200", "localhost:9291")
|
||||||
|
.withClientConfigurer(Rest5Clients.ElasticsearchConnectionManagerCallback.from(connectionManagerBuilder -> {
|
||||||
|
// configure the connection manager
|
||||||
|
return connectionManagerBuilder;
|
||||||
|
}))
|
||||||
|
.build();
|
||||||
|
----
|
||||||
|
====
|
||||||
|
|
||||||
|
[[elasticsearch.clients.configurationcallbacks.requestconfig]]
|
||||||
|
==== Configuration of the RequestConfig used by the low level Elasticsearch `Rest5Client`:
|
||||||
|
|
||||||
|
This callback provides a `org.apache.hc.client5.http.config.RequestConfig` to configure the RequestConfig that is
|
||||||
|
used by the `Rest5Client`.
|
||||||
|
|
||||||
|
====
|
||||||
|
[source,java]
|
||||||
|
----
|
||||||
|
ClientConfiguration.builder()
|
||||||
|
.connectedTo("localhost:9200", "localhost:9291")
|
||||||
|
.withClientConfigurer(Rest5Clients.ElasticsearchRequestConfigCallback.from(requestConfigBuilder -> {
|
||||||
|
// configure the request config
|
||||||
|
return requestConfigBuilder;
|
||||||
|
}))
|
||||||
|
.build();
|
||||||
|
----
|
||||||
|
====
|
||||||
|
|
||||||
[[elasticsearch.clients.logging]]
|
[[elasticsearch.clients.logging]]
|
||||||
== Client Logging
|
== Client Logging
|
||||||
|
|
||||||
|
@ -13,20 +13,14 @@ import java.net.URISyntaxException;
|
|||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
|
|
||||||
import org.apache.hc.client5.http.config.ConnectionConfig;
|
import org.apache.hc.client5.http.config.ConnectionConfig;
|
||||||
import org.apache.hc.client5.http.config.RequestConfig;
|
import org.apache.hc.client5.http.config.RequestConfig;
|
||||||
import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy;
|
|
||||||
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
|
|
||||||
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
|
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
|
||||||
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
|
|
||||||
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
|
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
|
||||||
import org.apache.hc.client5.http.impl.routing.DefaultProxyRoutePlanner;
|
import org.apache.hc.client5.http.impl.routing.DefaultProxyRoutePlanner;
|
||||||
import org.apache.hc.core5.http.Header;
|
import org.apache.hc.core5.http.Header;
|
||||||
@ -49,13 +43,11 @@ import org.springframework.util.Assert;
|
|||||||
public final class Rest5Clients {
|
public final class Rest5Clients {
|
||||||
|
|
||||||
// values copied from Rest5ClientBuilder
|
// values copied from Rest5ClientBuilder
|
||||||
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000;
|
|
||||||
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000;
|
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000;
|
||||||
public static final int DEFAULT_RESPONSE_TIMEOUT_MILLIS = 0; // meaning infinite
|
public static final int DEFAULT_RESPONSE_TIMEOUT_MILLIS = 0; // meaning infinite
|
||||||
public static final int DEFAULT_MAX_CONN_PER_ROUTE = 10;
|
|
||||||
public static final int DEFAULT_MAX_CONN_TOTAL = 30;
|
|
||||||
|
|
||||||
private Rest5Clients() {}
|
private Rest5Clients() {
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a low level {@link Rest5Client} for the given configuration.
|
* Creates a low level {@link Rest5Client} for the given configuration.
|
||||||
@ -82,11 +74,7 @@ public final class Rest5Clients {
|
|||||||
builder.setDefaultHeaders(toHeaderArray(headers));
|
builder.setDefaultHeaders(toHeaderArray(headers));
|
||||||
}
|
}
|
||||||
|
|
||||||
// we need to provide our own HttpClient, as the Rest5ClientBuilder
|
// RestClientBuilder configuration callbacks from the consumer
|
||||||
// does not provide a callback for configuration the http client as the old RestClientBuilder.
|
|
||||||
var httpClient = createHttpClient(clientConfiguration);
|
|
||||||
builder.setHttpClient(httpClient);
|
|
||||||
|
|
||||||
for (ClientConfiguration.ClientConfigurationCallback<?> clientConfigurationCallback : clientConfiguration
|
for (ClientConfiguration.ClientConfigurationCallback<?> clientConfigurationCallback : clientConfiguration
|
||||||
.getClientConfigurers()) {
|
.getClientConfigurers()) {
|
||||||
if (clientConfigurationCallback instanceof ElasticsearchRest5ClientConfigurationCallback configurationCallback) {
|
if (clientConfigurationCallback instanceof ElasticsearchRest5ClientConfigurationCallback configurationCallback) {
|
||||||
@ -94,6 +82,104 @@ public final class Rest5Clients {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Duration connectTimeout = clientConfiguration.getConnectTimeout();
|
||||||
|
Duration socketTimeout = clientConfiguration.getSocketTimeout();
|
||||||
|
|
||||||
|
builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
|
||||||
|
|
||||||
|
httpAsyncClientBuilder.setUserAgent(VersionInfo.clientVersions());
|
||||||
|
if (clientConfiguration.getProxy().isPresent()) {
|
||||||
|
var proxy = clientConfiguration.getProxy().get();
|
||||||
|
try {
|
||||||
|
var proxyRoutePlanner = new DefaultProxyRoutePlanner(HttpHost.create(proxy));
|
||||||
|
httpAsyncClientBuilder.setRoutePlanner(proxyRoutePlanner);
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
httpAsyncClientBuilder.addRequestInterceptorFirst((request, entity, context) -> {
|
||||||
|
clientConfiguration.getHeadersSupplier().get().forEach((header, values) -> {
|
||||||
|
// The accept and content-type headers are already put on the request, despite this being the first
|
||||||
|
// interceptor.
|
||||||
|
if ("Accept".equalsIgnoreCase(header) || " Content-Type".equalsIgnoreCase(header)) {
|
||||||
|
request.removeHeaders(header);
|
||||||
|
}
|
||||||
|
values.forEach(value -> request.addHeader(header, value));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// add httpclient configurator callbacks provided by the configuration
|
||||||
|
for (ClientConfiguration.ClientConfigurationCallback<?> clientConfigurer : clientConfiguration
|
||||||
|
.getClientConfigurers()) {
|
||||||
|
if (clientConfigurer instanceof ElasticsearchHttpClientConfigurationCallback httpClientConfigurer) {
|
||||||
|
httpAsyncClientBuilder = httpClientConfigurer.configure(httpAsyncClientBuilder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
builder.setConnectionConfigCallback(connectionConfigBuilder -> {
|
||||||
|
|
||||||
|
if (!connectTimeout.isNegative()) {
|
||||||
|
connectionConfigBuilder.setConnectTimeout(
|
||||||
|
Timeout.of(Math.toIntExact(connectTimeout.toMillis()), TimeUnit.MILLISECONDS));
|
||||||
|
}
|
||||||
|
if (!socketTimeout.isNegative()) {
|
||||||
|
var soTimeout = Timeout.of(Math.toIntExact(socketTimeout.toMillis()), TimeUnit.MILLISECONDS);
|
||||||
|
connectionConfigBuilder.setSocketTimeout(soTimeout);
|
||||||
|
} else {
|
||||||
|
connectionConfigBuilder.setSocketTimeout(Timeout.of(DEFAULT_SOCKET_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
// add connectionConfig configurator callbacks provided by the configuration
|
||||||
|
for (ClientConfiguration.ClientConfigurationCallback<?> clientConfigurer : clientConfiguration
|
||||||
|
.getClientConfigurers()) {
|
||||||
|
if (clientConfigurer instanceof ElasticsearchConnectionConfigurationCallback connectionConfigurationCallback) {
|
||||||
|
connectionConfigBuilder = connectionConfigurationCallback.configure(connectionConfigBuilder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
builder.setConnectionManagerCallback(poolingAsyncClientConnectionManagerBuilder -> {
|
||||||
|
|
||||||
|
SSLContext sslContext = null;
|
||||||
|
try {
|
||||||
|
sslContext = clientConfiguration.getCaFingerprint().isPresent()
|
||||||
|
? TransportUtils.sslContextFromCaFingerprint(clientConfiguration.getCaFingerprint().get())
|
||||||
|
: (clientConfiguration.getSslContext().isPresent()
|
||||||
|
? clientConfiguration.getSslContext().get()
|
||||||
|
: SSLContext.getDefault());
|
||||||
|
} catch (NoSuchAlgorithmException e) {
|
||||||
|
throw new IllegalStateException("could not create the default ssl context", e);
|
||||||
|
}
|
||||||
|
poolingAsyncClientConnectionManagerBuilder.setTlsStrategy(new BasicClientTlsStrategy(sslContext));
|
||||||
|
|
||||||
|
// add connectionManager configurator callbacks provided by the configuration
|
||||||
|
for (ClientConfiguration.ClientConfigurationCallback<?> clientConfigurer : clientConfiguration
|
||||||
|
.getClientConfigurers()) {
|
||||||
|
if (clientConfigurer instanceof ElasticsearchConnectionManagerCallback connectionManagerCallback) {
|
||||||
|
poolingAsyncClientConnectionManagerBuilder = connectionManagerCallback.configure(poolingAsyncClientConnectionManagerBuilder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
builder.setRequestConfigCallback(requestConfigBuilder -> {
|
||||||
|
|
||||||
|
if (!socketTimeout.isNegative()) {
|
||||||
|
var soTimeout = Timeout.of(Math.toIntExact(socketTimeout.toMillis()), TimeUnit.MILLISECONDS);
|
||||||
|
requestConfigBuilder.setConnectionRequestTimeout(soTimeout);
|
||||||
|
} else {
|
||||||
|
requestConfigBuilder
|
||||||
|
.setConnectionRequestTimeout(Timeout.of(DEFAULT_RESPONSE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
|
||||||
|
}
|
||||||
|
// add connectionConfig configurator callbacks provided by the configuration
|
||||||
|
for (ClientConfiguration.ClientConfigurationCallback<?> clientConfigurer : clientConfiguration
|
||||||
|
.getClientConfigurers()) {
|
||||||
|
if (clientConfigurer instanceof ElasticsearchRequestConfigCallback requestConfigCallback) {
|
||||||
|
requestConfigBuilder = requestConfigCallback.configure(requestConfigBuilder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -114,109 +200,23 @@ public final class Rest5Clients {
|
|||||||
.toList().toArray(new Header[0]);
|
.toList().toArray(new Header[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
// the basic logic to create the http client is copied from the Rest5ClientBuilder class, this is taken from the
|
/**
|
||||||
// Elasticsearch code, as there is no public usable instance in that
|
* {@link ClientConfiguration.ClientConfigurationCallback} to configure the Rest5Client client with a
|
||||||
private static CloseableHttpAsyncClient createHttpClient(ClientConfiguration clientConfiguration) {
|
* {@link Rest5ClientBuilder}
|
||||||
|
*
|
||||||
var requestConfigBuilder = RequestConfig.custom();
|
* @since 6.0
|
||||||
var connectionConfigBuilder = ConnectionConfig.custom();
|
|
||||||
|
|
||||||
Duration connectTimeout = clientConfiguration.getConnectTimeout();
|
|
||||||
|
|
||||||
if (!connectTimeout.isNegative()) {
|
|
||||||
connectionConfigBuilder.setConnectTimeout(
|
|
||||||
Timeout.of(Math.toIntExact(connectTimeout.toMillis()), TimeUnit.MILLISECONDS));
|
|
||||||
}
|
|
||||||
|
|
||||||
Duration socketTimeout = clientConfiguration.getSocketTimeout();
|
|
||||||
|
|
||||||
if (!socketTimeout.isNegative()) {
|
|
||||||
var soTimeout = Timeout.of(Math.toIntExact(socketTimeout.toMillis()), TimeUnit.MILLISECONDS);
|
|
||||||
connectionConfigBuilder.setSocketTimeout(soTimeout);
|
|
||||||
requestConfigBuilder.setConnectionRequestTimeout(soTimeout);
|
|
||||||
} else {
|
|
||||||
connectionConfigBuilder.setSocketTimeout(Timeout.of(DEFAULT_SOCKET_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
|
|
||||||
requestConfigBuilder
|
|
||||||
.setConnectionRequestTimeout(Timeout.of(DEFAULT_RESPONSE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
SSLContext sslContext = clientConfiguration.getCaFingerprint().isPresent()
|
|
||||||
? TransportUtils.sslContextFromCaFingerprint(clientConfiguration.getCaFingerprint().get())
|
|
||||||
: (clientConfiguration.getSslContext().isPresent()
|
|
||||||
? clientConfiguration.getSslContext().get()
|
|
||||||
: SSLContext.getDefault());
|
|
||||||
|
|
||||||
ConnectionConfig connectionConfig = connectionConfigBuilder.build();
|
|
||||||
|
|
||||||
PoolingAsyncClientConnectionManager defaultConnectionManager = PoolingAsyncClientConnectionManagerBuilder.create()
|
|
||||||
.setDefaultConnectionConfig(connectionConfig)
|
|
||||||
.setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE)
|
|
||||||
.setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL)
|
|
||||||
.setTlsStrategy(new BasicClientTlsStrategy(sslContext))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
var requestConfig = requestConfigBuilder.build();
|
|
||||||
|
|
||||||
var immutableRefToHttpClientBuilder = new Object() {
|
|
||||||
HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create()
|
|
||||||
.setDefaultRequestConfig(requestConfig)
|
|
||||||
.setConnectionManager(defaultConnectionManager)
|
|
||||||
.setUserAgent(VersionInfo.clientVersions())
|
|
||||||
.setTargetAuthenticationStrategy(new DefaultAuthenticationStrategy())
|
|
||||||
.setThreadFactory(new RestClientThreadFactory());
|
|
||||||
};
|
|
||||||
|
|
||||||
clientConfiguration.getProxy().ifPresent(proxy -> {
|
|
||||||
try {
|
|
||||||
var proxyRoutePlanner = new DefaultProxyRoutePlanner(HttpHost.create(proxy));
|
|
||||||
immutableRefToHttpClientBuilder.httpClientBuilder.setRoutePlanner(proxyRoutePlanner);
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
immutableRefToHttpClientBuilder.httpClientBuilder.addRequestInterceptorFirst((request, entity, context) -> {
|
|
||||||
clientConfiguration.getHeadersSupplier().get().forEach((header, values) -> {
|
|
||||||
// The accept and content-type headers are already put on the request, despite this being the first
|
|
||||||
// interceptor.
|
|
||||||
if ("Accept".equalsIgnoreCase(header) || " Content-Type".equalsIgnoreCase(header)) {
|
|
||||||
request.removeHeaders(header);
|
|
||||||
}
|
|
||||||
values.forEach(value -> request.addHeader(header, value));
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
for (ClientConfiguration.ClientConfigurationCallback<?> clientConfigurer : clientConfiguration
|
|
||||||
.getClientConfigurers()) {
|
|
||||||
if (clientConfigurer instanceof ElasticsearchHttpClientConfigurationCallback httpClientConfigurer) {
|
|
||||||
immutableRefToHttpClientBuilder.httpClientBuilder = httpClientConfigurer.configure(immutableRefToHttpClientBuilder.httpClientBuilder);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return immutableRefToHttpClientBuilder.httpClientBuilder.build();
|
|
||||||
} catch (NoSuchAlgorithmException e) {
|
|
||||||
throw new IllegalStateException("could not create the default ssl context", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Copied from the Elasticsearch code as this class is not public there.
|
|
||||||
*/
|
*/
|
||||||
private static class RestClientThreadFactory implements ThreadFactory {
|
public interface ElasticsearchRest5ClientConfigurationCallback
|
||||||
private static final AtomicLong CLIENT_THREAD_POOL_ID_GENERATOR = new AtomicLong();
|
extends ClientConfiguration.ClientConfigurationCallback<Rest5ClientBuilder> {
|
||||||
private final long clientThreadPoolId;
|
|
||||||
private final AtomicLong clientThreadId;
|
|
||||||
|
|
||||||
private RestClientThreadFactory() {
|
static ElasticsearchRest5ClientConfigurationCallback from(
|
||||||
this.clientThreadPoolId = CLIENT_THREAD_POOL_ID_GENERATOR.getAndIncrement();
|
Function<Rest5ClientBuilder, Rest5ClientBuilder> rest5ClientBuilderCallback) {
|
||||||
this.clientThreadId = new AtomicLong();
|
|
||||||
|
Assert.notNull(rest5ClientBuilderCallback, "rest5ClientBuilderCallback must not be null");
|
||||||
|
|
||||||
|
return rest5ClientBuilderCallback::apply;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Thread newThread(Runnable runnable) {
|
|
||||||
return new Thread(runnable, String.format(Locale.ROOT, "elasticsearch-rest-client-%d-thread-%d",
|
|
||||||
this.clientThreadPoolId, this.clientThreadId.incrementAndGet()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -238,20 +238,56 @@ public final class Rest5Clients {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link ClientConfiguration.ClientConfigurationCallback} to configure the Rest5Client client with a
|
* {@link org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationCallback} to configure
|
||||||
* {@link Rest5ClientBuilder}
|
* the Elasticsearch Rest5Client's connection with a {@link ConnectionConfig.Builder}
|
||||||
*
|
*
|
||||||
* @since 6.0
|
* @since 6.0
|
||||||
*/
|
*/
|
||||||
public interface ElasticsearchRest5ClientConfigurationCallback
|
public interface ElasticsearchConnectionConfigurationCallback
|
||||||
extends ClientConfiguration.ClientConfigurationCallback<Rest5ClientBuilder> {
|
extends ClientConfiguration.ClientConfigurationCallback<ConnectionConfig.Builder> {
|
||||||
|
|
||||||
static ElasticsearchRest5ClientConfigurationCallback from(
|
static ElasticsearchConnectionConfigurationCallback from(
|
||||||
Function<Rest5ClientBuilder, Rest5ClientBuilder> rest5ClientBuilderCallback) {
|
Function<ConnectionConfig.Builder, ConnectionConfig.Builder> connectionConfigBuilderCallback) {
|
||||||
|
|
||||||
Assert.notNull(rest5ClientBuilderCallback, "rest5ClientBuilderCallback must not be null");
|
Assert.notNull(connectionConfigBuilderCallback, "connectionConfigBuilderCallback must not be null");
|
||||||
|
|
||||||
return rest5ClientBuilderCallback::apply;
|
return connectionConfigBuilderCallback::apply;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationCallback} to configure
|
||||||
|
* the Elasticsearch Rest5Client's connection manager with a {@link PoolingAsyncClientConnectionManagerBuilder}
|
||||||
|
*
|
||||||
|
* @since 6.0
|
||||||
|
*/
|
||||||
|
public interface ElasticsearchConnectionManagerCallback
|
||||||
|
extends ClientConfiguration.ClientConfigurationCallback<PoolingAsyncClientConnectionManagerBuilder> {
|
||||||
|
|
||||||
|
static ElasticsearchConnectionManagerCallback from(
|
||||||
|
Function<PoolingAsyncClientConnectionManagerBuilder, PoolingAsyncClientConnectionManagerBuilder> connectionManagerBuilderCallback) {
|
||||||
|
|
||||||
|
Assert.notNull(connectionManagerBuilderCallback, "connectionManagerBuilderCallback must not be null");
|
||||||
|
|
||||||
|
return connectionManagerBuilderCallback::apply;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationCallback} to configure
|
||||||
|
* the Elasticsearch Rest5Client's connection manager with a {@link RequestConfig.Builder}
|
||||||
|
*
|
||||||
|
* @since 6.0
|
||||||
|
*/
|
||||||
|
public interface ElasticsearchRequestConfigCallback
|
||||||
|
extends ClientConfiguration.ClientConfigurationCallback<RequestConfig.Builder> {
|
||||||
|
|
||||||
|
static ElasticsearchRequestConfigCallback from(
|
||||||
|
Function<RequestConfig.Builder, RequestConfig.Builder> requestConfigBuilderCallback) {
|
||||||
|
|
||||||
|
Assert.notNull(requestConfigBuilderCallback, "requestConfigBuilderCallback must not be null");
|
||||||
|
|
||||||
|
return requestConfigBuilderCallback::apply;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,7 +47,6 @@ import org.springframework.data.elasticsearch.support.HttpHeaders;
|
|||||||
|
|
||||||
import com.github.tomakehurst.wiremock.WireMockServer;
|
import com.github.tomakehurst.wiremock.WireMockServer;
|
||||||
import com.github.tomakehurst.wiremock.client.WireMock;
|
import com.github.tomakehurst.wiremock.client.WireMock;
|
||||||
import com.github.tomakehurst.wiremock.common.ConsoleNotifier;
|
|
||||||
import com.github.tomakehurst.wiremock.matching.AnythingPattern;
|
import com.github.tomakehurst.wiremock.matching.AnythingPattern;
|
||||||
import com.github.tomakehurst.wiremock.matching.EqualToPattern;
|
import com.github.tomakehurst.wiremock.matching.EqualToPattern;
|
||||||
import com.github.tomakehurst.wiremock.stubbing.StubMapping;
|
import com.github.tomakehurst.wiremock.stubbing.StubMapping;
|
||||||
@ -104,8 +103,11 @@ public class RestClientsTest {
|
|||||||
defaultHeaders.add("def2", "def2-1");
|
defaultHeaders.add("def2", "def2-1");
|
||||||
|
|
||||||
AtomicInteger supplierCount = new AtomicInteger(1);
|
AtomicInteger supplierCount = new AtomicInteger(1);
|
||||||
AtomicInteger httpClientConfigurerCount = new AtomicInteger(0);
|
|
||||||
AtomicInteger restClientConfigurerCount = new AtomicInteger(0);
|
AtomicInteger restClientConfigurerCount = new AtomicInteger(0);
|
||||||
|
AtomicInteger httpClientConfigurerCount = new AtomicInteger(0);
|
||||||
|
AtomicInteger connectionConfigurerCount = new AtomicInteger(0);
|
||||||
|
AtomicInteger connectionManagerConfigurerCount = new AtomicInteger(0);
|
||||||
|
AtomicInteger requestConfigurerCount = new AtomicInteger(0);
|
||||||
|
|
||||||
ClientConfigurationBuilder configurationBuilder = new ClientConfigurationBuilder();
|
ClientConfigurationBuilder configurationBuilder = new ClientConfigurationBuilder();
|
||||||
configurationBuilder //
|
configurationBuilder //
|
||||||
@ -120,17 +122,31 @@ public class RestClientsTest {
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (clientUnderTestFactory instanceof ELCRest5ClientUnderTestFactory) {
|
if (clientUnderTestFactory instanceof ELCRest5ClientUnderTestFactory) {
|
||||||
|
configurationBuilder.withClientConfigurer(
|
||||||
|
Rest5Clients.ElasticsearchRest5ClientConfigurationCallback.from(rest5ClientBuilder -> {
|
||||||
|
restClientConfigurerCount.incrementAndGet();
|
||||||
|
return rest5ClientBuilder;
|
||||||
|
}));
|
||||||
configurationBuilder.withClientConfigurer(
|
configurationBuilder.withClientConfigurer(
|
||||||
Rest5Clients.ElasticsearchHttpClientConfigurationCallback.from(httpClientBuilder -> {
|
Rest5Clients.ElasticsearchHttpClientConfigurationCallback.from(httpClientBuilder -> {
|
||||||
httpClientConfigurerCount.incrementAndGet();
|
httpClientConfigurerCount.incrementAndGet();
|
||||||
return httpClientBuilder;
|
return httpClientBuilder;
|
||||||
}));
|
}));
|
||||||
configurationBuilder.withClientConfigurer(
|
configurationBuilder.withClientConfigurer(
|
||||||
Rest5Clients.ElasticsearchRest5ClientConfigurationCallback.from(rest5ClientBuilder -> {
|
Rest5Clients.ElasticsearchConnectionConfigurationCallback.from(connectionConfigBuilder -> {
|
||||||
restClientConfigurerCount.incrementAndGet();
|
connectionConfigurerCount.incrementAndGet();
|
||||||
return rest5ClientBuilder;
|
return connectionConfigBuilder;
|
||||||
|
}));
|
||||||
|
configurationBuilder.withClientConfigurer(
|
||||||
|
Rest5Clients.ElasticsearchConnectionManagerCallback.from(connectionManagerBuilder -> {
|
||||||
|
connectionManagerConfigurerCount.incrementAndGet();
|
||||||
|
return connectionManagerBuilder;
|
||||||
|
}));
|
||||||
|
configurationBuilder.withClientConfigurer(
|
||||||
|
Rest5Clients.ElasticsearchRequestConfigCallback.from(requestConfigBuilder -> {
|
||||||
|
requestConfigurerCount.incrementAndGet();
|
||||||
|
return requestConfigBuilder;
|
||||||
}));
|
}));
|
||||||
|
|
||||||
} else if (clientUnderTestFactory instanceof ELCRestClientUnderTestFactory) {
|
} else if (clientUnderTestFactory instanceof ELCRestClientUnderTestFactory) {
|
||||||
configurationBuilder.withClientConfigurer(
|
configurationBuilder.withClientConfigurer(
|
||||||
RestClients.ElasticsearchHttpClientConfigurationCallback.from(httpClientBuilder -> {
|
RestClients.ElasticsearchHttpClientConfigurationCallback.from(httpClientBuilder -> {
|
||||||
@ -177,8 +193,12 @@ public class RestClientsTest {
|
|||||||
;
|
;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assertThat(restClientConfigurerCount).hasValue(clientUnderTestFactory.getExpectedRestClientConfigurerCalls());
|
||||||
assertThat(httpClientConfigurerCount).hasValue(1);
|
assertThat(httpClientConfigurerCount).hasValue(1);
|
||||||
assertThat(restClientConfigurerCount).hasValue(clientUnderTestFactory.getExpectedRestClientConfigCalls());
|
assertThat(connectionConfigurerCount).hasValue(clientUnderTestFactory.getExpectedConnectionConfigurerCalls());
|
||||||
|
assertThat(connectionManagerConfigurerCount)
|
||||||
|
.hasValue(clientUnderTestFactory.getExpectedConnectionManagerConfigurerCalls());
|
||||||
|
assertThat(requestConfigurerCount).hasValue(clientUnderTestFactory.getExpectedRequestConfigurerCalls());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -404,11 +424,23 @@ public class RestClientsTest {
|
|||||||
|
|
||||||
protected abstract String getDisplayName();
|
protected abstract String getDisplayName();
|
||||||
|
|
||||||
protected Integer getExpectedRestClientConfigCalls() {
|
protected Integer getExpectedRestClientConfigurerCalls() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract int getElasticsearchMajorVersion();
|
protected abstract int getElasticsearchMajorVersion();
|
||||||
|
|
||||||
|
public Integer getExpectedConnectionConfigurerCalls() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getExpectedConnectionManagerConfigurerCalls() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getExpectedRequestConfigurerCalls() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -423,7 +455,22 @@ public class RestClientsTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Integer getExpectedRestClientConfigCalls() {
|
protected Integer getExpectedRestClientConfigurerCalls() {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer getExpectedConnectionConfigurerCalls() {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer getExpectedConnectionManagerConfigurerCalls() {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer getExpectedRequestConfigurerCalls() {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -467,7 +514,7 @@ public class RestClientsTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Integer getExpectedRestClientConfigCalls() {
|
protected Integer getExpectedRestClientConfigurerCalls() {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -511,7 +558,7 @@ public class RestClientsTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Integer getExpectedRestClientConfigCalls() {
|
protected Integer getExpectedRestClientConfigurerCalls() {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user