From 4a0e7cc56e1508b4a55f59ee5593c2dfb7f4d334 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Wed, 26 Jan 2022 21:48:05 +0100 Subject: [PATCH] Polishing --- .../DefaultReactiveElasticsearchClient.java | 11 +- .../reactive/ReactiveElasticsearchClient.java | 11 +- .../client/reactive/RequestCreator.java | 8 +- .../client/util/RequestConverters.java | 7 +- .../core/DocumentOperations.java | 14 +-- .../core/ElasticsearchRestTemplate.java | 15 ++- .../core/ReactiveDocumentOperations.java | 14 +-- .../core/ReactiveElasticsearchTemplate.java | 8 +- .../elasticsearch/core/RequestFactory.java | 99 ++++++++-------- .../elasticsearch/core/ResponseConverter.java | 4 +- .../core/reindex/ReindexRequest.java | 57 +++++---- .../core/reindex/ReindexResponse.java | 30 ++--- .../elasticsearch/core/reindex/Remote.java | 30 ++--- .../core/ElasticsearchTemplateTests.java | 16 ++- ...ElasticsearchTemplateIntegrationTests.java | 25 ++-- .../core/RequestFactoryTests.java | 109 ++++++++---------- 16 files changed, 230 insertions(+), 228 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index cd5d2beb3..89184eca4 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -88,8 +88,8 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.mustache.SearchTemplateRequest; @@ -105,7 +105,6 @@ import org.elasticsearch.xcontent.XContentType; import org.reactivestreams.Publisher; import org.springframework.data.elasticsearch.RestStatusException; import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; -import org.springframework.data.elasticsearch.core.ResponseConverter; import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ClientLogger; import org.springframework.data.elasticsearch.client.ElasticsearchHost; @@ -115,6 +114,7 @@ import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsea import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices; import org.springframework.data.elasticsearch.client.util.NamedXContents; import org.springframework.data.elasticsearch.client.util.ScrollState; +import org.springframework.data.elasticsearch.core.ResponseConverter; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.util.Lazy; import org.springframework.http.HttpHeaders; @@ -150,7 +150,6 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodySpe * @see ClientConfiguration * @see ReactiveRestClients */ -// todo package private after refactoring public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices, Cluster { private final HostProvider hostProvider; @@ -514,14 +513,12 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch @Override public Mono reindex(HttpHeaders headers, ReindexRequest reindexRequest) { - return sendRequest(reindexRequest, requestCreator.reindex(), BulkByScrollResponse.class, headers) - .next(); + return sendRequest(reindexRequest, requestCreator.reindex(), BulkByScrollResponse.class, headers).next(); } @Override public Mono submitReindex(HttpHeaders headers, ReindexRequest reindexRequest) { - return sendRequest(reindexRequest, requestCreator.submitReindex(), TaskSubmissionResponse.class, headers) - .next() + return sendRequest(reindexRequest, requestCreator.submitReindex(), TaskSubmissionResponse.class, headers).next() .map(TaskSubmissionResponse::getTask); } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index b5edb1904..4a28eb005 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -53,8 +53,8 @@ import org.elasticsearch.client.indices.*; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.script.mustache.SearchTemplateRequest; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.Aggregation; @@ -722,7 +722,7 @@ public interface ReactiveElasticsearchClient { * @return the {@link Mono} emitting the response * @since 4.4 */ - default Mono reindex(Consumer consumer){ + default Mono reindex(Consumer consumer) { ReindexRequest reindexRequest = new ReindexRequest(); consumer.accept(reindexRequest); @@ -736,7 +736,7 @@ public interface ReactiveElasticsearchClient { * @return the {@link Mono} emitting the response * @since 4.4 */ - default Mono reindex(ReindexRequest reindexRequest){ + default Mono reindex(ReindexRequest reindexRequest) { return reindex(HttpHeaders.EMPTY, reindexRequest); } @@ -757,7 +757,7 @@ public interface ReactiveElasticsearchClient { * @return the {@link Mono} emitting the task id * @since 4.4 */ - default Mono submitReindex(Consumer consumer){ + default Mono submitReindex(Consumer consumer) { ReindexRequest reindexRequest = new ReindexRequest(); consumer.accept(reindexRequest); @@ -771,7 +771,7 @@ public interface ReactiveElasticsearchClient { * @return the {@link Mono} emitting the task id * @since 4.4 */ - default Mono submitReindex(ReindexRequest reindexRequest){ + default Mono submitReindex(ReindexRequest reindexRequest) { return submitReindex(HttpHeaders.EMPTY, reindexRequest); } @@ -784,6 +784,7 @@ public interface ReactiveElasticsearchClient { * @since 4.4 */ Mono submitReindex(HttpHeaders headers, ReindexRequest reindexRequest); + /** * Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}. * {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java index 0d1b5ecec..7571e32bd 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java @@ -294,10 +294,14 @@ public interface RequestCreator { /** * @since 4.4 */ - default Function reindex() { return RequestConverters::reindex; } + default Function reindex() { + return RequestConverters::reindex; + } /** * @since 4.4 */ - default Function submitReindex() { return RequestConverters::submitReindex; } + default Function submitReindex() { + return RequestConverters::submitReindex; + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java index 82e9df984..c1c930a7f 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java @@ -547,14 +547,17 @@ public class RequestConverters { .withTimeout(reindexRequest.getTimeout()).withWaitForActiveShards(reindexRequest.getWaitForActiveShards()) .withRequestsPerSecond(reindexRequest.getRequestsPerSecond()); - if(reindexRequest.getDestination().isRequireAlias()){ + if (reindexRequest.getDestination().isRequireAlias()) { params.putParam("require_alias", Boolean.TRUE.toString()); } + if (reindexRequest.getScrollTime() != null) { params.putParam("scroll", reindexRequest.getScrollTime()); } + params.putParam("slices", Integer.toString(reindexRequest.getSlices())); - if(reindexRequest.getMaxDocs() > -1){ + + if (reindexRequest.getMaxDocs() > -1) { params.putParam("max_docs", Integer.toString(reindexRequest.getMaxDocs())); } request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE)); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java index fb66a4b99..ced39843e 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java @@ -18,8 +18,6 @@ package org.springframework.data.elasticsearch.core; import java.util.Collection; import java.util.List; -import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; -import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; @@ -27,6 +25,8 @@ import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateResponse; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.core.routing.RoutingResolver; import org.springframework.lang.Nullable; @@ -327,10 +327,9 @@ public interface DocumentOperations { ByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); /** - * Copies documents from a source to a destination. - * The source can be any existing index, alias, or data stream. The destination must differ from the source. - * For example, you cannot reindex a data stream into itself. - * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * Copies documents from a source to a destination. The source can be any existing index, alias, or data stream. The + * destination must differ from the source. For example, you cannot reindex a data stream into itself. (@see + * https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) * * @param reindexRequest reindex request parameters * @return the reindex response @@ -339,8 +338,7 @@ public interface DocumentOperations { ReindexResponse reindex(ReindexRequest reindexRequest); /** - * Submits a reindex task. - * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * Submits a reindex task. (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) * * @param reindexRequest reindex request parameters * @return the task id diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index b355e25bd..7bc4c2aba 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -61,9 +61,7 @@ import org.springframework.data.elasticsearch.core.cluster.ClusterOperations; import org.springframework.data.elasticsearch.core.cluster.ElasticsearchClusterOperations; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.document.DocumentAdapters; -import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; -import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; @@ -73,6 +71,8 @@ import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilde import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateResponse; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -265,7 +265,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { Assert.notNull(query, "query must not be null"); Assert.notNull(index, "index must not be null"); - final UpdateByQueryRequest updateByQueryRequest = requestFactory.updateByQueryRequest(query, index); + UpdateByQueryRequest updateByQueryRequest = requestFactory.updateByQueryRequest(query, index); if (query.getRefreshPolicy() == null && getRefreshPolicy() != null) { updateByQueryRequest.setRefresh(getRefreshPolicy() == RefreshPolicy.IMMEDIATE); @@ -285,8 +285,8 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); - final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); - final BulkByScrollResponse bulkByScrollResponse = execute( + org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + BulkByScrollResponse bulkByScrollResponse = execute( client -> client.reindex(reindexRequest, RequestOptions.DEFAULT)); return ResponseConverter.reindexResponseOf(bulkByScrollResponse); } @@ -295,9 +295,8 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { public String submitReindex(ReindexRequest postReindexRequest) { Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); - final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); - return execute( - client -> client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT).getTask()); + org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + return execute(client -> client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT).getTask()); } public List doBulkOperation(List queries, BulkOptions bulkOptions, diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java index 59a2e5675..ffbb0fc2a 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java @@ -15,8 +15,6 @@ */ package org.springframework.data.elasticsearch.core; -import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; -import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -30,6 +28,8 @@ import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateResponse; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.util.Assert; /** @@ -307,10 +307,9 @@ public interface ReactiveDocumentOperations { Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); /** - * Copies documents from a source to a destination. - * The source can be any existing index, alias, or data stream. The destination must differ from the source. - * For example, you cannot reindex a data stream into itself. - * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * Copies documents from a source to a destination. The source can be any existing index, alias, or data stream. The + * destination must differ from the source. For example, you cannot reindex a data stream into itself. (@see + * https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) * * @param reindexRequest reindex request parameters * @return a {@link Mono} emitting the reindex response @@ -319,8 +318,7 @@ public interface ReactiveDocumentOperations { Mono reindex(ReindexRequest reindexRequest); /** - * Submits a reindex task. - * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * Submits a reindex task. (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) * * @param reindexRequest reindex request parameters * @return a {@link Mono} emitting the {@literal task} id. diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index d3bf9ad60..47d703f1a 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -70,8 +70,6 @@ import org.springframework.data.elasticsearch.core.event.ReactiveAfterConvertCal import org.springframework.data.elasticsearch.core.event.ReactiveAfterLoadCallback; import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback; import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback; -import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; -import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; @@ -84,6 +82,8 @@ import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateResponse; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.core.routing.DefaultRoutingResolver; import org.springframework.data.elasticsearch.core.routing.RoutingResolver; import org.springframework.data.elasticsearch.core.suggest.response.Suggest; @@ -618,7 +618,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); return Mono.defer(() -> { - final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); return Mono.from(execute(client -> client.reindex(reindexRequest))).map(ResponseConverter::reindexResponseOf); }); } @@ -629,7 +629,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); return Mono.defer(() -> { - final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); return Mono.from(execute(client -> client.submitReindex(reindexRequest))); }); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java index bf67ace9c..333ab6ea3 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java @@ -92,17 +92,17 @@ import org.springframework.data.elasticsearch.core.index.AliasActions; import org.springframework.data.elasticsearch.core.index.DeleteTemplateRequest; import org.springframework.data.elasticsearch.core.index.ExistsTemplateRequest; import org.springframework.data.elasticsearch.core.index.GetTemplateRequest; -import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; import org.springframework.data.elasticsearch.core.index.PutTemplateRequest; -import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Source; -import org.springframework.data.elasticsearch.core.reindex.Remote; -import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Dest; -import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Slice; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.elasticsearch.core.query.RescorerQuery.ScoreMode; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Dest; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Slice; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Source; +import org.springframework.data.elasticsearch.core.reindex.Remote; import org.springframework.data.mapping.context.MappingContext; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -396,108 +396,117 @@ class RequestFactory { /** * @since 4.4 */ - public org.elasticsearch.index.reindex.ReindexRequest reindexRequest(ReindexRequest reindexRequest){ + public org.elasticsearch.index.reindex.ReindexRequest reindexRequest(ReindexRequest reindexRequest) { final org.elasticsearch.index.reindex.ReindexRequest request = new org.elasticsearch.index.reindex.ReindexRequest(); - if(reindexRequest.getConflicts() != null){ - request.setConflicts(reindexRequest.getConflicts().name().toLowerCase(Locale.ROOT)); + + if (reindexRequest.getConflicts() != null) { + request.setConflicts(reindexRequest.getConflicts().getEsName()); } - if(reindexRequest.getMaxDocs() != null){ + + if (reindexRequest.getMaxDocs() != null) { request.setMaxDocs(reindexRequest.getMaxDocs()); } // region source build final Source source = reindexRequest.getSource(); request.setSourceIndices(source.getIndexes().getIndexNames()); + // source query will build from RemoteInfo if remote exist - if(source.getQuery() != null && source.getRemote() == null){ + if (source.getQuery() != null && source.getRemote() == null) { request.setSourceQuery(getQuery(source.getQuery())); } - if(source.getSize() != null){ + + if (source.getSize() != null) { request.setSourceBatchSize(source.getSize()); } - if(source.getRemote() != null){ + if (source.getRemote() != null) { Remote remote = source.getRemote(); - QueryBuilder queryBuilder = source.getQuery() == null ? QueryBuilders.matchAllQuery() : getQuery(source.getQuery()); + QueryBuilder queryBuilder = source.getQuery() == null ? QueryBuilders.matchAllQuery() + : getQuery(source.getQuery()); BytesReference query; try { XContentBuilder builder = XContentBuilder.builder(QUERY_CONTENT_TYPE).prettyPrint(); query = BytesReference.bytes(queryBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS)); } catch (IOException e) { - throw new IllegalArgumentException("an IOException occurs while building the source query content",e); + throw new IllegalArgumentException("Error parsing the source query content", e); } - request.setRemoteInfo(new RemoteInfo( - remote.getScheme(), - remote.getHost(), - remote.getPort(), - remote.getPathPrefix(), - query, - remote.getUsername(), - remote.getPassword(), - Collections.emptyMap(), - remote.getSocketTimeout() == null ? DEFAULT_SOCKET_TIMEOUT : timeValueSeconds(remote.getSocketTimeout().getSeconds()), - remote.getConnectTimeout() == null ? DEFAULT_CONNECT_TIMEOUT : timeValueSeconds(remote.getConnectTimeout().getSeconds()) - )); + request.setRemoteInfo(new RemoteInfo( // + remote.getScheme(), // + remote.getHost(), // + remote.getPort(), // + remote.getPathPrefix(), // + query, // + remote.getUsername(), // + remote.getPassword(), // + Collections.emptyMap(), // + remote.getSocketTimeout() == null ? DEFAULT_SOCKET_TIMEOUT + : timeValueSeconds(remote.getSocketTimeout().getSeconds()), // + remote.getConnectTimeout() == null ? DEFAULT_CONNECT_TIMEOUT + : timeValueSeconds(remote.getConnectTimeout().getSeconds()))); // } final Slice slice = source.getSlice(); - if(slice != null){ + if (slice != null) { request.getSearchRequest().source().slice(new SliceBuilder(slice.getId(), slice.getMax())); } + final SourceFilter sourceFilter = source.getSourceFilter(); - if(sourceFilter != null){ + if (sourceFilter != null) { request.getSearchRequest().source().fetchSource(sourceFilter.getIncludes(), sourceFilter.getExcludes()); } // endregion // region dest build final Dest dest = reindexRequest.getDest(); - request.setDestIndex(dest.getIndex().getIndexName()) - .setDestRouting(dest.getRouting()) + request.setDestIndex(dest.getIndex().getIndexName()).setDestRouting(dest.getRouting()) .setDestPipeline(dest.getPipeline()); final org.springframework.data.elasticsearch.annotations.Document.VersionType versionType = dest.getVersionType(); - if(versionType != null){ + if (versionType != null) { request.setDestVersionType(VersionType.fromString(versionType.name().toLowerCase(Locale.ROOT))); } + final IndexQuery.OpType opType = dest.getOpType(); - if(opType != null){ + if (opType != null) { request.setDestOpType(opType.name().toLowerCase(Locale.ROOT)); } // endregion // region script build final ReindexRequest.Script script = reindexRequest.getScript(); - if(script != null){ - request.setScript(new Script(DEFAULT_SCRIPT_TYPE, - script.getLang(), - script.getSource(), - Collections.emptyMap() - )); + if (script != null) { + request.setScript(new Script(DEFAULT_SCRIPT_TYPE, script.getLang(), script.getSource(), Collections.emptyMap())); } // endregion // region query parameters build final Duration timeout = reindexRequest.getTimeout(); - if(timeout != null){ + if (timeout != null) { request.setTimeout(timeValueSeconds(timeout.getSeconds())); } - if(reindexRequest.getRefresh() != null){ + + if (reindexRequest.getRefresh() != null) { request.setRefresh(reindexRequest.getRefresh()); } - if(reindexRequest.getRequireAlias() != null){ + + if (reindexRequest.getRequireAlias() != null) { request.setRequireAlias(reindexRequest.getRequireAlias()); } - if(reindexRequest.getRequestsPerSecond() != null){ + + if (reindexRequest.getRequestsPerSecond() != null) { request.setRequestsPerSecond(reindexRequest.getRequestsPerSecond()); } + final Duration scroll = reindexRequest.getScroll(); - if(scroll != null){ + if (scroll != null) { request.setScroll(timeValueSeconds(scroll.getSeconds())); } - if(reindexRequest.getWaitForActiveShards() != null){ + + if (reindexRequest.getWaitForActiveShards() != null) { request.setWaitForActiveShards(ActiveShardCount.parseString(reindexRequest.getWaitForActiveShards())); } - if(reindexRequest.getSlices() != null){ + + if (reindexRequest.getSlices() != null) { request.setSlices(reindexRequest.getSlices()); } // endregion diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java b/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java index 8efe62224..01095b8ed 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java @@ -45,8 +45,8 @@ import org.springframework.data.elasticsearch.core.document.Document; import org.springframework.data.elasticsearch.core.index.AliasData; import org.springframework.data.elasticsearch.core.index.Settings; import org.springframework.data.elasticsearch.core.index.TemplateData; -import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -392,7 +392,7 @@ public class ResponseConverter { /** * @since 4.4 */ - public static ReindexResponse reindexResponseOf(BulkByScrollResponse bulkByScrollResponse){ + public static ReindexResponse reindexResponseOf(BulkByScrollResponse bulkByScrollResponse) { final List failures = bulkByScrollResponse.getBulkFailures() // .stream() // .map(ResponseConverter::reindexResponseFailureOf) // diff --git a/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexRequest.java b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexRequest.java index a9e95de15..b387f0d6e 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexRequest.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,8 @@ */ package org.springframework.data.elasticsearch.core.reindex; +import java.time.Duration; + import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.IndexQuery; @@ -23,11 +25,9 @@ import org.springframework.data.elasticsearch.core.query.SourceFilter; import org.springframework.lang.Nullable; import org.springframework.util.Assert; -import java.time.Duration; - /** - * Request to reindex some documents from one index to another. - * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * Request to reindex some documents from one index to another. (@see + * https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) * * @author Sijia Liu * @since 4.4 @@ -50,7 +50,10 @@ public class ReindexRequest { @Nullable private final Duration scroll; @Nullable private final Integer slices; - private ReindexRequest(Source source, Dest dest, @Nullable Integer maxDocs, @Nullable Conflicts conflicts, @Nullable Script script, @Nullable Duration timeout, @Nullable Boolean requireAlias, @Nullable Boolean refresh, @Nullable String waitForActiveShards, @Nullable Integer requestsPerSecond, @Nullable Duration scroll, @Nullable Integer slices) { + private ReindexRequest(Source source, Dest dest, @Nullable Integer maxDocs, @Nullable Conflicts conflicts, + @Nullable Script script, @Nullable Duration timeout, @Nullable Boolean requireAlias, @Nullable Boolean refresh, + @Nullable String waitForActiveShards, @Nullable Integer requestsPerSecond, @Nullable Duration scroll, + @Nullable Integer slices) { Assert.notNull(source, "source must not be null"); Assert.notNull(dest, "dest must not be null"); @@ -132,7 +135,18 @@ public class ReindexRequest { } public enum Conflicts { - PROCEED, ABORT + PROCEED("proceed"), ABORT("abort"); + + // value used in Elasticsearch + private final String esName; + + Conflicts(String esName) { + this.esName = esName; + } + + public String getEsName() { + return esName; + } } public static class Source { @@ -143,7 +157,7 @@ public class ReindexRequest { @Nullable private Integer size; @Nullable private SourceFilter sourceFilter; - private Source(IndexCoordinates indexes){ + private Source(IndexCoordinates indexes) { Assert.notNull(indexes, "indexes must not be null"); this.indexes = indexes; @@ -281,7 +295,7 @@ public class ReindexRequest { this.dest = new Dest(destIndex); } - // region setter + // region setter public ReindexRequestBuilder withMaxDocs(@Nullable Integer maxDocs) { this.maxDocs = maxDocs; @@ -298,7 +312,7 @@ public class ReindexRequest { return this; } - public ReindexRequestBuilder withSourceSlice(int id, int max){ + public ReindexRequestBuilder withSourceSlice(int id, int max) { this.source.slice = new Slice(id, max); return this; } @@ -313,17 +327,17 @@ public class ReindexRequest { return this; } - public ReindexRequestBuilder withSourceSourceFilter(SourceFilter sourceFilter){ + public ReindexRequestBuilder withSourceSourceFilter(SourceFilter sourceFilter) { this.source.sourceFilter = sourceFilter; return this; } - public ReindexRequestBuilder withDestPipeline(String pipelineName){ + public ReindexRequestBuilder withDestPipeline(String pipelineName) { this.dest.pipeline = pipelineName; return this; } - public ReindexRequestBuilder withDestRouting(String routing){ + public ReindexRequestBuilder withDestRouting(String routing) { this.dest.routing = routing; return this; } @@ -343,44 +357,45 @@ public class ReindexRequest { return this; } - public ReindexRequestBuilder withTimeout(Duration timeout){ + public ReindexRequestBuilder withTimeout(Duration timeout) { this.timeout = timeout; return this; } - public ReindexRequestBuilder withRequireAlias(boolean requireAlias){ + public ReindexRequestBuilder withRequireAlias(boolean requireAlias) { this.requireAlias = requireAlias; return this; } - public ReindexRequestBuilder withRefresh(boolean refresh){ + public ReindexRequestBuilder withRefresh(boolean refresh) { this.refresh = refresh; return this; } - public ReindexRequestBuilder withWaitForActiveShards(String waitForActiveShards){ + public ReindexRequestBuilder withWaitForActiveShards(String waitForActiveShards) { this.waitForActiveShards = waitForActiveShards; return this; } - public ReindexRequestBuilder withRequestsPerSecond(int requestsPerSecond){ + public ReindexRequestBuilder withRequestsPerSecond(int requestsPerSecond) { this.requestsPerSecond = requestsPerSecond; return this; } - public ReindexRequestBuilder withScroll(Duration scroll){ + public ReindexRequestBuilder withScroll(Duration scroll) { this.scroll = scroll; return this; } - public ReindexRequestBuilder withSlices(int slices){ + public ReindexRequestBuilder withSlices(int slices) { this.slices = slices; return this; } // endregion public ReindexRequest build() { - return new ReindexRequest(source, dest, maxDocs, conflicts, script, timeout, requireAlias, refresh, waitForActiveShards, requestsPerSecond, scroll, slices); + return new ReindexRequest(source, dest, maxDocs, conflicts, script, timeout, requireAlias, refresh, + waitForActiveShards, requestsPerSecond, scroll, slices); } } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexResponse.java b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexResponse.java index 05be38c65..2f3f1e345 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexResponse.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,14 +15,14 @@ */ package org.springframework.data.elasticsearch.core.reindex; -import org.springframework.lang.Nullable; - import java.util.Collections; import java.util.List; +import org.springframework.lang.Nullable; + /** - * Response of reindex request. - * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-api-response-body) + * Response of reindex request. (@see + * https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-api-response-body) * * @author Sijia Liu * @since 4.4 @@ -45,8 +45,8 @@ public class ReindexResponse { private final List failures; private ReindexResponse(long took, boolean timedOut, long total, long updated, long deleted, int batches, - long versionConflicts, long noops, long bulkRetries, long searchRetries, - long throttledMillis, double requestsPerSecond, long throttledUntilMillis, List failures) { + long versionConflicts, long noops, long bulkRetries, long searchRetries, long throttledMillis, + double requestsPerSecond, long throttledUntilMillis, List failures) { this.took = took; this.timedOut = timedOut; this.total = total; @@ -142,16 +142,16 @@ public class ReindexResponse { } /** - * The number of requests per second effectively executed during the reindex. + * The number of requests per second effectively executed during the reindex. */ public double getRequestsPerSecond() { return requestsPerSecond; } /** - * This field should always be equal to zero in a _reindex response. - * It only has meaning when using the Task API, where it indicates the next time (in milliseconds since epoch) - * a throttled request will be executed again in order to conform to requests_per_second. + * This field should always be equal to zero in a _reindex response. It only has meaning when using the Task API, + * where it indicates the next time (in milliseconds since epoch) a throttled request will be executed again in order + * to conform to requests_per_second. */ public long getThrottledUntilMillis() { return throttledUntilMillis; @@ -186,7 +186,7 @@ public class ReindexResponse { @Nullable private final Boolean aborted; private Failure(@Nullable String index, @Nullable String type, @Nullable String id, @Nullable Exception cause, - @Nullable Integer status, @Nullable Long seqNo, @Nullable Long term, @Nullable Boolean aborted) { + @Nullable Integer status, @Nullable Long seqNo, @Nullable Long term, @Nullable Boolean aborted) { this.index = index; this.type = type; this.id = id; @@ -375,17 +375,17 @@ public class ReindexResponse { return this; } - public ReindexResponseBuilder withThrottledMillis(long throttledMillis){ + public ReindexResponseBuilder withThrottledMillis(long throttledMillis) { this.throttledMillis = throttledMillis; return this; } - public ReindexResponseBuilder withRequestsPerSecond(double requestsPerSecond){ + public ReindexResponseBuilder withRequestsPerSecond(double requestsPerSecond) { this.requestsPerSecond = requestsPerSecond; return this; } - public ReindexResponseBuilder withThrottledUntilMillis(long throttledUntilMillis){ + public ReindexResponseBuilder withThrottledUntilMillis(long throttledUntilMillis) { this.throttledUntilMillis = throttledUntilMillis; return this; } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/reindex/Remote.java b/src/main/java/org/springframework/data/elasticsearch/core/reindex/Remote.java index 1b11037c2..7444294c4 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/reindex/Remote.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/reindex/Remote.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,14 +15,13 @@ */ package org.springframework.data.elasticsearch.core.reindex; +import java.time.Duration; + import org.springframework.lang.Nullable; import org.springframework.util.Assert; -import java.time.Duration; - /** - * Remote info - * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#source) + * Remote info (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#source) * * @author Sijia Liu * @since 4.4 @@ -38,7 +37,8 @@ public class Remote { @Nullable private final Duration socketTimeout; @Nullable private final Duration connectTimeout; - private Remote(String scheme, String host, int port, @Nullable String pathPrefix, @Nullable String username, @Nullable String password, @Nullable Duration socketTimeout, @Nullable Duration connectTimeout) { + private Remote(String scheme, String host, int port, @Nullable String pathPrefix, @Nullable String username, + @Nullable String password, @Nullable Duration socketTimeout, @Nullable Duration connectTimeout) { Assert.notNull(scheme, "scheme must not be null"); Assert.notNull(host, "host must not be null"); @@ -90,11 +90,11 @@ public class Remote { return pathPrefix; } - public static RemoteBuilder builder(String scheme, String host, int port){ + public static RemoteBuilder builder(String scheme, String host, int port) { return new RemoteBuilder(scheme, host, port); } - public static class RemoteBuilder{ + public static class RemoteBuilder { private final String scheme; private final String host; private final int port; @@ -110,33 +110,33 @@ public class Remote { this.port = port; } - public RemoteBuilder withPathPrefix(String pathPrefix){ + public RemoteBuilder withPathPrefix(String pathPrefix) { this.pathPrefix = pathPrefix; return this; } - public RemoteBuilder withUsername(String username){ + public RemoteBuilder withUsername(String username) { this.username = username; return this; } - public RemoteBuilder withPassword(String password){ + public RemoteBuilder withPassword(String password) { this.password = password; return this; } - public RemoteBuilder withSocketTimeout(Duration socketTimeout){ + public RemoteBuilder withSocketTimeout(Duration socketTimeout) { this.socketTimeout = socketTimeout; return this; } - public RemoteBuilder withConnectTimeout(Duration connectTimeout){ + public RemoteBuilder withConnectTimeout(Duration connectTimeout) { this.connectTimeout = connectTimeout; return this; } - public Remote build(){ - return new Remote(scheme, host, port , pathPrefix, username, password, socketTimeout, connectTimeout); + public Remote build() { + return new Remote(scheme, host, port, pathPrefix, username, password, socketTimeout, connectTimeout); } } } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java index e56a15b9b..ed5b5208f 100755 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java @@ -83,11 +83,6 @@ import org.springframework.data.elasticsearch.annotations.JoinTypeRelations; import org.springframework.data.elasticsearch.annotations.MultiField; import org.springframework.data.elasticsearch.annotations.ScriptedField; import org.springframework.data.elasticsearch.annotations.Setting; -import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; -import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; -import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; -import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; -import org.springframework.data.elasticsearch.core.query.ScriptField; import org.springframework.data.elasticsearch.core.document.Explanation; import org.springframework.data.elasticsearch.core.geo.GeoPoint; import org.springframework.data.elasticsearch.core.index.AliasAction; @@ -97,6 +92,8 @@ import org.springframework.data.elasticsearch.core.join.JoinField; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.elasticsearch.core.query.RescorerQuery.ScoreMode; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; import org.springframework.data.elasticsearch.utils.IndexNameProvider; import org.springframework.data.util.StreamUtils; @@ -3660,16 +3657,17 @@ public abstract class ElasticsearchTemplateTests { String destIndexName = indexNameProvider.indexName(); operations.indexOps(IndexCoordinates.of(destIndexName)).create(); - final ReindexRequest reindexRequest = ReindexRequest.builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) - .withRefresh(true).build(); + final ReindexRequest reindexRequest = ReindexRequest + .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)).withRefresh(true).build(); final ReindexResponse reindex = operations.reindex(reindexRequest); NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build(); + assertThat(reindex.getTotal()).isEqualTo(1); assertThat(operations.count(searchQuery, IndexCoordinates.of(destIndexName))).isEqualTo(1); } @Test // #1529 - void shouldWorkSubmitReindexTask(){ + void shouldWorkSubmitReindexTask() { String sourceIndexName = indexNameProvider.indexName(); indexNameProvider.increment(); String destIndexName = indexNameProvider.indexName(); @@ -3677,7 +3675,7 @@ public abstract class ElasticsearchTemplateTests { final ReindexRequest reindexRequest = ReindexRequest .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)).build(); String task = operations.submitReindex(reindexRequest); - // Maybe there should be a task api to detect whether the task exists + assertThat(task).isNotBlank(); } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java index 121bc9cde..730ffc668 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java @@ -19,9 +19,7 @@ import static java.util.Collections.*; import static org.assertj.core.api.Assertions.*; import static org.elasticsearch.index.query.QueryBuilders.*; import static org.springframework.data.elasticsearch.annotations.FieldType.*; -import static org.springframework.data.elasticsearch.utils.IdGenerator.*; -import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -80,6 +78,7 @@ import org.springframework.data.elasticsearch.core.index.AliasActions; import org.springframework.data.elasticsearch.core.index.AliasData; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.*; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration; import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; import org.springframework.data.elasticsearch.utils.IndexNameProvider; @@ -1201,32 +1200,24 @@ public class ReactiveElasticsearchTemplateIntegrationTests { String destIndexName = indexNameProvider.indexName(); operations.indexOps(IndexCoordinates.of(destIndexName)).create(); final ReindexRequest reindexRequest = ReindexRequest - .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) - .withRefresh(true) - .build(); - operations.reindex(reindexRequest) - .as(StepVerifier::create) + .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)).withRefresh(true).build(); + operations.reindex(reindexRequest).as(StepVerifier::create) .consumeNextWith(postReindexResponse -> assertThat(postReindexResponse.getTotal()).isEqualTo(1L)) .verifyComplete(); operations.count(operations.matchAllQuery(), SampleEntity.class, IndexCoordinates.of(destIndexName)) - .as(StepVerifier::create) - .expectNext(1L) - .verifyComplete(); + .as(StepVerifier::create).expectNext(1L).verifyComplete(); } @Test // #1529 - void shouldWorkSubmitReindexTask(){ + void shouldWorkSubmitReindexTask() { String sourceIndexName = indexNameProvider.indexName(); indexNameProvider.increment(); String destIndexName = indexNameProvider.indexName(); operations.indexOps(IndexCoordinates.of(destIndexName)).create(); final ReindexRequest reindexRequest = ReindexRequest - .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) - .build(); - operations.submitReindex(reindexRequest) - .as(StepVerifier::create) - .consumeNextWith(task -> assertThat(task).isNotBlank()) - .verifyComplete(); + .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)).build(); + operations.submitReindex(reindexRequest).as(StepVerifier::create) + .consumeNextWith(task -> assertThat(task).isNotBlank()).verifyComplete(); } // endregion diff --git a/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java b/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java index e327d1c35..772d02522 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java @@ -560,56 +560,46 @@ class RequestFactoryTests { @Test // #1529 void shouldCreateReindexRequest() throws IOException, JSONException { - final String expected = "{\n" + - " \"source\":{\n" + - " \"remote\":{\n" + - " \"username\":\"admin\",\n" + - " \"password\":\"password\",\n" + - " \"host\":\"http://localhost:9200/elasticsearch\",\n" + - " \"socket_timeout\":\"30s\",\n" + - " \"connect_timeout\":\"30s\"\n" + - " },\n" + - " \"index\":[\"source_1\",\"source_2\"],\n" + - " \"size\":5,\n" + - " \"query\":{\"match_all\":{}},\n" + - " \"_source\":{\"includes\":[\"name\"],\"excludes\":[]},\n" + - " \"slice\":{\"id\":1,\"max\":20}\n" + - " },\n" + - " \"dest\":{\n" + - " \"index\":\"destination\",\n" + - " \"routing\":\"routing\",\n" + - " \"op_type\":\"create\",\n" + - " \"pipeline\":\"pipeline\",\n" + - " \"version_type\":\"external\"\n" + - " },\n" + - " \"max_docs\":10,\n" + - " \"script\":{\"source\":\"Math.max(1,2)\",\"lang\":\"java\"},\n" + - " \"conflicts\":\"proceed\"\n" + + final String expected = "{\n" + // + " \"source\":{\n" + // + " \"remote\":{\n" + // + " \"username\":\"admin\",\n" + // + " \"password\":\"password\",\n" + // + " \"host\":\"http://localhost:9200/elasticsearch\",\n" + // + " \"socket_timeout\":\"30s\",\n" + // + " \"connect_timeout\":\"30s\"\n" + // + " },\n" + // + " \"index\":[\"source_1\",\"source_2\"],\n" + // + " \"size\":5,\n" + // + " \"query\":{\"match_all\":{}},\n" + // + " \"_source\":{\"includes\":[\"name\"],\"excludes\":[]},\n" + // + " \"slice\":{\"id\":1,\"max\":20}\n" + // + " },\n" + // + " \"dest\":{\n" + // + " \"index\":\"destination\",\n" + // + " \"routing\":\"routing\",\n" + // + " \"op_type\":\"create\",\n" + // + " \"pipeline\":\"pipeline\",\n" + // + " \"version_type\":\"external\"\n" + // + " },\n" + // + " \"max_docs\":10,\n" + // + " \"script\":{\"source\":\"Math.max(1,2)\",\"lang\":\"java\"},\n" + // + " \"conflicts\":\"proceed\"\n" + // "}"; - Remote remote = Remote.builder("http", "localhost",9200) - .withPathPrefix("elasticsearch") - .withUsername("admin") - .withPassword("password") - .withConnectTimeout(Duration.ofSeconds(30)) - .withSocketTimeout(Duration.ofSeconds(30)).build(); - - ReindexRequest reindexRequest = ReindexRequest.builder(IndexCoordinates.of("source_1", "source_2"), - IndexCoordinates.of("destination")) - .withConflicts(ReindexRequest.Conflicts.PROCEED) - .withMaxDocs(10) - .withSourceQuery(new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build()) - .withSourceSize(5) - .withSourceSourceFilter(new FetchSourceFilterBuilder().withIncludes("name").build()) - .withSourceRemote(remote) - .withSourceSlice(1,20) - .withDestOpType(IndexQuery.OpType.CREATE) - .withDestVersionType(Document.VersionType.EXTERNAL) - .withDestPipeline("pipeline") - .withDestRouting("routing") - .withScript("Math.max(1,2)", "java") + Remote remote = Remote.builder("http", "localhost", 9200).withPathPrefix("elasticsearch").withUsername("admin") + .withPassword("password").withConnectTimeout(Duration.ofSeconds(30)).withSocketTimeout(Duration.ofSeconds(30)) .build(); + ReindexRequest reindexRequest = ReindexRequest + .builder(IndexCoordinates.of("source_1", "source_2"), IndexCoordinates.of("destination")) + .withConflicts(ReindexRequest.Conflicts.PROCEED).withMaxDocs(10) + .withSourceQuery(new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build()).withSourceSize(5) + .withSourceSourceFilter(new FetchSourceFilterBuilder().withIncludes("name").build()).withSourceRemote(remote) + .withSourceSlice(1, 20).withDestOpType(IndexQuery.OpType.CREATE) + .withDestVersionType(Document.VersionType.EXTERNAL).withDestPipeline("pipeline").withDestRouting("routing") + .withScript("Math.max(1,2)", "java").build(); + final String json = requestToString(requestFactory.reindexRequest(reindexRequest)); assertEquals(expected, json, false); @@ -617,22 +607,21 @@ class RequestFactoryTests { @Test void shouldAllowSourceQueryForReindexWithoutRemote() throws IOException, JSONException { - final String expected = "{\n" + - " \"source\":{\n" + - " \"index\":[\"source\"],\n" + - " \"query\":{\"match_all\":{}}\n" + - " },\n" + - " \"dest\":{\n" + - " \"index\":\"destination\",\n" + - " \"op_type\":\"index\",\n" + - " \"version_type\":\"internal\"\n" + - " }\n" + + final String expected = "{\n" + // + " \"source\":{\n" + // + " \"index\":[\"source\"],\n" + // + " \"query\":{\"match_all\":{}}\n" + // + " },\n" + // + " \"dest\":{\n" + // + " \"index\":\"destination\",\n" + // + " \"op_type\":\"index\",\n" + // + " \"version_type\":\"internal\"\n" + // + " }\n" + // "}"; - ReindexRequest reindexRequest = ReindexRequest.builder(IndexCoordinates.of("source"), - IndexCoordinates.of("destination")) - .withSourceQuery(new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build()) - .build(); + ReindexRequest reindexRequest = ReindexRequest + .builder(IndexCoordinates.of("source"), IndexCoordinates.of("destination")) + .withSourceQuery(new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build()).build(); final String json = requestToString(requestFactory.reindexRequest(reindexRequest));