Polishing

This commit is contained in:
Peter-Josef Meisch 2022-01-26 21:48:05 +01:00
parent c5db583048
commit 4a0e7cc56e
No known key found for this signature in database
GPG Key ID: DE108246970C7708
16 changed files with 230 additions and 228 deletions

View File

@ -88,8 +88,8 @@ import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.mustache.SearchTemplateRequest; import org.elasticsearch.script.mustache.SearchTemplateRequest;
@ -105,7 +105,6 @@ import org.elasticsearch.xcontent.XContentType;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.springframework.data.elasticsearch.RestStatusException; import org.springframework.data.elasticsearch.RestStatusException;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.core.ResponseConverter;
import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.ClientLogger; import org.springframework.data.elasticsearch.client.ClientLogger;
import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.ElasticsearchHost;
@ -115,6 +114,7 @@ import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsea
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices;
import org.springframework.data.elasticsearch.client.util.NamedXContents; import org.springframework.data.elasticsearch.client.util.NamedXContents;
import org.springframework.data.elasticsearch.client.util.ScrollState; import org.springframework.data.elasticsearch.client.util.ScrollState;
import org.springframework.data.elasticsearch.core.ResponseConverter;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.util.Lazy; import org.springframework.data.util.Lazy;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
@ -150,7 +150,6 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodySpe
* @see ClientConfiguration * @see ClientConfiguration
* @see ReactiveRestClients * @see ReactiveRestClients
*/ */
// todo package private after refactoring
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices, Cluster { public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices, Cluster {
private final HostProvider<?> hostProvider; private final HostProvider<?> hostProvider;
@ -514,14 +513,12 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
@Override @Override
public Mono<BulkByScrollResponse> reindex(HttpHeaders headers, ReindexRequest reindexRequest) { public Mono<BulkByScrollResponse> reindex(HttpHeaders headers, ReindexRequest reindexRequest) {
return sendRequest(reindexRequest, requestCreator.reindex(), BulkByScrollResponse.class, headers) return sendRequest(reindexRequest, requestCreator.reindex(), BulkByScrollResponse.class, headers).next();
.next();
} }
@Override @Override
public Mono<String> submitReindex(HttpHeaders headers, ReindexRequest reindexRequest) { public Mono<String> submitReindex(HttpHeaders headers, ReindexRequest reindexRequest) {
return sendRequest(reindexRequest, requestCreator.submitReindex(), TaskSubmissionResponse.class, headers) return sendRequest(reindexRequest, requestCreator.submitReindex(), TaskSubmissionResponse.class, headers).next()
.next()
.map(TaskSubmissionResponse::getTask); .map(TaskSubmissionResponse::getTask);
} }

View File

@ -53,8 +53,8 @@ import org.elasticsearch.client.indices.*;
import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.mustache.SearchTemplateRequest; import org.elasticsearch.script.mustache.SearchTemplateRequest;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregation;
@ -722,7 +722,7 @@ public interface ReactiveElasticsearchClient {
* @return the {@link Mono} emitting the response * @return the {@link Mono} emitting the response
* @since 4.4 * @since 4.4
*/ */
default Mono<BulkByScrollResponse> reindex(Consumer<ReindexRequest> consumer){ default Mono<BulkByScrollResponse> reindex(Consumer<ReindexRequest> consumer) {
ReindexRequest reindexRequest = new ReindexRequest(); ReindexRequest reindexRequest = new ReindexRequest();
consumer.accept(reindexRequest); consumer.accept(reindexRequest);
@ -736,7 +736,7 @@ public interface ReactiveElasticsearchClient {
* @return the {@link Mono} emitting the response * @return the {@link Mono} emitting the response
* @since 4.4 * @since 4.4
*/ */
default Mono<BulkByScrollResponse> reindex(ReindexRequest reindexRequest){ default Mono<BulkByScrollResponse> reindex(ReindexRequest reindexRequest) {
return reindex(HttpHeaders.EMPTY, reindexRequest); return reindex(HttpHeaders.EMPTY, reindexRequest);
} }
@ -757,7 +757,7 @@ public interface ReactiveElasticsearchClient {
* @return the {@link Mono} emitting the task id * @return the {@link Mono} emitting the task id
* @since 4.4 * @since 4.4
*/ */
default Mono<String> submitReindex(Consumer<ReindexRequest> consumer){ default Mono<String> submitReindex(Consumer<ReindexRequest> consumer) {
ReindexRequest reindexRequest = new ReindexRequest(); ReindexRequest reindexRequest = new ReindexRequest();
consumer.accept(reindexRequest); consumer.accept(reindexRequest);
@ -771,7 +771,7 @@ public interface ReactiveElasticsearchClient {
* @return the {@link Mono} emitting the task id * @return the {@link Mono} emitting the task id
* @since 4.4 * @since 4.4
*/ */
default Mono<String> submitReindex(ReindexRequest reindexRequest){ default Mono<String> submitReindex(ReindexRequest reindexRequest) {
return submitReindex(HttpHeaders.EMPTY, reindexRequest); return submitReindex(HttpHeaders.EMPTY, reindexRequest);
} }
@ -784,6 +784,7 @@ public interface ReactiveElasticsearchClient {
* @since 4.4 * @since 4.4
*/ */
Mono<String> submitReindex(HttpHeaders headers, ReindexRequest reindexRequest); Mono<String> submitReindex(HttpHeaders headers, ReindexRequest reindexRequest);
/** /**
* Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}. * Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}.
* {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and * {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and

View File

@ -294,10 +294,14 @@ public interface RequestCreator {
/** /**
* @since 4.4 * @since 4.4
*/ */
default Function<ReindexRequest, Request> reindex() { return RequestConverters::reindex; } default Function<ReindexRequest, Request> reindex() {
return RequestConverters::reindex;
}
/** /**
* @since 4.4 * @since 4.4
*/ */
default Function<ReindexRequest, Request> submitReindex() { return RequestConverters::submitReindex; } default Function<ReindexRequest, Request> submitReindex() {
return RequestConverters::submitReindex;
}
} }

View File

@ -547,14 +547,17 @@ public class RequestConverters {
.withTimeout(reindexRequest.getTimeout()).withWaitForActiveShards(reindexRequest.getWaitForActiveShards()) .withTimeout(reindexRequest.getTimeout()).withWaitForActiveShards(reindexRequest.getWaitForActiveShards())
.withRequestsPerSecond(reindexRequest.getRequestsPerSecond()); .withRequestsPerSecond(reindexRequest.getRequestsPerSecond());
if(reindexRequest.getDestination().isRequireAlias()){ if (reindexRequest.getDestination().isRequireAlias()) {
params.putParam("require_alias", Boolean.TRUE.toString()); params.putParam("require_alias", Boolean.TRUE.toString());
} }
if (reindexRequest.getScrollTime() != null) { if (reindexRequest.getScrollTime() != null) {
params.putParam("scroll", reindexRequest.getScrollTime()); params.putParam("scroll", reindexRequest.getScrollTime());
} }
params.putParam("slices", Integer.toString(reindexRequest.getSlices())); params.putParam("slices", Integer.toString(reindexRequest.getSlices()));
if(reindexRequest.getMaxDocs() > -1){
if (reindexRequest.getMaxDocs() > -1) {
params.putParam("max_docs", Integer.toString(reindexRequest.getMaxDocs())); params.putParam("max_docs", Integer.toString(reindexRequest.getMaxDocs()));
} }
request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE)); request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE));

View File

@ -18,8 +18,6 @@ package org.springframework.data.elasticsearch.core;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
@ -27,6 +25,8 @@ import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse; import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.core.routing.RoutingResolver; import org.springframework.data.elasticsearch.core.routing.RoutingResolver;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
@ -327,10 +327,9 @@ public interface DocumentOperations {
ByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); ByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);
/** /**
* Copies documents from a source to a destination. * Copies documents from a source to a destination. The source can be any existing index, alias, or data stream. The
* The source can be any existing index, alias, or data stream. The destination must differ from the source. * destination must differ from the source. For example, you cannot reindex a data stream into itself. (@see
* For example, you cannot reindex a data stream into itself. * https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
* *
* @param reindexRequest reindex request parameters * @param reindexRequest reindex request parameters
* @return the reindex response * @return the reindex response
@ -339,8 +338,7 @@ public interface DocumentOperations {
ReindexResponse reindex(ReindexRequest reindexRequest); ReindexResponse reindex(ReindexRequest reindexRequest);
/** /**
* Submits a reindex task. * Submits a reindex task. (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
* *
* @param reindexRequest reindex request parameters * @param reindexRequest reindex request parameters
* @return the task id * @return the task id

View File

@ -61,9 +61,7 @@ import org.springframework.data.elasticsearch.core.cluster.ClusterOperations;
import org.springframework.data.elasticsearch.core.cluster.ElasticsearchClusterOperations; import org.springframework.data.elasticsearch.core.cluster.ElasticsearchClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.DocumentAdapters; import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
@ -73,6 +71,8 @@ import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilde
import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse; import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -265,7 +265,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
Assert.notNull(query, "query must not be null"); Assert.notNull(query, "query must not be null");
Assert.notNull(index, "index must not be null"); Assert.notNull(index, "index must not be null");
final UpdateByQueryRequest updateByQueryRequest = requestFactory.updateByQueryRequest(query, index); UpdateByQueryRequest updateByQueryRequest = requestFactory.updateByQueryRequest(query, index);
if (query.getRefreshPolicy() == null && getRefreshPolicy() != null) { if (query.getRefreshPolicy() == null && getRefreshPolicy() != null) {
updateByQueryRequest.setRefresh(getRefreshPolicy() == RefreshPolicy.IMMEDIATE); updateByQueryRequest.setRefresh(getRefreshPolicy() == RefreshPolicy.IMMEDIATE);
@ -285,8 +285,8 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); Assert.notNull(postReindexRequest, "postReindexRequest must not be null");
final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
final BulkByScrollResponse bulkByScrollResponse = execute( BulkByScrollResponse bulkByScrollResponse = execute(
client -> client.reindex(reindexRequest, RequestOptions.DEFAULT)); client -> client.reindex(reindexRequest, RequestOptions.DEFAULT));
return ResponseConverter.reindexResponseOf(bulkByScrollResponse); return ResponseConverter.reindexResponseOf(bulkByScrollResponse);
} }
@ -295,9 +295,8 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
public String submitReindex(ReindexRequest postReindexRequest) { public String submitReindex(ReindexRequest postReindexRequest) {
Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); Assert.notNull(postReindexRequest, "postReindexRequest must not be null");
final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
return execute( return execute(client -> client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT).getTask());
client -> client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT).getTask());
} }
public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions, public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,

View File

@ -15,8 +15,6 @@
*/ */
package org.springframework.data.elasticsearch.core; package org.springframework.data.elasticsearch.core;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -30,6 +28,8 @@ import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse; import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
@ -307,10 +307,9 @@ public interface ReactiveDocumentOperations {
Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);
/** /**
* Copies documents from a source to a destination. * Copies documents from a source to a destination. The source can be any existing index, alias, or data stream. The
* The source can be any existing index, alias, or data stream. The destination must differ from the source. * destination must differ from the source. For example, you cannot reindex a data stream into itself. (@see
* For example, you cannot reindex a data stream into itself. * https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
* *
* @param reindexRequest reindex request parameters * @param reindexRequest reindex request parameters
* @return a {@link Mono} emitting the reindex response * @return a {@link Mono} emitting the reindex response
@ -319,8 +318,7 @@ public interface ReactiveDocumentOperations {
Mono<ReindexResponse> reindex(ReindexRequest reindexRequest); Mono<ReindexResponse> reindex(ReindexRequest reindexRequest);
/** /**
* Submits a reindex task. * Submits a reindex task. (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
* *
* @param reindexRequest reindex request parameters * @param reindexRequest reindex request parameters
* @return a {@link Mono} emitting the {@literal task} id. * @return a {@link Mono} emitting the {@literal task} id.

View File

@ -70,8 +70,6 @@ import org.springframework.data.elasticsearch.core.event.ReactiveAfterConvertCal
import org.springframework.data.elasticsearch.core.event.ReactiveAfterLoadCallback; import org.springframework.data.elasticsearch.core.event.ReactiveAfterLoadCallback;
import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback; import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback;
import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback; import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
@ -84,6 +82,8 @@ import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm; import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse; import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.core.routing.DefaultRoutingResolver; import org.springframework.data.elasticsearch.core.routing.DefaultRoutingResolver;
import org.springframework.data.elasticsearch.core.routing.RoutingResolver; import org.springframework.data.elasticsearch.core.routing.RoutingResolver;
import org.springframework.data.elasticsearch.core.suggest.response.Suggest; import org.springframework.data.elasticsearch.core.suggest.response.Suggest;
@ -618,7 +618,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); Assert.notNull(postReindexRequest, "postReindexRequest must not be null");
return Mono.defer(() -> { return Mono.defer(() -> {
final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
return Mono.from(execute(client -> client.reindex(reindexRequest))).map(ResponseConverter::reindexResponseOf); return Mono.from(execute(client -> client.reindex(reindexRequest))).map(ResponseConverter::reindexResponseOf);
}); });
} }
@ -629,7 +629,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); Assert.notNull(postReindexRequest, "postReindexRequest must not be null");
return Mono.defer(() -> { return Mono.defer(() -> {
final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
return Mono.from(execute(client -> client.submitReindex(reindexRequest))); return Mono.from(execute(client -> client.submitReindex(reindexRequest)));
}); });
} }

View File

@ -92,17 +92,17 @@ import org.springframework.data.elasticsearch.core.index.AliasActions;
import org.springframework.data.elasticsearch.core.index.DeleteTemplateRequest; import org.springframework.data.elasticsearch.core.index.DeleteTemplateRequest;
import org.springframework.data.elasticsearch.core.index.ExistsTemplateRequest; import org.springframework.data.elasticsearch.core.index.ExistsTemplateRequest;
import org.springframework.data.elasticsearch.core.index.GetTemplateRequest; import org.springframework.data.elasticsearch.core.index.GetTemplateRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.index.PutTemplateRequest; import org.springframework.data.elasticsearch.core.index.PutTemplateRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Source;
import org.springframework.data.elasticsearch.core.reindex.Remote;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Dest;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Slice;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.elasticsearch.core.query.RescorerQuery.ScoreMode; import org.springframework.data.elasticsearch.core.query.RescorerQuery.ScoreMode;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Dest;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Slice;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Source;
import org.springframework.data.elasticsearch.core.reindex.Remote;
import org.springframework.data.mapping.context.MappingContext; import org.springframework.data.mapping.context.MappingContext;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -396,108 +396,117 @@ class RequestFactory {
/** /**
* @since 4.4 * @since 4.4
*/ */
public org.elasticsearch.index.reindex.ReindexRequest reindexRequest(ReindexRequest reindexRequest){ public org.elasticsearch.index.reindex.ReindexRequest reindexRequest(ReindexRequest reindexRequest) {
final org.elasticsearch.index.reindex.ReindexRequest request = new org.elasticsearch.index.reindex.ReindexRequest(); final org.elasticsearch.index.reindex.ReindexRequest request = new org.elasticsearch.index.reindex.ReindexRequest();
if(reindexRequest.getConflicts() != null){
request.setConflicts(reindexRequest.getConflicts().name().toLowerCase(Locale.ROOT)); if (reindexRequest.getConflicts() != null) {
request.setConflicts(reindexRequest.getConflicts().getEsName());
} }
if(reindexRequest.getMaxDocs() != null){
if (reindexRequest.getMaxDocs() != null) {
request.setMaxDocs(reindexRequest.getMaxDocs()); request.setMaxDocs(reindexRequest.getMaxDocs());
} }
// region source build // region source build
final Source source = reindexRequest.getSource(); final Source source = reindexRequest.getSource();
request.setSourceIndices(source.getIndexes().getIndexNames()); request.setSourceIndices(source.getIndexes().getIndexNames());
// source query will build from RemoteInfo if remote exist // source query will build from RemoteInfo if remote exist
if(source.getQuery() != null && source.getRemote() == null){ if (source.getQuery() != null && source.getRemote() == null) {
request.setSourceQuery(getQuery(source.getQuery())); request.setSourceQuery(getQuery(source.getQuery()));
} }
if(source.getSize() != null){
if (source.getSize() != null) {
request.setSourceBatchSize(source.getSize()); request.setSourceBatchSize(source.getSize());
} }
if(source.getRemote() != null){ if (source.getRemote() != null) {
Remote remote = source.getRemote(); Remote remote = source.getRemote();
QueryBuilder queryBuilder = source.getQuery() == null ? QueryBuilders.matchAllQuery() : getQuery(source.getQuery()); QueryBuilder queryBuilder = source.getQuery() == null ? QueryBuilders.matchAllQuery()
: getQuery(source.getQuery());
BytesReference query; BytesReference query;
try { try {
XContentBuilder builder = XContentBuilder.builder(QUERY_CONTENT_TYPE).prettyPrint(); XContentBuilder builder = XContentBuilder.builder(QUERY_CONTENT_TYPE).prettyPrint();
query = BytesReference.bytes(queryBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS)); query = BytesReference.bytes(queryBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS));
} catch (IOException e) { } catch (IOException e) {
throw new IllegalArgumentException("an IOException occurs while building the source query content",e); throw new IllegalArgumentException("Error parsing the source query content", e);
} }
request.setRemoteInfo(new RemoteInfo( request.setRemoteInfo(new RemoteInfo( //
remote.getScheme(), remote.getScheme(), //
remote.getHost(), remote.getHost(), //
remote.getPort(), remote.getPort(), //
remote.getPathPrefix(), remote.getPathPrefix(), //
query, query, //
remote.getUsername(), remote.getUsername(), //
remote.getPassword(), remote.getPassword(), //
Collections.emptyMap(), Collections.emptyMap(), //
remote.getSocketTimeout() == null ? DEFAULT_SOCKET_TIMEOUT : timeValueSeconds(remote.getSocketTimeout().getSeconds()), remote.getSocketTimeout() == null ? DEFAULT_SOCKET_TIMEOUT
remote.getConnectTimeout() == null ? DEFAULT_CONNECT_TIMEOUT : timeValueSeconds(remote.getConnectTimeout().getSeconds()) : timeValueSeconds(remote.getSocketTimeout().getSeconds()), //
)); remote.getConnectTimeout() == null ? DEFAULT_CONNECT_TIMEOUT
: timeValueSeconds(remote.getConnectTimeout().getSeconds()))); //
} }
final Slice slice = source.getSlice(); final Slice slice = source.getSlice();
if(slice != null){ if (slice != null) {
request.getSearchRequest().source().slice(new SliceBuilder(slice.getId(), slice.getMax())); request.getSearchRequest().source().slice(new SliceBuilder(slice.getId(), slice.getMax()));
} }
final SourceFilter sourceFilter = source.getSourceFilter(); final SourceFilter sourceFilter = source.getSourceFilter();
if(sourceFilter != null){ if (sourceFilter != null) {
request.getSearchRequest().source().fetchSource(sourceFilter.getIncludes(), sourceFilter.getExcludes()); request.getSearchRequest().source().fetchSource(sourceFilter.getIncludes(), sourceFilter.getExcludes());
} }
// endregion // endregion
// region dest build // region dest build
final Dest dest = reindexRequest.getDest(); final Dest dest = reindexRequest.getDest();
request.setDestIndex(dest.getIndex().getIndexName()) request.setDestIndex(dest.getIndex().getIndexName()).setDestRouting(dest.getRouting())
.setDestRouting(dest.getRouting())
.setDestPipeline(dest.getPipeline()); .setDestPipeline(dest.getPipeline());
final org.springframework.data.elasticsearch.annotations.Document.VersionType versionType = dest.getVersionType(); final org.springframework.data.elasticsearch.annotations.Document.VersionType versionType = dest.getVersionType();
if(versionType != null){ if (versionType != null) {
request.setDestVersionType(VersionType.fromString(versionType.name().toLowerCase(Locale.ROOT))); request.setDestVersionType(VersionType.fromString(versionType.name().toLowerCase(Locale.ROOT)));
} }
final IndexQuery.OpType opType = dest.getOpType(); final IndexQuery.OpType opType = dest.getOpType();
if(opType != null){ if (opType != null) {
request.setDestOpType(opType.name().toLowerCase(Locale.ROOT)); request.setDestOpType(opType.name().toLowerCase(Locale.ROOT));
} }
// endregion // endregion
// region script build // region script build
final ReindexRequest.Script script = reindexRequest.getScript(); final ReindexRequest.Script script = reindexRequest.getScript();
if(script != null){ if (script != null) {
request.setScript(new Script(DEFAULT_SCRIPT_TYPE, request.setScript(new Script(DEFAULT_SCRIPT_TYPE, script.getLang(), script.getSource(), Collections.emptyMap()));
script.getLang(),
script.getSource(),
Collections.emptyMap()
));
} }
// endregion // endregion
// region query parameters build // region query parameters build
final Duration timeout = reindexRequest.getTimeout(); final Duration timeout = reindexRequest.getTimeout();
if(timeout != null){ if (timeout != null) {
request.setTimeout(timeValueSeconds(timeout.getSeconds())); request.setTimeout(timeValueSeconds(timeout.getSeconds()));
} }
if(reindexRequest.getRefresh() != null){
if (reindexRequest.getRefresh() != null) {
request.setRefresh(reindexRequest.getRefresh()); request.setRefresh(reindexRequest.getRefresh());
} }
if(reindexRequest.getRequireAlias() != null){
if (reindexRequest.getRequireAlias() != null) {
request.setRequireAlias(reindexRequest.getRequireAlias()); request.setRequireAlias(reindexRequest.getRequireAlias());
} }
if(reindexRequest.getRequestsPerSecond() != null){
if (reindexRequest.getRequestsPerSecond() != null) {
request.setRequestsPerSecond(reindexRequest.getRequestsPerSecond()); request.setRequestsPerSecond(reindexRequest.getRequestsPerSecond());
} }
final Duration scroll = reindexRequest.getScroll(); final Duration scroll = reindexRequest.getScroll();
if(scroll != null){ if (scroll != null) {
request.setScroll(timeValueSeconds(scroll.getSeconds())); request.setScroll(timeValueSeconds(scroll.getSeconds()));
} }
if(reindexRequest.getWaitForActiveShards() != null){
if (reindexRequest.getWaitForActiveShards() != null) {
request.setWaitForActiveShards(ActiveShardCount.parseString(reindexRequest.getWaitForActiveShards())); request.setWaitForActiveShards(ActiveShardCount.parseString(reindexRequest.getWaitForActiveShards()));
} }
if(reindexRequest.getSlices() != null){
if (reindexRequest.getSlices() != null) {
request.setSlices(reindexRequest.getSlices()); request.setSlices(reindexRequest.getSlices());
} }
// endregion // endregion

View File

@ -45,8 +45,8 @@ import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.index.AliasData; import org.springframework.data.elasticsearch.core.index.AliasData;
import org.springframework.data.elasticsearch.core.index.Settings; import org.springframework.data.elasticsearch.core.index.Settings;
import org.springframework.data.elasticsearch.core.index.TemplateData; import org.springframework.data.elasticsearch.core.index.TemplateData;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -392,7 +392,7 @@ public class ResponseConverter {
/** /**
* @since 4.4 * @since 4.4
*/ */
public static ReindexResponse reindexResponseOf(BulkByScrollResponse bulkByScrollResponse){ public static ReindexResponse reindexResponseOf(BulkByScrollResponse bulkByScrollResponse) {
final List<ReindexResponse.Failure> failures = bulkByScrollResponse.getBulkFailures() // final List<ReindexResponse.Failure> failures = bulkByScrollResponse.getBulkFailures() //
.stream() // .stream() //
.map(ResponseConverter::reindexResponseFailureOf) // .map(ResponseConverter::reindexResponseFailureOf) //

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2019-2022 the original author or authors. * Copyright 2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -15,6 +15,8 @@
*/ */
package org.springframework.data.elasticsearch.core.reindex; package org.springframework.data.elasticsearch.core.reindex;
import java.time.Duration;
import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.IndexQuery;
@ -23,11 +25,9 @@ import org.springframework.data.elasticsearch.core.query.SourceFilter;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.time.Duration;
/** /**
* Request to reindex some documents from one index to another. * Request to reindex some documents from one index to another. (@see
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) * https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
* *
* @author Sijia Liu * @author Sijia Liu
* @since 4.4 * @since 4.4
@ -50,7 +50,10 @@ public class ReindexRequest {
@Nullable private final Duration scroll; @Nullable private final Duration scroll;
@Nullable private final Integer slices; @Nullable private final Integer slices;
private ReindexRequest(Source source, Dest dest, @Nullable Integer maxDocs, @Nullable Conflicts conflicts, @Nullable Script script, @Nullable Duration timeout, @Nullable Boolean requireAlias, @Nullable Boolean refresh, @Nullable String waitForActiveShards, @Nullable Integer requestsPerSecond, @Nullable Duration scroll, @Nullable Integer slices) { private ReindexRequest(Source source, Dest dest, @Nullable Integer maxDocs, @Nullable Conflicts conflicts,
@Nullable Script script, @Nullable Duration timeout, @Nullable Boolean requireAlias, @Nullable Boolean refresh,
@Nullable String waitForActiveShards, @Nullable Integer requestsPerSecond, @Nullable Duration scroll,
@Nullable Integer slices) {
Assert.notNull(source, "source must not be null"); Assert.notNull(source, "source must not be null");
Assert.notNull(dest, "dest must not be null"); Assert.notNull(dest, "dest must not be null");
@ -132,7 +135,18 @@ public class ReindexRequest {
} }
public enum Conflicts { public enum Conflicts {
PROCEED, ABORT PROCEED("proceed"), ABORT("abort");
// value used in Elasticsearch
private final String esName;
Conflicts(String esName) {
this.esName = esName;
}
public String getEsName() {
return esName;
}
} }
public static class Source { public static class Source {
@ -143,7 +157,7 @@ public class ReindexRequest {
@Nullable private Integer size; @Nullable private Integer size;
@Nullable private SourceFilter sourceFilter; @Nullable private SourceFilter sourceFilter;
private Source(IndexCoordinates indexes){ private Source(IndexCoordinates indexes) {
Assert.notNull(indexes, "indexes must not be null"); Assert.notNull(indexes, "indexes must not be null");
this.indexes = indexes; this.indexes = indexes;
@ -281,7 +295,7 @@ public class ReindexRequest {
this.dest = new Dest(destIndex); this.dest = new Dest(destIndex);
} }
// region setter // region setter
public ReindexRequestBuilder withMaxDocs(@Nullable Integer maxDocs) { public ReindexRequestBuilder withMaxDocs(@Nullable Integer maxDocs) {
this.maxDocs = maxDocs; this.maxDocs = maxDocs;
@ -298,7 +312,7 @@ public class ReindexRequest {
return this; return this;
} }
public ReindexRequestBuilder withSourceSlice(int id, int max){ public ReindexRequestBuilder withSourceSlice(int id, int max) {
this.source.slice = new Slice(id, max); this.source.slice = new Slice(id, max);
return this; return this;
} }
@ -313,17 +327,17 @@ public class ReindexRequest {
return this; return this;
} }
public ReindexRequestBuilder withSourceSourceFilter(SourceFilter sourceFilter){ public ReindexRequestBuilder withSourceSourceFilter(SourceFilter sourceFilter) {
this.source.sourceFilter = sourceFilter; this.source.sourceFilter = sourceFilter;
return this; return this;
} }
public ReindexRequestBuilder withDestPipeline(String pipelineName){ public ReindexRequestBuilder withDestPipeline(String pipelineName) {
this.dest.pipeline = pipelineName; this.dest.pipeline = pipelineName;
return this; return this;
} }
public ReindexRequestBuilder withDestRouting(String routing){ public ReindexRequestBuilder withDestRouting(String routing) {
this.dest.routing = routing; this.dest.routing = routing;
return this; return this;
} }
@ -343,44 +357,45 @@ public class ReindexRequest {
return this; return this;
} }
public ReindexRequestBuilder withTimeout(Duration timeout){ public ReindexRequestBuilder withTimeout(Duration timeout) {
this.timeout = timeout; this.timeout = timeout;
return this; return this;
} }
public ReindexRequestBuilder withRequireAlias(boolean requireAlias){ public ReindexRequestBuilder withRequireAlias(boolean requireAlias) {
this.requireAlias = requireAlias; this.requireAlias = requireAlias;
return this; return this;
} }
public ReindexRequestBuilder withRefresh(boolean refresh){ public ReindexRequestBuilder withRefresh(boolean refresh) {
this.refresh = refresh; this.refresh = refresh;
return this; return this;
} }
public ReindexRequestBuilder withWaitForActiveShards(String waitForActiveShards){ public ReindexRequestBuilder withWaitForActiveShards(String waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards; this.waitForActiveShards = waitForActiveShards;
return this; return this;
} }
public ReindexRequestBuilder withRequestsPerSecond(int requestsPerSecond){ public ReindexRequestBuilder withRequestsPerSecond(int requestsPerSecond) {
this.requestsPerSecond = requestsPerSecond; this.requestsPerSecond = requestsPerSecond;
return this; return this;
} }
public ReindexRequestBuilder withScroll(Duration scroll){ public ReindexRequestBuilder withScroll(Duration scroll) {
this.scroll = scroll; this.scroll = scroll;
return this; return this;
} }
public ReindexRequestBuilder withSlices(int slices){ public ReindexRequestBuilder withSlices(int slices) {
this.slices = slices; this.slices = slices;
return this; return this;
} }
// endregion // endregion
public ReindexRequest build() { public ReindexRequest build() {
return new ReindexRequest(source, dest, maxDocs, conflicts, script, timeout, requireAlias, refresh, waitForActiveShards, requestsPerSecond, scroll, slices); return new ReindexRequest(source, dest, maxDocs, conflicts, script, timeout, requireAlias, refresh,
waitForActiveShards, requestsPerSecond, scroll, slices);
} }
} }
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2019-2022 the original author or authors. * Copyright 2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -15,14 +15,14 @@
*/ */
package org.springframework.data.elasticsearch.core.reindex; package org.springframework.data.elasticsearch.core.reindex;
import org.springframework.lang.Nullable;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.springframework.lang.Nullable;
/** /**
* Response of reindex request. * Response of reindex request. (@see
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-api-response-body) * https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-api-response-body)
* *
* @author Sijia Liu * @author Sijia Liu
* @since 4.4 * @since 4.4
@ -45,8 +45,8 @@ public class ReindexResponse {
private final List<Failure> failures; private final List<Failure> failures;
private ReindexResponse(long took, boolean timedOut, long total, long updated, long deleted, int batches, private ReindexResponse(long took, boolean timedOut, long total, long updated, long deleted, int batches,
long versionConflicts, long noops, long bulkRetries, long searchRetries, long versionConflicts, long noops, long bulkRetries, long searchRetries, long throttledMillis,
long throttledMillis, double requestsPerSecond, long throttledUntilMillis, List<Failure> failures) { double requestsPerSecond, long throttledUntilMillis, List<Failure> failures) {
this.took = took; this.took = took;
this.timedOut = timedOut; this.timedOut = timedOut;
this.total = total; this.total = total;
@ -142,16 +142,16 @@ public class ReindexResponse {
} }
/** /**
* The number of requests per second effectively executed during the reindex. * The number of requests per second effectively executed during the reindex.
*/ */
public double getRequestsPerSecond() { public double getRequestsPerSecond() {
return requestsPerSecond; return requestsPerSecond;
} }
/** /**
* This field should always be equal to zero in a _reindex response. * This field should always be equal to zero in a _reindex response. It only has meaning when using the Task API,
* It only has meaning when using the Task API, where it indicates the next time (in milliseconds since epoch) * where it indicates the next time (in milliseconds since epoch) a throttled request will be executed again in order
* a throttled request will be executed again in order to conform to requests_per_second. * to conform to requests_per_second.
*/ */
public long getThrottledUntilMillis() { public long getThrottledUntilMillis() {
return throttledUntilMillis; return throttledUntilMillis;
@ -186,7 +186,7 @@ public class ReindexResponse {
@Nullable private final Boolean aborted; @Nullable private final Boolean aborted;
private Failure(@Nullable String index, @Nullable String type, @Nullable String id, @Nullable Exception cause, private Failure(@Nullable String index, @Nullable String type, @Nullable String id, @Nullable Exception cause,
@Nullable Integer status, @Nullable Long seqNo, @Nullable Long term, @Nullable Boolean aborted) { @Nullable Integer status, @Nullable Long seqNo, @Nullable Long term, @Nullable Boolean aborted) {
this.index = index; this.index = index;
this.type = type; this.type = type;
this.id = id; this.id = id;
@ -375,17 +375,17 @@ public class ReindexResponse {
return this; return this;
} }
public ReindexResponseBuilder withThrottledMillis(long throttledMillis){ public ReindexResponseBuilder withThrottledMillis(long throttledMillis) {
this.throttledMillis = throttledMillis; this.throttledMillis = throttledMillis;
return this; return this;
} }
public ReindexResponseBuilder withRequestsPerSecond(double requestsPerSecond){ public ReindexResponseBuilder withRequestsPerSecond(double requestsPerSecond) {
this.requestsPerSecond = requestsPerSecond; this.requestsPerSecond = requestsPerSecond;
return this; return this;
} }
public ReindexResponseBuilder withThrottledUntilMillis(long throttledUntilMillis){ public ReindexResponseBuilder withThrottledUntilMillis(long throttledUntilMillis) {
this.throttledUntilMillis = throttledUntilMillis; this.throttledUntilMillis = throttledUntilMillis;
return this; return this;
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2019-2022 the original author or authors. * Copyright 2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -15,14 +15,13 @@
*/ */
package org.springframework.data.elasticsearch.core.reindex; package org.springframework.data.elasticsearch.core.reindex;
import java.time.Duration;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.time.Duration;
/** /**
* Remote info * Remote info (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#source)
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#source)
* *
* @author Sijia Liu * @author Sijia Liu
* @since 4.4 * @since 4.4
@ -38,7 +37,8 @@ public class Remote {
@Nullable private final Duration socketTimeout; @Nullable private final Duration socketTimeout;
@Nullable private final Duration connectTimeout; @Nullable private final Duration connectTimeout;
private Remote(String scheme, String host, int port, @Nullable String pathPrefix, @Nullable String username, @Nullable String password, @Nullable Duration socketTimeout, @Nullable Duration connectTimeout) { private Remote(String scheme, String host, int port, @Nullable String pathPrefix, @Nullable String username,
@Nullable String password, @Nullable Duration socketTimeout, @Nullable Duration connectTimeout) {
Assert.notNull(scheme, "scheme must not be null"); Assert.notNull(scheme, "scheme must not be null");
Assert.notNull(host, "host must not be null"); Assert.notNull(host, "host must not be null");
@ -90,11 +90,11 @@ public class Remote {
return pathPrefix; return pathPrefix;
} }
public static RemoteBuilder builder(String scheme, String host, int port){ public static RemoteBuilder builder(String scheme, String host, int port) {
return new RemoteBuilder(scheme, host, port); return new RemoteBuilder(scheme, host, port);
} }
public static class RemoteBuilder{ public static class RemoteBuilder {
private final String scheme; private final String scheme;
private final String host; private final String host;
private final int port; private final int port;
@ -110,33 +110,33 @@ public class Remote {
this.port = port; this.port = port;
} }
public RemoteBuilder withPathPrefix(String pathPrefix){ public RemoteBuilder withPathPrefix(String pathPrefix) {
this.pathPrefix = pathPrefix; this.pathPrefix = pathPrefix;
return this; return this;
} }
public RemoteBuilder withUsername(String username){ public RemoteBuilder withUsername(String username) {
this.username = username; this.username = username;
return this; return this;
} }
public RemoteBuilder withPassword(String password){ public RemoteBuilder withPassword(String password) {
this.password = password; this.password = password;
return this; return this;
} }
public RemoteBuilder withSocketTimeout(Duration socketTimeout){ public RemoteBuilder withSocketTimeout(Duration socketTimeout) {
this.socketTimeout = socketTimeout; this.socketTimeout = socketTimeout;
return this; return this;
} }
public RemoteBuilder withConnectTimeout(Duration connectTimeout){ public RemoteBuilder withConnectTimeout(Duration connectTimeout) {
this.connectTimeout = connectTimeout; this.connectTimeout = connectTimeout;
return this; return this;
} }
public Remote build(){ public Remote build() {
return new Remote(scheme, host, port , pathPrefix, username, password, socketTimeout, connectTimeout); return new Remote(scheme, host, port, pathPrefix, username, password, socketTimeout, connectTimeout);
} }
} }
} }

View File

@ -83,11 +83,6 @@ import org.springframework.data.elasticsearch.annotations.JoinTypeRelations;
import org.springframework.data.elasticsearch.annotations.MultiField; import org.springframework.data.elasticsearch.annotations.MultiField;
import org.springframework.data.elasticsearch.annotations.ScriptedField; import org.springframework.data.elasticsearch.annotations.ScriptedField;
import org.springframework.data.elasticsearch.annotations.Setting; import org.springframework.data.elasticsearch.annotations.Setting;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.ScriptField;
import org.springframework.data.elasticsearch.core.document.Explanation; import org.springframework.data.elasticsearch.core.document.Explanation;
import org.springframework.data.elasticsearch.core.geo.GeoPoint; import org.springframework.data.elasticsearch.core.geo.GeoPoint;
import org.springframework.data.elasticsearch.core.index.AliasAction; import org.springframework.data.elasticsearch.core.index.AliasAction;
@ -97,6 +92,8 @@ import org.springframework.data.elasticsearch.core.join.JoinField;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.elasticsearch.core.query.RescorerQuery.ScoreMode; import org.springframework.data.elasticsearch.core.query.RescorerQuery.ScoreMode;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.data.elasticsearch.utils.IndexNameProvider; import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.data.util.StreamUtils; import org.springframework.data.util.StreamUtils;
@ -3660,16 +3657,17 @@ public abstract class ElasticsearchTemplateTests {
String destIndexName = indexNameProvider.indexName(); String destIndexName = indexNameProvider.indexName();
operations.indexOps(IndexCoordinates.of(destIndexName)).create(); operations.indexOps(IndexCoordinates.of(destIndexName)).create();
final ReindexRequest reindexRequest = ReindexRequest.builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) final ReindexRequest reindexRequest = ReindexRequest
.withRefresh(true).build(); .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)).withRefresh(true).build();
final ReindexResponse reindex = operations.reindex(reindexRequest); final ReindexResponse reindex = operations.reindex(reindexRequest);
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build(); NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build();
assertThat(reindex.getTotal()).isEqualTo(1); assertThat(reindex.getTotal()).isEqualTo(1);
assertThat(operations.count(searchQuery, IndexCoordinates.of(destIndexName))).isEqualTo(1); assertThat(operations.count(searchQuery, IndexCoordinates.of(destIndexName))).isEqualTo(1);
} }
@Test // #1529 @Test // #1529
void shouldWorkSubmitReindexTask(){ void shouldWorkSubmitReindexTask() {
String sourceIndexName = indexNameProvider.indexName(); String sourceIndexName = indexNameProvider.indexName();
indexNameProvider.increment(); indexNameProvider.increment();
String destIndexName = indexNameProvider.indexName(); String destIndexName = indexNameProvider.indexName();
@ -3677,7 +3675,7 @@ public abstract class ElasticsearchTemplateTests {
final ReindexRequest reindexRequest = ReindexRequest final ReindexRequest reindexRequest = ReindexRequest
.builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)).build(); .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)).build();
String task = operations.submitReindex(reindexRequest); String task = operations.submitReindex(reindexRequest);
// Maybe there should be a task api to detect whether the task exists
assertThat(task).isNotBlank(); assertThat(task).isNotBlank();
} }

View File

@ -19,9 +19,7 @@ import static java.util.Collections.*;
import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.*;
import static org.elasticsearch.index.query.QueryBuilders.*; import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.springframework.data.elasticsearch.annotations.FieldType.*; import static org.springframework.data.elasticsearch.annotations.FieldType.*;
import static org.springframework.data.elasticsearch.utils.IdGenerator.*;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
@ -80,6 +78,7 @@ import org.springframework.data.elasticsearch.core.index.AliasActions;
import org.springframework.data.elasticsearch.core.index.AliasData; import org.springframework.data.elasticsearch.core.index.AliasData;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration; import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.data.elasticsearch.utils.IndexNameProvider; import org.springframework.data.elasticsearch.utils.IndexNameProvider;
@ -1201,32 +1200,24 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
String destIndexName = indexNameProvider.indexName(); String destIndexName = indexNameProvider.indexName();
operations.indexOps(IndexCoordinates.of(destIndexName)).create(); operations.indexOps(IndexCoordinates.of(destIndexName)).create();
final ReindexRequest reindexRequest = ReindexRequest final ReindexRequest reindexRequest = ReindexRequest
.builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)).withRefresh(true).build();
.withRefresh(true) operations.reindex(reindexRequest).as(StepVerifier::create)
.build();
operations.reindex(reindexRequest)
.as(StepVerifier::create)
.consumeNextWith(postReindexResponse -> assertThat(postReindexResponse.getTotal()).isEqualTo(1L)) .consumeNextWith(postReindexResponse -> assertThat(postReindexResponse.getTotal()).isEqualTo(1L))
.verifyComplete(); .verifyComplete();
operations.count(operations.matchAllQuery(), SampleEntity.class, IndexCoordinates.of(destIndexName)) operations.count(operations.matchAllQuery(), SampleEntity.class, IndexCoordinates.of(destIndexName))
.as(StepVerifier::create) .as(StepVerifier::create).expectNext(1L).verifyComplete();
.expectNext(1L)
.verifyComplete();
} }
@Test // #1529 @Test // #1529
void shouldWorkSubmitReindexTask(){ void shouldWorkSubmitReindexTask() {
String sourceIndexName = indexNameProvider.indexName(); String sourceIndexName = indexNameProvider.indexName();
indexNameProvider.increment(); indexNameProvider.increment();
String destIndexName = indexNameProvider.indexName(); String destIndexName = indexNameProvider.indexName();
operations.indexOps(IndexCoordinates.of(destIndexName)).create(); operations.indexOps(IndexCoordinates.of(destIndexName)).create();
final ReindexRequest reindexRequest = ReindexRequest final ReindexRequest reindexRequest = ReindexRequest
.builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)).build();
.build(); operations.submitReindex(reindexRequest).as(StepVerifier::create)
operations.submitReindex(reindexRequest) .consumeNextWith(task -> assertThat(task).isNotBlank()).verifyComplete();
.as(StepVerifier::create)
.consumeNextWith(task -> assertThat(task).isNotBlank())
.verifyComplete();
} }
// endregion // endregion

View File

@ -560,56 +560,46 @@ class RequestFactoryTests {
@Test // #1529 @Test // #1529
void shouldCreateReindexRequest() throws IOException, JSONException { void shouldCreateReindexRequest() throws IOException, JSONException {
final String expected = "{\n" + final String expected = "{\n" + //
" \"source\":{\n" + " \"source\":{\n" + //
" \"remote\":{\n" + " \"remote\":{\n" + //
" \"username\":\"admin\",\n" + " \"username\":\"admin\",\n" + //
" \"password\":\"password\",\n" + " \"password\":\"password\",\n" + //
" \"host\":\"http://localhost:9200/elasticsearch\",\n" + " \"host\":\"http://localhost:9200/elasticsearch\",\n" + //
" \"socket_timeout\":\"30s\",\n" + " \"socket_timeout\":\"30s\",\n" + //
" \"connect_timeout\":\"30s\"\n" + " \"connect_timeout\":\"30s\"\n" + //
" },\n" + " },\n" + //
" \"index\":[\"source_1\",\"source_2\"],\n" + " \"index\":[\"source_1\",\"source_2\"],\n" + //
" \"size\":5,\n" + " \"size\":5,\n" + //
" \"query\":{\"match_all\":{}},\n" + " \"query\":{\"match_all\":{}},\n" + //
" \"_source\":{\"includes\":[\"name\"],\"excludes\":[]},\n" + " \"_source\":{\"includes\":[\"name\"],\"excludes\":[]},\n" + //
" \"slice\":{\"id\":1,\"max\":20}\n" + " \"slice\":{\"id\":1,\"max\":20}\n" + //
" },\n" + " },\n" + //
" \"dest\":{\n" + " \"dest\":{\n" + //
" \"index\":\"destination\",\n" + " \"index\":\"destination\",\n" + //
" \"routing\":\"routing\",\n" + " \"routing\":\"routing\",\n" + //
" \"op_type\":\"create\",\n" + " \"op_type\":\"create\",\n" + //
" \"pipeline\":\"pipeline\",\n" + " \"pipeline\":\"pipeline\",\n" + //
" \"version_type\":\"external\"\n" + " \"version_type\":\"external\"\n" + //
" },\n" + " },\n" + //
" \"max_docs\":10,\n" + " \"max_docs\":10,\n" + //
" \"script\":{\"source\":\"Math.max(1,2)\",\"lang\":\"java\"},\n" + " \"script\":{\"source\":\"Math.max(1,2)\",\"lang\":\"java\"},\n" + //
" \"conflicts\":\"proceed\"\n" + " \"conflicts\":\"proceed\"\n" + //
"}"; "}";
Remote remote = Remote.builder("http", "localhost",9200) Remote remote = Remote.builder("http", "localhost", 9200).withPathPrefix("elasticsearch").withUsername("admin")
.withPathPrefix("elasticsearch") .withPassword("password").withConnectTimeout(Duration.ofSeconds(30)).withSocketTimeout(Duration.ofSeconds(30))
.withUsername("admin")
.withPassword("password")
.withConnectTimeout(Duration.ofSeconds(30))
.withSocketTimeout(Duration.ofSeconds(30)).build();
ReindexRequest reindexRequest = ReindexRequest.builder(IndexCoordinates.of("source_1", "source_2"),
IndexCoordinates.of("destination"))
.withConflicts(ReindexRequest.Conflicts.PROCEED)
.withMaxDocs(10)
.withSourceQuery(new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build())
.withSourceSize(5)
.withSourceSourceFilter(new FetchSourceFilterBuilder().withIncludes("name").build())
.withSourceRemote(remote)
.withSourceSlice(1,20)
.withDestOpType(IndexQuery.OpType.CREATE)
.withDestVersionType(Document.VersionType.EXTERNAL)
.withDestPipeline("pipeline")
.withDestRouting("routing")
.withScript("Math.max(1,2)", "java")
.build(); .build();
ReindexRequest reindexRequest = ReindexRequest
.builder(IndexCoordinates.of("source_1", "source_2"), IndexCoordinates.of("destination"))
.withConflicts(ReindexRequest.Conflicts.PROCEED).withMaxDocs(10)
.withSourceQuery(new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build()).withSourceSize(5)
.withSourceSourceFilter(new FetchSourceFilterBuilder().withIncludes("name").build()).withSourceRemote(remote)
.withSourceSlice(1, 20).withDestOpType(IndexQuery.OpType.CREATE)
.withDestVersionType(Document.VersionType.EXTERNAL).withDestPipeline("pipeline").withDestRouting("routing")
.withScript("Math.max(1,2)", "java").build();
final String json = requestToString(requestFactory.reindexRequest(reindexRequest)); final String json = requestToString(requestFactory.reindexRequest(reindexRequest));
assertEquals(expected, json, false); assertEquals(expected, json, false);
@ -617,22 +607,21 @@ class RequestFactoryTests {
@Test @Test
void shouldAllowSourceQueryForReindexWithoutRemote() throws IOException, JSONException { void shouldAllowSourceQueryForReindexWithoutRemote() throws IOException, JSONException {
final String expected = "{\n" + final String expected = "{\n" + //
" \"source\":{\n" + " \"source\":{\n" + //
" \"index\":[\"source\"],\n" + " \"index\":[\"source\"],\n" + //
" \"query\":{\"match_all\":{}}\n" + " \"query\":{\"match_all\":{}}\n" + //
" },\n" + " },\n" + //
" \"dest\":{\n" + " \"dest\":{\n" + //
" \"index\":\"destination\",\n" + " \"index\":\"destination\",\n" + //
" \"op_type\":\"index\",\n" + " \"op_type\":\"index\",\n" + //
" \"version_type\":\"internal\"\n" + " \"version_type\":\"internal\"\n" + //
" }\n" + " }\n" + //
"}"; "}";
ReindexRequest reindexRequest = ReindexRequest.builder(IndexCoordinates.of("source"), ReindexRequest reindexRequest = ReindexRequest
IndexCoordinates.of("destination")) .builder(IndexCoordinates.of("source"), IndexCoordinates.of("destination"))
.withSourceQuery(new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build()) .withSourceQuery(new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build()).build();
.build();
final String json = requestToString(requestFactory.reindexRequest(reindexRequest)); final String json = requestToString(requestFactory.reindexRequest(reindexRequest));