From 1ea73d2fb9cea31e3701fe6b5b0da10c0caf0fc1 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Wed, 5 Dec 2018 09:56:46 +0100 Subject: [PATCH] DATAES-504 - Update documentation. Update documentation to cover newly added configuration options for the ReactiveElasticsearchClient. Make sure to apply postFilter correctly and set a default limit for unpaged search requests. Also fix some code format issues. --- .../reference/elasticsearch-clients.adoc | 40 ++++++++++++ .../reactive-elasticsearch-operations.adoc | 2 +- .../client/ClientConfiguration.java | 30 +++++++-- .../client/ClientConfigurationBuilder.java | 12 ++-- .../elasticsearch/client/ClientLogger.java | 16 +++-- .../client/DefaultClientConfiguration.java | 7 +- .../elasticsearch/client/RestClients.java | 17 +++-- .../DefaultReactiveElasticsearchClient.java | 25 +++---- .../elasticsearch/core/EntityOperations.java | 65 ++++++++++++------- .../core/ReactiveElasticsearchTemplate.java | 25 ++++--- .../data/elasticsearch/TestUtils.java | 8 ++- .../ReactiveElasticsearchClientTests.java | 14 ++-- .../ReactiveElasticsearchTemplateTests.java | 2 +- ...eactiveElasticsearchTemplateUnitTests.java | 32 +++++++++ 14 files changed, 221 insertions(+), 74 deletions(-) diff --git a/src/main/asciidoc/reference/elasticsearch-clients.adoc b/src/main/asciidoc/reference/elasticsearch-clients.adoc index 72a9a49d5..916e7f702 100644 --- a/src/main/asciidoc/reference/elasticsearch-clients.adoc +++ b/src/main/asciidoc/reference/elasticsearch-clients.adoc @@ -81,3 +81,43 @@ Mono response = client.index(request -> <1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enbale SSL. ==== +NOTE: The ReactiveClient response, especially for search operations, is bound to the `from` (offset) & `size` (limit) options of the request. + +[[elasticsearch.clients.configuration]] +== Client Configuration + +Client behaviour can be changed via the `ClientConfiguration` that allows to set options for SSL, connect and socket timeouts. + +.Client Configuration +==== +[source,java] +---- + +ClientConfiguration clientConfiguration = ClientConfiguration.builder() + .connectedTo("localhost:9200", "localhost:9291") <1> + .withConnectTimeout(Duration.ofSeconds(5)) <2> + .withSocketTimeout(Duration.ofSeconds(3)) <3> + .useSsl() + . // ... other options + .build(); + +---- +<1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enbale SSL. +<2> Set the connection timeout. Default is 10 sec. +<3> Set the socket timeout. Default is 5 sec. +==== + +[[elasticsearch.clients.logging]] +== Client Logging + +To see what is actually sent to and received from the server `Request` / `Response` logging on the transport level needs +to be turned on as outlined in the snippet below. + +.Enable transport layer logging +[source,xml] +---- + +---- + +NOTE: The above applies to both the `RestHighLevelClient` and `ReactiveElasticsearchClient` when obtained via `RestClients` +respectively `ReactiveRestClients`. diff --git a/src/main/asciidoc/reference/reactive-elasticsearch-operations.adoc b/src/main/asciidoc/reference/reactive-elasticsearch-operations.adoc index b3056481b..916a04aa1 100644 --- a/src/main/asciidoc/reference/reactive-elasticsearch-operations.adoc +++ b/src/main/asciidoc/reference/reactive-elasticsearch-operations.adoc @@ -38,7 +38,7 @@ public class Config extends AbstractReactiveElasticsearchConfiguration { <1> Configure the client to use. This can be done by `ReactiveRestClients` or directly via `DefaultReactiveElasticsearchClient`. ==== -NOTE: If applicable set default `HttpHeaders` via the `ClientConfiguration` of the `ReactiveElasticsearchClient`. +NOTE: If applicable set default `HttpHeaders` via the `ClientConfiguration` of the `ReactiveElasticsearchClient`. See <>. TIP: If needed the `ReactiveElasticsearchTemplate` can be configured with default `RefreshPolicy` and `IndicesOptions` that get applied to the related requests by overriding the defaults of `refreshPolicy()` and `indicesOptions()`. diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java index 41910a822..1a78dadfb 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfiguration.java @@ -198,26 +198,48 @@ public interface ClientConfiguration { */ TerminalClientConfigurationBuilder withDefaultHeaders(HttpHeaders defaultHeaders); + /** + * Configure the {@literal milliseconds} for the connect timeout. + * + * @param millis the timeout to use. + * @return the {@link TerminalClientConfigurationBuilder} + * @see #withConnectTimeout(Duration) + */ + default TerminalClientConfigurationBuilder withConnectTimeout(long millis) { + return withConnectTimeout(Duration.ofMillis(millis)); + } + /** * Configure a {@link java.time.Duration} connect timeout. * - * @param connectTimeout the timeout to use. + * @param timeout the timeout to use. Must not be {@literal null}. * @return the {@link TerminalClientConfigurationBuilder} * @see java.net.Socket#connect(SocketAddress, int) * @see io.netty.channel.ChannelOption#CONNECT_TIMEOUT_MILLIS */ - TerminalClientConfigurationBuilder withConnectTimeout(Duration connectTimeout); + TerminalClientConfigurationBuilder withConnectTimeout(Duration timeout); + + /** + * Configure the {@literal milliseconds} for the socket timeout. + * + * @param millis the timeout to use. + * @return the {@link TerminalClientConfigurationBuilder} + * @see #withSocketTimeout(Duration) + */ + default TerminalClientConfigurationBuilder withSocketTimeout(long millis) { + return withSocketTimeout(Duration.ofMillis(millis)); + } /** * Configure a {@link java.time.Duration socket timeout} which is typically applied as SO-timeout/read timeout. * - * @param soTimeout the timeout to use. + * @param timeout the timeout to use. Must not be {@literal null}. * @return the {@link TerminalClientConfigurationBuilder} * @see java.net.Socket#setSoTimeout(int) * @see io.netty.handler.timeout.ReadTimeoutHandler * @see io.netty.handler.timeout.WriteTimeoutHandler */ - TerminalClientConfigurationBuilder withSocketTimeout(Duration soTimeout); + TerminalClientConfigurationBuilder withSocketTimeout(Duration timeout); /** * Build the {@link ClientConfiguration} object. diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java index 89b8451fc..7d362d4f3 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/ClientConfigurationBuilder.java @@ -118,11 +118,11 @@ class ClientConfigurationBuilder * @see org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder#withConnectTimeout(java.time.Duration) */ @Override - public TerminalClientConfigurationBuilder withConnectTimeout(Duration connectTimeout) { + public TerminalClientConfigurationBuilder withConnectTimeout(Duration timeout) { - Assert.notNull(connectTimeout, "I/O timeout must not be null!"); + Assert.notNull(timeout, "I/O timeout must not be null!"); - this.connectTimeout = connectTimeout; + this.connectTimeout = timeout; return this; } @@ -131,11 +131,11 @@ class ClientConfigurationBuilder * @see org.springframework.data.elasticsearch.client.ClientConfiguration.TerminalClientConfigurationBuilder#withTimeout(java.time.Duration) */ @Override - public TerminalClientConfigurationBuilder withSocketTimeout(Duration soTimeout) { + public TerminalClientConfigurationBuilder withSocketTimeout(Duration timeout) { - Assert.notNull(soTimeout, "Socket timeout must not be null!"); + Assert.notNull(timeout, "Socket timeout must not be null!"); - this.soTimeout = soTimeout; + this.soTimeout = timeout; return this; } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/ClientLogger.java b/src/main/java/org/springframework/data/elasticsearch/client/ClientLogger.java index ede54037e..7ac2ba93e 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/ClientLogger.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/ClientLogger.java @@ -28,6 +28,7 @@ import org.springframework.util.ObjectUtils; * level. * * @author Mark Paluch + * @author Christoph Strobl * @since 4.0 */ public abstract class ClientLogger { @@ -57,7 +58,8 @@ public abstract class ClientLogger { */ public static void logRequest(String logId, String method, String endpoint, Object parameters) { - if (WIRE_LOGGER.isTraceEnabled()) { + if (isEnabled()) { + WIRE_LOGGER.trace("[{}] Sending request {} {} with parameters: {}", logId, method.toUpperCase(), endpoint, parameters); } @@ -75,7 +77,8 @@ public abstract class ClientLogger { public static void logRequest(String logId, String method, String endpoint, Object parameters, Supplier body) { - if (WIRE_LOGGER.isTraceEnabled()) { + if (isEnabled()) { + WIRE_LOGGER.trace("[{}] Sending request {} {} with parameters: {}{}Request body: {}", logId, method.toUpperCase(), endpoint, parameters, lineSeparator, body.get()); } @@ -89,7 +92,7 @@ public abstract class ClientLogger { */ public static void logRawResponse(String logId, HttpStatus statusCode) { - if (WIRE_LOGGER.isTraceEnabled()) { + if (isEnabled()) { WIRE_LOGGER.trace("[{}] Received raw response: ", logId, statusCode); } } @@ -103,7 +106,7 @@ public abstract class ClientLogger { */ public static void logResponse(String logId, HttpStatus statusCode, String body) { - if (WIRE_LOGGER.isTraceEnabled()) { + if (isEnabled()) { WIRE_LOGGER.trace("[{}] Received response: {}{}Response body: {}", logId, statusCode, lineSeparator, body); } } @@ -114,6 +117,11 @@ public abstract class ClientLogger { * @return a new, unique correlation Id. */ public static String newLogId() { + + if (!isEnabled()) { + return "-"; + } + return ObjectUtils.getIdentityHexString(new Object()); } } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/DefaultClientConfiguration.java b/src/main/java/org/springframework/data/elasticsearch/client/DefaultClientConfiguration.java index 79899ba51..65393fb2a 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/DefaultClientConfiguration.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/DefaultClientConfiguration.java @@ -31,6 +31,7 @@ import org.springframework.lang.Nullable; * Default {@link ClientConfiguration} implementation. * * @author Mark Paluch + * @author Christoph Strobl * @since 4.0 */ class DefaultClientConfiguration implements ClientConfiguration { @@ -90,9 +91,9 @@ class DefaultClientConfiguration implements ClientConfiguration { } /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.ClientConfiguration#getConnectTimeout() - */ + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.ClientConfiguration#getConnectTimeout() + */ @Override public Duration getConnectTimeout() { return this.connectTimeout; diff --git a/src/main/java/org/springframework/data/elasticsearch/client/RestClients.java b/src/main/java/org/springframework/data/elasticsearch/client/RestClients.java index 9329eabf4..8608c503b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/RestClients.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/RestClients.java @@ -85,19 +85,25 @@ public final class RestClients { } builder.setHttpClientConfigCallback(clientBuilder -> { + Optional sslContext = clientConfiguration.getSslContext(); sslContext.ifPresent(clientBuilder::setSSLContext); if (ClientLogger.isEnabled()) { - clientBuilder.addInterceptorLast((HttpRequestInterceptor) LoggingInterceptors.INSTANCE); - clientBuilder.addInterceptorLast((HttpResponseInterceptor) LoggingInterceptors.INSTANCE); + + HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor(); + + clientBuilder.addInterceptorLast((HttpRequestInterceptor) interceptor); + clientBuilder.addInterceptorLast((HttpResponseInterceptor) interceptor); } Duration connectTimeout = clientConfiguration.getConnectTimeout(); Duration timeout = clientConfiguration.getSocketTimeout(); Builder requestConfigBuilder = RequestConfig.custom(); + if (!connectTimeout.isNegative()) { + requestConfigBuilder.setConnectTimeout(Math.toIntExact(connectTimeout.toMillis())); requestConfigBuilder.setConnectionRequestTimeout(Math.toIntExact(connectTimeout.toMillis())); } @@ -151,16 +157,17 @@ public final class RestClients { * Logging interceptors for Elasticsearch client logging. * * @see ClientLogger + * @since 4.0 */ - enum LoggingInterceptors implements HttpResponseInterceptor, HttpRequestInterceptor { - - INSTANCE; + private static class HttpLoggingInterceptor implements HttpResponseInterceptor, HttpRequestInterceptor { @Override public void process(HttpRequest request, HttpContext context) throws IOException { String logId = (String) context.getAttribute(RestClients.LOG_ID_ATTRIBUTE); + if (logId == null) { + logId = ClientLogger.newLogId(); context.setAttribute(RestClients.LOG_ID_ATTRIBUTE, logId); } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 7e0e620c2..7826837bb 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -156,20 +156,20 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch private static WebClientProvider getWebClientProvider(ClientConfiguration clientConfiguration) { - WebClientProvider provider; + Duration connectTimeout = clientConfiguration.getConnectTimeout(); + Duration soTimeout = clientConfiguration.getSocketTimeout(); TcpClient tcpClient = TcpClient.create(); - Duration connectTimeout = clientConfiguration.getConnectTimeout(); - Duration timeout = clientConfiguration.getSocketTimeout(); if (!connectTimeout.isNegative()) { tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis())); } - if (!timeout.isNegative()) { + if (!soTimeout.isNegative()) { + tcpClient = tcpClient.doOnConnected(connection -> connection // - .addHandlerLast(new ReadTimeoutHandler(timeout.toMillis(), TimeUnit.MILLISECONDS)) - .addHandlerLast(new WriteTimeoutHandler(timeout.toMillis(), TimeUnit.MILLISECONDS))); + .addHandlerLast(new ReadTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS)) + .addHandlerLast(new WriteTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS))); } String scheme = "http"; @@ -187,7 +187,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch } ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient); - provider = WebClientProvider.create(scheme, connector); + WebClientProvider provider = WebClientProvider.create(scheme, connector); return provider.withDefaultHeaders(clientConfiguration.getDefaultHeaders()); } @@ -370,6 +370,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(), body::get); + requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue())); requestBodySpec.body(Mono.fromSupplier(body::get), String.class); } else { @@ -397,21 +398,21 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch Class responseType) { if (RawActionResponse.class.equals(responseType)) { - ClientLogger.logRawResponse(logId, response.statusCode()); + ClientLogger.logRawResponse(logId, response.statusCode()); return Mono.just(responseType.cast(RawActionResponse.create(response))); } if (response.statusCode().is5xxServerError()) { + ClientLogger.logRawResponse(logId, response.statusCode()); return handleServerError(request, response); } return response.body(BodyExtractors.toMono(byte[].class)) // .map(it -> new String(it, StandardCharsets.UTF_8)) // - .doOnNext(it -> { - ClientLogger.logResponse(logId, response.statusCode(), it); - }).flatMap(content -> doDecode(response, responseType, content)); + .doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) // + .flatMap(content -> doDecode(response, responseType, content)); } private static Mono doDecode(ClientResponse response, Class responseType, String content) { @@ -435,7 +436,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch .error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value()))); } } - } private static XContentParser createParser(String mediaType, String content) throws IOException { @@ -446,6 +446,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch } private static Publisher handleServerError(Request request, ClientResponse response) { + return Mono.error( new HttpServerErrorException(response.statusCode(), String.format("%s request to %s returned error code %s.", request.getMethod(), request.getEndpoint(), response.statusCode().value()))); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/EntityOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/EntityOperations.java index 8852b93cc..044c16fd1 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/EntityOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/EntityOperations.java @@ -43,7 +43,7 @@ import org.springframework.util.StringUtils; @RequiredArgsConstructor class EntityOperations { - public static final String ID_FIELD = "id"; + private static final String ID_FIELD = "id"; private final @NonNull MappingContext, ElasticsearchPersistentProperty> context; @@ -54,7 +54,7 @@ class EntityOperations { * @return */ @SuppressWarnings({ "unchecked", "rawtypes" }) - public Entity forEntity(T entity) { + Entity forEntity(T entity) { Assert.notNull(entity, "Bean must not be null!"); @@ -73,7 +73,7 @@ class EntityOperations { * @return */ @SuppressWarnings({ "unchecked", "rawtypes" }) - public AdaptibleEntity forEntity(T entity, ConversionService conversionService) { + AdaptibleEntity forEntity(T entity, ConversionService conversionService) { Assert.notNull(entity, "Bean must not be null!"); Assert.notNull(conversionService, "ConversionService must not be null!"); @@ -97,7 +97,7 @@ class EntityOperations { * @see ElasticsearchPersistentEntity#getIndexName() * @see ElasticsearchPersistentEntity#getIndexType() */ - public IndexCoordinates determineIndex(Entity entity, @Nullable String index, @Nullable String type) { + IndexCoordinates determineIndex(Entity entity, @Nullable String index, @Nullable String type) { return determineIndex(entity.getPersistentEntity(), index, type); } @@ -114,7 +114,7 @@ class EntityOperations { * @see ElasticsearchPersistentEntity#getIndexName() * @see ElasticsearchPersistentEntity#getIndexType() */ - public IndexCoordinates determineIndex(ElasticsearchPersistentEntity persistentEntity, @Nullable String index, + IndexCoordinates determineIndex(ElasticsearchPersistentEntity persistentEntity, @Nullable String index, @Nullable String type) { return new IndexCoordinates(indexName(persistentEntity, index), typeName(persistentEntity, type)); } @@ -214,20 +214,21 @@ class EntityOperations { * Information and commands on an entity. * * @author Mark Paluch + * @author Christoph Strobl */ interface AdaptibleEntity extends Entity { /** * Returns whether the entity has a parent. * - * @return + * @return {@literal true} if the entity has a parent that has an {@literal id}. */ boolean hasParent(); /** * Returns the parent Id. Can be {@literal null}. * - * @return + * @return can be {@literal null}. */ @Nullable Object getParentId(); @@ -236,8 +237,8 @@ class EntityOperations { * Populates the identifier of the backing entity if it has an identifier property and there's no identifier * currently present. * - * @param id must not be {@literal null}. - * @return + * @param id can be {@literal null}. + * @return can be {@literal null}. */ @Nullable T populateIdIfNecessary(@Nullable Object id); @@ -267,16 +268,16 @@ class EntityOperations { } /** - * Plain entity without applying further mapping. - * * @param + * @author Christoph Strobl + * @since 4.0 */ @RequiredArgsConstructor - private static class UnmappedEntity> implements AdaptibleEntity { + private static class MapBackedEntity> implements AdaptibleEntity { private final T map; - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getId() */ @@ -285,7 +286,7 @@ class EntityOperations { return map.get(ID_FIELD); } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#hasParent() */ @@ -294,7 +295,7 @@ class EntityOperations { return false; } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#getParentId() */ @@ -303,7 +304,7 @@ class EntityOperations { return null; } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#populateIdIfNecessary(java.lang.Object) */ @@ -316,7 +317,7 @@ class EntityOperations { return map; } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#initializeVersionProperty() */ @@ -325,7 +326,7 @@ class EntityOperations { return map; } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#getVersion() */ @@ -335,7 +336,7 @@ class EntityOperations { return null; } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#incrementVersion() */ @@ -344,7 +345,7 @@ class EntityOperations { return map; } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getBean() */ @@ -353,7 +354,7 @@ class EntityOperations { return map; } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#isNew() */ @@ -362,7 +363,7 @@ class EntityOperations { return map.get(ID_FIELD) != null; } - /* + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getPersistentEntity() */ @@ -372,12 +373,26 @@ class EntityOperations { } } + /** + * Plain entity without applying further mapping. + * + * @param + * @since 4.0 + */ + private static class UnmappedEntity> extends MapBackedEntity { + + UnmappedEntity(T map) { + super(map); + } + } + /** * Simple mapped entity without an associated {@link ElasticsearchPersistentEntity}. * * @param + * @since 4.0 */ - private static class SimpleMappedEntity> extends UnmappedEntity { + private static class SimpleMappedEntity> extends MapBackedEntity { SimpleMappedEntity(T map) { super(map); @@ -471,6 +486,10 @@ class EntityOperations { } } + /** + * @param + * @since 4.0 + */ private static class AdaptibleMappedEntity extends MappedEntity implements AdaptibleEntity { private final ElasticsearchPersistentEntity entity; diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index 27c0b1e9f..cf7aadbde 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -62,6 +62,7 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersiste import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; import org.springframework.data.elasticsearch.core.query.CriteriaQuery; import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.core.query.SearchQuery; import org.springframework.data.elasticsearch.core.query.StringQuery; import org.springframework.data.mapping.context.MappingContext; import org.springframework.http.HttpStatus; @@ -240,12 +241,14 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(mappedQuery(query, entity)); - - // TODO: request.source().postFilter(elasticsearchFilter); -- filter query - searchSourceBuilder.version(entity.hasVersionProperty()); // This has been true by default before searchSourceBuilder.trackScores(query.getTrackScores()); + QueryBuilder postFilterQuery = mappedFilterQuery(query, entity); + if (postFilterQuery != null) { + searchSourceBuilder.postFilter(postFilterQuery); + } + if (query.getSourceFilter() != null) { searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes()); } @@ -259,6 +262,10 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera searchSourceBuilder.from((int) offset); searchSourceBuilder.size(query.getPageable().getPageSize()); + } else { + + searchSourceBuilder.from(0); + searchSourceBuilder.size(10000); // this is the index.max_result_window default value } if (query.getIndicesOptions() != null) { @@ -273,9 +280,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera request.source(searchSourceBuilder); - request = prepareSearchRequest(request); - - return doFind(request); + return doFind(prepareSearchRequest(request)); }); } @@ -605,9 +610,13 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera return elasticsearchQuery != null ? elasticsearchQuery : QueryBuilders.matchAllQuery(); } - private QueryBuilder mappedFilterQuery(CriteriaQuery query, ElasticsearchPersistentEntity entity) { + @Nullable + private QueryBuilder mappedFilterQuery(Query query, ElasticsearchPersistentEntity entity) { + + if (query instanceof SearchQuery) { + return ((SearchQuery) query).getFilter(); + } - // TODO: this is actually strange in the RestTemplate:L378 - need to check return null; } diff --git a/src/test/java/org/springframework/data/elasticsearch/TestUtils.java b/src/test/java/org/springframework/data/elasticsearch/TestUtils.java index 6790c9f7a..d8db1e716 100644 --- a/src/test/java/org/springframework/data/elasticsearch/TestUtils.java +++ b/src/test/java/org/springframework/data/elasticsearch/TestUtils.java @@ -18,6 +18,7 @@ package org.springframework.data.elasticsearch; import lombok.SneakyThrows; import java.io.IOException; +import java.time.Duration; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; @@ -41,12 +42,15 @@ public final class TestUtils { private TestUtils() {} + private static final ClientConfiguration CONFIG = ClientConfiguration.builder().connectedToLocalhost() + .withConnectTimeout(Duration.ofSeconds(5)).withSocketTimeout(Duration.ofSeconds(3)).build(); + public static RestHighLevelClient restHighLevelClient() { - return RestClients.create(ClientConfiguration.create("localhost:9200")).rest(); + return RestClients.create(CONFIG).rest(); } public static ReactiveElasticsearchClient reactiveClient() { - return ReactiveRestClients.create(ClientConfiguration.create("localhost:9200")); + return ReactiveRestClients.create(CONFIG); } public static Version serverVersion() { diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java index d7ec06536..28c2a5ff4 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java @@ -17,12 +17,10 @@ package org.springframework.data.elasticsearch.client.reactive; import static org.assertj.core.api.Assertions.*; -import org.junit.Rule; -import org.springframework.data.elasticsearch.ElasticsearchVersion; -import org.springframework.data.elasticsearch.ElasticsearchVersionRule; import reactor.test.StepVerifier; import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -45,10 +43,13 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; +import org.springframework.data.elasticsearch.ElasticsearchVersion; +import org.springframework.data.elasticsearch.ElasticsearchVersionRule; import org.springframework.data.elasticsearch.TestUtils; -import org.springframework.http.HttpHeaders; +import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.lang.Nullable; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; @@ -109,7 +110,10 @@ public class ReactiveElasticsearchClientTests { @Test // DATAES-488 public void pingForUnknownHostShouldReturnFalse() { - DefaultReactiveElasticsearchClient.create(HttpHeaders.EMPTY, "http://localhost:4711").ping() // + DefaultReactiveElasticsearchClient + .create(ClientConfiguration.builder().connectedTo("localhost:4711") + .withConnectTimeout(Duration.ofSeconds(2)).build()) + .ping() // .as(StepVerifier::create) // .expectNext(false) // .verifyComplete(); diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java index 699f1c7fe..592381e02 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java @@ -440,7 +440,7 @@ public class ReactiveElasticsearchTemplateTests { @Test // DATAES-504 @ElasticsearchVersion(asOf = "6.5.0") - public void deleteByQueryShouldReturnZeroIfNothingDeleted() throws Exception { + public void deleteByQueryShouldReturnZeroIfNothingDeleted() { index(randomEntity("test message")); diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java index c1a31c9ef..95e21f879 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java @@ -19,6 +19,8 @@ import static org.assertj.core.api.Assertions.*; import static org.elasticsearch.action.search.SearchRequest.*; import static org.mockito.Mockito.*; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -118,6 +120,36 @@ public class ReactiveElasticsearchTemplateUnitTests { assertThat(captor.getValue().indicesOptions()).isEqualTo(IndicesOptions.LENIENT_EXPAND_OPEN); } + @Test // DATAES-504 + public void findShouldApplyPaginationIfSet() { + + ArgumentCaptor captor = ArgumentCaptor.forClass(SearchRequest.class); + when(client.search(captor.capture())).thenReturn(Flux.empty()); + + + template.find(new CriteriaQuery(new Criteria("*")).setPageable(PageRequest.of(2, 50)), SampleEntity.class) // + .as(StepVerifier::create) // + .verifyComplete(); + + assertThat(captor.getValue().source().from()).isEqualTo(100); + assertThat(captor.getValue().source().size()).isEqualTo(50); + } + + @Test // DATAES-504 + public void findShouldApplyDefaultMaxIfPaginationNotSet() { + + ArgumentCaptor captor = ArgumentCaptor.forClass(SearchRequest.class); + when(client.search(captor.capture())).thenReturn(Flux.empty()); + + + template.find(new CriteriaQuery(new Criteria("*")).setPageable(Pageable.unpaged()), SampleEntity.class) // + .as(StepVerifier::create) // + .verifyComplete(); + + assertThat(captor.getValue().source().from()).isEqualTo(0); + assertThat(captor.getValue().source().size()).isEqualTo(10000); + } + @Test // DATAES-504 public void deleteShouldUseDefaultRefreshPolicy() {