diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index f8e9468d5..a89100e89 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -22,10 +22,6 @@ import io.netty.handler.ssl.IdentityCipherSuiteFilter; import io.netty.handler.ssl.JdkSslContext; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; -import org.elasticsearch.client.indices.GetFieldMappingsRequest; -import org.elasticsearch.client.indices.GetFieldMappingsResponse; -import org.elasticsearch.index.reindex.UpdateByQueryRequest; -import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; @@ -88,6 +84,8 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.GetAliasesResponse; import org.elasticsearch.client.Request; +import org.elasticsearch.client.indices.GetFieldMappingsRequest; +import org.elasticsearch.client.indices.GetFieldMappingsResponse; import org.elasticsearch.client.indices.GetIndexTemplatesRequest; import org.elasticsearch.client.indices.GetIndexTemplatesResponse; import org.elasticsearch.client.indices.IndexTemplatesExistRequest; @@ -100,6 +98,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -116,6 +115,7 @@ import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verif import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices; import org.springframework.data.elasticsearch.client.util.NamedXContents; import org.springframework.data.elasticsearch.client.util.ScrollState; +import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.data.util.Lazy; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -687,8 +687,10 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch } @Override - public Mono getFieldMapping(HttpHeaders headers, GetFieldMappingsRequest getFieldMappingsRequest) { - return sendRequest(getFieldMappingsRequest, requestCreator.getFieldMapping(), GetFieldMappingsResponse.class, headers).next(); + public Mono getFieldMapping(HttpHeaders headers, + GetFieldMappingsRequest getFieldMappingsRequest) { + return sendRequest(getFieldMappingsRequest, requestCreator.getFieldMapping(), GetFieldMappingsResponse.class, + headers).next(); } @Override diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index a87d23f33..270ab61af 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -15,10 +15,6 @@ */ package org.springframework.data.elasticsearch.client.reactive; -import org.elasticsearch.client.indices.GetFieldMappingsRequest; -import org.elasticsearch.client.indices.GetFieldMappingsResponse; -import org.elasticsearch.index.reindex.UpdateByQueryRequest; -import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -55,6 +51,8 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.GetAliasesResponse; +import org.elasticsearch.client.indices.GetFieldMappingsRequest; +import org.elasticsearch.client.indices.GetFieldMappingsResponse; import org.elasticsearch.client.indices.GetIndexTemplatesRequest; import org.elasticsearch.client.indices.GetIndexTemplatesResponse; import org.elasticsearch.client.indices.IndexTemplatesExistRequest; @@ -62,11 +60,13 @@ import org.elasticsearch.client.indices.PutIndexTemplateRequest; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.suggest.Suggest; import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ElasticsearchHost; +import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.http.HttpHeaders; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @@ -601,12 +601,12 @@ public interface ReactiveElasticsearchClient { /** * Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API. * - * @param consumer never {@literal null}. + * @param consumer must not be {@literal null}. * @see Update By - * * Query API on elastic.co + * * Query API on elastic.co * @return a {@link Mono} emitting operation response. */ - default Mono updateBy(Consumer consumer){ + default Mono updateBy(Consumer consumer) { final UpdateByQueryRequest request = new UpdateByQueryRequest(); consumer.accept(request); @@ -618,10 +618,10 @@ public interface ReactiveElasticsearchClient { * * @param updateRequest must not be {@literal null}. * @see Update By - * * Query API on elastic.co + * * Query API on elastic.co * @return a {@link Mono} emitting operation response. */ - default Mono updateBy(UpdateByQueryRequest updateRequest){ + default Mono updateBy(UpdateByQueryRequest updateRequest) { return updateBy(HttpHeaders.EMPTY, updateRequest); } @@ -631,7 +631,7 @@ public interface ReactiveElasticsearchClient { * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. * @param updateRequest must not be {@literal null}. * @see Update By - * * Query API on elastic.co + * * Query API on elastic.co * @return a {@link Mono} emitting operation response. */ Mono updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest); @@ -1210,9 +1210,9 @@ public interface ReactiveElasticsearchClient { * * @param consumer never {@literal null}. * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index - * does not exist. - * @see Indices - * Flush API on elastic.co + * does not exist. + * @see + * Indices Flush API on elastic.co * @since 4.2 */ default Mono getFieldMapping(Consumer consumer) { @@ -1228,8 +1228,8 @@ public interface ReactiveElasticsearchClient { * @param getFieldMappingsRequest must not be {@literal null}. * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index * does not exist. - * @see Indices - * Flush API on elastic.co + * @see + * Indices Flush API on elastic.co * @since 4.2 */ default Mono getFieldMapping(GetFieldMappingsRequest getFieldMappingsRequest) { @@ -1243,15 +1243,16 @@ public interface ReactiveElasticsearchClient { * @param getFieldMappingsRequest must not be {@literal null}. * @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index * does not exist. - * @see Indices - * Flush API on elastic.co + * @see + * Indices Flush API on elastic.co * @since 4.2 */ - Mono getFieldMapping(HttpHeaders headers, GetFieldMappingsRequest getFieldMappingsRequest); + Mono getFieldMapping(HttpHeaders headers, + GetFieldMappingsRequest getFieldMappingsRequest); /** * Execute the given {@link IndicesAliasesRequest} against the {@literal indices} API. - * + * * @param consumer never {@literal null}. * @return a {@link Mono} signalling operation completion. * @since 4.1 @@ -1264,7 +1265,7 @@ public interface ReactiveElasticsearchClient { /** * Execute the given {@link IndicesAliasesRequest} against the {@literal indices} API. - * + * * @param indicesAliasesRequest must not be {@literal null} * @return a {@link Mono} signalling operation completion. * @since 4.1 @@ -1275,7 +1276,7 @@ public interface ReactiveElasticsearchClient { /** * Execute the given {@link IndicesAliasesRequest} against the {@literal indices} API. - * + * * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. * @param indicesAliasesRequest must not be {@literal null} * @return a {@link Mono} signalling operation completion. diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java index dc774bb39..4d39143fe 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java @@ -92,6 +92,9 @@ public interface RequestCreator { return RequestConverters::deleteByQuery; } + /** + * @since 4.2 + */ default Function updateByQuery() { return RequestConverters::updateByQuery; } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java index 56b30c1cb..bf6e40d40 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java @@ -413,7 +413,7 @@ public class RequestConverters { /** * Creates a count request. - * + * * @param countRequest the search defining the data to be counted * @return Elasticsearch count request * @since 4.0 @@ -538,10 +538,9 @@ public class RequestConverters { } public static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) { - String endpoint = endpoint(updateByQueryRequest.indices(), updateByQueryRequest.getDocTypes(), "_update_by_query"); + String endpoint = endpoint(updateByQueryRequest.indices(), "_update_by_query"); Request request = new Request(HttpMethod.POST.name(), endpoint); - Params params = new Params(request) - .withRouting(updateByQueryRequest.getRouting()) // + Params params = new Params(request).withRouting(updateByQueryRequest.getRouting()) // .withPipeline(updateByQueryRequest.getPipeline()) // .withRefresh(updateByQueryRequest.isRefresh()) // .withTimeout(updateByQueryRequest.getTimeout()) // @@ -552,15 +551,19 @@ public class RequestConverters { if (!updateByQueryRequest.isAbortOnVersionConflict()) { params.putParam("conflicts", "proceed"); } + if (updateByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) { params.putParam("scroll_size", Integer.toString(updateByQueryRequest.getBatchSize())); } + if (updateByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) { params.putParam("scroll", updateByQueryRequest.getScrollTime()); } + if (updateByQueryRequest.getMaxDocs() > 0) { params.putParam("max_docs", Integer.toString(updateByQueryRequest.getMaxDocs())); } + request.setEntity(createEntity(updateByQueryRequest, REQUEST_BODY_CONTENT_TYPE)); return request; } @@ -897,13 +900,12 @@ public class RequestConverters { } public static Request getFieldMapping(GetFieldMappingsRequest getFieldMappingsRequest) { - String[] indices = getFieldMappingsRequest.indices() == null ? Strings.EMPTY_ARRAY : getFieldMappingsRequest.indices(); + String[] indices = getFieldMappingsRequest.indices() == null ? Strings.EMPTY_ARRAY + : getFieldMappingsRequest.indices(); String[] fields = getFieldMappingsRequest.fields() == null ? Strings.EMPTY_ARRAY : getFieldMappingsRequest.fields(); - final String endpoint = new EndpointBuilder().addCommaSeparatedPathParts(indices) - .addPathPartAsIs("_mapping").addPathPartAsIs("field") - .addCommaSeparatedPathParts(fields) - .build(); + final String endpoint = new EndpointBuilder().addCommaSeparatedPathParts(indices).addPathPartAsIs("_mapping") + .addPathPartAsIs("field").addCommaSeparatedPathParts(fields).build(); Request request = new Request(HttpMethod.GET.name(), endpoint); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java index fd7c4f2be..2c997e13b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java @@ -22,7 +22,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -373,7 +372,7 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper return request; } - return request.setRefreshPolicy(refreshPolicy.toRequestRefreshPolicy()); + return request.setRefreshPolicy(RequestFactory.toElasticsearchRefreshPolicy(refreshPolicy)); } /** @@ -390,7 +389,7 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper return requestBuilder; } - return requestBuilder.setRefreshPolicy(refreshPolicy.toRequestRefreshPolicy()); + return requestBuilder.setRefreshPolicy(RequestFactory.toElasticsearchRefreshPolicy(refreshPolicy)); } // endregion diff --git a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java index bcb45df7f..b142aad11 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java @@ -18,12 +18,12 @@ package org.springframework.data.elasticsearch.core; import java.util.List; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; -import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.DeleteQuery; import org.springframework.data.elasticsearch.core.query.GetQuery; import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateResponse; import org.springframework.data.elasticsearch.core.routing.RoutingResolver; @@ -208,7 +208,7 @@ public interface DocumentOperations { /** * Bulk update all objects. Will do update. - * + * * @param clazz the entity class * @param queries the queries to execute in bulk * @since 4.1 @@ -304,8 +304,8 @@ public interface DocumentOperations { /** * Update document(s) by query * - * @param updateQuery query defining the update - * @param index the index where to update the records + * @param updateQuery query defining the update, must not be {@literal null} + * @param index the index where to update the records , must not be {@literal null} * @return the update response * @since 4.2 */ diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index 7ef335ce3..e4ff8f251 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -48,11 +48,11 @@ import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverte import org.springframework.data.elasticsearch.core.document.DocumentAdapters; import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; -import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.DeleteQuery; import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateResponse; import org.springframework.data.elasticsearch.support.SearchHitsUtil; @@ -162,7 +162,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { @Override @Nullable public T get(String id, Class clazz, IndexCoordinates index) { - GetRequest request = requestFactory.getRequest(id,routingResolver.getRouting(), index); + GetRequest request = requestFactory.getRequest(id, routingResolver.getRouting(), index); GetResponse response = execute(client -> client.get(request, RequestOptions.DEFAULT)); DocumentCallback callback = new ReadDocumentCallback<>(elasticsearchConverter, clazz, index); @@ -184,7 +184,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { @Override protected boolean doExists(String id, IndexCoordinates index) { - GetRequest request = requestFactory.getRequest(id, routingResolver.getRouting(),index); + GetRequest request = requestFactory.getRequest(id, routingResolver.getRouting(), index); request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE); return execute(client -> client.get(request, RequestOptions.DEFAULT).isExists()); } @@ -225,15 +225,39 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { @Override public UpdateResponse update(UpdateQuery query, IndexCoordinates index) { UpdateRequest request = requestFactory.updateRequest(query, index); - UpdateResponse.Result result = UpdateResponse.Result + + if (query.getRefreshPolicy() == null && getRefreshPolicy() != null) { + request.setRefreshPolicy(RequestFactory.toElasticsearchRefreshPolicy(getRefreshPolicy())); + } + + if (query.getRouting() == null && routingResolver.getRouting() != null) { + request.routing(routingResolver.getRouting()); + } + + UpdateResponse.Result result = UpdateResponse.Result .valueOf(execute(client -> client.update(request, RequestOptions.DEFAULT)).getResult().name()); return new UpdateResponse(result); } @Override public UpdateByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) { + + Assert.notNull(query, "query must not be null"); + Assert.notNull(index, "index must not be null"); + final UpdateByQueryRequest updateByQueryRequest = requestFactory.updateByQueryRequest(query, index); - final BulkByScrollResponse bulkByScrollResponse = execute(client -> client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT)); + + if (query.getRefreshPolicy() == null && getRefreshPolicy() != null) { + updateByQueryRequest.setRefresh(getRefreshPolicy() == RefreshPolicy.IMMEDIATE); + } + + + if (query.getRouting() == null && routingResolver.getRouting() != null) { + updateByQueryRequest.setRouting(routingResolver.getRouting()); + } + + final BulkByScrollResponse bulkByScrollResponse = execute( + client -> client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT)); return UpdateByQueryResponse.of(bulkByScrollResponse); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java index f6653f2e2..8a68f3ccf 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java @@ -249,16 +249,37 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate { @Override public UpdateResponse update(UpdateQuery query, IndexCoordinates index) { + UpdateRequestBuilder updateRequestBuilder = requestFactory.updateRequestBuilderFor(client, query, index); - org.elasticsearch.action.update.UpdateResponse updateResponse = updateRequestBuilder.execute().actionGet(); + + if (query.getRefreshPolicy() == null && getRefreshPolicy() != null) { + updateRequestBuilder.setRefreshPolicy(RequestFactory.toElasticsearchRefreshPolicy(getRefreshPolicy())); + } + + if (query.getRouting() == null && routingResolver.getRouting() != null) { + updateRequestBuilder.setRouting(routingResolver.getRouting()); + } + + org.elasticsearch.action.update.UpdateResponse updateResponse = updateRequestBuilder.execute().actionGet(); UpdateResponse.Result result = UpdateResponse.Result.valueOf(updateResponse.getResult().name()); return new UpdateResponse(result); } @Override public UpdateByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) { - final UpdateByQueryRequestBuilder updateByQueryRequestBuilder = requestFactory.updateByQueryRequestBuilder(client, query, index); - final BulkByScrollResponse bulkByScrollResponse = updateByQueryRequestBuilder.execute().actionGet(); + + Assert.notNull(query, "query must not be null"); + Assert.notNull(index, "index must not be null"); + + final UpdateByQueryRequestBuilder updateByQueryRequestBuilder = requestFactory.updateByQueryRequestBuilder(client, query, index); + + if (query.getRefreshPolicy() == null && getRefreshPolicy() != null) { + updateByQueryRequestBuilder.refresh(getRefreshPolicy() == RefreshPolicy.IMMEDIATE); + } + + // UpdateByQueryRequestBuilder has not parameters to set a routing value + + final BulkByScrollResponse bulkByScrollResponse = updateByQueryRequestBuilder.execute().actionGet(); return UpdateByQueryResponse.of(bulkByScrollResponse); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java index a79ecd217..b4214a6a7 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java @@ -15,7 +15,6 @@ */ package org.springframework.data.elasticsearch.core; -import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -26,6 +25,7 @@ import java.util.List; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateResponse; import org.springframework.util.Assert; @@ -341,8 +341,9 @@ public interface ReactiveDocumentOperations { /** * Update document(s) by query. - * @param updateQuery query defining the update - * @param index the index where to update the records + * + * @param updateQuery query defining the update, must not be {@literal null} + * @param index the index where to update the records, must not be {@literal null} * @return a {@link Mono} emitting the update response * @since 4.2 */ diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index ac52f0156..436ceaee0 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -15,8 +15,6 @@ */ package org.springframework.data.elasticsearch.core; -import org.elasticsearch.index.reindex.UpdateByQueryRequest; -import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -43,6 +41,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.SuggestBuilder; @@ -75,6 +74,7 @@ import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm; +import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateResponse; import org.springframework.data.elasticsearch.core.routing.DefaultRoutingResolver; @@ -82,7 +82,6 @@ import org.springframework.data.elasticsearch.core.routing.RoutingResolver; import org.springframework.data.elasticsearch.support.VersionInfo; import org.springframework.data.mapping.PersistentPropertyAccessor; import org.springframework.data.mapping.callback.ReactiveEntityCallbacks; -import org.springframework.data.mapping.context.MappingContext; import org.springframework.http.HttpStatus; import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; @@ -556,6 +555,15 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera return Mono.defer(() -> { UpdateRequest request = requestFactory.updateRequest(updateQuery, index); + + if (updateQuery.getRefreshPolicy() == null && refreshPolicy != null) { + request.setRefreshPolicy(RequestFactory.toElasticsearchRefreshPolicy(refreshPolicy)); + } + + if (updateQuery.getRouting() == null && routingResolver.getRouting() != null) { + request.routing(routingResolver.getRouting()); + } + return Mono.from(execute(client -> client.update(request))) .map(response -> new UpdateResponse(UpdateResponse.Result.valueOf(response.getResult().name()))); }); @@ -564,11 +572,21 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera @Override public Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) { - Assert.notNull(updateQuery, "UpdateQuery must not be null"); + Assert.notNull(updateQuery, "updateQuery must not be null"); Assert.notNull(index, "Index must not be null"); return Mono.defer(() -> { + final UpdateByQueryRequest request = requestFactory.updateByQueryRequest(updateQuery, index); + + if (updateQuery.getRefreshPolicy() == null && refreshPolicy != null) { + request.setRefresh(refreshPolicy == RefreshPolicy.IMMEDIATE); + } + + if (updateQuery.getRouting() == null && routingResolver.getRouting() != null) { + request.setRouting(routingResolver.getRouting()); + } + return Mono.from(execute(client -> client.updateBy(request))); }); } @@ -681,7 +699,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera return request; } - return request.setRefreshPolicy(refreshPolicy.toRequestRefreshPolicy()); + return request.setRefreshPolicy(RequestFactory.toElasticsearchRefreshPolicy(refreshPolicy)); } // endregion diff --git a/src/main/java/org/springframework/data/elasticsearch/core/RefreshPolicy.java b/src/main/java/org/springframework/data/elasticsearch/core/RefreshPolicy.java index c1ac99a77..3bb2e189e 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/RefreshPolicy.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/RefreshPolicy.java @@ -15,27 +15,12 @@ */ package org.springframework.data.elasticsearch.core; -import org.elasticsearch.action.support.WriteRequest; - /** - * Enum mirroring org.elasticsearch.action.support.WriteRequest.RefreshPolicy to keep Elasticsearch classes out of our - * API. - * + * Enum defining the refresh policy. + * * @author Peter-Josef Meisch * @since 4.2 */ public enum RefreshPolicy { NONE, IMMEDIATE, WAIT_UNTIL; - - WriteRequest.RefreshPolicy toRequestRefreshPolicy() { - switch (this) { - case IMMEDIATE: - return WriteRequest.RefreshPolicy.IMMEDIATE; - case WAIT_UNTIL: - return WriteRequest.RefreshPolicy.WAIT_UNTIL; - case NONE: - default: - return WriteRequest.RefreshPolicy.NONE; - } - } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java index 00df712d3..d11828cf3 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java @@ -46,6 +46,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.client.Client; @@ -1442,8 +1443,8 @@ class RequestFactory { updateRequest.setIfPrimaryTerm(query.getIfPrimaryTerm()); } - if (query.getRefresh() != null) { - updateRequest.setRefreshPolicy(query.getRefresh().name().toLowerCase()); + if (query.getRefreshPolicy() != null) { + updateRequest.setRefreshPolicy(RequestFactory.toElasticsearchRefreshPolicy(query.getRefreshPolicy())); } if (query.getRetryOnConflict() != null) { @@ -1516,8 +1517,8 @@ class RequestFactory { updateRequestBuilder.setIfPrimaryTerm(query.getIfPrimaryTerm()); } - if (query.getRefresh() != null) { - updateRequestBuilder.setRefreshPolicy(query.getRefresh().name().toLowerCase()); + if (query.getRefreshPolicy() != null) { + updateRequestBuilder.setRefreshPolicy(RequestFactory.toElasticsearchRefreshPolicy(query.getRefreshPolicy())); } if (query.getRetryOnConflict() != null) { @@ -1536,9 +1537,9 @@ class RequestFactory { } public UpdateByQueryRequest updateByQueryRequest(UpdateQuery query, IndexCoordinates index) { + String indexName = index.getIndexName(); final UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName); - updateByQueryRequest.setScript(getScript(query)); if (query.getAbortOnVersionConflict() != null) { @@ -1558,7 +1559,7 @@ class RequestFactory { updateByQueryRequest.setIndicesOptions(queryQuery.getIndicesOptions()); } - if(queryQuery.getScrollTime() != null) { + if (queryQuery.getScrollTime() != null) { updateByQueryRequest.setScroll(TimeValue.timeValueMillis(queryQuery.getScrollTime().toMillis())); } } @@ -1575,8 +1576,8 @@ class RequestFactory { updateByQueryRequest.setPipeline(query.getPipeline()); } - if (query.getRefresh() != null) { - updateByQueryRequest.setRefresh(Boolean.getBoolean(query.getRefresh().name().toLowerCase())); + if (query.getRefreshPolicy() != null) { + updateByQueryRequest.setRefresh(query.getRefreshPolicy() == RefreshPolicy.IMMEDIATE); } if (query.getRequestsPerSecond() != null) { @@ -1606,13 +1607,15 @@ class RequestFactory { return updateByQueryRequest; } - public UpdateByQueryRequestBuilder updateByQueryRequestBuilder(Client client, UpdateQuery query, IndexCoordinates index) { + public UpdateByQueryRequestBuilder updateByQueryRequestBuilder(Client client, UpdateQuery query, + IndexCoordinates index) { + String indexName = index.getIndexName(); - final UpdateByQueryRequestBuilder updateByQueryRequestBuilder = new UpdateByQueryRequestBuilder(client, UpdateByQueryAction.INSTANCE); + final UpdateByQueryRequestBuilder updateByQueryRequestBuilder = new UpdateByQueryRequestBuilder(client, + UpdateByQueryAction.INSTANCE); updateByQueryRequestBuilder.source(indexName); - updateByQueryRequestBuilder.script(getScript(query)); if (query.getAbortOnVersionConflict() != null) { @@ -1625,15 +1628,15 @@ class RequestFactory { if (query.getQuery() != null) { final Query queryQuery = query.getQuery(); - updateByQueryRequestBuilder.filter(getQuery(queryQuery)); if (queryQuery.getIndicesOptions() != null) { updateByQueryRequestBuilder.source().setIndicesOptions(queryQuery.getIndicesOptions()); } - if(queryQuery.getScrollTime() != null) { - updateByQueryRequestBuilder.source().setScroll(TimeValue.timeValueMillis(queryQuery.getScrollTime().toMillis())); + if (queryQuery.getScrollTime() != null) { + updateByQueryRequestBuilder.source() + .setScroll(TimeValue.timeValueMillis(queryQuery.getScrollTime().toMillis())); } } @@ -1649,8 +1652,8 @@ class RequestFactory { updateByQueryRequestBuilder.setPipeline(query.getPipeline()); } - if (query.getRefresh() != null) { - updateByQueryRequestBuilder.refresh(Boolean.getBoolean(query.getRefresh().name().toLowerCase())); + if (query.getRefreshPolicy() != null) { + updateByQueryRequestBuilder.refresh(query.getRefreshPolicy() == RefreshPolicy.IMMEDIATE); } if (query.getRequestsPerSecond() != null) { @@ -1670,7 +1673,8 @@ class RequestFactory { } if (query.getTimeout() != null) { - updateByQueryRequestBuilder.source().setTimeout(TimeValue.parseTimeValue(query.getTimeout(), getClass().getSimpleName() + ".timeout")); + updateByQueryRequestBuilder.source() + .setTimeout(TimeValue.parseTimeValue(query.getTimeout(), getClass().getSimpleName() + ".timeout")); } return updateByQueryRequestBuilder; @@ -1717,6 +1721,17 @@ class RequestFactory { return elasticsearchFilter; } + public static WriteRequest.RefreshPolicy toElasticsearchRefreshPolicy(RefreshPolicy refreshPolicy) { + switch (refreshPolicy) { + case IMMEDIATE: + return WriteRequest.RefreshPolicy.IMMEDIATE; + case WAIT_UNTIL: + return WriteRequest.RefreshPolicy.WAIT_UNTIL; + case NONE: + default: + return WriteRequest.RefreshPolicy.NONE; + } + } // region response stuff /** @@ -1797,7 +1812,8 @@ class RequestFactory { return entity.hasSeqNoPrimaryTermProperty(); } - private org.elasticsearch.script.ScriptType getScriptType(ScriptType scriptType) { + private org.elasticsearch.script.ScriptType getScriptType(@Nullable ScriptType scriptType) { + if (scriptType == null || ScriptType.INLINE.equals(scriptType)) { return org.elasticsearch.script.ScriptType.INLINE; } else { @@ -1808,14 +1824,12 @@ class RequestFactory { @Nullable private Script getScript(UpdateQuery query) { if (ScriptType.STORED.equals(query.getScriptType()) && query.getScriptName() != null) { - final Map params = Optional.ofNullable(query.getParams()) - .orElse(new HashMap<>()); + final Map params = Optional.ofNullable(query.getParams()).orElse(new HashMap<>()); return new Script(getScriptType(ScriptType.STORED), null, query.getScriptName(), params); } - if (ScriptType.INLINE.equals(query.getScriptType()) && query.getScript() != null){ - final Map params = Optional.ofNullable(query.getParams()) - .orElse(new HashMap<>()); + if (ScriptType.INLINE.equals(query.getScriptType()) && query.getScript() != null) { + final Map params = Optional.ofNullable(query.getParams()).orElse(new HashMap<>()); return new Script(getScriptType(ScriptType.INLINE), query.getLang(), query.getScript(), params); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ScriptType.java b/src/main/java/org/springframework/data/elasticsearch/core/ScriptType.java index b0bf11d56..d894635a2 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ScriptType.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ScriptType.java @@ -16,7 +16,7 @@ package org.springframework.data.elasticsearch.core; /** - * Enum mirroring org.elasticsearch.script.ScriptType to keep Elasticsearch classes out of our API. + * Define script types for update queries. * * @author Farid Faoudi * @since 4.2 diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateByQueryResponse.java b/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateByQueryResponse.java index b2e629329..ac7b471cc 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateByQueryResponse.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateByQueryResponse.java @@ -15,15 +15,16 @@ */ package org.springframework.data.elasticsearch.core.query; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.springframework.lang.Nullable; -import java.util.List; -import java.util.stream.Collectors; - /** - * Class mirroring org.elasticsearch.index.reindex.BulkByScrollResponse to keep Elasticsearch classes out of our API. + * Response of an update by query operation. * * @author Farid Faoudi * @since 4.2 @@ -44,8 +45,8 @@ public class UpdateByQueryResponse { private final List failures; private UpdateByQueryResponse(long took, boolean timedOut, long total, long updated, long deleted, int batches, - long versionConflicts, long noops, long bulkRetries, long searchRetries, - @Nullable String reasonCancelled, List failures) { + long versionConflicts, long noops, long bulkRetries, long searchRetries, @Nullable String reasonCancelled, + List failures) { this.took = took; this.timedOut = timedOut; this.total = total; @@ -110,7 +111,8 @@ public class UpdateByQueryResponse { } /** - * The number of documents that were ignored because the script used for the update by query returned a noop value for ctx.op. + * The number of documents that were ignored because the script used for the update by query returned a noop value for + * ctx.op. */ public long getNoops() { return noops; @@ -139,7 +141,8 @@ public class UpdateByQueryResponse { } /** - * All of the bulk failures. Version conflicts are only included if the request sets abortOnVersionConflict to true (the default). + * All of the bulk failures. Version conflicts are only included if the request sets abortOnVersionConflict to true + * (the default). */ public List getFailures() { return failures; @@ -155,40 +158,40 @@ public class UpdateByQueryResponse { } public static UpdateByQueryResponse of(BulkByScrollResponse bulkByScrollResponse) { - final List failures = bulkByScrollResponse.getBulkFailures() - .stream() - .map(Failure::of) - .collect(Collectors.toList()); + final List failures = bulkByScrollResponse.getBulkFailures() // + .stream() // + .map(Failure::of) // + .collect(Collectors.toList()); // - return UpdateByQueryResponse.builder() - .withTook(bulkByScrollResponse.getTook().getMillis()) - .withTimedOut(bulkByScrollResponse.isTimedOut()) - .withTotal(bulkByScrollResponse.getTotal()) - .withUpdated(bulkByScrollResponse.getUpdated()) - .withDeleted(bulkByScrollResponse.getDeleted()) - .withBatches(bulkByScrollResponse.getBatches()) - .withVersionConflicts(bulkByScrollResponse.getVersionConflicts()) - .withNoops(bulkByScrollResponse.getNoops()) - .withBulkRetries(bulkByScrollResponse.getBulkRetries()) - .withSearchRetries(bulkByScrollResponse.getSearchRetries()) - .withReasonCancelled(bulkByScrollResponse.getReasonCancelled()) - .withFailures(failures) - .build(); + return UpdateByQueryResponse.builder() // + .withTook(bulkByScrollResponse.getTook().getMillis()) // + .withTimedOut(bulkByScrollResponse.isTimedOut()) // + .withTotal(bulkByScrollResponse.getTotal()) // + .withUpdated(bulkByScrollResponse.getUpdated()) // + .withDeleted(bulkByScrollResponse.getDeleted()) // + .withBatches(bulkByScrollResponse.getBatches()) // + .withVersionConflicts(bulkByScrollResponse.getVersionConflicts()) // + .withNoops(bulkByScrollResponse.getNoops()) // + .withBulkRetries(bulkByScrollResponse.getBulkRetries()) // + .withSearchRetries(bulkByScrollResponse.getSearchRetries()) // + .withReasonCancelled(bulkByScrollResponse.getReasonCancelled()) // + .withFailures(failures) // + .build(); // } public static class Failure { - private final String index; - private final String type; - private final String id; - private final Exception cause; - private final Integer status; - private final Long seqNo; - private final Long term; - private final Boolean aborted; + @Nullable private final String index; + @Nullable private final String type; + @Nullable private final String id; + @Nullable private final Exception cause; + @Nullable private final Integer status; + @Nullable private final Long seqNo; + @Nullable private final Long term; + @Nullable private final Boolean aborted; - private Failure(String index, String type, String id, Exception cause, Integer status, Long seqNo, Long term, - Boolean aborted) { + private Failure(@Nullable String index, @Nullable String type, @Nullable String id, @Nullable Exception cause, + @Nullable Integer status, @Nullable Long seqNo, @Nullable Long term, @Nullable Boolean aborted) { this.index = index; this.type = type; this.id = id; @@ -199,34 +202,42 @@ public class UpdateByQueryResponse { this.aborted = aborted; } + @Nullable public String getIndex() { return index; } + @Nullable public String getType() { return type; } + @Nullable public String getId() { return id; } + @Nullable public Exception getCause() { return cause; } + @Nullable public Integer getStatus() { return status; } + @Nullable public Long getSeqNo() { return seqNo; } + @Nullable public Long getTerm() { return term; } + @Nullable public Boolean getAborted() { return aborted; } @@ -242,37 +253,37 @@ public class UpdateByQueryResponse { /** * Create a new {@link Failure} from {@link BulkItemResponse.Failure} + * * @param failure {@link BulkItemResponse.Failure} to translate * @return a new {@link Failure} */ public static Failure of(BulkItemResponse.Failure failure) { - return builder() - .withIndex(failure.getIndex()) - .withType(failure.getType()) - .withId(failure.getId()) - .withStatus(failure.getStatus().getStatus()) - .withAborted(failure.isAborted()) - .withCause(failure.getCause()) - .withSeqNo(failure.getSeqNo()) - .withTerm(failure.getTerm()) - .build(); + return builder() // + .withIndex(failure.getIndex()) // + .withType(failure.getType()) // + .withId(failure.getId()) // + .withStatus(failure.getStatus().getStatus()) // + .withAborted(failure.isAborted()) // + .withCause(failure.getCause()) // + .withSeqNo(failure.getSeqNo()) // + .withTerm(failure.getTerm()) // + .build(); // } /** * Builder for {@link Failure} */ public static final class FailureBuilder { - private String index; - private String type; - private String id; - private Exception cause; - private Integer status; - private Long seqNo; - private Long term; - private Boolean aborted; + @Nullable private String index; + @Nullable private String type; + @Nullable private String id; + @Nullable private Exception cause; + @Nullable private Integer status; + @Nullable private Long seqNo; + @Nullable private Long term; + @Nullable private Boolean aborted; - private FailureBuilder() { - } + private FailureBuilder() {} public FailureBuilder withIndex(String index) { this.index = index; @@ -332,10 +343,9 @@ public class UpdateByQueryResponse { private long bulkRetries; private long searchRetries; @Nullable private String reasonCancelled; - private List failures; + private List failures = Collections.emptyList(); - private UpdateByQueryResponseBuilder() { - } + private UpdateByQueryResponseBuilder() {} public UpdateByQueryResponseBuilder withTook(long took) { this.took = took; @@ -398,7 +408,8 @@ public class UpdateByQueryResponse { } public UpdateByQueryResponse build() { - return new UpdateByQueryResponse(took, timedOut, total, updated, deleted, batches, versionConflicts, noops, bulkRetries, searchRetries, reasonCancelled, failures); + return new UpdateByQueryResponse(took, timedOut, total, updated, deleted, batches, versionConflicts, noops, + bulkRetries, searchRetries, reasonCancelled, failures); } } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateQuery.java b/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateQuery.java index be859bce6..324f4a708 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateQuery.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateQuery.java @@ -18,13 +18,14 @@ package org.springframework.data.elasticsearch.core.query; import java.util.List; import java.util.Map; +import org.springframework.data.elasticsearch.core.RefreshPolicy; import org.springframework.data.elasticsearch.core.ScriptType; import org.springframework.data.elasticsearch.core.document.Document; import org.springframework.lang.Nullable; /** * Defines an update request. - * + * * @author Rizwan Idrees * @author Mohsin Husen * @author Peter-Josef Meisch @@ -33,35 +34,35 @@ import org.springframework.lang.Nullable; */ public class UpdateQuery { - private String id; - @Nullable private String script; - @Nullable private Map params; - @Nullable private Document document; - @Nullable private Document upsert; - @Nullable private String lang; - @Nullable private String routing; - @Nullable private Boolean scriptedUpsert; - @Nullable private Boolean docAsUpsert; - @Nullable private Boolean fetchSource; - @Nullable private List fetchSourceIncludes; - @Nullable private List fetchSourceExcludes; - @Nullable private Integer ifSeqNo; - @Nullable private Integer ifPrimaryTerm; - @Nullable private Refresh refresh; - @Nullable private Integer retryOnConflict; - @Nullable private String timeout; - @Nullable private String waitForActiveShards; - @Nullable private Query query; - @Nullable private Boolean abortOnVersionConflict; - @Nullable private Integer batchSize; - @Nullable private Integer maxDocs; - @Nullable private Integer maxRetries; - @Nullable private String pipeline; - @Nullable private Float requestsPerSecond; - @Nullable private Boolean shouldStoreResult; - @Nullable private Integer slices; - @Nullable private ScriptType scriptType; - @Nullable private String scriptName; + private final String id; + @Nullable private final String script; + @Nullable private final Map params; + @Nullable private final Document document; + @Nullable private final Document upsert; + @Nullable private final String lang; + @Nullable private final String routing; + @Nullable private final Boolean scriptedUpsert; + @Nullable private final Boolean docAsUpsert; + @Nullable private final Boolean fetchSource; + @Nullable private final List fetchSourceIncludes; + @Nullable private final List fetchSourceExcludes; + @Nullable private final Integer ifSeqNo; + @Nullable private final Integer ifPrimaryTerm; + @Nullable private final RefreshPolicy refreshPolicy; + @Nullable private final Integer retryOnConflict; + @Nullable private final String timeout; + @Nullable private final String waitForActiveShards; + @Nullable private final Query query; + @Nullable private final Boolean abortOnVersionConflict; + @Nullable private final Integer batchSize; + @Nullable private final Integer maxDocs; + @Nullable private final Integer maxRetries; + @Nullable private final String pipeline; + @Nullable private final Float requestsPerSecond; + @Nullable private final Boolean shouldStoreResult; + @Nullable private final Integer slices; + @Nullable private final ScriptType scriptType; + @Nullable private final String scriptName; public static Builder builder(String id) { return new Builder(id); @@ -75,11 +76,12 @@ public class UpdateQuery { @Nullable Document document, @Nullable Document upsert, @Nullable String lang, @Nullable String routing, @Nullable Boolean scriptedUpsert, @Nullable Boolean docAsUpsert, @Nullable Boolean fetchSource, @Nullable List fetchSourceIncludes, @Nullable List fetchSourceExcludes, @Nullable Integer ifSeqNo, - @Nullable Integer ifPrimaryTerm, @Nullable Refresh refresh, @Nullable Integer retryOnConflict, + @Nullable Integer ifPrimaryTerm, @Nullable RefreshPolicy refreshPolicy, @Nullable Integer retryOnConflict, @Nullable String timeout, @Nullable String waitForActiveShards, @Nullable Query query, @Nullable Boolean abortOnVersionConflict, @Nullable Integer batchSize, @Nullable Integer maxDocs, @Nullable Integer maxRetries, @Nullable String pipeline, @Nullable Float requestsPerSecond, - @Nullable Boolean shouldStoreResult, @Nullable Integer slices, @Nullable ScriptType scriptType, @Nullable String scriptName) { + @Nullable Boolean shouldStoreResult, @Nullable Integer slices, @Nullable ScriptType scriptType, + @Nullable String scriptName) { this.id = id; this.script = script; @@ -95,7 +97,7 @@ public class UpdateQuery { this.fetchSourceExcludes = fetchSourceExcludes; this.ifSeqNo = ifSeqNo; this.ifPrimaryTerm = ifPrimaryTerm; - this.refresh = refresh; + this.refreshPolicy = refreshPolicy; this.retryOnConflict = retryOnConflict; this.timeout = timeout; this.waitForActiveShards = waitForActiveShards; @@ -182,8 +184,8 @@ public class UpdateQuery { } @Nullable - public Refresh getRefresh() { - return refresh; + public RefreshPolicy getRefreshPolicy() { + return refreshPolicy; } @Nullable @@ -269,7 +271,7 @@ public class UpdateQuery { @Nullable private Boolean fetchSource; @Nullable private Integer ifSeqNo; @Nullable private Integer ifPrimaryTerm; - @Nullable private Refresh refresh; + @Nullable private RefreshPolicy refreshPolicy; @Nullable private Integer retryOnConflict; @Nullable private String timeout; @Nullable String waitForActiveShards; @@ -350,8 +352,8 @@ public class UpdateQuery { return this; } - public Builder withRefresh(Refresh refresh) { - this.refresh = refresh; + public Builder withRefreshPolicy(RefreshPolicy refreshPolicy) { + this.refreshPolicy = refreshPolicy; return this; } @@ -437,16 +439,9 @@ public class UpdateQuery { } return new UpdateQuery(id, script, params, document, upsert, lang, routing, scriptedUpsert, docAsUpsert, - fetchSource, fetchSourceIncludes, fetchSourceExcludes, ifSeqNo, ifPrimaryTerm, refresh, retryOnConflict, + fetchSource, fetchSourceIncludes, fetchSourceExcludes, ifSeqNo, ifPrimaryTerm, refreshPolicy, retryOnConflict, timeout, waitForActiveShards, query, abortOnVersionConflict, batchSize, maxDocs, maxRetries, pipeline, requestsPerSecond, shouldStoreResult, slices, scriptType, scriptName); } } - - /* - * names will be lowercased on building the query. - */ - public enum Refresh { - True, False, Wait_For - } } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java index 1f50c26f9..b6eb439bc 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java @@ -18,11 +18,6 @@ package org.springframework.data.elasticsearch.client.reactive; import static org.assertj.core.api.Assertions.*; import lombok.SneakyThrows; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; -import org.elasticsearch.index.reindex.UpdateByQueryRequest; -import org.elasticsearch.script.Script; -import org.elasticsearch.script.ScriptType; -import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -38,6 +33,7 @@ import java.util.stream.IntStream; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; @@ -50,7 +46,10 @@ import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -68,6 +67,7 @@ import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperatio import org.springframework.data.elasticsearch.core.ReactiveIndexOperations; import org.springframework.data.elasticsearch.core.document.Document; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration; import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; import org.springframework.http.HttpHeaders; @@ -107,10 +107,8 @@ public class ReactiveElasticsearchClientIntegrationTests { // (Object...) static final Map DOC_SOURCE; - @Autowired - ReactiveElasticsearchClient client; - @Autowired - ReactiveElasticsearchOperations operations; + @Autowired ReactiveElasticsearchClient client; + @Autowired ReactiveElasticsearchOperations operations; static { @@ -473,59 +471,61 @@ public class ReactiveElasticsearchClientIntegrationTests { final String script = "ctx._source['firstname'] = params['newFirstname']"; final Map params = Collections.singletonMap("newFirstname", "arrow"); - final UpdateByQueryRequest request = new UpdateByQueryRequest(INDEX_I) - .setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("lastname", "fallstar"))) - .setAbortOnVersionConflict(true) - .setRefresh(true) - .setScript(new Script(ScriptType.INLINE, "painless", script, params)); + final UpdateByQueryRequest request = new UpdateByQueryRequest(INDEX_I) // + .setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("lastname", "fallstar"))) // + .setAbortOnVersionConflict(true) // + .setRefresh(true) // + .setScript(new Script(ScriptType.INLINE, "painless", script, params)); // - client.updateBy(request) - .map(UpdateByQueryResponse::getUpdated) - .as(StepVerifier::create) - .expectNext(2L) - .verifyComplete(); + client.updateBy(request) // + .map(UpdateByQueryResponse::getUpdated) // + .as(StepVerifier::create) // + .expectNext(2L) // + .verifyComplete(); // final SearchRequest searchUpdatedRequest = new SearchRequest(INDEX_I) // - .source(new SearchSourceBuilder() - .query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("firstname", "arrow")))); + .source(new SearchSourceBuilder() // + .query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("firstname", "arrow"))) // + ); - client.search(searchUpdatedRequest) - .collectList() - .map(List::size) - .as(StepVerifier::create) - .expectNext(2) - .verifyComplete(); + client.search(searchUpdatedRequest) // + .collectList() // + .map(List::size) // + .as(StepVerifier::create) // + .expectNext(2) // + .verifyComplete(); // } @Test // #1446 void updateByShouldUpdateExistingDocument() { addSourceDocument().to(INDEX_I); - final String script = "ctx._source['firstname'] = params['newFirstname']"; - final Map params = Collections.singletonMap("newFirstname", "arrow"); + String script = "ctx._source['firstname'] = params['newFirstname']"; + Map params = Collections.singletonMap("newFirstname", "arrow"); - final UpdateByQueryRequest request = new UpdateByQueryRequest(INDEX_I) - .setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("lastname", "non_existing_lastname"))) - .setAbortOnVersionConflict(true) - .setRefresh(true) - .setScript(new Script(ScriptType.INLINE, "painless", script, params)); + UpdateByQueryRequest request = new UpdateByQueryRequest(INDEX_I) // + .setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("lastname", "non_existing_lastname"))) // + .setAbortOnVersionConflict(true) // + .setRefresh(true) // + .setScript(new Script(ScriptType.INLINE, "painless", script, params)); // - client.updateBy(request) - .map(UpdateByQueryResponse::getUpdated) - .as(StepVerifier::create) - .expectNext(0L) - .verifyComplete(); + client.updateBy(request) // + .map(UpdateByQueryResponse::getUpdated) // + .as(StepVerifier::create) // + .expectNext(0L) // + .verifyComplete(); // - SearchRequest searchUpdatedRequest = new SearchRequest(INDEX_I) // - .source(new SearchSourceBuilder() - .query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("firstname", "arrow")))); + SearchRequest searchUpdatedRequest = new SearchRequest(INDEX_I) // // + .source(new SearchSourceBuilder() // + .query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("firstname", "arrow"))) // + ); // - client.search(searchUpdatedRequest) - .collectList() - .map(List::size) - .as(StepVerifier::create) - .expectNext(0) - .verifyComplete(); + client.search(searchUpdatedRequest) // + .collectList() // + .map(List::size) // + .as(StepVerifier::create) // + .expectNext(0) // + .verifyComplete(); // } @Test // DATAES-510 @@ -812,19 +812,18 @@ public class ReactiveElasticsearchClientIntegrationTests { Map jsonMap = Collections.singletonMap("properties", properties); - final PutMappingRequest putMappingRequest = new PutMappingRequest(INDEX_I) - .source(jsonMap); + PutMappingRequest putMappingRequest = new PutMappingRequest(INDEX_I).source(jsonMap); client.indices().putMapping(putMappingRequest).block(); client.indices().getFieldMapping(request -> request.indices(INDEX_I).fields("message1", "message2")) - .as(StepVerifier::create) - .consumeNextWith(it -> { + .as(StepVerifier::create).consumeNextWith(it -> { assertThat(it.mappings().get(INDEX_I).keySet().size()).isEqualTo(2); - assertThat(it.mappings().get(INDEX_I).get("message1").sourceAsMap()).isEqualTo(Collections.singletonMap("message1", Collections.singletonMap("type", "text"))); - assertThat(it.mappings().get(INDEX_I).get("message2").sourceAsMap()).isEqualTo(Collections.singletonMap("message2", Collections.singletonMap("type", "keyword"))); - }) - .verifyComplete(); + assertThat(it.mappings().get(INDEX_I).get("message1").sourceAsMap()) + .isEqualTo(Collections.singletonMap("message1", Collections.singletonMap("type", "text"))); + assertThat(it.mappings().get(INDEX_I).get("message2").sourceAsMap()) + .isEqualTo(Collections.singletonMap("message2", Collections.singletonMap("type", "keyword"))); + }).verifyComplete(); } @Test // #1640 @@ -835,24 +834,20 @@ public class ReactiveElasticsearchClientIntegrationTests { Map jsonMap = Collections.singletonMap("properties", Collections.singletonMap("message", Collections.singletonMap("type", "text"))); - final PutMappingRequest putMappingRequest = new PutMappingRequest(INDEX_I) - .source(jsonMap); + PutMappingRequest putMappingRequest = new PutMappingRequest(INDEX_I).source(jsonMap); client.indices().putMapping(putMappingRequest).block(); - client.indices().getFieldMapping(request -> request.indices(INDEX_I).fields("message1")) - .as(StepVerifier::create) + client.indices().getFieldMapping(request -> request.indices(INDEX_I).fields("message1")).as(StepVerifier::create) .consumeNextWith(it -> { assertThat(it.mappings().get(INDEX_I).keySet().size()).isZero(); - }) - .verifyComplete(); + }).verifyComplete(); } @Test // #1640 void getFieldMappingNonExistingIndex() { - client.indices().getFieldMapping(request -> request.indices(INDEX_I).fields("message1")) - .as(StepVerifier::create) + client.indices().getFieldMapping(request -> request.indices(INDEX_I).fields("message1")).as(StepVerifier::create) .verifyError(ElasticsearchStatusException.class); } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplateTests.java index 285f824c7..513629edc 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplateTests.java @@ -40,7 +40,6 @@ import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.json.JSONException; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; -import org.skyscreamer.jsonassert.JSONAssert; import org.springframework.data.annotation.Id; import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; import org.springframework.data.elasticsearch.annotations.Document; @@ -50,7 +49,6 @@ import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchRestTemplateConfiguration; -import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; import org.springframework.test.context.ContextConfiguration; /** @@ -105,7 +103,7 @@ public class ElasticsearchRestTemplateTests extends ElasticsearchTemplateTests { .withIfPrimaryTerm(13) // .withScript("script")// .withLang("lang") // - .withRefresh(UpdateQuery.Refresh.Wait_For) // + .withRefreshPolicy(RefreshPolicy.WAIT_UNTIL) // .withRetryOnConflict(7) // .withTimeout("4711s") // .withWaitForActiveShards("all") // @@ -132,41 +130,40 @@ public class ElasticsearchRestTemplateTests extends ElasticsearchTemplateTests { @Test // #1446 void shouldUseAllOptionsFromUpdateByQuery() throws JSONException { - // given - NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()) - .withIndicesOptions(IndicesOptions.lenientExpandOpen()) - .build(); + + NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()) // + .withIndicesOptions(IndicesOptions.lenientExpandOpen()) // + .build(); // searchQuery.setScrollTime(Duration.ofMillis(1000)); - final UpdateQuery updateQuery = UpdateQuery.builder(searchQuery) - .withAbortOnVersionConflict(true) - .withBatchSize(10) - .withMaxDocs(12) - .withMaxRetries(3) - .withPipeline("pipeline") - .withRequestsPerSecond(5F) - .withShouldStoreResult(false) - .withSlices(4) - .withScriptType(ScriptType.INLINE) - .withScript("script") - .withLang("painless") - .build(); + UpdateQuery updateQuery = UpdateQuery.builder(searchQuery) // + .withAbortOnVersionConflict(true) // + .withBatchSize(10) // + .withMaxDocs(12) // + .withMaxRetries(3) // + .withPipeline("pipeline") // + .withRequestsPerSecond(5F) // + .withShouldStoreResult(false) // + .withSlices(4) // + .withScriptType(ScriptType.INLINE) // + .withScript("script") // + .withLang("painless") // + .build(); // - final String expectedSearchRequest = '{' + // + String expectedSearchRequest = '{' + // " \"size\": 10," + // " \"query\": {" + // " \"match_all\": {" + // " \"boost\": 1.0" + // - " }" + - " }" + - '}'; + " }" + " }" + '}'; // when - final UpdateByQueryRequest request = getRequestFactory().updateByQueryRequest(updateQuery, IndexCoordinates.of("index")); + UpdateByQueryRequest request = getRequestFactory().updateByQueryRequest(updateQuery, IndexCoordinates.of("index")); // then assertThat(request).isNotNull(); - assertThat(request.getSearchRequest().indicesOptions()).usingRecursiveComparison().isEqualTo(IndicesOptions.lenientExpandOpen()); + assertThat(request.getSearchRequest().indicesOptions()).usingRecursiveComparison() + .isEqualTo(IndicesOptions.lenientExpandOpen()); assertThat(request.getScrollTime().getMillis()).isEqualTo(1000); assertEquals(request.getSearchRequest().source().toString(), expectedSearchRequest, false); assertThat(request.isAbortOnVersionConflict()).isTrue(); diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTransportTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTransportTemplateTests.java index 157ed580e..864d58fe1 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTransportTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTransportTemplateTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.json.JSONException; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -107,18 +108,18 @@ public class ElasticsearchTransportTemplateTests extends ElasticsearchTemplateTe doc.put("message", "test"); org.springframework.data.elasticsearch.core.document.Document document = org.springframework.data.elasticsearch.core.document.Document .from(doc); - UpdateQuery updateQuery = UpdateQuery.builder("1") // - .withDocument(document) // - .withIfSeqNo(42) // - .withIfPrimaryTerm(13) // - .withScript("script")// - .withLang("lang") // - .withRefresh(UpdateQuery.Refresh.Wait_For) // - .withRetryOnConflict(7) // - .withTimeout("4711s") // - .withWaitForActiveShards("all").withFetchSourceIncludes(Collections.singletonList("incl")) // - .withFetchSourceExcludes(Collections.singletonList("excl")) // - .build(); + UpdateQuery updateQuery = UpdateQuery.builder("1") // + .withDocument(document) // + .withIfSeqNo(42) // + .withIfPrimaryTerm(13) // + .withScript("script")// + .withLang("lang") // + .withRefreshPolicy(RefreshPolicy.WAIT_UNTIL) // + .withRetryOnConflict(7) // + .withTimeout("4711s") // + .withWaitForActiveShards("all").withFetchSourceIncludes(Collections.singletonList("incl")) // + .withFetchSourceExcludes(Collections.singletonList("excl")) // + .build(); UpdateRequestBuilder request = getRequestFactory().updateRequestBuilderFor(client, updateQuery, IndexCoordinates.of("index")); @@ -132,7 +133,7 @@ public class ElasticsearchTransportTemplateTests extends ElasticsearchTemplateTe assertThat(request.request().retryOnConflict()).isEqualTo(7); assertThat(request.request().timeout()).isEqualByComparingTo(TimeValue.parseTimeValue("4711s", "test")); assertThat(request.request().waitForActiveShards()).isEqualTo(ActiveShardCount.ALL); - val fetchSourceContext = request.request().fetchSource(); + FetchSourceContext fetchSourceContext = request.request().fetchSource(); assertThat(fetchSourceContext).isNotNull(); assertThat(fetchSourceContext.includes()).containsExactlyInAnyOrder("incl"); assertThat(fetchSourceContext.excludes()).containsExactlyInAnyOrder("excl"); @@ -147,40 +148,42 @@ public class ElasticsearchTransportTemplateTests extends ElasticsearchTemplateTe @Test // #1446 void shouldUseAllOptionsFromUpdateByQuery() throws JSONException { - // given - NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()) - .withIndicesOptions(IndicesOptions.lenientExpandOpen()) - .build(); + + NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()) // + .withIndicesOptions(IndicesOptions.lenientExpandOpen()) // + .build(); // searchQuery.setScrollTime(Duration.ofMillis(1000)); - final UpdateQuery updateQuery = UpdateQuery.builder(searchQuery) - .withAbortOnVersionConflict(true) - .withBatchSize(10) - .withMaxDocs(12) - .withMaxRetries(3) - .withPipeline("pipeline") - .withRequestsPerSecond(5F) - .withShouldStoreResult(false) - .withSlices(4) - .withScriptType(ScriptType.STORED) - .withScriptName("script_name") - .build(); + UpdateQuery updateQuery = UpdateQuery.builder(searchQuery) // + .withAbortOnVersionConflict(true) // + .withBatchSize(10) // + .withMaxDocs(12) // + .withMaxRetries(3) // + .withPipeline("pipeline") // + .withRequestsPerSecond(5F) // + .withShouldStoreResult(false) // + .withSlices(4) // + .withScriptType(ScriptType.STORED) // + .withScriptName("script_name") // + .build(); // - final String expectedSearchRequest = '{' + // + String expectedSearchRequest = '{' + // " \"size\": 10," + // " \"query\": {" + // " \"match_all\": {" + // " \"boost\": 1.0" + // - " }" + - " }" + - '}'; + " }" + // + " }" + // + '}'; // // when - final UpdateByQueryRequestBuilder request = getRequestFactory().updateByQueryRequestBuilder(client, updateQuery, IndexCoordinates.of("index")); + UpdateByQueryRequestBuilder request = getRequestFactory().updateByQueryRequestBuilder(client, updateQuery, + IndexCoordinates.of("index")); // then assertThat(request).isNotNull(); - assertThat(request.request().getSearchRequest().indicesOptions()).usingRecursiveComparison().isEqualTo(IndicesOptions.lenientExpandOpen()); + assertThat(request.request().getSearchRequest().indicesOptions()).usingRecursiveComparison() + .isEqualTo(IndicesOptions.lenientExpandOpen()); assertThat(request.request().getScrollTime().getMillis()).isEqualTo(1000); assertEquals(request.request().getSearchRequest().source().toString(), expectedSearchRequest, false); assertThat(request.request().isAbortOnVersionConflict()).isTrue(); diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java index 4ce09bd75..fb467a144 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.main.MainResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryRequest; @@ -93,7 +94,7 @@ public class ReactiveElasticsearchTemplateUnitTests { .as(StepVerifier::create) // .verifyComplete(); - assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.IMMEDIATE.toRequestRefreshPolicy()); + assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(WriteRequest.RefreshPolicy.IMMEDIATE); } @Test // DATAES-504 @@ -108,7 +109,7 @@ public class ReactiveElasticsearchTemplateUnitTests { .as(StepVerifier::create) // .verifyComplete(); - assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.WAIT_UNTIL.toRequestRefreshPolicy()); + assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(WriteRequest.RefreshPolicy.WAIT_UNTIL); } @Test // DATAES-504, DATAES-518 @@ -178,7 +179,7 @@ public class ReactiveElasticsearchTemplateUnitTests { .as(StepVerifier::create) // .verifyComplete(); - assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.IMMEDIATE.toRequestRefreshPolicy()); + assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(WriteRequest.RefreshPolicy.IMMEDIATE); } @Test // DATAES-504 @@ -193,7 +194,7 @@ public class ReactiveElasticsearchTemplateUnitTests { .as(StepVerifier::create) // .verifyComplete(); - assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.WAIT_UNTIL.toRequestRefreshPolicy()); + assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(WriteRequest.RefreshPolicy.WAIT_UNTIL); } @Test // DATAES-504