DATAES-504 - Add configuration options for logging and timeouts to ReactiveElasticsearchClient.

We now log HTTP requests and responses with the org.springframework.data.elasticsearch.client.WIRE logger for both, the HighLevelRestClient and our reactive client and associate a logging Id for improved traceability.

Configuration of connection/socket timeouts is now available via the ClientConfiguration.

Along the lines we also aligned entity handling to EntityOperations already common in other modules. EntityOperations centralizes how aspects of entities (versioning, retrieval of index name/index type) are handled.

Original Pull Request: #229
This commit is contained in:
Mark Paluch 2018-11-29 14:26:34 +01:00 committed by Christoph Strobl
parent ba890cb7eb
commit 2dcd1cfbad
15 changed files with 1158 additions and 266 deletions

View File

@ -16,6 +16,8 @@
package org.springframework.data.elasticsearch.client;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
@ -99,6 +101,23 @@ public interface ClientConfiguration {
*/
Optional<SSLContext> getSslContext();
/**
* Returns the {@link java.time.Duration connect timeout}.
*
* @see java.net.Socket#connect(SocketAddress, int)
* @see io.netty.channel.ChannelOption#CONNECT_TIMEOUT_MILLIS
*/
Duration getConnectTimeout();
/**
* Returns the {@link java.time.Duration socket timeout} which is typically applied as SO-timeout/read timeout.
*
* @see java.net.Socket#setSoTimeout(int)
* @see io.netty.handler.timeout.ReadTimeoutHandler
* @see io.netty.handler.timeout.WriteTimeoutHandler
*/
Duration getSocketTimeout();
/**
* @author Christoph Strobl
*/
@ -146,38 +165,59 @@ public interface ClientConfiguration {
/**
* @author Christoph Strobl
*/
interface MaybeSecureClientConfigurationBuilder extends ClientConfigurationBuilderWithOptionalDefaultHeaders {
interface MaybeSecureClientConfigurationBuilder extends TerminalClientConfigurationBuilder {
/**
* Connect via {@literal https} <br />
* <strong>NOTE</strong> You need to leave out the protocol in
* {@link ClientConfigurationBuilderWithRequiredEndpoint#connectedTo(String)}.
*
* @return the {@link ClientConfigurationBuilderWithOptionalDefaultHeaders}.
* @return the {@link TerminalClientConfigurationBuilder}.
*/
ClientConfigurationBuilderWithOptionalDefaultHeaders usingSsl();
TerminalClientConfigurationBuilder usingSsl();
/**
* Connect via {@literal https} using the given {@link SSLContext}.<br />
* <strong>NOTE</strong> You need to leave out the protocol in
* {@link ClientConfigurationBuilderWithRequiredEndpoint#connectedTo(String)}.
*
* @return the {@link ClientConfigurationBuilderWithOptionalDefaultHeaders}.
* @return the {@link TerminalClientConfigurationBuilder}.
*/
ClientConfigurationBuilderWithOptionalDefaultHeaders usingSsl(SSLContext sslContext);
TerminalClientConfigurationBuilder usingSsl(SSLContext sslContext);
}
/**
* @author Christoph Strobl
* @author Mark Paluch
*/
interface ClientConfigurationBuilderWithOptionalDefaultHeaders {
interface TerminalClientConfigurationBuilder {
/**
* @param defaultHeaders must not be {@literal null}.
* @return the {@link ClientConfigurationBuilderWithOptionalDefaultHeaders}
* @return the {@link TerminalClientConfigurationBuilder}
*/
ClientConfigurationBuilderWithOptionalDefaultHeaders withDefaultHeaders(HttpHeaders defaultHeaders);
TerminalClientConfigurationBuilder withDefaultHeaders(HttpHeaders defaultHeaders);
/**
* Configure a {@link java.time.Duration} connect timeout.
*
* @param connectTimeout the timeout to use.
* @return the {@link TerminalClientConfigurationBuilder}
* @see java.net.Socket#connect(SocketAddress, int)
* @see io.netty.channel.ChannelOption#CONNECT_TIMEOUT_MILLIS
*/
TerminalClientConfigurationBuilder withConnectTimeout(Duration connectTimeout);
/**
* Configure a {@link java.time.Duration socket timeout} which is typically applied as SO-timeout/read timeout.
*
* @param soTimeout the timeout to use.
* @return the {@link TerminalClientConfigurationBuilder}
* @see java.net.Socket#setSoTimeout(int)
* @see io.netty.handler.timeout.ReadTimeoutHandler
* @see io.netty.handler.timeout.WriteTimeoutHandler
*/
TerminalClientConfigurationBuilder withSocketTimeout(Duration soTimeout);
/**
* Build the {@link ClientConfiguration} object.

View File

@ -16,6 +16,7 @@
package org.springframework.data.elasticsearch.client;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -23,9 +24,9 @@ import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithOptionalDefaultHeaders;
import org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithRequiredEndpoint;
import org.springframework.data.elasticsearch.client.ClientConfiguration.MaybeSecureClientConfigurationBuilder;
import org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -44,6 +45,8 @@ class ClientConfigurationBuilder
private HttpHeaders headers = HttpHeaders.EMPTY;
private boolean useSsl;
private @Nullable SSLContext sslContext;
private Duration connectTimeout = Duration.ofSeconds(10);
private Duration soTimeout = Duration.ofSeconds(5);
/*
* (non-Javadoc)
@ -77,7 +80,7 @@ class ClientConfigurationBuilder
* @see org.springframework.data.elasticsearch.client.ClientConfiguration.MaybeSecureClientConfigurationBuilder#usingSsl()
*/
@Override
public ClientConfigurationBuilderWithOptionalDefaultHeaders usingSsl() {
public TerminalClientConfigurationBuilder usingSsl() {
this.useSsl = true;
return this;
@ -88,7 +91,7 @@ class ClientConfigurationBuilder
* @see org.springframework.data.elasticsearch.client.ClientConfiguration.MaybeSecureClientConfigurationBuilder#usingSsl(javax.net.ssl.SSLContext)
*/
@Override
public ClientConfigurationBuilderWithOptionalDefaultHeaders usingSsl(SSLContext sslContext) {
public TerminalClientConfigurationBuilder usingSsl(SSLContext sslContext) {
Assert.notNull(sslContext, "SSL Context must not be null");
@ -99,10 +102,10 @@ class ClientConfigurationBuilder
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithOptionalDefaultHeaders#withDefaultHeaders(org.springframework.http.HttpHeaders)
* @see org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder#withDefaultHeaders(org.springframework.http.HttpHeaders)
*/
@Override
public ClientConfigurationBuilderWithOptionalDefaultHeaders withDefaultHeaders(HttpHeaders defaultHeaders) {
public TerminalClientConfigurationBuilder withDefaultHeaders(HttpHeaders defaultHeaders) {
Assert.notNull(defaultHeaders, "Default HTTP headers must not be null");
@ -110,13 +113,40 @@ class ClientConfigurationBuilder
return this;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder#withConnectTimeout(java.time.Duration)
*/
@Override
public TerminalClientConfigurationBuilder withConnectTimeout(Duration connectTimeout) {
Assert.notNull(connectTimeout, "I/O timeout must not be null!");
this.connectTimeout = connectTimeout;
return this;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder#withTimeout(java.time.Duration)
*/
@Override
public TerminalClientConfigurationBuilder withSocketTimeout(Duration soTimeout) {
Assert.notNull(soTimeout, "Socket timeout must not be null!");
this.soTimeout = soTimeout;
return this;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.ClientConfiguration.ClientConfigurationBuilderWithOptionalDefaultHeaders#build()
*/
@Override
public ClientConfiguration build() {
return new DefaultClientConfiguration(this.hosts, this.headers, this.useSsl, this.sslContext);
return new DefaultClientConfiguration(this.hosts, this.headers, this.useSsl, this.sslContext, this.soTimeout,
this.connectTimeout);
}
private static InetSocketAddress parse(String hostAndPort) {

View File

@ -0,0 +1,119 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.util.ObjectUtils;
/**
* Logging Utility to log client requests and responses. Logs client requests and responses to Elasticsearch to a
* dedicated logger: {@code org.springframework.data.elasticsearch.client.WIRE} on {@link org.slf4j.event.Level#TRACE}
* level.
*
* @author Mark Paluch
* @since 4.0
*/
public abstract class ClientLogger {
private static final String lineSeparator = System.getProperty("line.separator");
private static final Logger WIRE_LOGGER = LoggerFactory
.getLogger("org.springframework.data.elasticsearch.client.WIRE");
private ClientLogger() {}
/**
* Returns {@literal true} if the logger is enabled.
*
* @return {@literal true} if the logger is enabled.
*/
public static boolean isEnabled() {
return WIRE_LOGGER.isTraceEnabled();
}
/**
* Log an outgoing HTTP request.
*
* @param logId the correlation Id, see {@link #newLogId()}.
* @param method HTTP method
* @param endpoint URI
* @param parameters optional parameters.
*/
public static void logRequest(String logId, String method, String endpoint, Object parameters) {
if (WIRE_LOGGER.isTraceEnabled()) {
WIRE_LOGGER.trace("[{}] Sending request {} {} with parameters: {}", logId, method.toUpperCase(), endpoint,
parameters);
}
}
/**
* Log an outgoing HTTP request with a request body.
*
* @param logId the correlation Id, see {@link #newLogId()}.
* @param method HTTP method
* @param endpoint URI
* @param parameters optional parameters.
* @param body body content supplier.
*/
public static void logRequest(String logId, String method, String endpoint, Object parameters,
Supplier<Object> body) {
if (WIRE_LOGGER.isTraceEnabled()) {
WIRE_LOGGER.trace("[{}] Sending request {} {} with parameters: {}{}Request body: {}", logId, method.toUpperCase(),
endpoint, parameters, lineSeparator, body.get());
}
}
/**
* Log a raw HTTP response without logging the body.
*
* @param logId the correlation Id, see {@link #newLogId()}.
* @param statusCode the HTTP status code.
*/
public static void logRawResponse(String logId, HttpStatus statusCode) {
if (WIRE_LOGGER.isTraceEnabled()) {
WIRE_LOGGER.trace("[{}] Received raw response: ", logId, statusCode);
}
}
/**
* Log a raw HTTP response along with the body.
*
* @param logId the correlation Id, see {@link #newLogId()}.
* @param statusCode the HTTP status code.
* @param body body content.
*/
public static void logResponse(String logId, HttpStatus statusCode, String body) {
if (WIRE_LOGGER.isTraceEnabled()) {
WIRE_LOGGER.trace("[{}] Received response: {}{}Response body: {}", logId, statusCode, lineSeparator, body);
}
}
/**
* Creates a new, unique correlation Id to improve tracing across log events.
*
* @return a new, unique correlation Id.
*/
public static String newLogId() {
return ObjectUtils.getIdentityHexString(new Object());
}
}

View File

@ -16,6 +16,7 @@
package org.springframework.data.elasticsearch.client;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -38,14 +39,18 @@ class DefaultClientConfiguration implements ClientConfiguration {
private final HttpHeaders headers;
private final boolean useSsl;
private final @Nullable SSLContext sslContext;
private final Duration soTimeout;
private final Duration connectTimeout;
DefaultClientConfiguration(List<InetSocketAddress> hosts, HttpHeaders headers, boolean useSsl,
@Nullable SSLContext sslContext) {
@Nullable SSLContext sslContext, Duration soTimeout, Duration connectTimeout) {
this.hosts = Collections.unmodifiableList(new ArrayList<>(hosts));
this.headers = new HttpHeaders(headers);
this.useSsl = useSsl;
this.sslContext = sslContext;
this.soTimeout = soTimeout;
this.connectTimeout = connectTimeout;
}
/*
@ -83,4 +88,23 @@ class DefaultClientConfiguration implements ClientConfiguration {
public Optional<SSLContext> getSslContext() {
return Optional.ofNullable(this.sslContext);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.ClientConfiguration#getConnectTimeout()
*/
@Override
public Duration getConnectTimeout() {
return this.connectTimeout;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.ClientConfiguration#getSocketTimeout()
*/
@Override
public Duration getSocketTimeout() {
return this.soTimeout;
}
}

View File

@ -15,9 +15,11 @@
*/
package org.springframework.data.elasticsearch.client;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@ -25,12 +27,23 @@ import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.config.RequestConfig.Builder;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
/**
@ -43,6 +56,11 @@ import org.springframework.util.Assert;
*/
public final class RestClients {
/**
* Name of whose value can be used to correlate log messages for this request.
*/
private static final String LOG_ID_ATTRIBUTE = RestClients.class.getName() + ".LOG_ID";
private RestClients() {}
/**
@ -70,6 +88,26 @@ public final class RestClients {
Optional<SSLContext> sslContext = clientConfiguration.getSslContext();
sslContext.ifPresent(clientBuilder::setSSLContext);
if (ClientLogger.isEnabled()) {
clientBuilder.addInterceptorLast((HttpRequestInterceptor) LoggingInterceptors.INSTANCE);
clientBuilder.addInterceptorLast((HttpResponseInterceptor) LoggingInterceptors.INSTANCE);
}
Duration connectTimeout = clientConfiguration.getConnectTimeout();
Duration timeout = clientConfiguration.getSocketTimeout();
Builder requestConfigBuilder = RequestConfig.custom();
if (!connectTimeout.isNegative()) {
requestConfigBuilder.setConnectTimeout(Math.toIntExact(connectTimeout.toMillis()));
requestConfigBuilder.setConnectionRequestTimeout(Math.toIntExact(connectTimeout.toMillis()));
}
if (!timeout.isNegative()) {
requestConfigBuilder.setSocketTimeout(Math.toIntExact(timeout.toMillis()));
}
clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
return clientBuilder;
});
@ -108,4 +146,49 @@ public final class RestClients {
rest().close();
}
}
/**
* Logging interceptors for Elasticsearch client logging.
*
* @see ClientLogger
*/
enum LoggingInterceptors implements HttpResponseInterceptor, HttpRequestInterceptor {
INSTANCE;
@Override
public void process(HttpRequest request, HttpContext context) throws IOException {
String logId = (String) context.getAttribute(RestClients.LOG_ID_ATTRIBUTE);
if (logId == null) {
logId = ClientLogger.newLogId();
context.setAttribute(RestClients.LOG_ID_ATTRIBUTE, logId);
}
if (request instanceof HttpEntityEnclosingRequest && ((HttpEntityEnclosingRequest) request).getEntity() != null) {
HttpEntityEnclosingRequest entityRequest = (HttpEntityEnclosingRequest) request;
HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
entity.writeTo(buffer);
if (!entity.isRepeatable()) {
entityRequest.setEntity(new ByteArrayEntity(buffer.toByteArray()));
}
ClientLogger.logRequest(logId, request.getRequestLine().getMethod(), request.getRequestLine().getUri(), "",
() -> new String(buffer.toByteArray()));
} else {
ClientLogger.logRequest(logId, request.getRequestLine().getMethod(), request.getRequestLine().getUri(), "");
}
}
@Override
public void process(HttpResponse response, HttpContext context) {
String logId = (String) context.getAttribute(RestClients.LOG_ID_ATTRIBUTE);
ClientLogger.logRawResponse(logId, HttpStatus.resolve(response.getStatusLine().getStatusCode()));
}
}
}

View File

@ -15,19 +15,25 @@
*/
package org.springframework.data.elasticsearch.client.reactive;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.net.ssl.SSLContext;
@ -64,10 +70,12 @@ import org.elasticsearch.search.SearchHit;
import org.reactivestreams.Publisher;
import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.ClientLogger;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.NoReachableHostException;
import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification;
import org.springframework.data.elasticsearch.client.util.RequestConverters;
import org.springframework.data.util.Lazy;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
@ -77,6 +85,7 @@ import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec;
@ -149,19 +158,37 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
WebClientProvider provider;
TcpClient tcpClient = TcpClient.create();
Duration connectTimeout = clientConfiguration.getConnectTimeout();
Duration timeout = clientConfiguration.getSocketTimeout();
if (!connectTimeout.isNegative()) {
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis()));
}
if (!timeout.isNegative()) {
tcpClient = tcpClient.doOnConnected(connection -> connection //
.addHandlerLast(new ReadTimeoutHandler(timeout.toMillis(), TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(timeout.toMillis(), TimeUnit.MILLISECONDS)));
}
String scheme = "http";
HttpClient httpClient = HttpClient.from(tcpClient);
if (clientConfiguration.useSsl()) {
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(HttpClient.create().secure(sslConfig -> {
httpClient = httpClient.secure(sslConfig -> {
Optional<SSLContext> sslContext = clientConfiguration.getSslContext();
sslContext.ifPresent(it -> sslConfig.sslContext(new JdkSslContext(it, true, ClientAuth.NONE)));
}));
provider = WebClientProvider.create("https", connector);
} else {
provider = WebClientProvider.create("http");
});
scheme = "https";
}
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
provider = WebClientProvider.create(scheme, connector);
return provider.withDefaultHeaders(clientConfiguration.getDefaultHeaders());
}
@ -324,20 +351,29 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
private <AR extends ActionResponse> Flux<AR> sendRequest(Request request, Class<AR> responseType,
HttpHeaders headers) {
return execute(webClient -> sendRequest(webClient, request, headers))
.flatMapMany(response -> readResponseBody(request, response, responseType));
String logId = ClientLogger.newLogId();
return execute(webClient -> sendRequest(webClient, logId, request, headers))
.flatMapMany(response -> readResponseBody(logId, request, response, responseType));
}
private Mono<ClientResponse> sendRequest(WebClient webClient, Request request, HttpHeaders headers) {
private Mono<ClientResponse> sendRequest(WebClient webClient, String logId, Request request, HttpHeaders headers) {
RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) //
.uri(request.getEndpoint(), request.getParameters()) //
.attribute(ClientRequest.LOG_ID_ATTRIBUTE, logId) //
.headers(theHeaders -> theHeaders.addAll(headers));
if (request.getEntity() != null) {
Lazy<String> body = bodyExtractor(request);
ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(),
body::get);
requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue()));
requestBodySpec.body(bodyExtractor(request), String.class);
requestBodySpec.body(Mono.fromSupplier(body::get), String.class);
} else {
ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters());
}
return requestBodySpec //
@ -345,9 +381,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build());
}
private Publisher<String> bodyExtractor(Request request) {
private Lazy<String> bodyExtractor(Request request) {
return Mono.fromSupplier(() -> {
return Lazy.of(() -> {
try {
return EntityUtils.toString(request.getEntity());
@ -357,19 +393,25 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
});
}
private <T> Publisher<? extends T> readResponseBody(Request request, ClientResponse response, Class<T> responseType) {
private <T> Publisher<? extends T> readResponseBody(String logId, Request request, ClientResponse response,
Class<T> responseType) {
if (RawActionResponse.class.equals(responseType)) {
ClientLogger.logRawResponse(logId, response.statusCode());
return Mono.just(responseType.cast(RawActionResponse.create(response)));
}
if (response.statusCode().is5xxServerError()) {
ClientLogger.logRawResponse(logId, response.statusCode());
return handleServerError(request, response);
}
return response.body(BodyExtractors.toMono(byte[].class)) //
.map(it -> new String(it, StandardCharsets.UTF_8)) //
.flatMap(content -> doDecode(response, responseType, content));
.doOnNext(it -> {
ClientLogger.logResponse(logId, response.statusCode(), it);
}).flatMap(content -> doDecode(response, responseType, content));
}
private static <T> Mono<T> doDecode(ClientResponse response, Class<T> responseType, String content) {

View File

@ -0,0 +1,608 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import java.util.Map;
import org.springframework.core.convert.ConversionService;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.mapping.IdentifierAccessor;
import org.springframework.data.mapping.PersistentPropertyAccessor;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.mapping.model.ConvertingPropertyAccessor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* Common operations performed on an entity in the context of it's mapping metadata.
*
* @author Mark Paluch
* @author Christoph Strobl
* @since 4.0
*/
@RequiredArgsConstructor
class EntityOperations {
public static final String ID_FIELD = "id";
private final @NonNull MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> context;
/**
* Creates a new {@link Entity} for the given bean.
*
* @param entity must not be {@literal null}.
* @return
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public <T> Entity<T> forEntity(T entity) {
Assert.notNull(entity, "Bean must not be null!");
if (entity instanceof Map) {
return new SimpleMappedEntity((Map<String, Object>) entity);
}
return MappedEntity.of(entity, context);
}
/**
* Creates a new {@link AdaptibleEntity} for the given bean and {@link ConversionService}.
*
* @param entity must not be {@literal null}.
* @param conversionService must not be {@literal null}.
* @return
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public <T> AdaptibleEntity<T> forEntity(T entity, ConversionService conversionService) {
Assert.notNull(entity, "Bean must not be null!");
Assert.notNull(conversionService, "ConversionService must not be null!");
if (entity instanceof Map) {
return new SimpleMappedEntity((Map<String, Object>) entity);
}
return AdaptibleMappedEntity.of(entity, context, conversionService);
}
/**
* Determine index name and type name from {@link Entity} with {@code index} and {@code type} overrides. Allows using
* preferred values for index and type if provided, otherwise fall back to index and type defined on entity level.
*
* @param entity the entity to determine the index name. Can be {@literal null} if {@code index} and {@literal type}
* are provided.
* @param index index name override can be {@literal null}.
* @param type index type override can be {@literal null}.
* @return the {@link IndexCoordinates} containing index name and index type.
* @see ElasticsearchPersistentEntity#getIndexName()
* @see ElasticsearchPersistentEntity#getIndexType()
*/
public IndexCoordinates determineIndex(Entity<?> entity, @Nullable String index, @Nullable String type) {
return determineIndex(entity.getPersistentEntity(), index, type);
}
/**
* Determine index name and type name from {@link ElasticsearchPersistentEntity} with {@code index} and {@code type}
* overrides. Allows using preferred values for index and type if provided, otherwise fall back to index and type
* defined on entity level.
*
* @param persistentEntity the entity to determine the index name. Can be {@literal null} if {@code index} and
* {@literal type} are provided.
* @param index index name override can be {@literal null}.
* @param type index type override can be {@literal null}.
* @return the {@link IndexCoordinates} containing index name and index type.
* @see ElasticsearchPersistentEntity#getIndexName()
* @see ElasticsearchPersistentEntity#getIndexType()
*/
public IndexCoordinates determineIndex(ElasticsearchPersistentEntity<?> persistentEntity, @Nullable String index,
@Nullable String type) {
return new IndexCoordinates(indexName(persistentEntity, index), typeName(persistentEntity, type));
}
private static String indexName(@Nullable ElasticsearchPersistentEntity<?> entity, @Nullable String index) {
if (StringUtils.isEmpty(index)) {
Assert.notNull(entity, "Cannot determine index name");
return entity.getIndexName();
}
return index;
}
private static String typeName(@Nullable ElasticsearchPersistentEntity<?> entity, @Nullable String type) {
if (StringUtils.isEmpty(type)) {
Assert.notNull(entity, "Cannot determine index type");
return entity.getIndexType();
}
return type;
}
/**
* A representation of information about an entity.
*
* @author Christoph Strobl
*/
interface Entity<T> {
/**
* Returns the identifier of the entity.
*
* @return the ID value, can be {@literal null}.
*/
@Nullable
Object getId();
/**
* Returns whether the entity is versioned, i.e. if it contains a version property.
*
* @return
*/
default boolean isVersionedEntity() {
return false;
}
/**
* Returns the value of the version if the entity has a version property, {@literal null} otherwise.
*
* @return
*/
@Nullable
Object getVersion();
/**
* Returns the underlying bean.
*
* @return
*/
T getBean();
/**
* Returns whether the entity is considered to be new.
*
* @return
*/
boolean isNew();
/**
* Returns the {@link ElasticsearchPersistentEntity} associated with this entity.
*
* @return can be {@literal null} if this entity is not mapped.
*/
@Nullable
ElasticsearchPersistentEntity<?> getPersistentEntity();
/**
* Returns the required {@link ElasticsearchPersistentEntity}.
*
* @return
* @throws IllegalStateException if no {@link ElasticsearchPersistentEntity} is associated with this entity.
*/
default ElasticsearchPersistentEntity<?> getRequiredPersistentEntity() {
ElasticsearchPersistentEntity<?> persistentEntity = getPersistentEntity();
if (persistentEntity == null) {
throw new IllegalStateException("No ElasticsearchPersistentEntity available for this entity!");
}
return persistentEntity;
}
}
/**
* Information and commands on an entity.
*
* @author Mark Paluch
*/
interface AdaptibleEntity<T> extends Entity<T> {
/**
* Returns whether the entity has a parent.
*
* @return
*/
boolean hasParent();
/**
* Returns the parent Id. Can be {@literal null}.
*
* @return
*/
@Nullable
Object getParentId();
/**
* Populates the identifier of the backing entity if it has an identifier property and there's no identifier
* currently present.
*
* @param id must not be {@literal null}.
* @return
*/
@Nullable
T populateIdIfNecessary(@Nullable Object id);
/**
* Initializes the version property of the of the current entity if available.
*
* @return the entity with the version property updated if available.
*/
T initializeVersionProperty();
/**
* Increments the value of the version property if available.
*
* @return the entity with the version property incremented if available.
*/
T incrementVersion();
/**
* Returns the current version value if the entity has a version property.
*
* @return the current version or {@literal null} in case it's uninitialized or the entity doesn't expose a version
* property.
*/
@Nullable
Number getVersion();
}
/**
* Plain entity without applying further mapping.
*
* @param <T>
*/
@RequiredArgsConstructor
private static class UnmappedEntity<T extends Map<String, Object>> implements AdaptibleEntity<T> {
private final T map;
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getId()
*/
@Override
public Object getId() {
return map.get(ID_FIELD);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#hasParent()
*/
@Override
public boolean hasParent() {
return false;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#getParentId()
*/
@Override
public Entity<?> getParentId() {
return null;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#populateIdIfNecessary(java.lang.Object)
*/
@Nullable
@Override
public T populateIdIfNecessary(@Nullable Object id) {
map.put(ID_FIELD, id);
return map;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#initializeVersionProperty()
*/
@Override
public T initializeVersionProperty() {
return map;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#getVersion()
*/
@Override
@Nullable
public Number getVersion() {
return null;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#incrementVersion()
*/
@Override
public T incrementVersion() {
return map;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getBean()
*/
@Override
public T getBean() {
return map;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#isNew()
*/
@Override
public boolean isNew() {
return map.get(ID_FIELD) != null;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getPersistentEntity()
*/
@Override
public ElasticsearchPersistentEntity<?> getPersistentEntity() {
return null;
}
}
/**
* Simple mapped entity without an associated {@link ElasticsearchPersistentEntity}.
*
* @param <T>
*/
private static class SimpleMappedEntity<T extends Map<String, Object>> extends UnmappedEntity<T> {
SimpleMappedEntity(T map) {
super(map);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.UnmappedEntity#getId()
*/
@Override
public Object getId() {
return getBean().get(ID_FIELD);
}
}
/**
* Mapped entity with an associated {@link ElasticsearchPersistentEntity}.
*
* @param <T>
*/
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
private static class MappedEntity<T> implements Entity<T> {
private final ElasticsearchPersistentEntity<?> entity;
private final IdentifierAccessor idAccessor;
private final PersistentPropertyAccessor<T> propertyAccessor;
private static <T> MappedEntity<T> of(T bean,
MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> context) {
ElasticsearchPersistentEntity<?> entity = context.getRequiredPersistentEntity(bean.getClass());
IdentifierAccessor identifierAccessor = entity.getIdentifierAccessor(bean);
PersistentPropertyAccessor<T> propertyAccessor = entity.getPropertyAccessor(bean);
return new MappedEntity<>(entity, identifierAccessor, propertyAccessor);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getId()
*/
@Override
public Object getId() {
return idAccessor.getIdentifier();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#isVersionedEntity()
*/
@Override
public boolean isVersionedEntity() {
return entity.hasVersionProperty();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getVersion()
*/
@Override
@Nullable
public Object getVersion() {
return propertyAccessor.getProperty(entity.getRequiredVersionProperty());
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getBean()
*/
@Override
public T getBean() {
return propertyAccessor.getBean();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#isNew()
*/
@Override
public boolean isNew() {
return entity.isNew(propertyAccessor.getBean());
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getPersistentEntity()
*/
@Override
public ElasticsearchPersistentEntity<?> getPersistentEntity() {
return entity;
}
}
private static class AdaptibleMappedEntity<T> extends MappedEntity<T> implements AdaptibleEntity<T> {
private final ElasticsearchPersistentEntity<?> entity;
private final ConvertingPropertyAccessor<T> propertyAccessor;
private final IdentifierAccessor identifierAccessor;
private AdaptibleMappedEntity(ElasticsearchPersistentEntity<?> entity, IdentifierAccessor identifierAccessor,
ConvertingPropertyAccessor<T> propertyAccessor) {
super(entity, identifierAccessor, propertyAccessor);
this.entity = entity;
this.propertyAccessor = propertyAccessor;
this.identifierAccessor = identifierAccessor;
}
static <T> AdaptibleEntity<T> of(T bean,
MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> context,
ConversionService conversionService) {
ElasticsearchPersistentEntity<?> entity = context.getRequiredPersistentEntity(bean.getClass());
IdentifierAccessor identifierAccessor = entity.getIdentifierAccessor(bean);
PersistentPropertyAccessor<T> propertyAccessor = entity.getPropertyAccessor(bean);
return new AdaptibleMappedEntity<>(entity, identifierAccessor,
new ConvertingPropertyAccessor<>(propertyAccessor, conversionService));
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#hasParent()
*/
@Override
public boolean hasParent() {
return getRequiredPersistentEntity().getParentIdProperty() != null;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#getParentId()
*/
@Override
public Object getParentId() {
ElasticsearchPersistentProperty parentProperty = getRequiredPersistentEntity().getParentIdProperty();
return propertyAccessor.getProperty(parentProperty);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#populateIdIfNecessary(java.lang.Object)
*/
@Nullable
@Override
public T populateIdIfNecessary(@Nullable Object id) {
if (id == null) {
return null;
}
T bean = propertyAccessor.getBean();
ElasticsearchPersistentProperty idProperty = entity.getIdProperty();
if (idProperty == null) {
return bean;
}
if (identifierAccessor.getIdentifier() != null) {
return bean;
}
propertyAccessor.setProperty(idProperty, id);
return propertyAccessor.getBean();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.MappedEntity#getVersion()
*/
@Override
@Nullable
public Number getVersion() {
ElasticsearchPersistentProperty versionProperty = entity.getRequiredVersionProperty();
return propertyAccessor.getProperty(versionProperty, Number.class);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#initializeVersionProperty()
*/
@Override
public T initializeVersionProperty() {
if (!entity.hasVersionProperty()) {
return propertyAccessor.getBean();
}
ElasticsearchPersistentProperty versionProperty = entity.getRequiredVersionProperty();
propertyAccessor.setProperty(versionProperty, versionProperty.getType().isPrimitive() ? 1 : 0);
return propertyAccessor.getBean();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#incrementVersion()
*/
@Override
public T incrementVersion() {
ElasticsearchPersistentProperty versionProperty = entity.getRequiredVersionProperty();
Number version = getVersion();
Number nextVersion = version == null ? 0 : version.longValue() + 1;
propertyAccessor.setProperty(versionProperty, nextVersion);
return propertyAccessor.getBean();
}
}
/**
* Value object encapsulating index name and index type.
*/
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
static class IndexCoordinates {
private final String indexName;
private final String typeName;
}
}

View File

@ -17,6 +17,7 @@ package org.springframework.data.elasticsearch.core;
import static org.elasticsearch.index.VersionType.*;
import lombok.NonNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -24,7 +25,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.elasticsearch.action.delete.DeleteRequest;
@ -48,37 +48,43 @@ import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity;
import org.springframework.data.elasticsearch.core.EntityOperations.Entity;
import org.springframework.data.elasticsearch.core.EntityOperations.IndexCoordinates;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.mapping.IdentifierAccessor;
import org.springframework.data.mapping.PersistentProperty;
import org.springframework.data.mapping.PersistentPropertyAccessor;
import org.springframework.data.util.ClassTypeInformation;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.http.HttpStatus;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
* @author Christoph Strobl
* @author Mark Paluch
* @since 4.0
*/
public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations {
private static final Logger QUERY_LOGGER = LoggerFactory
.getLogger("org.springframework.data.elasticsearch.core.QUERY");
private final ReactiveElasticsearchClient client;
private final ElasticsearchConverter converter;
private final @NonNull MappingContext<? extends ElasticsearchPersistentEntity<?>, ElasticsearchPersistentProperty> mappingContext;
private final ResultsMapper resultMapper;
private final ElasticsearchExceptionTranslator exceptionTranslator;
private final EntityOperations operations;
private @Nullable RefreshPolicy refreshPolicy = RefreshPolicy.IMMEDIATE;
private @Nullable IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed();
@ -91,8 +97,10 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
this.client = client;
this.converter = converter;
this.mappingContext = converter.getMappingContext();
this.resultMapper = new DefaultResultMapper(converter.getMappingContext());
this.exceptionTranslator = new ElasticsearchExceptionTranslator();
this.operations = new EntityOperations(this.mappingContext);
}
/*
@ -113,26 +121,26 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Assert.notNull(entity, "Entity must not be null!");
AdaptableEntity<T> adaptableEntity = ConverterAwareAdaptableEntity.of(entity, converter);
AdaptibleEntity<T> adaptableEntity = operations.forEntity(entity, converter.getConversionService());
return doIndex(entity, adaptableEntity, index, type) //
.map(it -> {
return adaptableEntity.updateIdIfNecessary(it.getId());
return adaptableEntity.populateIdIfNecessary(it.getId());
});
}
private Mono<IndexResponse> doIndex(Object value, AdaptableEntity<?> entity, @Nullable String index,
private Mono<IndexResponse> doIndex(Object value, AdaptibleEntity<?> entity, @Nullable String index,
@Nullable String type) {
return Mono.defer(() -> {
Object id = entity.getId();
String indexToUse = indexName(index, entity);
String typeToUse = typeName(type, entity);
IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type);
IndexRequest request = id != null ? new IndexRequest(indexToUse, typeToUse, id.toString())
: new IndexRequest(indexToUse, typeToUse);
IndexRequest request = id != null
? new IndexRequest(indexCoordinates.getIndexName(), indexCoordinates.getTypeName(), id.toString())
: new IndexRequest(indexCoordinates.getIndexName(), indexCoordinates.getTypeName());
try {
request.source(resultMapper.getEntityMapper().mapToString(value), Requests.INDEX_CONTENT_TYPE);
@ -140,7 +148,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
throw new RuntimeException(e);
}
if (entity.isVersioned()) {
if (entity.isVersionedEntity()) {
Object version = entity.getVersion();
if (version != null) {
@ -149,9 +157,9 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
}
}
if (entity.getPersistentEntity().getParentIdProperty() != null) {
if (entity.hasParent()) {
Object parentId = entity.getPropertyValue(entity.getPersistentEntity().getParentIdProperty());
Object parentId = entity.getParentId();
if (parentId != null) {
request.parent(parentId.toString());
}
@ -171,19 +179,18 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Assert.notNull(id, "Id must not be null!");
return doFindById(id, BasicElasticsearchEntity.of(entityType, converter), index, type)
return doFindById(id, getPersistentEntity(entityType), index, type)
.map(it -> resultMapper.mapEntity(it, entityType));
}
private Mono<GetResult> doFindById(String id, ElasticsearchEntity<?> entity, @Nullable String index,
private Mono<GetResult> doFindById(String id, ElasticsearchPersistentEntity<?> entity, @Nullable String index,
@Nullable String type) {
return Mono.defer(() -> {
String indexToUse = indexName(index, entity);
String typeToUse = typeName(type, entity);
IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type);
return doFindById(new GetRequest(indexToUse, typeToUse, id));
return doFindById(new GetRequest(indexCoordinates.getIndexName(), indexCoordinates.getTypeName(), id));
});
}
@ -196,18 +203,17 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Assert.notNull(id, "Id must not be null!");
return doExists(id, BasicElasticsearchEntity.of(entityType, converter), index, type);
return doExists(id, getPersistentEntity(entityType), index, type);
}
private Mono<Boolean> doExists(String id, ElasticsearchEntity<?> entity, @Nullable String index,
private Mono<Boolean> doExists(String id, ElasticsearchPersistentEntity<?> entity, @Nullable String index,
@Nullable String type) {
return Mono.defer(() -> {
String indexToUse = indexName(index, entity);
String typeToUse = typeName(type, entity);
IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type);
return doExists(new GetRequest(indexToUse, typeToUse, id));
return doExists(new GetRequest(indexCoordinates.getIndexName(), indexCoordinates.getTypeName(), id));
});
}
@ -219,24 +225,25 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
public <T> Flux<T> find(Query query, Class<?> entityType, @Nullable String index, @Nullable String type,
Class<T> resultType) {
return doFind(query, BasicElasticsearchEntity.of(entityType, converter), index, type)
return doFind(query, getPersistentEntity(entityType), index, type)
.map(it -> resultMapper.mapEntity(it, resultType));
}
private Flux<SearchHit> doFind(Query query, ElasticsearchEntity<?> entity, @Nullable String index,
private Flux<SearchHit> doFind(Query query, ElasticsearchPersistentEntity<?> entity, @Nullable String index,
@Nullable String type) {
return Flux.defer(() -> {
SearchRequest request = new SearchRequest(indices(query, () -> indexName(index, entity)));
request.types(indexTypes(query, () -> typeName(type, entity)));
IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type);
SearchRequest request = new SearchRequest(indices(query, indexCoordinates::getIndexName));
request.types(indexTypes(query, indexCoordinates::getTypeName));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(mappedQuery(query, entity.getPersistentEntity()));
searchSourceBuilder.query(mappedQuery(query, entity));
// TODO: request.source().postFilter(elasticsearchFilter); -- filter query
searchSourceBuilder.version(entity.isVersioned()); // This has been true by default before
searchSourceBuilder.version(entity.hasVersionProperty()); // This has been true by default before
searchSourceBuilder.trackScores(query.getTrackScores());
if (query.getSourceFilter() != null) {
@ -258,7 +265,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
request.indicesOptions(query.getIndicesOptions());
}
sort(query, entity.getPersistentEntity()).forEach(searchSourceBuilder::sort);
sort(query, entity).forEach(searchSourceBuilder::sort);
if (query.getMinScore() > 0) {
searchSourceBuilder.minScore(query.getMinScore());
@ -290,10 +297,10 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
@Override
public Mono<String> delete(Object entity, @Nullable String index, @Nullable String type) {
AdaptableEntity<?> elasticsearchEntity = ConverterAwareAdaptableEntity.of(entity, converter);
Entity<?> elasticsearchEntity = operations.forEntity(entity);
return Mono.defer(() -> doDeleteById(entity, ObjectUtils.nullSafeToString(elasticsearchEntity.getId()),
elasticsearchEntity, index, type));
elasticsearchEntity.getPersistentEntity(), index, type));
}
/*
@ -305,19 +312,19 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Assert.notNull(id, "Id must not be null!");
return doDeleteById(null, id, BasicElasticsearchEntity.of(entityType, converter), index, type);
return doDeleteById(null, id, getPersistentEntity(entityType), index, type);
}
private Mono<String> doDeleteById(@Nullable Object source, String id, ElasticsearchEntity<?> entity,
private Mono<String> doDeleteById(@Nullable Object source, String id, ElasticsearchPersistentEntity<?> entity,
@Nullable String index, @Nullable String type) {
return Mono.defer(() -> {
String indexToUse = indexName(index, entity);
String typeToUse = typeName(type, entity);
IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type);
return doDelete(prepareDeleteRequest(source, new DeleteRequest(indexToUse, typeToUse, id)));
return doDelete(prepareDeleteRequest(source,
new DeleteRequest(indexCoordinates.getIndexName(), indexCoordinates.getTypeName(), id)));
});
}
@ -330,18 +337,20 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Assert.notNull(query, "Query must not be null!");
return doDeleteBy(query, BasicElasticsearchEntity.of(entityType, converter), index, type)
.map(BulkByScrollResponse::getDeleted).publishNext();
return doDeleteBy(query, getPersistentEntity(entityType), index, type).map(BulkByScrollResponse::getDeleted)
.publishNext();
}
private Flux<BulkByScrollResponse> doDeleteBy(Query query, ElasticsearchEntity<?> entity, @Nullable String index,
@Nullable String type) {
private Flux<BulkByScrollResponse> doDeleteBy(Query query, ElasticsearchPersistentEntity<?> entity,
@Nullable String index, @Nullable String type) {
return Flux.defer(() -> {
DeleteByQueryRequest request = new DeleteByQueryRequest(indices(query, () -> indexName(index, entity)));
request.types(indexTypes(query, () -> typeName(type, entity)));
request.setQuery(mappedQuery(query, entity.getPersistentEntity()));
IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type);
DeleteByQueryRequest request = new DeleteByQueryRequest(indices(query, indexCoordinates::getIndexName));
request.types(indexTypes(query, indexCoordinates::getTypeName));
request.setQuery(mappedQuery(query, entity));
return doDeleteBy(prepareDeleteByRequest(request));
});
@ -494,6 +503,11 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
* @return a {@link Flux} emitting the result of the operation.
*/
protected Flux<SearchHit> doFind(SearchRequest request) {
if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug("Executing doFind: {}", request);
}
return Flux.from(execute(client -> client.search(request)));
}
@ -529,14 +543,6 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
// private helpers
private static String indexName(@Nullable String index, ElasticsearchEntity<?> entity) {
return StringUtils.isEmpty(index) ? entity.getIndexName() : index;
}
private static String typeName(@Nullable String type, ElasticsearchEntity<?> entity) {
return StringUtils.isEmpty(type) ? entity.getTypeName() : type;
}
private static String[] indices(Query query, Supplier<String> index) {
if (query.getIndices().isEmpty()) {
@ -601,12 +607,13 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
private QueryBuilder mappedFilterQuery(CriteriaQuery query, ElasticsearchPersistentEntity<?> entity) {
// TODO: this is actually strange in the RestTemplate:L378 - need to chack
// TODO: this is actually strange in the RestTemplate:L378 - need to check
return null;
}
private ElasticsearchPersistentEntity<?> lookupPersistentEntity(Class<?> type) {
return converter.getMappingContext().getPersistentEntity(type);
@Nullable
private ElasticsearchPersistentEntity<?> getPersistentEntity(@Nullable Class<?> type) {
return type != null ? mappingContext.getPersistentEntity(type) : null;
}
private Throwable translateException(Throwable throwable) {
@ -617,160 +624,4 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return potentiallyTranslatedException != null ? potentiallyTranslatedException : throwable;
}
/**
* @param <T>
* @author Christoph Strobl
* @since 4.0
*/
protected interface ElasticsearchEntity<T> {
default boolean isIdentifiable() {
return getPersistentEntity().hasVersionProperty();
}
default boolean isVersioned() {
return getPersistentEntity().hasVersionProperty();
}
default ElasticsearchPersistentProperty getIdProperty() {
return getPersistentEntity().getIdProperty();
}
default String getIndexName() {
return getPersistentEntity().getIndexName();
}
default String getTypeName() {
return getPersistentEntity().getIndexType();
}
ElasticsearchPersistentEntity<?> getPersistentEntity();
}
protected interface AdaptableEntity<T> extends ElasticsearchEntity<T> {
PersistentPropertyAccessor<T> getPropertyAccessor();
IdentifierAccessor getIdentifierAccessor();
@Nullable
default Object getId() {
return getIdentifierAccessor().getIdentifier();
}
default Object getVersion() {
return getPropertyAccessor().getProperty(getPersistentEntity().getRequiredVersionProperty());
}
@Nullable
default Object getPropertyValue(PersistentProperty<?> property) {
return getPropertyAccessor().getProperty(property);
}
default T getBean() {
return getPropertyAccessor().getBean();
}
default T updateIdIfNecessary(Object id) {
if (id == null || !getPersistentEntity().hasIdProperty() || getId() != null) {
return getPropertyAccessor().getBean();
}
return updatePropertyValue(getPersistentEntity().getIdProperty(), id);
}
default T updatePropertyValue(PersistentProperty<?> property, @Nullable Object value) {
getPropertyAccessor().setProperty(property, value);
return getPropertyAccessor().getBean();
}
}
protected static class BasicElasticsearchEntity<T> implements ElasticsearchEntity<T> {
final ElasticsearchPersistentEntity<?> entity;
BasicElasticsearchEntity(ElasticsearchPersistentEntity<?> entity) {
this.entity = entity;
}
static <T> BasicElasticsearchEntity<T> of(T bean, ElasticsearchConverter converter) {
return new BasicElasticsearchEntity<>(converter.getMappingContext().getRequiredPersistentEntity(bean.getClass()));
}
static <T> BasicElasticsearchEntity<T> of(Class<T> type, ElasticsearchConverter converter) {
if (Object.class.equals(type)) {
return new BasicElasticsearchEntity<>(new SimpleElasticsearchPersistentEntity<>(ClassTypeInformation.OBJECT));
}
return new BasicElasticsearchEntity<>(converter.getMappingContext().getRequiredPersistentEntity(type));
}
@Override
public ElasticsearchPersistentEntity<?> getPersistentEntity() {
return entity;
}
}
protected static class ConverterAwareAdaptableEntity<T> implements AdaptableEntity<T> {
final ElasticsearchPersistentEntity<?> entity;
final PersistentPropertyAccessor<T> propertyAccessor;
final IdentifierAccessor idAccessor;
final ElasticsearchConverter converter;
ConverterAwareAdaptableEntity(ElasticsearchPersistentEntity<?> entity, IdentifierAccessor idAccessor,
PersistentPropertyAccessor<T> propertyAccessor, ElasticsearchConverter converter) {
this.entity = entity;
this.propertyAccessor = propertyAccessor;
this.idAccessor = idAccessor;
this.converter = converter;
}
static <T> ConverterAwareAdaptableEntity<T> of(T bean, ElasticsearchConverter converter) {
ElasticsearchPersistentEntity<?> entity = converter.getMappingContext()
.getRequiredPersistentEntity(bean.getClass());
IdentifierAccessor idAccessor = entity.getIdentifierAccessor(bean);
PersistentPropertyAccessor<T> propertyAccessor = entity.getPropertyAccessor(bean);
return new ConverterAwareAdaptableEntity<>(entity, idAccessor, propertyAccessor, converter);
}
@Override
public PersistentPropertyAccessor<T> getPropertyAccessor() {
return propertyAccessor;
}
@Override
public IdentifierAccessor getIdentifierAccessor() {
if (entity.getTypeInformation().isMap()) {
return () -> {
Object id = idAccessor.getIdentifier();
if (id != null) {
return id;
}
Map<?, ?> source = (Map<?, ?>) propertyAccessor.getBean();
return source.get("id");
};
}
return idAccessor;
}
@Override
public ElasticsearchPersistentEntity<?> getPersistentEntity() {
return entity;
}
}
}

View File

@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import java.net.InetSocketAddress;
import java.time.Duration;
import javax.net.ssl.SSLContext;
@ -40,7 +41,7 @@ public class ClientConfigurationUnitTests {
assertThat(clientConfiguration.getEndpoints()).containsOnly(InetSocketAddress.createUnresolved("localhost", 9200));
}
@Test // DATAES-488
@Test // DATAES-488, DATAES-504
public void shouldCreateCustomizedConfiguration() {
HttpHeaders headers = new HttpHeaders();
@ -50,15 +51,17 @@ public class ClientConfigurationUnitTests {
.connectedTo("foo", "bar") //
.usingSsl() //
.withDefaultHeaders(headers) //
.build();
.withConnectTimeout(Duration.ofDays(1)).withSocketTimeout(Duration.ofDays(2)).build();
assertThat(clientConfiguration.getEndpoints()).containsOnly(InetSocketAddress.createUnresolved("foo", 9200),
InetSocketAddress.createUnresolved("bar", 9200));
assertThat(clientConfiguration.useSsl()).isTrue();
assertThat(clientConfiguration.getDefaultHeaders().get("foo")).containsOnly("bar");
assertThat(clientConfiguration.getConnectTimeout()).isEqualTo(Duration.ofDays(1));
assertThat(clientConfiguration.getSocketTimeout()).isEqualTo(Duration.ofDays(2));
}
@Test // DATAES-488
@Test // DATAES-488, DATAES-504
public void shouldCreateSslConfiguration() {
SSLContext sslContext = mock(SSLContext.class);
@ -72,5 +75,7 @@ public class ClientConfigurationUnitTests {
InetSocketAddress.createUnresolved("bar", 9200));
assertThat(clientConfiguration.useSsl()).isTrue();
assertThat(clientConfiguration.getSslContext()).contains(sslContext);
assertThat(clientConfiguration.getConnectTimeout()).isEqualTo(Duration.ofSeconds(10));
assertThat(clientConfiguration.getSocketTimeout()).isEqualTo(Duration.ofSeconds(5));
}
}

View File

@ -214,11 +214,13 @@ public class ReactiveMockClientTestsUtils {
Mockito.when(headersUriSpec.uri(any(String.class))).thenReturn(headersUriSpec);
Mockito.when(headersUriSpec.uri(any(), any(Map.class))).thenReturn(headersUriSpec);
Mockito.when(headersUriSpec.headers(any(Consumer.class))).thenReturn(headersUriSpec);
Mockito.when(headersUriSpec.attribute(anyString(), anyString())).thenReturn(headersUriSpec);
RequestBodyUriSpec bodyUriSpec = mock(RequestBodyUriSpec.class);
Mockito.when(webClient.method(any())).thenReturn(bodyUriSpec);
Mockito.when(bodyUriSpec.body(any())).thenReturn(headersUriSpec);
Mockito.when(bodyUriSpec.uri(any(), any(Map.class))).thenReturn(bodyUriSpec);
Mockito.when(bodyUriSpec.attribute(anyString(), anyString())).thenReturn(bodyUriSpec);
Mockito.when(bodyUriSpec.headers(any(Consumer.class))).thenReturn(bodyUriSpec);
ClientResponse response = mock(ClientResponse.class);

View File

@ -21,25 +21,26 @@ import static org.elasticsearch.index.query.QueryBuilders.*;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.junit.Rule;
import org.springframework.data.elasticsearch.ElasticsearchVersion;
import org.springframework.data.elasticsearch.ElasticsearchVersionRule;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.ElasticsearchVersion;
import org.springframework.data.elasticsearch.ElasticsearchVersionRule;
import org.springframework.data.elasticsearch.TestUtils;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.core.query.Criteria;
@ -53,7 +54,10 @@ import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.StringUtils;
/**
* Integration tests for {@link ReactiveElasticsearchTemplate}.
*
* @author Christoph Strobl
* @author Mark Paluch
* @currentRead Golden Fool - Robin Hobb
*/
@RunWith(SpringRunner.class)
@ -140,7 +144,8 @@ public class ReactiveElasticsearchTemplateTests {
SampleEntity sampleEntity = randomEntity("in another index");
template.save(sampleEntity, ALTERNATE_INDEX).as(StepVerifier::create)//
template.save(sampleEntity, ALTERNATE_INDEX) //
.as(StepVerifier::create)//
.expectNextCount(1)//
.verifyComplete();
@ -154,12 +159,13 @@ public class ReactiveElasticsearchTemplateTests {
@Test // DATAES-504
public void insertShouldAcceptPlainMapStructureAsSource() {
Map<String, Object> map = Collections.singletonMap("foo", "bar");
Map<String, Object> map = new LinkedHashMap<>(Collections.singletonMap("foo", "bar"));
template.save(map, ALTERNATE_INDEX, "singleton-map") //
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();
.consumeNextWith(actual -> {
assertThat(map).containsKey("id");
}).verifyComplete();
}
@Test(expected = IllegalArgumentException.class) // DATAES-504
@ -434,7 +440,7 @@ public class ReactiveElasticsearchTemplateTests {
@Test // DATAES-504
@ElasticsearchVersion(asOf = "6.5.0")
public void deleteByQueryShouldReturnZeroIfNothingDeleted() {
public void deleteByQueryShouldReturnZeroIfNothingDeleted() throws Exception {
index(randomEntity("test message"));

View File

@ -6,6 +6,9 @@
</Console>
</Appenders>
<Loggers>
<Logger name="org.springframework.web" level="INFO"/>
<Logger name="org.apache.http" level="INFO"/>
<Logger name="org.apache.http.wire" level="INFO"/>
<Root level="error">
<AppenderRef ref="Console"/>
</Root>

View File

@ -8,6 +8,7 @@
</appender>
<logger name="org.springframework.data.elasticsearch.ElasticsearchVersionRule" level="info" />
<logger name="org.springframework.data.elasticsearch.client.WIRE" level="info"/>
<root level="error">
<appender-ref ref="console"/>

View File

@ -0,0 +1,45 @@
# Elasticsearch plugin descriptor file
# This file must exist as 'plugin-descriptor.properties' inside a plugin.
#
### example plugin for "foo"
#
# foo.zip <-- zip file for the plugin, with this structure:
# |____ <arbitrary name1>.jar <-- classes, resources, dependencies
# |____ <arbitrary nameN>.jar <-- any number of jars
# |____ plugin-descriptor.properties <-- example contents below:
#
# classname=foo.bar.BazPlugin
# description=My cool plugin
# version=6.0
# elasticsearch.version=6.0
# java.version=1.8
#
### mandatory elements for all plugins:
#
# 'description': simple summary of the plugin
description=The Reindex module adds APIs to reindex from one index to another or update documents in place.
#
# 'version': plugin's version
version=6.5.0
#
# 'name': the plugin name
name=reindex
#
# 'classname': the name of the class to load, fully-qualified.
classname=org.elasticsearch.index.reindex.ReindexPlugin
#
# 'java.version': version of java the code is built against
# use the system property java.specification.version
# version string must be a sequence of nonnegative decimal integers
# separated by "."'s and may have leading zeros
java.version=1.8
#
# 'elasticsearch.version': version of elasticsearch compiled against
elasticsearch.version=6.5.0
### optional elements for plugins:
#
# 'extended.plugins': other plugins this plugin extends through SPI
extended.plugins=
#
# 'has.native.controller': whether or not the plugin has a native controller
has.native.controller=false

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
grant {
// reindex opens socket connections using the rest client
permission java.net.SocketPermission "*", "connect";
};
grant codeBase "${codebase.elasticsearch-rest-client}" {
// rest client uses system properties which gets the default proxy
permission java.net.NetPermission "getProxySelector";
};
grant codeBase "${codebase.httpasyncclient}" {
// rest client uses system properties which gets the default proxy
permission java.net.NetPermission "getProxySelector";
};