mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-25 17:38:44 +00:00
Move delete by query helpers into core (#22810)
This moves the building blocks for delete by query into core. This should enabled two thigns: 1. Plugins other than reindex to implement "bulk by scroll" style operations. 2. Plugins to directly call delete by query. Those plugins should be careful to make sure that task cancellation still works, but this should be possible. Notes: 1. I've mostly just moved classes and moved around tests methods. 2. I haven't been super careful about cohesion between these core classes and reindex. They are quite interconnected because I wanted to make the change as mechanical as possible. Closes #22616
This commit is contained in:
parent
eb4562d7a5
commit
2e48fb8294
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
@ -31,6 +31,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.bulk.Retry;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.client.ParentTaskAssigningClient;
|
||||
@ -47,7 +48,6 @@ import org.elasticsearch.index.mapper.RoutingFieldMapper;
|
||||
import org.elasticsearch.index.mapper.SourceFieldMapper;
|
||||
import org.elasticsearch.index.mapper.TypeFieldMapper;
|
||||
import org.elasticsearch.index.mapper.VersionFieldMapper;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.script.CompiledScript;
|
||||
import org.elasticsearch.script.ExecutableScript;
|
||||
import org.elasticsearch.script.Script;
|
||||
@ -77,8 +77,8 @@ import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.unmodifiableList;
|
||||
import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff;
|
||||
import static org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
|
||||
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
|
||||
import static org.elasticsearch.rest.RestStatus.CONFLICT;
|
||||
import static org.elasticsearch.search.sort.SortBuilders.fieldSort;
|
||||
|
||||
@ -103,7 +103,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
||||
private final Set<String> destinationIndices = Collections.newSetFromMap(new ConcurrentHashMap<>());
|
||||
|
||||
private final ParentTaskAssigningClient client;
|
||||
private final ActionListener<BulkIndexByScrollResponse> listener;
|
||||
private final ActionListener<BulkByScrollResponse> listener;
|
||||
private final Retry bulkRetry;
|
||||
private final ScrollableHitSource scrollSource;
|
||||
|
||||
@ -116,7 +116,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
||||
|
||||
public AbstractAsyncBulkByScrollAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
|
||||
ThreadPool threadPool, Request mainRequest, ScriptService scriptService, ClusterState clusterState,
|
||||
ActionListener<BulkIndexByScrollResponse> listener) {
|
||||
ActionListener<BulkByScrollResponse> listener) {
|
||||
this.task = task;
|
||||
this.logger = logger;
|
||||
this.client = client;
|
||||
@ -143,8 +143,10 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
||||
|
||||
/**
|
||||
* Build the {@link BiFunction} to apply to all {@link RequestWrapper}.
|
||||
*
|
||||
* Public for testings....
|
||||
*/
|
||||
protected BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
|
||||
public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
|
||||
// The default script applier executes a no-op
|
||||
return (request, searchHit) -> request;
|
||||
}
|
||||
@ -215,9 +217,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
||||
/**
|
||||
* Build the response for reindex actions.
|
||||
*/
|
||||
protected BulkIndexByScrollResponse buildResponse(TimeValue took, List<BulkItemResponse.Failure> indexingFailures,
|
||||
protected BulkByScrollResponse buildResponse(TimeValue took, List<BulkItemResponse.Failure> indexingFailures,
|
||||
List<SearchFailure> searchFailures, boolean timedOut) {
|
||||
return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut);
|
||||
return new BulkByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -452,7 +454,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
||||
*
|
||||
* @param failure if non null then the request failed catastrophically with this exception
|
||||
*/
|
||||
void finishHim(Exception failure) {
|
||||
protected void finishHim(Exception failure) {
|
||||
finishHim(failure, emptyList(), emptyList(), false);
|
||||
}
|
||||
|
||||
@ -463,7 +465,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
||||
* @param searchFailures any search failures accumulated during the request
|
||||
* @param timedOut have any of the sub-requests timed out?
|
||||
*/
|
||||
void finishHim(Exception failure, List<Failure> indexingFailures, List<SearchFailure> searchFailures, boolean timedOut) {
|
||||
protected void finishHim(Exception failure, List<Failure> indexingFailures, List<SearchFailure> searchFailures, boolean timedOut) {
|
||||
scrollSource.close();
|
||||
if (failure == null) {
|
||||
listener.onResponse(
|
||||
@ -498,7 +500,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
||||
/**
|
||||
* Wrapper for the {@link DocWriteRequest} that are used in this action class.
|
||||
*/
|
||||
interface RequestWrapper<Self extends DocWriteRequest<Self>> {
|
||||
public interface RequestWrapper<Self extends DocWriteRequest<Self>> {
|
||||
|
||||
void setIndex(String index);
|
||||
|
||||
@ -628,7 +630,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
||||
/**
|
||||
* Wraps a {@link IndexRequest} in a {@link RequestWrapper}
|
||||
*/
|
||||
static RequestWrapper<IndexRequest> wrap(IndexRequest request) {
|
||||
public static RequestWrapper<IndexRequest> wrap(IndexRequest request) {
|
||||
return new IndexRequestWrapper(request);
|
||||
}
|
||||
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
@ -355,7 +355,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
|
||||
/**
|
||||
* Build a new request for a slice of the parent request.
|
||||
*/
|
||||
abstract Self forSlice(TaskId slicingTask, SearchRequest slice);
|
||||
protected abstract Self forSlice(TaskId slicingTask, SearchRequest slice);
|
||||
|
||||
/**
|
||||
* Setup a clone of this request with the information needed to process a slice of it.
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
@ -31,11 +31,11 @@ import org.elasticsearch.index.query.QueryBuilder;
|
||||
public abstract class AbstractBulkByScrollRequestBuilder<
|
||||
Request extends AbstractBulkByScrollRequest<Request>,
|
||||
Self extends AbstractBulkByScrollRequestBuilder<Request, Self>>
|
||||
extends ActionRequestBuilder<Request, BulkIndexByScrollResponse, Self> {
|
||||
extends ActionRequestBuilder<Request, BulkByScrollResponse, Self> {
|
||||
private final SearchRequestBuilder source;
|
||||
|
||||
protected AbstractBulkByScrollRequestBuilder(ElasticsearchClient client,
|
||||
Action<Request, BulkIndexByScrollResponse, Self> action, SearchRequestBuilder source, Request request) {
|
||||
Action<Request, BulkByScrollResponse, Self> action, SearchRequestBuilder source, Request request) {
|
||||
super(client, action, request);
|
||||
this.source = source;
|
||||
}
|
@ -0,0 +1,77 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.client.ParentTaskAssigningClient;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
/**
|
||||
* Implementation of delete-by-query using scrolling and bulk.
|
||||
*/
|
||||
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest> {
|
||||
public AsyncDeleteByQueryAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
|
||||
ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService, ClusterState clusterState,
|
||||
ActionListener<BulkByScrollResponse> listener) {
|
||||
super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean needsSourceDocumentVersions() {
|
||||
/*
|
||||
* We always need the version of the source document so we can report a version conflict if we try to delete it and it has been
|
||||
* changed.
|
||||
*/
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accept(ScrollableHitSource.Hit doc) {
|
||||
// Delete-by-query does not require the source to delete a document
|
||||
// and the default implementation checks for it
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RequestWrapper<DeleteRequest> buildRequest(ScrollableHitSource.Hit doc) {
|
||||
DeleteRequest delete = new DeleteRequest();
|
||||
delete.index(doc.getIndex());
|
||||
delete.type(doc.getType());
|
||||
delete.id(doc.getId());
|
||||
delete.version(doc.getVersion());
|
||||
return wrap(delete);
|
||||
}
|
||||
|
||||
/**
|
||||
* Overrides the parent's implementation is much more Update/Reindex oriented and so also copies things like timestamp/ttl which we
|
||||
* don't care for a deletion.
|
||||
*/
|
||||
@Override
|
||||
protected RequestWrapper<?> copyMetadata(RequestWrapper<?> request, ScrollableHitSource.Hit doc) {
|
||||
request.setParent(doc.getParent());
|
||||
request.setRouting(doc.getRouting());
|
||||
return request;
|
||||
}
|
||||
|
||||
}
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
@ -32,18 +32,18 @@ import org.elasticsearch.tasks.TaskManager;
|
||||
/**
|
||||
* Helps parallelize reindex requests using sliced scrolls.
|
||||
*/
|
||||
public class ReindexParallelizationHelper {
|
||||
private ReindexParallelizationHelper() {}
|
||||
public class BulkByScrollParallelizationHelper {
|
||||
private BulkByScrollParallelizationHelper() {}
|
||||
|
||||
public static <
|
||||
Request extends AbstractBulkByScrollRequest<Request>
|
||||
> void startSlices(Client client, TaskManager taskManager, Action<Request, BulkIndexByScrollResponse, ?> action,
|
||||
String localNodeId, ParentBulkByScrollTask task, Request request, ActionListener<BulkIndexByScrollResponse> listener) {
|
||||
> void startSlices(Client client, TaskManager taskManager, Action<Request, BulkByScrollResponse, ?> action,
|
||||
String localNodeId, ParentBulkByScrollTask task, Request request, ActionListener<BulkByScrollResponse> listener) {
|
||||
TaskId parentTaskId = new TaskId(localNodeId, task.getId());
|
||||
for (final SearchRequest slice : sliceIntoSubRequests(request.getSearchRequest(), UidFieldMapper.NAME, request.getSlices())) {
|
||||
// TODO move the request to the correct node. maybe here or somehow do it as part of startup for reindex in general....
|
||||
Request requestForSlice = request.forSlice(parentTaskId, slice);
|
||||
ActionListener<BulkIndexByScrollResponse> sliceListener = ActionListener.wrap(
|
||||
ActionListener<BulkByScrollResponse> sliceListener = ActionListener.wrap(
|
||||
r -> task.onSliceResponse(listener, slice.source().slice().getId(), r),
|
||||
e -> task.onSliceFailure(listener, slice.source().slice().getId(), e));
|
||||
client.execute(action, requestForSlice, sliceListener);
|
@ -17,17 +17,17 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
@ -41,17 +41,17 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
|
||||
/**
|
||||
* Response used for actions that index many documents using a scroll request.
|
||||
*/
|
||||
public class BulkIndexByScrollResponse extends ActionResponse implements ToXContent {
|
||||
public class BulkByScrollResponse extends ActionResponse implements ToXContent {
|
||||
private TimeValue took;
|
||||
private BulkByScrollTask.Status status;
|
||||
private List<Failure> bulkFailures;
|
||||
private List<SearchFailure> searchFailures;
|
||||
private boolean timedOut;
|
||||
|
||||
public BulkIndexByScrollResponse() {
|
||||
public BulkByScrollResponse() {
|
||||
}
|
||||
|
||||
public BulkIndexByScrollResponse(TimeValue took, BulkByScrollTask.Status status, List<Failure> bulkFailures,
|
||||
public BulkByScrollResponse(TimeValue took, BulkByScrollTask.Status status, List<Failure> bulkFailures,
|
||||
List<SearchFailure> searchFailures, boolean timedOut) {
|
||||
this.took = took;
|
||||
this.status = requireNonNull(status, "Null status not supported");
|
||||
@ -60,12 +60,12 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
|
||||
this.timedOut = timedOut;
|
||||
}
|
||||
|
||||
public BulkIndexByScrollResponse(Iterable<BulkIndexByScrollResponse> toMerge, @Nullable String reasonCancelled) {
|
||||
public BulkByScrollResponse(Iterable<BulkByScrollResponse> toMerge, @Nullable String reasonCancelled) {
|
||||
long mergedTook = 0;
|
||||
List<BulkByScrollTask.StatusOrException> statuses = new ArrayList<>();
|
||||
bulkFailures = new ArrayList<>();
|
||||
searchFailures = new ArrayList<>();
|
||||
for (BulkIndexByScrollResponse response : toMerge) {
|
||||
for (BulkByScrollResponse response : toMerge) {
|
||||
mergedTook = max(mergedTook, response.getTook().nanos());
|
||||
statuses.add(new BulkByScrollTask.StatusOrException(response.status));
|
||||
bulkFailures.addAll(response.getBulkFailures());
|
||||
@ -80,7 +80,7 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
|
||||
return took;
|
||||
}
|
||||
|
||||
protected BulkByScrollTask.Status getStatus() {
|
||||
public BulkByScrollTask.Status getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
@ -51,15 +51,14 @@ public abstract class BulkByScrollTask extends CancellableTask {
|
||||
|
||||
/**
|
||||
* The number of sub-slices that are still running. {@link WorkingBulkByScrollTask} will always have 0 and
|
||||
* {@link ParentBulkByScrollTask} will return the number of waiting tasks. Used by {@link TransportRethrottleAction} to decide how to
|
||||
* perform the rethrottling.
|
||||
* {@link ParentBulkByScrollTask} will return the number of waiting tasks. Used to decide how to perform rethrottling.
|
||||
*/
|
||||
abstract int runningSliceSubTasks();
|
||||
public abstract int runningSliceSubTasks();
|
||||
|
||||
/**
|
||||
* Apply the {@code newRequestsPerSecond}.
|
||||
*/
|
||||
abstract void rethrottle(float newRequestsPerSecond);
|
||||
public abstract void rethrottle(float newRequestsPerSecond);
|
||||
|
||||
/*
|
||||
* Overridden to force children to return compatible status.
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
@ -79,7 +79,7 @@ public class DeleteByQueryRequest extends AbstractBulkByScrollRequest<DeleteByQu
|
||||
}
|
||||
|
||||
@Override
|
||||
DeleteByQueryRequest forSlice(TaskId slicingTask, SearchRequest slice) {
|
||||
protected DeleteByQueryRequest forSlice(TaskId slicingTask, SearchRequest slice) {
|
||||
return doForSlice(new DeleteByQueryRequest(slice, false), slicingTask);
|
||||
}
|
||||
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
@ -35,12 +35,12 @@ import static java.util.Collections.unmodifiableList;
|
||||
/**
|
||||
* Task for parent bulk by scroll requests that have sub-workers.
|
||||
*/
|
||||
class ParentBulkByScrollTask extends BulkByScrollTask {
|
||||
public class ParentBulkByScrollTask extends BulkByScrollTask {
|
||||
/**
|
||||
* Holds the responses as they come back. This uses {@link Tuple} as an "Either" style holder where only the response or the exception
|
||||
* is set.
|
||||
*/
|
||||
private final AtomicArray<Tuple<BulkIndexByScrollResponse, Exception>> results;
|
||||
private final AtomicArray<Tuple<BulkByScrollResponse, Exception>> results;
|
||||
private final AtomicInteger counter;
|
||||
|
||||
public ParentBulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId, int slices) {
|
||||
@ -50,7 +50,7 @@ class ParentBulkByScrollTask extends BulkByScrollTask {
|
||||
}
|
||||
|
||||
@Override
|
||||
void rethrottle(float newRequestsPerSecond) {
|
||||
public void rethrottle(float newRequestsPerSecond) {
|
||||
// Nothing to do because all rethrottling is done on slice sub tasks.
|
||||
}
|
||||
|
||||
@ -63,7 +63,7 @@ class ParentBulkByScrollTask extends BulkByScrollTask {
|
||||
}
|
||||
|
||||
@Override
|
||||
int runningSliceSubTasks() {
|
||||
public int runningSliceSubTasks() {
|
||||
return counter.get();
|
||||
}
|
||||
|
||||
@ -82,7 +82,7 @@ class ParentBulkByScrollTask extends BulkByScrollTask {
|
||||
}
|
||||
|
||||
private void addResultsToList(List<StatusOrException> sliceStatuses) {
|
||||
for (AtomicArray.Entry<Tuple<BulkIndexByScrollResponse, Exception>> t : results.asList()) {
|
||||
for (AtomicArray.Entry<Tuple<BulkByScrollResponse, Exception>> t : results.asList()) {
|
||||
if (t.value != null) {
|
||||
if (t.value.v1() != null) {
|
||||
sliceStatuses.set(t.index, new StatusOrException(t.value.v1().getStatus()));
|
||||
@ -96,7 +96,7 @@ class ParentBulkByScrollTask extends BulkByScrollTask {
|
||||
/**
|
||||
* Record a response from a slice and respond to the listener if the request is finished.
|
||||
*/
|
||||
void onSliceResponse(ActionListener<BulkIndexByScrollResponse> listener, int sliceId, BulkIndexByScrollResponse response) {
|
||||
public void onSliceResponse(ActionListener<BulkByScrollResponse> listener, int sliceId, BulkByScrollResponse response) {
|
||||
results.setOnce(sliceId, new Tuple<>(response, null));
|
||||
/* If the request isn't finished we could automatically rethrottle the sub-requests here but we would only want to do that if we
|
||||
* were fairly sure they had a while left to go. */
|
||||
@ -106,19 +106,19 @@ class ParentBulkByScrollTask extends BulkByScrollTask {
|
||||
/**
|
||||
* Record a failure from a slice and respond to the listener if the request is finished.
|
||||
*/
|
||||
void onSliceFailure(ActionListener<BulkIndexByScrollResponse> listener, int sliceId, Exception e) {
|
||||
void onSliceFailure(ActionListener<BulkByScrollResponse> listener, int sliceId, Exception e) {
|
||||
results.setOnce(sliceId, new Tuple<>(null, e));
|
||||
recordSliceCompletionAndRespondIfAllDone(listener);
|
||||
// TODO cancel when a slice fails?
|
||||
}
|
||||
|
||||
private void recordSliceCompletionAndRespondIfAllDone(ActionListener<BulkIndexByScrollResponse> listener) {
|
||||
private void recordSliceCompletionAndRespondIfAllDone(ActionListener<BulkByScrollResponse> listener) {
|
||||
if (counter.decrementAndGet() != 0) {
|
||||
return;
|
||||
}
|
||||
List<BulkIndexByScrollResponse> responses = new ArrayList<>(results.length());
|
||||
List<BulkByScrollResponse> responses = new ArrayList<>(results.length());
|
||||
Exception exception = null;
|
||||
for (AtomicArray.Entry<Tuple<BulkIndexByScrollResponse, Exception>> t : results.asList()) {
|
||||
for (AtomicArray.Entry<Tuple<BulkByScrollResponse, Exception>> t : results.asList()) {
|
||||
if (t.value.v1() == null) {
|
||||
assert t.value.v2() != null : "exception shouldn't be null if value is null";
|
||||
if (exception == null) {
|
||||
@ -132,7 +132,7 @@ class ParentBulkByScrollTask extends BulkByScrollTask {
|
||||
}
|
||||
}
|
||||
if (exception == null) {
|
||||
listener.onResponse(new BulkIndexByScrollResponse(responses, getReasonCancelled()));
|
||||
listener.onResponse(new BulkByScrollResponse(responses, getReasonCancelled()));
|
||||
} else {
|
||||
listener.onFailure(exception);
|
||||
}
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
@ -32,7 +32,6 @@ import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
@ -201,8 +200,7 @@ public abstract class ScrollableHitSource implements Closeable {
|
||||
}
|
||||
|
||||
/**
|
||||
* An implementation of {@linkplain Hit} that uses getters and setters. Primarily used for testing and {@link RemoteScrollableHitSource}
|
||||
* .
|
||||
* An implementation of {@linkplain Hit} that uses getters and setters.
|
||||
*/
|
||||
public static class BasicHit implements Hit {
|
||||
private final String index;
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
/**
|
||||
* Implemented by {@link BulkByScrollTask} and {@link BulkByScrollTask.Status} to consistently implement
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
@ -95,7 +95,7 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
|
||||
}
|
||||
|
||||
@Override
|
||||
int runningSliceSubTasks() {
|
||||
public int runningSliceSubTasks() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -162,7 +162,7 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
|
||||
bulkRetries.incrementAndGet();
|
||||
}
|
||||
|
||||
void countSearchRetry() {
|
||||
public void countSearchRetry() {
|
||||
searchRetries.incrementAndGet();
|
||||
}
|
||||
|
||||
@ -209,7 +209,7 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
|
||||
}
|
||||
|
||||
@Override
|
||||
void rethrottle(float newRequestsPerSecond) {
|
||||
public void rethrottle(float newRequestsPerSecond) {
|
||||
synchronized (delayedPrepareBulkRequestReference) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("[{}]: Rethrottling to [{}] requests per second", getId(), newRequestsPerSecond);
|
@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Infrastructure for actions that modify documents based on the results of a scrolling query.
|
||||
*/
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
@ -34,6 +34,13 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractAsyncBulkByScrollAction;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequest;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource;
|
||||
import org.elasticsearch.action.bulk.byscroll.WorkingBulkByScrollTask;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.Hit;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
@ -63,8 +70,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.internal.InternalSearchHit;
|
||||
@ -120,7 +125,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
private MyMockClient client;
|
||||
private DummyAbstractBulkByScrollRequest testRequest;
|
||||
private SearchRequest firstSearchRequest;
|
||||
private PlainActionFuture<BulkIndexByScrollResponse> listener;
|
||||
private PlainActionFuture<BulkByScrollResponse> listener;
|
||||
private String scrollId;
|
||||
private TaskManager taskManager;
|
||||
private WorkingBulkByScrollTask testTask;
|
||||
@ -335,7 +340,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(false, singletonList(shardFailure), 0,
|
||||
emptyList(), null);
|
||||
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
|
||||
BulkIndexByScrollResponse response = listener.get();
|
||||
BulkByScrollResponse response = listener.get();
|
||||
assertThat(response.getBulkFailures(), empty());
|
||||
assertThat(response.getSearchFailures(), contains(shardFailure));
|
||||
assertFalse(response.isTimedOut());
|
||||
@ -349,7 +354,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
public void testSearchTimeoutsAbortRequest() throws Exception {
|
||||
ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(true, emptyList(), 0, emptyList(), null);
|
||||
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
|
||||
BulkIndexByScrollResponse response = listener.get();
|
||||
BulkByScrollResponse response = listener.get();
|
||||
assertThat(response.getBulkFailures(), empty());
|
||||
assertThat(response.getSearchFailures(), empty());
|
||||
assertTrue(response.isTimedOut());
|
||||
@ -366,7 +371,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]
|
||||
{new BulkItemResponse(0, DocWriteRequest.OpType.CREATE, failure)}, randomLong());
|
||||
action.onBulkResponse(timeValueNanos(System.nanoTime()), bulkResponse);
|
||||
BulkIndexByScrollResponse response = listener.get();
|
||||
BulkByScrollResponse response = listener.get();
|
||||
assertThat(response.getBulkFailures(), contains(failure));
|
||||
assertThat(response.getSearchFailures(), empty());
|
||||
assertNull(response.getReasonCancelled());
|
||||
@ -492,7 +497,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
}
|
||||
action.sendBulkRequest(timeValueNanos(System.nanoTime()), request);
|
||||
if (failWithRejection) {
|
||||
BulkIndexByScrollResponse response = listener.get();
|
||||
BulkByScrollResponse response = listener.get();
|
||||
assertThat(response.getBulkFailures(), hasSize(1));
|
||||
assertEquals(response.getBulkFailures().get(0).getStatus(), RestStatus.TOO_MANY_REQUESTS);
|
||||
assertThat(response.getSearchFailures(), empty());
|
||||
@ -690,7 +695,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
}
|
||||
|
||||
@Override
|
||||
DummyAbstractBulkByScrollRequest forSlice(TaskId slicingTask, SearchRequest slice) {
|
||||
protected DummyAbstractBulkByScrollRequest forSlice(TaskId slicingTask, SearchRequest slice) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
@ -27,11 +27,11 @@ import org.elasticsearch.test.ESTestCase;
|
||||
import java.io.IOException;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static org.elasticsearch.index.reindex.ReindexParallelizationHelper.sliceIntoSubRequests;
|
||||
import static org.elasticsearch.action.bulk.byscroll.BulkByScrollParallelizationHelper.sliceIntoSubRequests;
|
||||
import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchRequest;
|
||||
import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchSourceBuilder;
|
||||
|
||||
public class ReindexParallelizationHelperTests extends ESTestCase {
|
||||
public class BulkByScrollParallelizationHelperTests extends ESTestCase {
|
||||
public void testSliceIntoSubRequests() throws IOException {
|
||||
SearchRequest searchRequest = randomSearchRequest(() -> randomSearchSourceBuilder(
|
||||
() -> null,
|
@ -0,0 +1,98 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.apache.lucene.util.TestUtil.randomSimpleString;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
|
||||
|
||||
public class BulkByScrollResponseTests extends ESTestCase {
|
||||
|
||||
public void testRountTrip() throws IOException {
|
||||
BulkByScrollResponse response = new BulkByScrollResponse(timeValueMillis(randomNonNegativeLong()),
|
||||
BulkByScrollTaskStatusTests.randomStatus(), randomIndexingFailures(), randomSearchFailures(), randomBoolean());
|
||||
BulkByScrollResponse tripped = new BulkByScrollResponse();
|
||||
try (BytesStreamOutput out = new BytesStreamOutput()) {
|
||||
response.writeTo(out);
|
||||
try (StreamInput in = out.bytes().streamInput()) {
|
||||
tripped.readFrom(in);
|
||||
}
|
||||
}
|
||||
assertResponseEquals(response, tripped);
|
||||
}
|
||||
|
||||
private List<Failure> randomIndexingFailures() {
|
||||
return usually() ? emptyList()
|
||||
: singletonList(new Failure(randomSimpleString(random()), randomSimpleString(random()),
|
||||
randomSimpleString(random()), new IllegalArgumentException("test")));
|
||||
}
|
||||
|
||||
private List<SearchFailure> randomSearchFailures() {
|
||||
if (randomBoolean()) {
|
||||
return emptyList();
|
||||
}
|
||||
String index = null;
|
||||
Integer shardId = null;
|
||||
String nodeId = null;
|
||||
if (randomBoolean()) {
|
||||
index = randomAsciiOfLength(5);
|
||||
shardId = randomInt();
|
||||
nodeId = usually() ? randomAsciiOfLength(5) : null;
|
||||
}
|
||||
return singletonList(new SearchFailure(new ElasticsearchException("foo"), index, shardId, nodeId));
|
||||
}
|
||||
|
||||
private void assertResponseEquals(BulkByScrollResponse expected, BulkByScrollResponse actual) {
|
||||
assertEquals(expected.getTook(), actual.getTook());
|
||||
BulkByScrollTaskStatusTests.assertTaskStatusEquals(Version.CURRENT, expected.getStatus(), actual.getStatus());
|
||||
assertEquals(expected.getBulkFailures().size(), actual.getBulkFailures().size());
|
||||
for (int i = 0; i < expected.getBulkFailures().size(); i++) {
|
||||
Failure expectedFailure = expected.getBulkFailures().get(i);
|
||||
Failure actualFailure = actual.getBulkFailures().get(i);
|
||||
assertEquals(expectedFailure.getIndex(), actualFailure.getIndex());
|
||||
assertEquals(expectedFailure.getType(), actualFailure.getType());
|
||||
assertEquals(expectedFailure.getId(), actualFailure.getId());
|
||||
assertEquals(expectedFailure.getMessage(), actualFailure.getMessage());
|
||||
assertEquals(expectedFailure.getStatus(), actualFailure.getStatus());
|
||||
}
|
||||
assertEquals(expected.getSearchFailures().size(), actual.getSearchFailures().size());
|
||||
for (int i = 0; i < expected.getSearchFailures().size(); i++) {
|
||||
SearchFailure expectedFailure = expected.getSearchFailures().get(i);
|
||||
SearchFailure actualFailure = actual.getSearchFailures().get(i);
|
||||
assertEquals(expectedFailure.getIndex(), actualFailure.getIndex());
|
||||
assertEquals(expectedFailure.getShardId(), actualFailure.getShardId());
|
||||
assertEquals(expectedFailure.getNodeId(), actualFailure.getNodeId());
|
||||
assertEquals(expectedFailure.getReason().getClass(), actualFailure.getReason().getClass());
|
||||
assertEquals(expectedFailure.getReason().getMessage(), actualFailure.getReason().getMessage());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,131 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static java.lang.Math.abs;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.apache.lucene.util.TestUtil.randomSimpleString;
|
||||
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
public class BulkByScrollTaskStatusTests extends ESTestCase {
|
||||
public void testBulkByTaskStatus() throws IOException {
|
||||
BulkByScrollTask.Status status = randomStatus();
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
status.writeTo(out);
|
||||
BulkByScrollTask.Status tripped = new BulkByScrollTask.Status(out.bytes().streamInput());
|
||||
assertTaskStatusEquals(out.getVersion(), status, tripped);
|
||||
|
||||
// Also check round tripping pre-5.1 which is the first version to support parallelized scroll
|
||||
out = new BytesStreamOutput();
|
||||
out.setVersion(Version.V_5_0_0_rc1); // This can be V_5_0_0
|
||||
status.writeTo(out);
|
||||
StreamInput in = out.bytes().streamInput();
|
||||
in.setVersion(Version.V_5_0_0_rc1);
|
||||
tripped = new BulkByScrollTask.Status(in);
|
||||
assertTaskStatusEquals(Version.V_5_0_0_rc1, status, tripped);
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that two task statuses are equal after serialization.
|
||||
* @param version the version at which expected was serialized
|
||||
*/
|
||||
public static void assertTaskStatusEquals(Version version, BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) {
|
||||
assertEquals(expected.getTotal(), actual.getTotal());
|
||||
assertEquals(expected.getUpdated(), actual.getUpdated());
|
||||
assertEquals(expected.getCreated(), actual.getCreated());
|
||||
assertEquals(expected.getDeleted(), actual.getDeleted());
|
||||
assertEquals(expected.getBatches(), actual.getBatches());
|
||||
assertEquals(expected.getVersionConflicts(), actual.getVersionConflicts());
|
||||
assertEquals(expected.getNoops(), actual.getNoops());
|
||||
assertEquals(expected.getBulkRetries(), actual.getBulkRetries());
|
||||
assertEquals(expected.getSearchRetries(), actual.getSearchRetries());
|
||||
assertEquals(expected.getThrottled(), actual.getThrottled());
|
||||
assertEquals(expected.getRequestsPerSecond(), actual.getRequestsPerSecond(), 0f);
|
||||
assertEquals(expected.getReasonCancelled(), actual.getReasonCancelled());
|
||||
assertEquals(expected.getThrottledUntil(), actual.getThrottledUntil());
|
||||
if (version.onOrAfter(Version.V_5_1_1_UNRELEASED)) {
|
||||
assertThat(actual.getSliceStatuses(), hasSize(expected.getSliceStatuses().size()));
|
||||
for (int i = 0; i < expected.getSliceStatuses().size(); i++) {
|
||||
BulkByScrollTask.StatusOrException sliceStatus = expected.getSliceStatuses().get(i);
|
||||
if (sliceStatus == null) {
|
||||
assertNull(actual.getSliceStatuses().get(i));
|
||||
} else if (sliceStatus.getException() == null) {
|
||||
assertNull(actual.getSliceStatuses().get(i).getException());
|
||||
assertTaskStatusEquals(version, sliceStatus.getStatus(), actual.getSliceStatuses().get(i).getStatus());
|
||||
} else {
|
||||
assertNull(actual.getSliceStatuses().get(i).getStatus());
|
||||
// Just check the message because we're not testing exception serialization in general here.
|
||||
assertEquals(sliceStatus.getException().getMessage(), actual.getSliceStatuses().get(i).getException().getMessage());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
assertEquals(emptyList(), actual.getSliceStatuses());
|
||||
}
|
||||
}
|
||||
|
||||
public static BulkByScrollTask.Status randomStatus() {
|
||||
if (randomBoolean()) {
|
||||
return randomWorkingStatus(null);
|
||||
}
|
||||
boolean canHaveNullStatues = randomBoolean();
|
||||
List<BulkByScrollTask.StatusOrException> statuses = IntStream.range(0, between(0, 10))
|
||||
.mapToObj(i -> {
|
||||
if (canHaveNullStatues && LuceneTestCase.rarely()) {
|
||||
return null;
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
return new BulkByScrollTask.StatusOrException(new ElasticsearchException(randomAsciiOfLength(5)));
|
||||
}
|
||||
return new BulkByScrollTask.StatusOrException(randomWorkingStatus(i));
|
||||
})
|
||||
.collect(toList());
|
||||
return new BulkByScrollTask.Status(statuses, randomBoolean() ? "test" : null);
|
||||
}
|
||||
|
||||
private static BulkByScrollTask.Status randomWorkingStatus(Integer sliceId) {
|
||||
// These all should be believably small because we sum them if we have multiple workers
|
||||
int total = between(0, 10000000);
|
||||
int updated = between(0, total);
|
||||
int created = between(0, total - updated);
|
||||
int deleted = between(0, total - updated - created);
|
||||
int noops = total - updated - created - deleted;
|
||||
int batches = between(0, 10000);
|
||||
long versionConflicts = between(0, total);
|
||||
long bulkRetries = between(0, 10000000);
|
||||
long searchRetries = between(0, 100000);
|
||||
return new BulkByScrollTask.Status(sliceId, total, updated, created, deleted, batches, versionConflicts, noops, bulkRetries,
|
||||
searchRetries, parseTimeValue(randomPositiveTimeValue(), "test"), abs(Randomness.get().nextFloat()),
|
||||
randomBoolean() ? null : randomSimpleString(Randomness.get()), parseTimeValue(randomPositiveTimeValue(), "test"));
|
||||
}
|
||||
}
|
@ -17,8 +17,9 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollTask;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
@ -17,8 +17,10 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestTestCase;
|
||||
import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
|
@ -17,20 +17,21 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
|
||||
import static org.elasticsearch.index.reindex.TransportRethrottleActionTests.captureResponse;
|
||||
import static org.elasticsearch.index.reindex.TransportRethrottleActionTests.neverCalled;
|
||||
import static org.mockito.Mockito.atMost;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
public class ParentBulkByScrollTaskTests extends ESTestCase {
|
||||
private int slices;
|
||||
@ -88,9 +89,9 @@ public class ParentBulkByScrollTaskTests extends ESTestCase {
|
||||
sliceStatuses.set(slice, new BulkByScrollTask.StatusOrException(sliceStatus));
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<BulkIndexByScrollResponse> listener = slice < slices - 1 ? neverCalled() : mock(ActionListener.class);
|
||||
ActionListener<BulkByScrollResponse> listener = slice < slices - 1 ? neverCalled() : mock(ActionListener.class);
|
||||
task.onSliceResponse(listener, slice,
|
||||
new BulkIndexByScrollResponse(timeValueMillis(10), sliceStatus, emptyList(), emptyList(), false));
|
||||
new BulkByScrollResponse(timeValueMillis(10), sliceStatus, emptyList(), emptyList(), false));
|
||||
|
||||
status = task.getStatus();
|
||||
assertEquals(total, status.getTotal());
|
||||
@ -104,7 +105,7 @@ public class ParentBulkByScrollTaskTests extends ESTestCase {
|
||||
|
||||
if (slice == slices - 1) {
|
||||
// The whole thing succeeded so we should have got the success
|
||||
status = captureResponse(BulkIndexByScrollResponse.class, listener).getStatus();
|
||||
status = captureResponse(BulkByScrollResponse.class, listener).getStatus();
|
||||
assertEquals(total, status.getTotal());
|
||||
assertEquals(created, status.getCreated());
|
||||
assertEquals(updated, status.getUpdated());
|
||||
@ -117,5 +118,31 @@ public class ParentBulkByScrollTaskTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
private <T> ActionListener<T> neverCalled() {
|
||||
return new ActionListener<T>() {
|
||||
@Override
|
||||
public void onResponse(T response) {
|
||||
throw new RuntimeException("Expected no interactions but got [" + response + "]");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new RuntimeException("Expected no interations but was received a failure", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
private <T> T captureResponse(Class<T> responseClass, ActionListener<T> listener) {
|
||||
ArgumentCaptor<Exception> failure = ArgumentCaptor.forClass(Exception.class);
|
||||
// Rethrow any failures just so we get a nice exception if there were any. We don't expect any though.
|
||||
verify(listener, atMost(1)).onFailure(failure.capture());
|
||||
if (false == failure.getAllValues().isEmpty()) {
|
||||
throw new AssertionError(failure.getValue());
|
||||
}
|
||||
ArgumentCaptor<T> response = ArgumentCaptor.forClass(responseClass);
|
||||
verify(listener).onResponse(response.capture());
|
||||
return response.getValue();
|
||||
}
|
||||
|
||||
}
|
@ -17,8 +17,10 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollTask;
|
||||
import org.elasticsearch.action.bulk.byscroll.WorkingBulkByScrollTask;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.tasks.TaskId;
|
@ -21,6 +21,9 @@ package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.GenericAction;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequest;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollTask;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -38,7 +41,7 @@ import java.util.Map;
|
||||
|
||||
public abstract class AbstractBaseReindexRestHandler<
|
||||
Request extends AbstractBulkByScrollRequest<Request>,
|
||||
A extends GenericAction<Request, BulkIndexByScrollResponse>
|
||||
A extends GenericAction<Request, BulkByScrollResponse>
|
||||
> extends BaseRestHandler {
|
||||
|
||||
private final A action;
|
||||
|
@ -21,6 +21,8 @@ package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.action.GenericAction;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequest;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
@ -33,14 +35,14 @@ import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
|
||||
import static org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
|
||||
|
||||
/**
|
||||
* Rest handler for reindex actions that accepts a search request like Update-By-Query or Delete-By-Query
|
||||
*/
|
||||
public abstract class AbstractBulkByQueryRestHandler<
|
||||
Request extends AbstractBulkByScrollRequest<Request>,
|
||||
A extends GenericAction<Request, BulkIndexByScrollResponse>> extends AbstractBaseReindexRestHandler<Request, A> {
|
||||
A extends GenericAction<Request, BulkByScrollResponse>> extends AbstractBaseReindexRestHandler<Request, A> {
|
||||
|
||||
protected AbstractBulkByQueryRestHandler(Settings settings, A action) {
|
||||
super(settings, action);
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -20,6 +20,8 @@
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.script.Script;
|
||||
@ -30,7 +32,7 @@ public abstract class AbstractBulkIndexByScrollRequestBuilder<
|
||||
extends AbstractBulkByScrollRequestBuilder<Request, Self> {
|
||||
|
||||
protected AbstractBulkIndexByScrollRequestBuilder(ElasticsearchClient client,
|
||||
Action<Request, BulkIndexByScrollResponse, Self> action, SearchRequestBuilder search, Request request) {
|
||||
Action<Request, BulkByScrollResponse, Self> action, SearchRequestBuilder search, Request request) {
|
||||
super(client, action, search, request);
|
||||
}
|
||||
|
||||
|
@ -21,9 +21,10 @@ package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.rest.BytesRestResponse;
|
||||
import org.elasticsearch.rest.RestChannel;
|
||||
import org.elasticsearch.rest.RestResponse;
|
||||
@ -35,7 +36,7 @@ import java.util.Map;
|
||||
/**
|
||||
* RestBuilderListener that returns higher than 200 status if there are any failures and allows to set XContent.Params.
|
||||
*/
|
||||
public class BulkIndexByScrollResponseContentListener extends RestBuilderListener<BulkIndexByScrollResponse> {
|
||||
public class BulkIndexByScrollResponseContentListener extends RestBuilderListener<BulkByScrollResponse> {
|
||||
|
||||
private final Map<String, String> params;
|
||||
|
||||
@ -45,14 +46,14 @@ public class BulkIndexByScrollResponseContentListener extends RestBuilderListene
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestResponse buildResponse(BulkIndexByScrollResponse response, XContentBuilder builder) throws Exception {
|
||||
public RestResponse buildResponse(BulkByScrollResponse response, XContentBuilder builder) throws Exception {
|
||||
builder.startObject();
|
||||
response.toXContent(builder, new ToXContent.DelegatingMapParams(params, channel.request()));
|
||||
builder.endObject();
|
||||
return new BytesRestResponse(getStatus(response), builder);
|
||||
}
|
||||
|
||||
private RestStatus getStatus(BulkIndexByScrollResponse response) {
|
||||
private RestStatus getStatus(BulkByScrollResponse response) {
|
||||
/*
|
||||
* Return the highest numbered rest status under the assumption that higher numbered statuses are "more error" and thus more
|
||||
* interesting to the user.
|
||||
|
@ -20,9 +20,11 @@
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public class DeleteByQueryAction extends Action<DeleteByQueryRequest, BulkIndexByScrollResponse, DeleteByQueryRequestBuilder> {
|
||||
public class DeleteByQueryAction extends Action<DeleteByQueryRequest, BulkByScrollResponse, DeleteByQueryRequestBuilder> {
|
||||
|
||||
public static final DeleteByQueryAction INSTANCE = new DeleteByQueryAction();
|
||||
public static final String NAME = "indices:data/write/delete/byquery";
|
||||
@ -37,7 +39,7 @@ public class DeleteByQueryAction extends Action<DeleteByQueryRequest, BulkIndexB
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkIndexByScrollResponse newResponse() {
|
||||
return new BulkIndexByScrollResponse();
|
||||
public BulkByScrollResponse newResponse() {
|
||||
return new BulkByScrollResponse();
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,9 @@
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
@ -28,12 +31,12 @@ public class DeleteByQueryRequestBuilder extends
|
||||
AbstractBulkByScrollRequestBuilder<DeleteByQueryRequest, DeleteByQueryRequestBuilder> {
|
||||
|
||||
public DeleteByQueryRequestBuilder(ElasticsearchClient client,
|
||||
Action<DeleteByQueryRequest, BulkIndexByScrollResponse, DeleteByQueryRequestBuilder> action) {
|
||||
Action<DeleteByQueryRequest, BulkByScrollResponse, DeleteByQueryRequestBuilder> action) {
|
||||
this(client, action, new SearchRequestBuilder(client, SearchAction.INSTANCE));
|
||||
}
|
||||
|
||||
private DeleteByQueryRequestBuilder(ElasticsearchClient client,
|
||||
Action<DeleteByQueryRequest, BulkIndexByScrollResponse, DeleteByQueryRequestBuilder> action,
|
||||
Action<DeleteByQueryRequest, BulkByScrollResponse, DeleteByQueryRequestBuilder> action,
|
||||
SearchRequestBuilder search) {
|
||||
super(client, action, search, new DeleteByQueryRequest(search.request()));
|
||||
}
|
||||
|
@ -20,9 +20,10 @@
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public class ReindexAction extends Action<ReindexRequest, BulkIndexByScrollResponse, ReindexRequestBuilder> {
|
||||
public class ReindexAction extends Action<ReindexRequest, BulkByScrollResponse, ReindexRequestBuilder> {
|
||||
public static final ReindexAction INSTANCE = new ReindexAction();
|
||||
public static final String NAME = "indices:data/write/reindex";
|
||||
|
||||
@ -36,7 +37,7 @@ public class ReindexAction extends Action<ReindexRequest, BulkIndexByScrollRespo
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkIndexByScrollResponse newResponse() {
|
||||
return new BulkIndexByScrollResponse();
|
||||
public BulkByScrollResponse newResponse() {
|
||||
return new BulkByScrollResponse();
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollTask;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
|
@ -128,7 +128,7 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
|
||||
}
|
||||
|
||||
@Override
|
||||
ReindexRequest forSlice(TaskId slicingTask, SearchRequest slice) {
|
||||
protected ReindexRequest forSlice(TaskId slicingTask, SearchRequest slice) {
|
||||
ReindexRequest sliced = doForSlice(new ReindexRequest(slice, destination, false), slicingTask);
|
||||
sliced.setRemoteInfo(remoteInfo);
|
||||
return sliced;
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.index.IndexAction;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
@ -32,13 +33,13 @@ public class ReindexRequestBuilder extends
|
||||
private final IndexRequestBuilder destination;
|
||||
|
||||
public ReindexRequestBuilder(ElasticsearchClient client,
|
||||
Action<ReindexRequest, BulkIndexByScrollResponse, ReindexRequestBuilder> action) {
|
||||
Action<ReindexRequest, BulkByScrollResponse, ReindexRequestBuilder> action) {
|
||||
this(client, action, new SearchRequestBuilder(client, SearchAction.INSTANCE),
|
||||
new IndexRequestBuilder(client, IndexAction.INSTANCE));
|
||||
}
|
||||
|
||||
private ReindexRequestBuilder(ElasticsearchClient client,
|
||||
Action<ReindexRequest, BulkIndexByScrollResponse, ReindexRequestBuilder> action,
|
||||
Action<ReindexRequest, BulkByScrollResponse, ReindexRequestBuilder> action,
|
||||
SearchRequestBuilder search, IndexRequestBuilder destination) {
|
||||
super(client, action, search, new ReindexRequest(search.request(), destination.request()));
|
||||
this.destination = destination;
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -19,9 +19,13 @@
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.bulk.byscroll.AsyncDeleteByQueryAction;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollParallelizationHelper;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest;
|
||||
import org.elasticsearch.action.bulk.byscroll.ParentBulkByScrollTask;
|
||||
import org.elasticsearch.action.bulk.byscroll.WorkingBulkByScrollTask;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
@ -36,7 +40,7 @@ import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteByQueryRequest, BulkIndexByScrollResponse> {
|
||||
public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteByQueryRequest, BulkByScrollResponse> {
|
||||
private final Client client;
|
||||
private final ScriptService scriptService;
|
||||
private final ClusterService clusterService;
|
||||
@ -52,68 +56,20 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doExecute(Task task, DeleteByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
|
||||
public void doExecute(Task task, DeleteByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
|
||||
if (request.getSlices() > 1) {
|
||||
ReindexParallelizationHelper.startSlices(client, taskManager, DeleteByQueryAction.INSTANCE, clusterService.localNode().getId(),
|
||||
(ParentBulkByScrollTask) task, request, listener);
|
||||
BulkByScrollParallelizationHelper.startSlices(client, taskManager, DeleteByQueryAction.INSTANCE,
|
||||
clusterService.localNode().getId(), (ParentBulkByScrollTask) task, request, listener);
|
||||
} else {
|
||||
ClusterState state = clusterService.state();
|
||||
ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task);
|
||||
new AsyncDeleteBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state,
|
||||
new AsyncDeleteByQueryAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state,
|
||||
listener).start();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(DeleteByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
|
||||
protected void doExecute(DeleteByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
|
||||
throw new UnsupportedOperationException("task required");
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of delete-by-query using scrolling and bulk.
|
||||
*/
|
||||
static class AsyncDeleteBySearchAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest> {
|
||||
public AsyncDeleteBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
|
||||
ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService, ClusterState clusterState,
|
||||
ActionListener<BulkIndexByScrollResponse> listener) {
|
||||
super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean needsSourceDocumentVersions() {
|
||||
/*
|
||||
* We always need the version of the source document so we can report a version conflict if we try to delete it and it has been
|
||||
* changed.
|
||||
*/
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accept(ScrollableHitSource.Hit doc) {
|
||||
// Delete-by-query does not require the source to delete a document
|
||||
// and the default implementation checks for it
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RequestWrapper<DeleteRequest> buildRequest(ScrollableHitSource.Hit doc) {
|
||||
DeleteRequest delete = new DeleteRequest();
|
||||
delete.index(doc.getIndex());
|
||||
delete.type(doc.getType());
|
||||
delete.id(doc.getId());
|
||||
delete.version(doc.getVersion());
|
||||
return wrap(delete);
|
||||
}
|
||||
|
||||
/**
|
||||
* Overrides the parent's implementation is much more Update/Reindex oriented and so also copies things like timestamp/ttl which we
|
||||
* don't care for a deletion.
|
||||
*/
|
||||
@Override
|
||||
protected RequestWrapper<?> copyMetadata(RequestWrapper<?> request, ScrollableHitSource.Hit doc) {
|
||||
request.setParent(doc.getParent());
|
||||
request.setRouting(doc.getRouting());
|
||||
return request;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -37,6 +37,13 @@ import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractAsyncBulkByScrollAction;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollParallelizationHelper;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.ParentBulkByScrollTask;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.action.bulk.byscroll.WorkingBulkByScrollTask;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
@ -57,7 +64,6 @@ import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.mapper.VersionFieldMapper;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.index.reindex.remote.RemoteInfo;
|
||||
import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource;
|
||||
import org.elasticsearch.script.Script;
|
||||
@ -79,7 +85,7 @@ import static java.util.Collections.synchronizedList;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
import static org.elasticsearch.index.VersionType.INTERNAL;
|
||||
|
||||
public class TransportReindexAction extends HandledTransportAction<ReindexRequest, BulkIndexByScrollResponse> {
|
||||
public class TransportReindexAction extends HandledTransportAction<ReindexRequest, BulkByScrollResponse> {
|
||||
public static final Setting<List<String>> REMOTE_CLUSTER_WHITELIST =
|
||||
Setting.listSetting("reindex.remote.whitelist", emptyList(), Function.identity(), Property.NodeScope);
|
||||
|
||||
@ -103,9 +109,9 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
|
||||
protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
|
||||
if (request.getSlices() > 1) {
|
||||
ReindexParallelizationHelper.startSlices(client, taskManager, ReindexAction.INSTANCE, clusterService.localNode().getId(),
|
||||
BulkByScrollParallelizationHelper.startSlices(client, taskManager, ReindexAction.INSTANCE, clusterService.localNode().getId(),
|
||||
(ParentBulkByScrollTask) task, request, listener);
|
||||
} else {
|
||||
checkRemoteWhitelist(remoteWhitelist, request.getRemoteInfo());
|
||||
@ -119,7 +125,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(ReindexRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
|
||||
protected void doExecute(ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
|
||||
throw new UnsupportedOperationException("task required");
|
||||
}
|
||||
|
||||
@ -241,7 +247,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
||||
|
||||
public AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
|
||||
ThreadPool threadPool, ReindexRequest request, ScriptService scriptService, ClusterState clusterState,
|
||||
ActionListener<BulkIndexByScrollResponse> listener) {
|
||||
ActionListener<BulkByScrollResponse> listener) {
|
||||
super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
|
||||
}
|
||||
|
||||
@ -267,7 +273,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
||||
}
|
||||
|
||||
@Override
|
||||
void finishHim(Exception failure, List<Failure> indexingFailures, List<SearchFailure> searchFailures, boolean timedOut) {
|
||||
protected void finishHim(Exception failure, List<Failure> indexingFailures, List<SearchFailure> searchFailures, boolean timedOut) {
|
||||
super.finishHim(failure, indexingFailures, searchFailures, timedOut);
|
||||
// A little extra paranoia so we log something if we leave any threads running
|
||||
for (Thread thread : createdThreads) {
|
||||
@ -279,7 +285,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
|
||||
public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
|
||||
Script script = mainRequest.getScript();
|
||||
if (script != null) {
|
||||
return new ReindexScriptApplier(task, scriptService, script, script.getParams());
|
||||
|
@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.FailedNodeException;
|
||||
import org.elasticsearch.action.TaskOperationFailure;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollTask;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -21,6 +21,12 @@ package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractAsyncBulkByScrollAction;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.ParentBulkByScrollTask;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollParallelizationHelper;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource;
|
||||
import org.elasticsearch.action.bulk.byscroll.WorkingBulkByScrollTask;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
@ -46,7 +52,7 @@ import org.elasticsearch.transport.TransportService;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateByQueryRequest, BulkIndexByScrollResponse> {
|
||||
public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateByQueryRequest, BulkByScrollResponse> {
|
||||
private final Client client;
|
||||
private final ScriptService scriptService;
|
||||
private final ClusterService clusterService;
|
||||
@ -63,10 +69,10 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, UpdateByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
|
||||
protected void doExecute(Task task, UpdateByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
|
||||
if (request.getSlices() > 1) {
|
||||
ReindexParallelizationHelper.startSlices(client, taskManager, UpdateByQueryAction.INSTANCE, clusterService.localNode().getId(),
|
||||
(ParentBulkByScrollTask) task, request, listener);
|
||||
BulkByScrollParallelizationHelper.startSlices(client, taskManager, UpdateByQueryAction.INSTANCE,
|
||||
clusterService.localNode().getId(), (ParentBulkByScrollTask) task, request, listener);
|
||||
} else {
|
||||
ClusterState state = clusterService.state();
|
||||
ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task);
|
||||
@ -76,7 +82,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(UpdateByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
|
||||
protected void doExecute(UpdateByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
|
||||
throw new UnsupportedOperationException("task required");
|
||||
}
|
||||
|
||||
@ -86,7 +92,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
|
||||
static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<UpdateByQueryRequest> {
|
||||
public AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
|
||||
ThreadPool threadPool, UpdateByQueryRequest request, ScriptService scriptService, ClusterState clusterState,
|
||||
ActionListener<BulkIndexByScrollResponse> listener) {
|
||||
ActionListener<BulkByScrollResponse> listener) {
|
||||
super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
|
||||
}
|
||||
|
||||
@ -100,7 +106,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
|
||||
public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
|
||||
Script script = mainRequest.getScript();
|
||||
if (script != null) {
|
||||
return new UpdateByQueryScriptApplier(task, scriptService, script, script.getParams());
|
||||
|
@ -20,10 +20,11 @@
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public class UpdateByQueryAction extends
|
||||
Action<UpdateByQueryRequest, BulkIndexByScrollResponse, UpdateByQueryRequestBuilder> {
|
||||
Action<UpdateByQueryRequest, BulkByScrollResponse, UpdateByQueryRequestBuilder> {
|
||||
public static final UpdateByQueryAction INSTANCE = new UpdateByQueryAction();
|
||||
public static final String NAME = "indices:data/write/update/byquery";
|
||||
|
||||
@ -37,7 +38,7 @@ public class UpdateByQueryAction extends
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkIndexByScrollResponse newResponse() {
|
||||
return new BulkIndexByScrollResponse();
|
||||
public BulkByScrollResponse newResponse() {
|
||||
return new BulkByScrollResponse();
|
||||
}
|
||||
}
|
||||
|
@ -71,7 +71,7 @@ public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest<Updat
|
||||
}
|
||||
|
||||
@Override
|
||||
UpdateByQueryRequest forSlice(TaskId slicingTask, SearchRequest slice) {
|
||||
protected UpdateByQueryRequest forSlice(TaskId slicingTask, SearchRequest slice) {
|
||||
UpdateByQueryRequest request = doForSlice(new UpdateByQueryRequest(slice, false), slicingTask);
|
||||
request.setPipeline(pipeline);
|
||||
return request;
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
@ -28,12 +29,12 @@ public class UpdateByQueryRequestBuilder extends
|
||||
AbstractBulkIndexByScrollRequestBuilder<UpdateByQueryRequest, UpdateByQueryRequestBuilder> {
|
||||
|
||||
public UpdateByQueryRequestBuilder(ElasticsearchClient client,
|
||||
Action<UpdateByQueryRequest, BulkIndexByScrollResponse, UpdateByQueryRequestBuilder> action) {
|
||||
Action<UpdateByQueryRequest, BulkByScrollResponse, UpdateByQueryRequestBuilder> action) {
|
||||
this(client, action, new SearchRequestBuilder(client, SearchAction.INSTANCE));
|
||||
}
|
||||
|
||||
private UpdateByQueryRequestBuilder(ElasticsearchClient client,
|
||||
Action<UpdateByQueryRequest, BulkIndexByScrollResponse, UpdateByQueryRequestBuilder> action,
|
||||
Action<UpdateByQueryRequest, BulkByScrollResponse, UpdateByQueryRequestBuilder> action,
|
||||
SearchRequestBuilder search) {
|
||||
super(client, action, search, new UpdateByQueryRequest(search.request()));
|
||||
}
|
||||
|
@ -20,6 +20,10 @@
|
||||
package org.elasticsearch.index.reindex.remote;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.BasicHit;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.Hit;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.Response;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.ParsingException;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
@ -30,10 +34,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentLocation;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.BasicHit;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.Response;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
@ -29,6 +29,7 @@ import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.client.ResponseListener;
|
||||
@ -43,7 +44,6 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
|
@ -20,10 +20,14 @@
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractAsyncBulkByScrollAction;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractAsyncBulkByScrollAction.OpType;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractAsyncBulkByScrollAction.RequestWrapper;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractAsyncBulkByScrollActionTestCase;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.index.reindex.AbstractAsyncBulkByScrollAction.OpType;
|
||||
import org.elasticsearch.index.reindex.AbstractAsyncBulkByScrollAction.RequestWrapper;
|
||||
import org.elasticsearch.script.CompiledScript;
|
||||
import org.elasticsearch.script.ExecutableScript;
|
||||
import org.elasticsearch.script.Script;
|
||||
@ -42,7 +46,7 @@ import static org.mockito.Mockito.when;
|
||||
|
||||
public abstract class AbstractAsyncBulkByScrollActionScriptTestCase<
|
||||
Request extends AbstractBulkIndexByScrollRequest<Request>,
|
||||
Response extends BulkIndexByScrollResponse>
|
||||
Response extends BulkByScrollResponse>
|
||||
extends AbstractAsyncBulkByScrollActionTestCase<Request, Response> {
|
||||
|
||||
private static final Script EMPTY_SCRIPT = new Script("");
|
||||
|
@ -20,8 +20,10 @@
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollTask;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -35,7 +37,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
|
||||
public class BulkIndexByScrollResponseTests extends ESTestCase {
|
||||
public void testMergeConstructor() {
|
||||
int mergeCount = between(2, 10);
|
||||
List<BulkIndexByScrollResponse> responses = new ArrayList<>(mergeCount);
|
||||
List<BulkByScrollResponse> responses = new ArrayList<>(mergeCount);
|
||||
int took = between(1000, 10000);
|
||||
int tookIndex = between(0, mergeCount - 1);
|
||||
List<BulkItemResponse.Failure> allBulkFailures = new ArrayList<>();
|
||||
@ -59,10 +61,10 @@ public class BulkIndexByScrollResponseTests extends ESTestCase {
|
||||
allSearchFailures.addAll(searchFailures);
|
||||
boolean thisTimedOut = rarely();
|
||||
timedOut |= thisTimedOut;
|
||||
responses.add(new BulkIndexByScrollResponse(thisTook, status, bulkFailures, searchFailures, thisTimedOut));
|
||||
responses.add(new BulkByScrollResponse(thisTook, status, bulkFailures, searchFailures, thisTimedOut));
|
||||
}
|
||||
|
||||
BulkIndexByScrollResponse merged = new BulkIndexByScrollResponse(responses, reasonCancelled);
|
||||
BulkByScrollResponse merged = new BulkByScrollResponse(responses, reasonCancelled);
|
||||
|
||||
assertEquals(timeValueMillis(took), merged.getTook());
|
||||
assertEquals(allBulkFailures, merged.getBulkFailures());
|
||||
|
@ -22,6 +22,9 @@ package org.elasticsearch.index.reindex;
|
||||
import org.elasticsearch.action.ListenableActionFuture;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollTask;
|
||||
import org.elasticsearch.action.ingest.DeletePipelineRequest;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
@ -52,7 +55,7 @@ import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
/**
|
||||
* Test that you can actually cancel a reindex/update-by-query/delete-by-query request and all the plumbing works. Doesn't test all of the
|
||||
* different cancellation places - that is the responsibility of {@link AsyncBulkByScrollActionTests} which have more precise control to
|
||||
* different cancellation places - that is the responsibility of AsyncBulkByScrollActionTests which have more precise control to
|
||||
* simulate failures but do not exercise important portion of the stack like transport and task management.
|
||||
*/
|
||||
public class CancelTests extends ReindexTestCase {
|
||||
@ -104,7 +107,7 @@ public class CancelTests extends ReindexTestCase {
|
||||
ALLOWED_OPERATIONS.release(numModifiedDocs - builder.request().getSlices());
|
||||
|
||||
// Now execute the reindex action...
|
||||
ListenableActionFuture<? extends BulkIndexByScrollResponse> future = builder.execute();
|
||||
ListenableActionFuture<? extends BulkByScrollResponse> future = builder.execute();
|
||||
|
||||
/* ... and waits for the indexing operation listeners to block. It is important to realize that some of the workers might have
|
||||
* exhausted their slice while others might have quite a bit left to work on. We can't control that. */
|
||||
@ -156,7 +159,7 @@ public class CancelTests extends ReindexTestCase {
|
||||
});
|
||||
|
||||
// And check the status of the response
|
||||
BulkIndexByScrollResponse response = future.get();
|
||||
BulkByScrollResponse response = future.get();
|
||||
assertThat(response.getReasonCancelled(), equalTo("by user request"));
|
||||
assertThat(response.getBulkFailures(), emptyIterable());
|
||||
assertThat(response.getSearchFailures(), emptyIterable());
|
||||
@ -256,7 +259,7 @@ public class CancelTests extends ReindexTestCase {
|
||||
* Used to check the result of the cancel test.
|
||||
*/
|
||||
private interface CancelAssertion {
|
||||
void assertThat(BulkIndexByScrollResponse response, int total, int modified);
|
||||
void assertThat(BulkByScrollResponse response, int total, int modified);
|
||||
}
|
||||
|
||||
public static class ReindexCancellationPlugin extends Plugin {
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.index.query.MatchQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
@ -96,7 +97,7 @@ public class DeleteByQueryConcurrentTests extends ReindexTestCase {
|
||||
try {
|
||||
start.await();
|
||||
|
||||
BulkIndexByScrollResponse response = deleteByQuery().source("test").filter(query).refresh(true).get();
|
||||
BulkByScrollResponse response = deleteByQuery().source("test").filter(query).refresh(true).get();
|
||||
// Some deletions might fail due to version conflict, but
|
||||
// what matters here is the total of successful deletions
|
||||
deleted.addAndGet(response.getDeleted());
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -111,7 +112,7 @@ public class ReindexBasicTests extends ReindexTestCase {
|
||||
// Use a small batch size so we have to use more than one batch
|
||||
copy.source().setSize(5);
|
||||
copy.size(half); // The real "size" of the request.
|
||||
BulkIndexByScrollResponse response = copy.get();
|
||||
BulkByScrollResponse response = copy.get();
|
||||
assertThat(response, matcher().created(lessThanOrEqualTo((long) half)).slices(hasSize(workers)));
|
||||
assertHitCount(client().prepareSearch("dest").setTypes("half").setSize(0).get(), response.getCreated());
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -56,7 +57,7 @@ public class ReindexFailureTests extends ReindexTestCase {
|
||||
*/
|
||||
copy.source().setSize(1);
|
||||
|
||||
BulkIndexByScrollResponse response = copy.get();
|
||||
BulkByScrollResponse response = copy.get();
|
||||
assertThat(response, matcher()
|
||||
.batches(1)
|
||||
.failures(both(greaterThan(0)).and(lessThanOrEqualTo(maximumNumberOfShards()))));
|
||||
@ -76,7 +77,7 @@ public class ReindexFailureTests extends ReindexTestCase {
|
||||
// CREATE will cause the conflict to prevent the write.
|
||||
copy.destination().setOpType(CREATE);
|
||||
|
||||
BulkIndexByScrollResponse response = copy.get();
|
||||
BulkByScrollResponse response = copy.get();
|
||||
assertThat(response, matcher().batches(1).versionConflicts(1).failures(1).created(99));
|
||||
for (Failure failure: response.getBulkFailures()) {
|
||||
assertThat(failure.getMessage(), containsString("VersionConflictEngineException[[test]["));
|
||||
@ -98,7 +99,7 @@ public class ReindexFailureTests extends ReindexTestCase {
|
||||
indexDocs(100);
|
||||
ReindexRequestBuilder copy = reindex().source("source").destination("dest");
|
||||
copy.source().setSize(10);
|
||||
Future<BulkIndexByScrollResponse> response = copy.execute();
|
||||
Future<BulkByScrollResponse> response = copy.execute();
|
||||
client().admin().indices().prepareDelete("source").get();
|
||||
|
||||
try {
|
||||
|
@ -19,13 +19,17 @@
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractAsyncBulkByScrollAction;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractAsyncBulkByScrollActionMetadataTestCase;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.Hit;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
|
||||
/**
|
||||
* Index-by-search test for ttl, timestamp, and routing.
|
||||
*/
|
||||
public class ReindexMetadataTests extends AbstractAsyncBulkByScrollActionMetadataTestCase<ReindexRequest, BulkIndexByScrollResponse> {
|
||||
public class ReindexMetadataTests extends AbstractAsyncBulkByScrollActionMetadataTestCase<ReindexRequest, BulkByScrollResponse> {
|
||||
public void testRoutingCopiedByDefault() throws Exception {
|
||||
IndexRequest index = new IndexRequest();
|
||||
action().copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo"));
|
||||
@ -33,44 +37,61 @@ public class ReindexMetadataTests extends AbstractAsyncBulkByScrollActionMetadat
|
||||
}
|
||||
|
||||
public void testRoutingCopiedIfRequested() throws Exception {
|
||||
TransportReindexAction.AsyncIndexBySearchAction action = action();
|
||||
action.mainRequest.getDestination().routing("keep");
|
||||
TestAction action = action();
|
||||
action.mainRequest().getDestination().routing("keep");
|
||||
IndexRequest index = new IndexRequest();
|
||||
action.copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo"));
|
||||
assertEquals("foo", index.routing());
|
||||
}
|
||||
|
||||
public void testRoutingDiscardedIfRequested() throws Exception {
|
||||
TransportReindexAction.AsyncIndexBySearchAction action = action();
|
||||
action.mainRequest.getDestination().routing("discard");
|
||||
TestAction action = action();
|
||||
action.mainRequest().getDestination().routing("discard");
|
||||
IndexRequest index = new IndexRequest();
|
||||
action.copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo"));
|
||||
assertEquals(null, index.routing());
|
||||
}
|
||||
|
||||
public void testRoutingSetIfRequested() throws Exception {
|
||||
TransportReindexAction.AsyncIndexBySearchAction action = action();
|
||||
action.mainRequest.getDestination().routing("=cat");
|
||||
TestAction action = action();
|
||||
action.mainRequest().getDestination().routing("=cat");
|
||||
IndexRequest index = new IndexRequest();
|
||||
action.copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo"));
|
||||
assertEquals("cat", index.routing());
|
||||
}
|
||||
|
||||
public void testRoutingSetIfWithDegenerateValue() throws Exception {
|
||||
TransportReindexAction.AsyncIndexBySearchAction action = action();
|
||||
action.mainRequest.getDestination().routing("==]");
|
||||
TestAction action = action();
|
||||
action.mainRequest().getDestination().routing("==]");
|
||||
IndexRequest index = new IndexRequest();
|
||||
action.copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo"));
|
||||
assertEquals("=]", index.routing());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TransportReindexAction.AsyncIndexBySearchAction action() {
|
||||
return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request(), null, null, listener());
|
||||
protected TestAction action() {
|
||||
return new TestAction();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReindexRequest request() {
|
||||
return new ReindexRequest(new SearchRequest(), new IndexRequest());
|
||||
}
|
||||
|
||||
private class TestAction extends TransportReindexAction.AsyncIndexBySearchAction {
|
||||
public TestAction() {
|
||||
super(ReindexMetadataTests.this.task, ReindexMetadataTests.this.logger, null, ReindexMetadataTests.this.threadPool, request(),
|
||||
null, null, listener());
|
||||
}
|
||||
|
||||
public ReindexRequest mainRequest() {
|
||||
return this.mainRequest;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractAsyncBulkByScrollAction.RequestWrapper<?> copyMetadata(AbstractAsyncBulkByScrollAction.RequestWrapper<?> request,
|
||||
Hit doc) {
|
||||
return super.copyMetadata(request, doc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestTestCase;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
@ -31,7 +32,7 @@ import static org.hamcrest.Matchers.containsString;
|
||||
/**
|
||||
* Tests index-by-search with a script modifying the documents.
|
||||
*/
|
||||
public class ReindexScriptTests extends AbstractAsyncBulkByScrollActionScriptTestCase<ReindexRequest, BulkIndexByScrollResponse> {
|
||||
public class ReindexScriptTests extends AbstractAsyncBulkByScrollActionScriptTestCase<ReindexRequest, BulkByScrollResponse> {
|
||||
|
||||
public void testSetIndex() throws Exception {
|
||||
Object dest = randomFrom(new Object[] {234, 234L, "pancake"});
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkIndexByScrollResponseMatcher;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
|
||||
import static org.elasticsearch.action.DocWriteRequest.OpType.CREATE;
|
||||
@ -88,7 +89,7 @@ public class ReindexVersioningTests extends ReindexTestCase {
|
||||
/**
|
||||
* Perform a reindex with EXTERNAL versioning which has "refresh" semantics.
|
||||
*/
|
||||
private BulkIndexByScrollResponse reindexExternal() {
|
||||
private BulkByScrollResponse reindexExternal() {
|
||||
ReindexRequestBuilder reindex = reindex().source("source").destination("dest").abortOnVersionConflict(false);
|
||||
reindex.destination().setVersionType(EXTERNAL);
|
||||
return reindex.get();
|
||||
@ -97,7 +98,7 @@ public class ReindexVersioningTests extends ReindexTestCase {
|
||||
/**
|
||||
* Perform a reindex with INTERNAL versioning which has "overwrite" semantics.
|
||||
*/
|
||||
private BulkIndexByScrollResponse reindexInternal() {
|
||||
private BulkByScrollResponse reindexInternal() {
|
||||
ReindexRequestBuilder reindex = reindex().source("source").destination("dest").abortOnVersionConflict(false);
|
||||
reindex.destination().setVersionType(INTERNAL);
|
||||
return reindex.get();
|
||||
@ -106,7 +107,7 @@ public class ReindexVersioningTests extends ReindexTestCase {
|
||||
/**
|
||||
* Perform a reindex with CREATE OpType which has "create" semantics.
|
||||
*/
|
||||
private BulkIndexByScrollResponse reindexCreate() {
|
||||
private BulkByScrollResponse reindexCreate() {
|
||||
ReindexRequestBuilder reindex = reindex().source("source").destination("dest").abortOnVersionConflict(false);
|
||||
reindex.destination().setOpType(CREATE);
|
||||
return reindex.get();
|
||||
|
@ -22,6 +22,9 @@ package org.elasticsearch.index.reindex;
|
||||
import org.elasticsearch.action.ListenableActionFuture;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollTask;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
|
||||
@ -82,7 +85,7 @@ public class RethrottleTests extends ReindexTestCase {
|
||||
// Start a request that will never finish unless we rethrottle it
|
||||
request.setRequestsPerSecond(.000001f); // Throttle "forever"
|
||||
request.source().setSize(1); // Make sure we use multiple batches
|
||||
ListenableActionFuture<? extends BulkIndexByScrollResponse> responseListener = request.execute();
|
||||
ListenableActionFuture<? extends BulkByScrollResponse> responseListener = request.execute();
|
||||
|
||||
TaskGroup taskGroupToRethrottle = findTaskToRethrottle(actionName, request.request().getSlices());
|
||||
TaskId taskToRethrottle = taskGroupToRethrottle.getTaskInfo().getTaskId();
|
||||
@ -151,7 +154,7 @@ public class RethrottleTests extends ReindexTestCase {
|
||||
}
|
||||
|
||||
// Now the response should come back quickly because we've rethrottled the request
|
||||
BulkIndexByScrollResponse response = responseListener.get();
|
||||
BulkByScrollResponse response = responseListener.get();
|
||||
assertThat("Entire request completed in a single batch. This may invalidate the test as throttling is done between batches.",
|
||||
response.getBatches(), greaterThanOrEqualTo(request.request().getSlices()));
|
||||
}
|
||||
|
@ -26,6 +26,10 @@ import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.bulk.Retry;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollTask;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkIndexByScrollResponseMatcher;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -143,7 +147,7 @@ public class RetryTests extends ESSingleNodeTestCase {
|
||||
request.source().setSize(DOC_COUNT / randomIntBetween(2, 10));
|
||||
|
||||
logger.info("Starting request");
|
||||
ListenableActionFuture<BulkIndexByScrollResponse> responseListener = request.execute();
|
||||
ListenableActionFuture<BulkByScrollResponse> responseListener = request.execute();
|
||||
|
||||
try {
|
||||
logger.info("Waiting for search rejections on the initial search");
|
||||
@ -170,13 +174,13 @@ public class RetryTests extends ESSingleNodeTestCase {
|
||||
scrollBlock.await();
|
||||
|
||||
logger.info("Waiting for the request to finish");
|
||||
BulkIndexByScrollResponse response = responseListener.get();
|
||||
BulkByScrollResponse response = responseListener.get();
|
||||
assertThat(response, matcher);
|
||||
assertThat(response.getBulkRetries(), greaterThan(0L));
|
||||
assertThat(response.getSearchRetries(), greaterThan(initialSearchRejections));
|
||||
} finally {
|
||||
// Fetch the response just in case we blew up half way through. This will make sure the failure is thrown up to the top level.
|
||||
BulkIndexByScrollResponse response = responseListener.get();
|
||||
BulkByScrollResponse response = responseListener.get();
|
||||
assertThat(response.getSearchFailures(), empty());
|
||||
assertThat(response.getBulkFailures(), empty());
|
||||
}
|
||||
|
@ -19,9 +19,9 @@
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequest;
|
||||
import org.elasticsearch.action.bulk.byscroll.DeleteByQueryRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
@ -31,7 +31,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
|
||||
import org.elasticsearch.index.reindex.remote.RemoteInfo;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptType;
|
||||
@ -41,18 +40,10 @@ import org.elasticsearch.test.ESTestCase;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static java.lang.Math.abs;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.apache.lucene.util.TestUtil.randomSimpleString;
|
||||
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
/**
|
||||
* Round trip tests for all Streamable things declared in this plugin.
|
||||
@ -199,39 +190,6 @@ public class RoundTripTests extends ESTestCase {
|
||||
assertEquals(request.getRequestsPerSecond(), tripped.getRequestsPerSecond(), 0d);
|
||||
}
|
||||
|
||||
public void testBulkByTaskStatus() throws IOException {
|
||||
BulkByScrollTask.Status status = randomStatus();
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
status.writeTo(out);
|
||||
BulkByScrollTask.Status tripped = new BulkByScrollTask.Status(out.bytes().streamInput());
|
||||
assertTaskStatusEquals(out.getVersion(), status, tripped);
|
||||
|
||||
// Also check round tripping pre-5.1 which is the first version to support parallelized scroll
|
||||
out = new BytesStreamOutput();
|
||||
out.setVersion(Version.V_5_0_0_rc1); // This can be V_5_0_0
|
||||
status.writeTo(out);
|
||||
StreamInput in = out.bytes().streamInput();
|
||||
in.setVersion(Version.V_5_0_0_rc1);
|
||||
tripped = new BulkByScrollTask.Status(in);
|
||||
assertTaskStatusEquals(Version.V_5_0_0_rc1, status, tripped);
|
||||
}
|
||||
|
||||
public void testReindexResponse() throws IOException {
|
||||
BulkIndexByScrollResponse response = new BulkIndexByScrollResponse(timeValueMillis(randomNonNegativeLong()), randomStatus(),
|
||||
randomIndexingFailures(), randomSearchFailures(), randomBoolean());
|
||||
BulkIndexByScrollResponse tripped = new BulkIndexByScrollResponse();
|
||||
roundTrip(response, tripped);
|
||||
assertResponseEquals(response, tripped);
|
||||
}
|
||||
|
||||
public void testBulkIndexByScrollResponse() throws IOException {
|
||||
BulkIndexByScrollResponse response = new BulkIndexByScrollResponse(timeValueMillis(randomNonNegativeLong()), randomStatus(),
|
||||
randomIndexingFailures(), randomSearchFailures(), randomBoolean());
|
||||
BulkIndexByScrollResponse tripped = new BulkIndexByScrollResponse();
|
||||
roundTrip(response, tripped);
|
||||
assertResponseEquals(response, tripped);
|
||||
}
|
||||
|
||||
public void testRethrottleRequest() throws IOException {
|
||||
RethrottleRequest request = new RethrottleRequest();
|
||||
request.setRequestsPerSecond((float) randomDoubleBetween(0, Float.POSITIVE_INFINITY, false));
|
||||
@ -247,62 +205,6 @@ public class RoundTripTests extends ESTestCase {
|
||||
assertEquals(request.getTaskId(), tripped.getTaskId());
|
||||
}
|
||||
|
||||
private BulkByScrollTask.Status randomStatus() {
|
||||
if (randomBoolean()) {
|
||||
return randomWorkingStatus(null);
|
||||
}
|
||||
boolean canHaveNullStatues = randomBoolean();
|
||||
List<BulkByScrollTask.StatusOrException> statuses = IntStream.range(0, between(0, 10))
|
||||
.mapToObj(i -> {
|
||||
if (canHaveNullStatues && rarely()) {
|
||||
return null;
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
return new BulkByScrollTask.StatusOrException(new ElasticsearchException(randomAsciiOfLength(5)));
|
||||
}
|
||||
return new BulkByScrollTask.StatusOrException(randomWorkingStatus(i));
|
||||
})
|
||||
.collect(toList());
|
||||
return new BulkByScrollTask.Status(statuses, randomBoolean() ? "test" : null);
|
||||
}
|
||||
|
||||
private BulkByScrollTask.Status randomWorkingStatus(Integer sliceId) {
|
||||
// These all should be believably small because we sum them if we have multiple workers
|
||||
int total = between(0, 10000000);
|
||||
int updated = between(0, total);
|
||||
int created = between(0, total - updated);
|
||||
int deleted = between(0, total - updated - created);
|
||||
int noops = total - updated - created - deleted;
|
||||
int batches = between(0, 10000);
|
||||
long versionConflicts = between(0, total);
|
||||
long bulkRetries = between(0, 10000000);
|
||||
long searchRetries = between(0, 100000);
|
||||
return new BulkByScrollTask.Status(sliceId, total, updated, created, deleted, batches, versionConflicts, noops, bulkRetries,
|
||||
searchRetries, parseTimeValue(randomPositiveTimeValue(), "test"), abs(random().nextFloat()),
|
||||
randomBoolean() ? null : randomSimpleString(random()), parseTimeValue(randomPositiveTimeValue(), "test"));
|
||||
}
|
||||
|
||||
private List<Failure> randomIndexingFailures() {
|
||||
return usually() ? emptyList()
|
||||
: singletonList(new Failure(randomSimpleString(random()), randomSimpleString(random()),
|
||||
randomSimpleString(random()), new IllegalArgumentException("test")));
|
||||
}
|
||||
|
||||
private List<SearchFailure> randomSearchFailures() {
|
||||
if (randomBoolean()) {
|
||||
return emptyList();
|
||||
}
|
||||
String index = null;
|
||||
Integer shardId = null;
|
||||
String nodeId = null;
|
||||
if (randomBoolean()) {
|
||||
index = randomAsciiOfLength(5);
|
||||
shardId = randomInt();
|
||||
nodeId = usually() ? randomAsciiOfLength(5) : null;
|
||||
}
|
||||
return singletonList(new SearchFailure(new ElasticsearchException("foo"), index, shardId, nodeId));
|
||||
}
|
||||
|
||||
private void roundTrip(Streamable example, Streamable empty) throws IOException {
|
||||
roundTrip(Version.CURRENT, example, empty);
|
||||
}
|
||||
@ -324,64 +226,4 @@ public class RoundTripTests extends ESTestCase {
|
||||
|
||||
return new Script(type, lang, idOrCode, params);
|
||||
}
|
||||
|
||||
private void assertResponseEquals(BulkIndexByScrollResponse expected, BulkIndexByScrollResponse actual) {
|
||||
assertEquals(expected.getTook(), actual.getTook());
|
||||
assertTaskStatusEquals(Version.CURRENT, expected.getStatus(), actual.getStatus());
|
||||
assertEquals(expected.getBulkFailures().size(), actual.getBulkFailures().size());
|
||||
for (int i = 0; i < expected.getBulkFailures().size(); i++) {
|
||||
Failure expectedFailure = expected.getBulkFailures().get(i);
|
||||
Failure actualFailure = actual.getBulkFailures().get(i);
|
||||
assertEquals(expectedFailure.getIndex(), actualFailure.getIndex());
|
||||
assertEquals(expectedFailure.getType(), actualFailure.getType());
|
||||
assertEquals(expectedFailure.getId(), actualFailure.getId());
|
||||
assertEquals(expectedFailure.getMessage(), actualFailure.getMessage());
|
||||
assertEquals(expectedFailure.getStatus(), actualFailure.getStatus());
|
||||
}
|
||||
assertEquals(expected.getSearchFailures().size(), actual.getSearchFailures().size());
|
||||
for (int i = 0; i < expected.getSearchFailures().size(); i++) {
|
||||
SearchFailure expectedFailure = expected.getSearchFailures().get(i);
|
||||
SearchFailure actualFailure = actual.getSearchFailures().get(i);
|
||||
assertEquals(expectedFailure.getIndex(), actualFailure.getIndex());
|
||||
assertEquals(expectedFailure.getShardId(), actualFailure.getShardId());
|
||||
assertEquals(expectedFailure.getNodeId(), actualFailure.getNodeId());
|
||||
assertEquals(expectedFailure.getReason().getClass(), actualFailure.getReason().getClass());
|
||||
assertEquals(expectedFailure.getReason().getMessage(), actualFailure.getReason().getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void assertTaskStatusEquals(Version version, BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) {
|
||||
assertEquals(expected.getTotal(), actual.getTotal());
|
||||
assertEquals(expected.getUpdated(), actual.getUpdated());
|
||||
assertEquals(expected.getCreated(), actual.getCreated());
|
||||
assertEquals(expected.getDeleted(), actual.getDeleted());
|
||||
assertEquals(expected.getBatches(), actual.getBatches());
|
||||
assertEquals(expected.getVersionConflicts(), actual.getVersionConflicts());
|
||||
assertEquals(expected.getNoops(), actual.getNoops());
|
||||
assertEquals(expected.getBulkRetries(), actual.getBulkRetries());
|
||||
assertEquals(expected.getSearchRetries(), actual.getSearchRetries());
|
||||
assertEquals(expected.getThrottled(), actual.getThrottled());
|
||||
assertEquals(expected.getRequestsPerSecond(), actual.getRequestsPerSecond(), 0f);
|
||||
assertEquals(expected.getReasonCancelled(), actual.getReasonCancelled());
|
||||
assertEquals(expected.getThrottledUntil(), actual.getThrottledUntil());
|
||||
if (version.onOrAfter(Version.V_5_1_1_UNRELEASED)) {
|
||||
assertThat(actual.getSliceStatuses(), hasSize(expected.getSliceStatuses().size()));
|
||||
for (int i = 0; i < expected.getSliceStatuses().size(); i++) {
|
||||
BulkByScrollTask.StatusOrException sliceStatus = expected.getSliceStatuses().get(i);
|
||||
if (sliceStatus == null) {
|
||||
assertNull(actual.getSliceStatuses().get(i));
|
||||
} else if (sliceStatus.getException() == null) {
|
||||
assertNull(actual.getSliceStatuses().get(i).getException());
|
||||
assertTaskStatusEquals(version, sliceStatus.getStatus(), actual.getSliceStatuses().get(i).getStatus());
|
||||
} else {
|
||||
assertNull(actual.getSliceStatuses().get(i).getStatus());
|
||||
// Just check the message because we're not testing exception serialization in general here.
|
||||
assertEquals(sliceStatus.getException().getMessage(), actual.getSliceStatuses().get(i).getException().getMessage());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
assertEquals(emptyList(), actual.getSliceStatuses());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,9 @@ import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.FailedNodeException;
|
||||
import org.elasticsearch.action.TaskOperationFailure;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollTask;
|
||||
import org.elasticsearch.action.bulk.byscroll.ParentBulkByScrollTask;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
@ -114,7 +117,7 @@ public class TransportRethrottleActionTests extends ESTestCase {
|
||||
for (int i = 0; i < succeeded; i++) {
|
||||
BulkByScrollTask.Status status = believeableCompletedStatus(i);
|
||||
task.onSliceResponse(neverCalled(), i,
|
||||
new BulkIndexByScrollResponse(timeValueMillis(10), status, emptyList(), emptyList(), false));
|
||||
new BulkByScrollResponse(timeValueMillis(10), status, emptyList(), emptyList(), false));
|
||||
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
|
||||
}
|
||||
List<TaskInfo> tasks = new ArrayList<>();
|
||||
@ -132,12 +135,12 @@ public class TransportRethrottleActionTests extends ESTestCase {
|
||||
List<BulkByScrollTask.StatusOrException> sliceStatuses = new ArrayList<>(slices);
|
||||
for (int i = 0; i < slices; i++) {
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<BulkIndexByScrollResponse> listener = i < slices - 1 ? neverCalled() : mock(ActionListener.class);
|
||||
ActionListener<BulkByScrollResponse> listener = i < slices - 1 ? neverCalled() : mock(ActionListener.class);
|
||||
BulkByScrollTask.Status status = believeableCompletedStatus(i);
|
||||
task.onSliceResponse(listener, i, new BulkIndexByScrollResponse(timeValueMillis(10), status, emptyList(), emptyList(), false));
|
||||
task.onSliceResponse(listener, i, new BulkByScrollResponse(timeValueMillis(10), status, emptyList(), emptyList(), false));
|
||||
if (i == slices - 1) {
|
||||
// The whole thing succeeded so we should have got the success
|
||||
captureResponse(BulkIndexByScrollResponse.class, listener).getStatus();
|
||||
captureResponse(BulkByScrollResponse.class, listener).getStatus();
|
||||
}
|
||||
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
|
||||
}
|
||||
@ -182,7 +185,7 @@ public class TransportRethrottleActionTests extends ESTestCase {
|
||||
return new BulkByScrollTask.Status(sliceId, 10, 10, 0, 0, 0, 0, 0, 0, 0, timeValueMillis(0), 0, null, timeValueMillis(0));
|
||||
}
|
||||
|
||||
static <T> ActionListener<T> neverCalled() {
|
||||
private <T> ActionListener<T> neverCalled() {
|
||||
return new ActionListener<T>() {
|
||||
@Override
|
||||
public void onResponse(T response) {
|
||||
@ -196,7 +199,7 @@ public class TransportRethrottleActionTests extends ESTestCase {
|
||||
};
|
||||
}
|
||||
|
||||
static <T> T captureResponse(Class<T> responseClass, ActionListener<T> listener) {
|
||||
private <T> T captureResponse(Class<T> responseClass, ActionListener<T> listener) {
|
||||
ArgumentCaptor<Exception> failure = ArgumentCaptor.forClass(Exception.class);
|
||||
// Rethrow any failures just so we get a nice exception if there were any. We don't expect any though.
|
||||
verify(listener, atMost(1)).onFailure(failure.capture());
|
||||
|
@ -19,11 +19,15 @@
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractAsyncBulkByScrollAction;
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractAsyncBulkByScrollActionMetadataTestCase;
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.Hit;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
|
||||
public class UpdateByQueryMetadataTests
|
||||
extends AbstractAsyncBulkByScrollActionMetadataTestCase<UpdateByQueryRequest, BulkIndexByScrollResponse> {
|
||||
extends AbstractAsyncBulkByScrollActionMetadataTestCase<UpdateByQueryRequest, BulkByScrollResponse> {
|
||||
public void testRoutingIsCopied() throws Exception {
|
||||
IndexRequest index = new IndexRequest();
|
||||
action().copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo"));
|
||||
@ -31,12 +35,25 @@ public class UpdateByQueryMetadataTests
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TransportUpdateByQueryAction.AsyncIndexBySearchAction action() {
|
||||
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request(), null, null, listener());
|
||||
protected TestAction action() {
|
||||
return new TestAction();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected UpdateByQueryRequest request() {
|
||||
return new UpdateByQueryRequest(new SearchRequest());
|
||||
}
|
||||
|
||||
private class TestAction extends TransportUpdateByQueryAction.AsyncIndexBySearchAction {
|
||||
public TestAction() {
|
||||
super(UpdateByQueryMetadataTests.this.task, UpdateByQueryMetadataTests.this.logger, null,
|
||||
UpdateByQueryMetadataTests.this.threadPool, request(), null, null, listener());
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractAsyncBulkByScrollAction.RequestWrapper<?> copyMetadata(AbstractAsyncBulkByScrollAction.RequestWrapper<?> request,
|
||||
Hit doc) {
|
||||
return super.copyMetadata(request, doc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestTestCase;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.script.Script;
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
@ -49,7 +50,7 @@ public class UpdateByQueryWhileModifyingTests extends ReindexTestCase {
|
||||
Thread updater = new Thread(() -> {
|
||||
while (keepUpdating.get()) {
|
||||
try {
|
||||
BulkIndexByScrollResponse response = updateByQuery().source("test").refresh(true).abortOnVersionConflict(false).get();
|
||||
BulkByScrollResponse response = updateByQuery().source("test").refresh(true).abortOnVersionConflict(false).get();
|
||||
assertThat(response, matcher().updated(either(equalTo(0L)).or(equalTo(1L)))
|
||||
.versionConflicts(either(equalTo(0L)).or(equalTo(1L))));
|
||||
} catch (Exception e) {
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
|
||||
@ -28,7 +29,7 @@ import java.util.Map;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
||||
public class UpdateByQueryWithScriptTests
|
||||
extends AbstractAsyncBulkByScrollActionScriptTestCase<UpdateByQueryRequest, BulkIndexByScrollResponse> {
|
||||
extends AbstractAsyncBulkByScrollActionScriptTestCase<UpdateByQueryRequest, BulkByScrollResponse> {
|
||||
|
||||
public void testModifyingCtxNotAllowed() {
|
||||
/*
|
||||
|
@ -40,6 +40,7 @@ import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.byscroll.ScrollableHitSource.Response;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.client.HeapBufferedAsyncResponseConsumer;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
@ -51,7 +52,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource.Response;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -17,11 +17,11 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
public abstract class AbstractAsyncBulkByScrollActionMetadataTestCase<
|
||||
Request extends AbstractBulkIndexByScrollRequest<Request>,
|
||||
Response extends BulkIndexByScrollResponse>
|
||||
Request extends AbstractBulkByScrollRequest<Request>,
|
||||
Response extends BulkByScrollResponse>
|
||||
extends AbstractAsyncBulkByScrollActionTestCase<Request, Response> {
|
||||
|
||||
protected ScrollableHitSource.BasicHit doc() {
|
@ -17,7 +17,7 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
@ -28,8 +28,8 @@ import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
public abstract class AbstractAsyncBulkByScrollActionTestCase<
|
||||
Request extends AbstractBulkIndexByScrollRequest<Request>,
|
||||
Response extends BulkIndexByScrollResponse>
|
||||
Request extends AbstractBulkByScrollRequest<Request>,
|
||||
Response extends BulkByScrollResponse>
|
||||
extends ESTestCase {
|
||||
protected ThreadPool threadPool;
|
||||
protected WorkingBulkByScrollTask task;
|
@ -17,8 +17,9 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.tasks.TaskId;
|
@ -17,8 +17,9 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
package org.elasticsearch.action.bulk.byscroll;
|
||||
|
||||
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
|
||||
import org.hamcrest.Description;
|
||||
import org.hamcrest.Matcher;
|
||||
import org.hamcrest.TypeSafeMatcher;
|
||||
@ -29,7 +30,7 @@ import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class BulkIndexByScrollResponseMatcher extends TypeSafeMatcher<BulkIndexByScrollResponse> {
|
||||
public class BulkIndexByScrollResponseMatcher extends TypeSafeMatcher<BulkByScrollResponse> {
|
||||
|
||||
private Matcher<Long> createdMatcher = equalTo(0L);
|
||||
private Matcher<Long> updatedMatcher = equalTo(0L);
|
||||
@ -130,7 +131,7 @@ public class BulkIndexByScrollResponseMatcher extends TypeSafeMatcher<BulkIndexB
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean matchesSafely(BulkIndexByScrollResponse item) {
|
||||
protected boolean matchesSafely(BulkByScrollResponse item) {
|
||||
return updatedMatcher.matches(item.getUpdated()) &&
|
||||
createdMatcher.matches(item.getCreated()) &&
|
||||
deletedMatcher.matches(item.getDeleted()) &&
|
Loading…
x
Reference in New Issue
Block a user