Allow providing a custom ElasticsearchTransport in the configuration classes.

Original Pull Request #2703
Closes #2702
This commit is contained in:
Peter-Josef Meisch 2023-09-17 15:30:06 +02:00 committed by GitHub
parent de87dd42ee
commit bbfb6ff463
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 128 additions and 46 deletions

View File

@ -57,16 +57,18 @@ import org.springframework.util.Assert;
* @author Peter-Josef Meisch * @author Peter-Josef Meisch
* @since 4.4 * @since 4.4
*/ */
@SuppressWarnings("unused")
public final class ElasticsearchClients { public final class ElasticsearchClients {
/** /**
* Name of whose value can be used to correlate log messages for this request. * Name of whose value can be used to correlate log messages for this request.
*/ */
private static final String X_SPRING_DATA_ELASTICSEARCH_CLIENT = "X-SpringDataElasticsearch-Client"; private static final String X_SPRING_DATA_ELASTICSEARCH_CLIENT = "X-SpringDataElasticsearch-Client";
private static final String IMPERATIVE_CLIENT = "imperative"; public static final String IMPERATIVE_CLIENT = "imperative";
private static final String REACTIVE_CLIENT = "reactive"; public static final String REACTIVE_CLIENT = "reactive";
private static final JsonpMapper DEFAULT_JSONP_MAPPER = new JacksonJsonpMapper(); private static final JsonpMapper DEFAULT_JSONP_MAPPER = new JacksonJsonpMapper();
// region reactive client
/** /**
* Creates a new {@link ReactiveElasticsearchClient} * Creates a new {@link ReactiveElasticsearchClient}
* *
@ -131,10 +133,28 @@ public final class ElasticsearchClients {
*/ */
public static ReactiveElasticsearchClient createReactive(RestClient restClient, public static ReactiveElasticsearchClient createReactive(RestClient restClient,
@Nullable TransportOptions transportOptions, JsonpMapper jsonpMapper) { @Nullable TransportOptions transportOptions, JsonpMapper jsonpMapper) {
return new ReactiveElasticsearchClient(
getElasticsearchTransport(restClient, REACTIVE_CLIENT, transportOptions, jsonpMapper)); Assert.notNull(restClient, "restClient must not be null");
var transport = getElasticsearchTransport(restClient, REACTIVE_CLIENT, transportOptions, jsonpMapper);
return createReactive(transport);
} }
/**
* Creates a new {@link ReactiveElasticsearchClient} that uses the given {@link ElasticsearchTransport}.
*
* @param transport the transport to use
* @return the {@link ElasticsearchClient
*/
public static ReactiveElasticsearchClient createReactive(ElasticsearchTransport transport) {
Assert.notNull(transport, "transport must not be null");
return new ReactiveElasticsearchClient(transport);
}
// endregion
// region imperative client
/** /**
* Creates a new imperative {@link ElasticsearchClient} * Creates a new imperative {@link ElasticsearchClient}
* *
@ -183,8 +203,40 @@ public final class ElasticsearchClients {
ElasticsearchTransport transport = getElasticsearchTransport(restClient, IMPERATIVE_CLIENT, transportOptions, ElasticsearchTransport transport = getElasticsearchTransport(restClient, IMPERATIVE_CLIENT, transportOptions,
jsonpMapper); jsonpMapper);
return createImperative(transport);
}
/**
* Creates a new {@link ElasticsearchClient} that uses the given {@link ElasticsearchTransport}.
*
* @param transport the transport to use
* @return the {@link ElasticsearchClient
*/
public static AutoCloseableElasticsearchClient createImperative(ElasticsearchTransport transport) {
Assert.notNull(transport, "transport must not be null");
return new AutoCloseableElasticsearchClient(transport); return new AutoCloseableElasticsearchClient(transport);
} }
// endregion
// region low level RestClient
private static RestClientOptions.Builder getRestClientOptionsBuilder(@Nullable TransportOptions transportOptions) {
if (transportOptions instanceof RestClientOptions restClientOptions) {
return restClientOptions.toBuilder();
}
var builder = new RestClientOptions.Builder(RequestOptions.DEFAULT.toBuilder());
if (transportOptions != null) {
transportOptions.headers().forEach(header -> builder.addHeader(header.getKey(), header.getValue()));
transportOptions.queryParameters().forEach(builder::setParameter);
builder.onWarnings(transportOptions.onWarnings());
}
return builder;
}
/** /**
* Creates a low level {@link RestClient} for the given configuration. * Creates a low level {@link RestClient} for the given configuration.
@ -256,10 +308,26 @@ public final class ElasticsearchClients {
} }
return builder; return builder;
} }
// endregion
private static ElasticsearchTransport getElasticsearchTransport(RestClient restClient, String clientType, // region Elasticsearch transport
/**
* Creates an {@link ElasticsearchTransport} that will use the given client that additionally is customized with a
* header to contain the clientType
*
* @param restClient the client to use
* @param clientType the client type to pass in each request as header
* @param transportOptions options for the transport
* @param jsonpMapper mapper for the transport
* @return ElasticsearchTransport
*/
public static ElasticsearchTransport getElasticsearchTransport(RestClient restClient, String clientType,
@Nullable TransportOptions transportOptions, JsonpMapper jsonpMapper) { @Nullable TransportOptions transportOptions, JsonpMapper jsonpMapper) {
Assert.notNull(restClient, "restClient must not be null");
Assert.notNull(clientType, "clientType must not be null");
Assert.notNull(jsonpMapper, "jsonpMapper must not be null");
TransportOptions.Builder transportOptionsBuilder = transportOptions != null ? transportOptions.toBuilder() TransportOptions.Builder transportOptionsBuilder = transportOptions != null ? transportOptions.toBuilder()
: new RestClientOptions(RequestOptions.DEFAULT).toBuilder(); : new RestClientOptions(RequestOptions.DEFAULT).toBuilder();
@ -285,26 +353,10 @@ public final class ElasticsearchClients {
return new RestClientTransport(restClient, jsonpMapper, restClientOptionsBuilder.build()); return new RestClientTransport(restClient, jsonpMapper, restClientOptionsBuilder.build());
} }
// endregion
private static RestClientOptions.Builder getRestClientOptionsBuilder(@Nullable TransportOptions transportOptions) {
if (transportOptions instanceof RestClientOptions restClientOptions) {
return restClientOptions.toBuilder();
}
var builder = new RestClientOptions.Builder(RequestOptions.DEFAULT.toBuilder());
if (transportOptions != null) {
transportOptions.headers().forEach(header -> builder.addHeader(header.getKey(), header.getValue()));
transportOptions.queryParameters().forEach(builder::setParameter);
builder.onWarnings(transportOptions.onWarnings());
}
return builder;
}
private static List<String> formattedHosts(List<InetSocketAddress> hosts, boolean useSsl) { private static List<String> formattedHosts(List<InetSocketAddress> hosts, boolean useSsl) {
return hosts.stream().map(it -> (useSsl ? "https" : "http") + "://" + it.getHostString() + ":" + it.getPort()) return hosts.stream().map(it -> (useSsl ? "https" : "http") + "://" + it.getHostString() + ':' + it.getPort())
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@ -320,13 +372,7 @@ public final class ElasticsearchClients {
* *
* @since 4.4 * @since 4.4
*/ */
private static class CustomHeaderInjector implements HttpRequestInterceptor { private record CustomHeaderInjector(Supplier<HttpHeaders> headersSupplier) implements HttpRequestInterceptor {
public CustomHeaderInjector(Supplier<HttpHeaders> headersSupplier) {
this.headersSupplier = headersSupplier;
}
private final Supplier<HttpHeaders> headersSupplier;
@Override @Override
public void process(HttpRequest request, HttpContext context) { public void process(HttpRequest request, HttpContext context) {

View File

@ -18,12 +18,12 @@ package org.springframework.data.elasticsearch.client.elc;
import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.TransportOptions; import co.elastic.clients.transport.TransportOptions;
import co.elastic.clients.transport.rest_client.RestClientOptions; import co.elastic.clients.transport.rest_client.RestClientOptions;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClient;
import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.config.ElasticsearchConfigurationSupport; import org.springframework.data.elasticsearch.config.ElasticsearchConfigurationSupport;
@ -33,7 +33,8 @@ import org.springframework.util.Assert;
/** /**
* Base class for a @{@link org.springframework.context.annotation.Configuration} class to set up the Elasticsearch * Base class for a @{@link org.springframework.context.annotation.Configuration} class to set up the Elasticsearch
* connection using the Elasticsearch Client. * connection using the Elasticsearch Client. This class exposes different parts of the setup as Spring beans. Deriving
* classes must provide the {@link ClientConfiguration} to use.
* *
* @author Peter-Josef Meisch * @author Peter-Josef Meisch
* @since 4.4 * @since 4.4
@ -49,7 +50,7 @@ public abstract class ElasticsearchConfiguration extends ElasticsearchConfigurat
public abstract ClientConfiguration clientConfiguration(); public abstract ClientConfiguration clientConfiguration();
/** /**
* Provides the underlying low level RestClient. * Provides the underlying low level Elasticsearch RestClient.
* *
* @param clientConfiguration configuration for the client, must not be {@literal null} * @param clientConfiguration configuration for the client, must not be {@literal null}
* @return RestClient * @return RestClient
@ -62,19 +63,35 @@ public abstract class ElasticsearchConfiguration extends ElasticsearchConfigurat
return ElasticsearchClients.getRestClient(clientConfiguration); return ElasticsearchClients.getRestClient(clientConfiguration);
} }
/**
* Provides the Elasticsearch transport to be used. The default implementation uses the {@link RestClient} bean and
* the {@link JsonpMapper} bean provided in this class.
*
* @return the {@link ElasticsearchTransport}
* @since 5.2
*/
@Bean
public ElasticsearchTransport elasticsearchTransport(RestClient restClient, JsonpMapper jsonpMapper) {
Assert.notNull(restClient, "restClient must not be null");
Assert.notNull(jsonpMapper, "jsonpMapper must not be null");
return ElasticsearchClients.getElasticsearchTransport(restClient, ElasticsearchClients.IMPERATIVE_CLIENT,
transportOptions(), jsonpMapper);
}
/** /**
* Provides the {@link ElasticsearchClient} to be used. * Provides the {@link ElasticsearchClient} to be used.
* *
* @param restClient the low level RestClient to use * @param transport the {@link ElasticsearchTransport} to use
* @param jsonpMapper the JsonpMapper to use
* @return ElasticsearchClient instance * @return ElasticsearchClient instance
*/ */
@Bean @Bean
public ElasticsearchClient elasticsearchClient(RestClient restClient, JsonpMapper jsonpMapper) { public ElasticsearchClient elasticsearchClient(ElasticsearchTransport transport) {
Assert.notNull(restClient, "restClient must not be null"); Assert.notNull(transport, "transport must not be null");
return ElasticsearchClients.createImperative(restClient, transportOptions(), jsonpMapper); return ElasticsearchClients.createImperative(transport);
} }
/** /**
@ -94,7 +111,7 @@ public abstract class ElasticsearchConfiguration extends ElasticsearchConfigurat
} }
/** /**
* Provides the JsonpMapper bean that is used in the {@link #elasticsearchClient(RestClient, JsonpMapper)} method. * Provides the JsonpMapper bean that is used in the {@link #elasticsearchTransport(RestClient, JsonpMapper)} method.
* *
* @return the {@link JsonpMapper} to use * @return the {@link JsonpMapper} to use
* @since 5.2 * @since 5.2

View File

@ -17,6 +17,7 @@ package org.springframework.data.elasticsearch.client.elc;
import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.TransportOptions; import co.elastic.clients.transport.TransportOptions;
import co.elastic.clients.transport.rest_client.RestClientOptions; import co.elastic.clients.transport.rest_client.RestClientOptions;
@ -31,7 +32,8 @@ import org.springframework.util.Assert;
/** /**
* Base class for a @{@link org.springframework.context.annotation.Configuration} class to set up the Elasticsearch * Base class for a @{@link org.springframework.context.annotation.Configuration} class to set up the Elasticsearch
* connection using the {@link ReactiveElasticsearchClient}. * connection using the {@link ReactiveElasticsearchClient}. This class exposes different parts of the setup as Spring
* beans. Deriving * classes must provide the {@link ClientConfiguration} to use.
* *
* @author Peter-Josef Meisch * @author Peter-Josef Meisch
* @since 4.4 * @since 4.4
@ -60,18 +62,35 @@ public abstract class ReactiveElasticsearchConfiguration extends ElasticsearchCo
return ElasticsearchClients.getRestClient(clientConfiguration); return ElasticsearchClients.getRestClient(clientConfiguration);
} }
/**
* Provides the Elasticsearch transport to be used. The default implementation uses the {@link RestClient} bean and
* the {@link JsonpMapper} bean provided in this class.
*
* @return the {@link ElasticsearchTransport}
* @since 5.2
*/
@Bean
public ElasticsearchTransport elasticsearchTransport(RestClient restClient, JsonpMapper jsonpMapper) {
Assert.notNull(restClient, "restClient must not be null");
Assert.notNull(jsonpMapper, "jsonpMapper must not be null");
return ElasticsearchClients.getElasticsearchTransport(restClient, ElasticsearchClients.REACTIVE_CLIENT,
transportOptions(), jsonpMapper);
}
/** /**
* Provides the {@link ReactiveElasticsearchClient} instance used. * Provides the {@link ReactiveElasticsearchClient} instance used.
* *
* @param restClient the low level RestClient to use * @param transport the ElasticsearchTransport to use
* @return ReactiveElasticsearchClient instance. * @return ReactiveElasticsearchClient instance.
*/ */
@Bean @Bean
public ReactiveElasticsearchClient reactiveElasticsearchClient(RestClient restClient, JsonpMapper jsonpMapper) { public ReactiveElasticsearchClient reactiveElasticsearchClient(ElasticsearchTransport transport) {
Assert.notNull(restClient, "restClient must not be null"); Assert.notNull(transport, "transport must not be null");
return ElasticsearchClients.createReactive(restClient, transportOptions(), jsonpMapper); return ElasticsearchClients.createReactive(transport);
} }
/** /**
@ -91,8 +110,8 @@ public abstract class ReactiveElasticsearchConfiguration extends ElasticsearchCo
} }
/** /**
* Provides the JsonpMapper that is used in the {@link #reactiveElasticsearchClient(RestClient, JsonpMapper)} method * Provides the JsonpMapper that is used in the {@link #elasticsearchTransport(RestClient, JsonpMapper)} method and
* and exposes it as a bean. * exposes it as a bean.
* *
* @return the {@link JsonpMapper} to use * @return the {@link JsonpMapper} to use
* @since 5.2 * @since 5.2