From 7e3cd6a01931a95bfad785490f3319ed79f258f4 Mon Sep 17 00:00:00 2001 From: Andy Bristol Date: Fri, 11 Aug 2017 08:25:25 -0700 Subject: [PATCH] reindex: automatically choose the number of slices (#26030) In reindex APIs, when using the `slices` parameter to choose the number of slices, adds the option to specify `slices` as "auto" which will choose a reasonable number of slices. It uses the number of shards in the source index, up to a ceiling. If there is more than one source index, it uses the smallest number of shards among them. This gives users an easy way to use slicing in these APIs without having to make decisions about how to configure it, as it provides a good-enough configuration for them out of the box. This may become the default behavior for these APIs in the future. --- .../reindex/AbstractBulkByScrollRequest.java | 51 +++---- .../AbstractBulkByScrollRequestBuilder.java | 4 +- .../AbstractBulkIndexByScrollRequest.java | 4 +- .../index/reindex/BulkByScrollResponse.java | 2 +- .../index/reindex/BulkByScrollTask.java | 136 ++++++++++++++++-- .../index/reindex/DeleteByQueryRequest.java | 4 +- ....java => LeaderBulkByScrollTaskState.java} | 90 ++++++------ .../index/reindex/ReindexRequest.java | 8 +- .../index/reindex/SuccessfullyProcessed.java | 2 +- .../index/reindex/UpdateByQueryRequest.java | 4 +- ....java => WorkerBulkByScrollTaskState.java} | 103 ++++++------- .../AbstractBulkByScrollRequestTestCase.java | 13 +- ... => LeaderBulkByScrollTaskStateTests.java} | 12 +- .../index/reindex/ReindexRequestTests.java | 8 +- ... => WorkerBulkByScrollTaskStateTests.java} | 63 ++++---- .../search/slice/SliceBuilderTests.java | 12 +- docs/reference/docs/delete-by-query.asciidoc | 54 ++++--- docs/reference/docs/reindex.asciidoc | 49 ++++--- docs/reference/docs/update-by-query.asciidoc | 49 ++++--- .../AbstractAsyncBulkByScrollAction.java | 51 ++++--- .../AbstractBaseReindexRestHandler.java | 33 ++++- .../reindex/AsyncDeleteByQueryAction.java | 2 +- .../BulkByScrollParallelizationHelper.java | 106 ++++++++++++-- .../reindex/TransportDeleteByQueryAction.java | 18 +-- .../index/reindex/TransportReindexAction.java | 40 +++--- .../reindex/TransportRethrottleAction.java | 53 +++++-- .../reindex/TransportUpdateByQueryAction.java | 30 ++-- .../reindex/AsyncBulkByScrollActionTests.java | 17 ++- .../reindex/DeleteByQueryBasicTests.java | 72 +++++++++- .../index/reindex/ReindexBasicTests.java | 54 ++++++- .../index/reindex/ReindexTestCase.java | 50 +++++++ .../index/reindex/RethrottleTests.java | 27 ++-- .../index/reindex/RoundTripTests.java | 51 ++++--- .../TransportRethrottleActionTests.java | 10 +- .../reindex/UpdateByQueryBasicTests.java | 87 +++++++++-- .../test/delete_by_query/20_validation.yml | 4 +- .../test/delete_by_query/80_slices.yml | 74 ++++++++++ .../test/reindex/20_validation.yml | 4 +- .../rest-api-spec/test/reindex/80_slices.yml | 69 +++++++++ .../test/update_by_query/20_validation.yml | 4 +- .../test/update_by_query/70_slices.yml | 67 +++++++++ ...stractAsyncBulkByScrollActionTestCase.java | 6 +- 42 files changed, 1184 insertions(+), 413 deletions(-) rename core/src/main/java/org/elasticsearch/index/reindex/{ParentBulkByScrollTask.java => LeaderBulkByScrollTaskState.java} (64%) rename core/src/main/java/org/elasticsearch/index/reindex/{WorkingBulkByScrollTask.java => WorkerBulkByScrollTaskState.java} (84%) rename core/src/test/java/org/elasticsearch/index/reindex/{ParentBulkByScrollTaskTests.java => LeaderBulkByScrollTaskStateTests.java} (93%) rename core/src/test/java/org/elasticsearch/index/reindex/{WorkingBulkByScrollTaskTests.java => WorkerBulkByScrollTaskStateTests.java} (83%) diff --git a/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java b/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java index 9f10304622b..0355eeaee47 100644 --- a/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java +++ b/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java @@ -45,6 +45,10 @@ public abstract class AbstractBulkByScrollRequest 1) { - return new ParentBulkByScrollTask(id, type, action, getDescription(), parentTaskId, slices); - } - /* Extract the slice from the search request so it'll be available in the status. This is potentially useful for users that manually - * slice their search requests so they can keep track of it and **absolutely** useful for automatically sliced reindex requests so - * they can properly track the responses. */ - Integer sliceId = searchRequest.source().slice() == null ? null : searchRequest.source().slice().getId(); - return new WorkingBulkByScrollTask(id, type, action, getDescription(), parentTaskId, sliceId, requestsPerSecond); + return new BulkByScrollTask(id, type, action, getDescription(), parentTaskId); } @Override @@ -408,11 +409,7 @@ public abstract class AbstractBulkByScrollRequest 1) { - throw new IllegalArgumentException("Attempting to send sliced reindex-style request to a node that doesn't support " - + "it. Version is [" + out.getVersion() + "] but must be [" + Version.V_5_1_1 + "]"); - } + out.writeVInt(slices); } } diff --git a/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestBuilder.java b/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestBuilder.java index de3f22f0943..e3c5bd2197a 100644 --- a/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestBuilder.java @@ -145,8 +145,8 @@ public abstract class AbstractBulkByScrollRequestBuilder< /** * The number of slices this task should be divided into. Defaults to 1 meaning the task isn't sliced into subtasks. */ - public Self setSlices(int workers) { - request.setSlices(workers); + public Self setSlices(int slices) { + request.setSlices(slices); return self(); } } diff --git a/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkIndexByScrollRequest.java b/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkIndexByScrollRequest.java index 62c2635b301..65cdcf52b6f 100644 --- a/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkIndexByScrollRequest.java +++ b/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkIndexByScrollRequest.java @@ -68,8 +68,8 @@ public abstract class AbstractBulkIndexByScrollRequest sliceInfo) { + if (isLeader() == false) { + throw new IllegalStateException("This task is not set to be a leader of other slice subtasks"); + } - /* - * Overridden to force children to return compatible status. - */ - public abstract BulkByScrollTask.Status getStatus(); + List sliceStatuses = Arrays.asList( + new BulkByScrollTask.StatusOrException[leaderState.getSlices()]); + for (TaskInfo t : sliceInfo) { + BulkByScrollTask.Status status = (BulkByScrollTask.Status) t.getStatus(); + sliceStatuses.set(status.getSliceId(), new BulkByScrollTask.StatusOrException(status)); + } + Status status = leaderState.getStatus(sliceStatuses); + return taskInfo(localNodeId, getDescription(), status); + } + + private BulkByScrollTask.Status emptyStatus() { + return new Status(Collections.emptyList(), getReasonCancelled()); + } /** - * Build the status for this task given a snapshot of the information of running slices. + * Returns true if this task is a leader for other slice subtasks */ - public abstract TaskInfo getInfoGivenSliceInfo(String localNodeId, List sliceInfo); + public boolean isLeader() { + return leaderState != null; + } + + /** + * Sets this task to be a leader task for {@code slices} sliced subtasks + */ + public void setWorkerCount(int slices) { + if (isLeader()) { + throw new IllegalStateException("This task is already a leader for other slice subtasks"); + } + if (isWorker()) { + throw new IllegalStateException("This task is already a worker"); + } + + leaderState = new LeaderBulkByScrollTaskState(this, slices); + } + + /** + * Returns the object that tracks the state of sliced subtasks. Throws IllegalStateException if this task is not set to be + * a leader task. + */ + public LeaderBulkByScrollTaskState getLeaderState() { + if (!isLeader()) { + throw new IllegalStateException("This task is not set to be a leader for other slice subtasks"); + } + return leaderState; + } + + /** + * Returns true if this task is a worker task that performs search requests. False otherwise + */ + public boolean isWorker() { + return workerState != null; + } + + /** + * Sets this task to be a worker task that performs search requests + * @param requestsPerSecond How many search requests per second this task should make + * @param sliceId If this is is a sliced task, which slice number this task corresponds to. Null if not sliced. + */ + public void setWorker(float requestsPerSecond, @Nullable Integer sliceId) { + if (isWorker()) { + throw new IllegalStateException("This task is already a worker"); + } + if (isLeader()) { + throw new IllegalStateException("This task is already a leader for other slice subtasks"); + } + + workerState = new WorkerBulkByScrollTaskState(this, sliceId, requestsPerSecond); + } + + /** + * Returns the object that manages sending search requests. Throws IllegalStateException if this task is not set to be a + * worker task. + */ + public WorkerBulkByScrollTaskState getWorkerState() { + if (!isWorker()) { + throw new IllegalStateException("This task is not set to be a worker"); + } + return workerState; + } + + @Override + public void onCancelled() { + if (isLeader()) { + // The task cancellation task automatically finds children and cancels them, nothing extra to do + } else if (isWorker()) { + workerState.handleCancel(); + } else { + throw new IllegalStateException("This task has not had its sliced state initialized and doesn't know how to cancel itself"); + } + } @Override public boolean shouldCancelChildrenOnCancellation() { diff --git a/core/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java b/core/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java index ad70748f3e4..20f87e047b6 100644 --- a/core/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java +++ b/core/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java @@ -81,8 +81,8 @@ public class DeleteByQueryRequest extends AbstractBulkByScrollRequest results; - private final AtomicInteger counter; + /** + * How many subtasks are still running + */ + private final AtomicInteger runningSubtasks; - public ParentBulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId, int slices) { - super(id, type, action, description, parentTaskId); - this.results = new AtomicArray<>(slices); - this.counter = new AtomicInteger(slices); + public LeaderBulkByScrollTaskState(BulkByScrollTask task, int slices) { + this.task = task; + this.slices = slices; + results = new AtomicArray<>(slices); + runningSubtasks = new AtomicInteger(slices); } - @Override - public void rethrottle(float newRequestsPerSecond) { - // Nothing to do because all rethrottling is done on slice sub tasks. + /** + * Returns the number of slices this BulkByScrollRequest will use + */ + public int getSlices() { + return slices; } - @Override - public Status getStatus() { + /** + * Get the combined statuses of slice subtasks, merged with the given list of statuses + */ + public BulkByScrollTask.Status getStatus(List statuses) { // We only have access to the statuses of requests that have finished so we return them - List statuses = Arrays.asList(new StatusOrException[results.length()]); - addResultsToList(statuses); - return new Status(unmodifiableList(statuses), getReasonCancelled()); - } - - @Override - public int runningSliceSubTasks() { - return counter.get(); - } - - @Override - public TaskInfo getInfoGivenSliceInfo(String localNodeId, List sliceInfo) { - /* Merge the list of finished sub requests with the provided info. If a slice is both finished and in the list then we prefer the - * finished status because we don't expect them to change after the task is finished. */ - List sliceStatuses = Arrays.asList(new StatusOrException[results.length()]); - for (TaskInfo t : sliceInfo) { - Status status = (Status) t.getStatus(); - sliceStatuses.set(status.getSliceId(), new StatusOrException(status)); + if (statuses.size() != results.length()) { + throw new IllegalArgumentException("Given number of statuses does not match amount of expected results"); } - addResultsToList(sliceStatuses); - Status status = new Status(sliceStatuses, getReasonCancelled()); - return taskInfo(localNodeId, getDescription(), status); + addResultsToList(statuses); + return new BulkByScrollTask.Status(unmodifiableList(statuses), task.getReasonCancelled()); } - private void addResultsToList(List sliceStatuses) { + /** + * Get the combined statuses of sliced subtasks + */ + public BulkByScrollTask.Status getStatus() { + return getStatus(Arrays.asList(new BulkByScrollTask.StatusOrException[results.length()])); + } + + /** + * The number of sliced subtasks that are still running + */ + public int runningSliceSubTasks() { + return runningSubtasks.get(); + } + + private void addResultsToList(List sliceStatuses) { for (Result t : results.asList()) { if (t.response != null) { - sliceStatuses.set(t.sliceId, new StatusOrException(t.response.getStatus())); + sliceStatuses.set(t.sliceId, new BulkByScrollTask.StatusOrException(t.response.getStatus())); } else { - sliceStatuses.set(t.sliceId, new StatusOrException(t.failure)); + sliceStatuses.set(t.sliceId, new BulkByScrollTask.StatusOrException(t.failure)); } } } @@ -111,7 +117,7 @@ public class ParentBulkByScrollTask extends BulkByScrollTask { } private void recordSliceCompletionAndRespondIfAllDone(ActionListener listener) { - if (counter.decrementAndGet() != 0) { + if (runningSubtasks.decrementAndGet() != 0) { return; } List responses = new ArrayList<>(results.length()); @@ -130,7 +136,7 @@ public class ParentBulkByScrollTask extends BulkByScrollTask { } } if (exception == null) { - listener.onResponse(new BulkByScrollResponse(responses, getReasonCancelled())); + listener.onResponse(new BulkByScrollResponse(responses, task.getReasonCancelled())); } else { listener.onFailure(exception); } diff --git a/core/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java b/core/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java index 76944c7b804..276c4559153 100644 --- a/core/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java +++ b/core/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java @@ -94,8 +94,8 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest 1 but was [" + getSlices() + "]", e); + if (getSlices() == AbstractBulkByScrollRequest.AUTO_SLICES || getSlices() > 1) { + e = addValidationError("reindex from remote sources doesn't support slices > 1 but was [" + getSlices() + "]", e); } } return e; @@ -127,8 +127,8 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest delayedPrepareBulkRequestReference = new AtomicReference<>(); - public WorkingBulkByScrollTask(long id, String type, String action, String description, TaskId parentTask, Integer sliceId, - float requestsPerSecond) { - super(id, type, action, description, parentTask); + public WorkerBulkByScrollTaskState(BulkByScrollTask task, Integer sliceId, float requestsPerSecond) { + this.task = task; this.sliceId = sliceId; setRequestsPerSecond(requestsPerSecond); } - @Override - public Status getStatus() { - return new Status(sliceId, total.get(), updated.get(), created.get(), deleted.get(), batch.get(), versionConflicts.get(), - noops.get(), bulkRetries.get(), searchRetries.get(), timeValueNanos(throttledNanos.get()), getRequestsPerSecond(), - getReasonCancelled(), throttledUntil()); + public BulkByScrollTask.Status getStatus() { + return new BulkByScrollTask.Status( + sliceId, + total.get(), + updated.get(), + created.get(), + deleted.get(), + batch.get(), + versionConflicts.get(), + noops.get(), + bulkRetries.get(), + searchRetries.get(), + timeValueNanos(throttledNanos.get()), + getRequestsPerSecond(), + task.getReasonCancelled(), + throttledUntil()); } - @Override - protected void onCancelled() { - /* Drop the throttle to 0, immediately rescheduling any throttled - * operation so it will wake up and cancel itself. */ + public void handleCancel() { + // Drop the throttle to 0, immediately rescheduling any throttle operation so it will wake up and cancel itself. rethrottle(Float.POSITIVE_INFINITY); } - @Override - public int runningSliceSubTasks() { - return 0; - } - - @Override - public TaskInfo getInfoGivenSliceInfo(String localNodeId, List sliceInfo) { - throw new UnsupportedOperationException("This is only supported by " + ParentBulkByScrollTask.class.getName() + "."); - } - - TimeValue throttledUntil() { - DelayedPrepareBulkRequest delayed = delayedPrepareBulkRequestReference.get(); - if (delayed == null) { - return timeValueNanos(0); - } - if (delayed.future == null) { - return timeValueNanos(0); - } - return timeValueNanos(max(0, delayed.future.getDelay(TimeUnit.NANOSECONDS))); - } - public void setTotal(long totalHits) { total.set(totalHits); } @@ -171,6 +161,17 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success return requestsPerSecond; } + TimeValue throttledUntil() { + DelayedPrepareBulkRequest delayed = delayedPrepareBulkRequestReference.get(); + if (delayed == null) { + return timeValueNanos(0); + } + if (delayed.future == null) { + return timeValueNanos(0); + } + return timeValueNanos(max(0, delayed.future.getDelay(TimeUnit.NANOSECONDS))); + } + /** * Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be * rescheduled over and over again. @@ -180,9 +181,9 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success // Synchronize so we are less likely to schedule the same request twice. synchronized (delayedPrepareBulkRequestReference) { TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize); - logger.debug("[{}]: preparing bulk request for [{}]", getId(), delay); + logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay); delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(), - delay, new RunOnce(prepareBulkRequestRunnable))); + delay, new RunOnce(prepareBulkRequestRunnable))); } } @@ -213,16 +214,18 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success this.requestsPerSecond = requestsPerSecond; } - @Override + /** + * Apply {@code newRequestsPerSecond} as the new rate limit for this task's search requests + */ public void rethrottle(float newRequestsPerSecond) { synchronized (delayedPrepareBulkRequestReference) { - logger.debug("[{}]: rethrottling to [{}] requests per second", getId(), newRequestsPerSecond); + logger.debug("[{}]: rethrottling to [{}] requests per second", task.getId(), newRequestsPerSecond); setRequestsPerSecond(newRequestsPerSecond); DelayedPrepareBulkRequest delayedPrepareBulkRequest = this.delayedPrepareBulkRequestReference.get(); if (delayedPrepareBulkRequest == null) { // No request has been queued so nothing to reschedule. - logger.debug("[{}]: skipping rescheduling because there is no scheduled task", getId()); + logger.debug("[{}]: skipping rescheduling because there is no scheduled task", task.getId()); return; } @@ -260,8 +263,8 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success * change in throttle take effect the next time we delay * prepareBulkRequest. We can't just reschedule the request further * out in the future because the bulk context might time out. */ - logger.debug("[{}]: skipping rescheduling because the new throttle [{}] is slower than the old one [{}]", getId(), - newRequestsPerSecond, requestsPerSecond); + logger.debug("[{}]: skipping rescheduling because the new throttle [{}] is slower than the old one [{}]", task.getId(), + newRequestsPerSecond, requestsPerSecond); return this; } @@ -269,7 +272,7 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success // Actually reschedule the task if (false == FutureUtils.cancel(future)) { // Couldn't cancel, probably because the task has finished or been scheduled. Either way we have nothing to do here. - logger.debug("[{}]: skipping rescheduling because we couldn't cancel the task", getId()); + logger.debug("[{}]: skipping rescheduling because we couldn't cancel the task", task.getId()); return this; } @@ -278,7 +281,7 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success * test it you'll find that requests sneak through. So each request * is given a runOnce boolean to prevent that. */ TimeValue newDelay = newDelay(remainingDelay, newRequestsPerSecond); - logger.debug("[{}]: rescheduling for [{}] in the future", getId(), newDelay); + logger.debug("[{}]: rescheduling for [{}] in the future", task.getId(), newDelay); return new DelayedPrepareBulkRequest(threadPool, requestsPerSecond, newDelay, command); } diff --git a/core/src/test/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestTestCase.java b/core/src/test/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestTestCase.java index 8a255d376af..143e2416b88 100644 --- a/core/src/test/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestTestCase.java @@ -39,16 +39,21 @@ public abstract class AbstractBulkByScrollRequestTestCase r < 0, ESTestCase::randomFloat)); if (randomBoolean()) { original.setSize(between(0, Integer.MAX_VALUE)); } + // it's not important how many slices there are, we just need a number for forSlice + int actualSlices = between(2, 1000); + original.setSlices(randomBoolean() + ? actualSlices + : AbstractBulkByScrollRequest.AUTO_SLICES); + TaskId slicingTask = new TaskId(randomAlphaOfLength(5), randomLong()); SearchRequest sliceRequest = new SearchRequest(); - R forSliced = original.forSlice(slicingTask, sliceRequest); + R forSliced = original.forSlice(slicingTask, sliceRequest, actualSlices); assertEquals(original.isAbortOnVersionConflict(), forSliced.isAbortOnVersionConflict()); assertEquals(original.isRefresh(), forSliced.isRefresh()); assertEquals(original.getTimeout(), forSliced.getTimeout()); @@ -57,10 +62,10 @@ public abstract class AbstractBulkByScrollRequestTestCase listener = slice < slices - 1 ? neverCalled() : mock(ActionListener.class); - task.onSliceResponse(listener, slice, + taskState.onSliceResponse(listener, slice, new BulkByScrollResponse(timeValueMillis(10), sliceStatus, emptyList(), emptyList(), false)); status = task.getStatus(); diff --git a/core/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java b/core/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java index 32b01237375..9f4b20ff35b 100644 --- a/core/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java +++ b/core/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java @@ -45,7 +45,7 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase 1 but was [" + reindex.getSlices() + "];", + "Validation Failed: 1: reindex from remote sources doesn't support slices > 1 but was [" + reindex.getSlices() + "];", e.getMessage()); } - public void testNoSliceWithWorkers() { + public void testNoSliceBuilderSetWithSlicedRequest() { ReindexRequest reindex = newRequest(); reindex.getSearchRequest().source().slice(new SliceBuilder(0, 4)); reindex.setSlices(between(2, Integer.MAX_VALUE)); ActionRequestValidationException e = reindex.validate(); - assertEquals("Validation Failed: 1: can't specify both slice and workers;", e.getMessage()); + assertEquals("Validation Failed: 1: can't specify both manual and automatic slicing at the same time;", e.getMessage()); } @Override diff --git a/core/src/test/java/org/elasticsearch/index/reindex/WorkingBulkByScrollTaskTests.java b/core/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java similarity index 83% rename from core/src/test/java/org/elasticsearch/index/reindex/WorkingBulkByScrollTaskTests.java rename to core/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java index 5d594d080b8..64bf52c319e 100644 --- a/core/src/test/java/org/elasticsearch/index/reindex/WorkingBulkByScrollTaskTests.java +++ b/core/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java @@ -46,12 +46,15 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; -public class WorkingBulkByScrollTaskTests extends ESTestCase { - private WorkingBulkByScrollTask task; +public class WorkerBulkByScrollTaskStateTests extends ESTestCase { + private BulkByScrollTask task; + private WorkerBulkByScrollTaskState workerState; @Before public void createTask() { - task = new WorkingBulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, null, Float.POSITIVE_INFINITY); + task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID); + task.setWorker(Float.POSITIVE_INFINITY, null); + workerState = task.getWorkerState(); } public void testBasicData() { @@ -78,7 +81,7 @@ public class WorkingBulkByScrollTaskTests extends ESTestCase { assertEquals(noops, status.getNoops()); long totalHits = randomIntBetween(10, 1000); - task.setTotal(totalHits); + workerState.setTotal(totalHits); for (long p = 0; p < totalHits; p++) { status = task.getStatus(); assertEquals(totalHits, status.getTotal()); @@ -91,28 +94,28 @@ public class WorkingBulkByScrollTaskTests extends ESTestCase { if (randomBoolean()) { created++; - task.countCreated(); + workerState.countCreated(); } else if (randomBoolean()) { updated++; - task.countUpdated(); + workerState.countUpdated(); } else { deleted++; - task.countDeleted(); + workerState.countDeleted(); } if (rarely()) { versionConflicts++; - task.countVersionConflict(); + workerState.countVersionConflict(); } if (rarely()) { batch++; - task.countBatch(); + workerState.countBatch(); } if (rarely()) { noops++; - task.countNoop(); + workerState.countNoop(); } } status = task.getStatus(); @@ -139,7 +142,7 @@ public class WorkingBulkByScrollTaskTests extends ESTestCase { * each time. */ float originalRequestsPerSecond = (float) randomDoubleBetween(1, 10000, true); - task.rethrottle(originalRequestsPerSecond); + workerState.rethrottle(originalRequestsPerSecond); TimeValue maxDelay = timeValueSeconds(between(1, 5)); assertThat(maxDelay.nanos(), greaterThanOrEqualTo(0L)); int batchSizeForMaxDelay = (int) (maxDelay.seconds() * originalRequestsPerSecond); @@ -151,20 +154,22 @@ public class WorkingBulkByScrollTaskTests extends ESTestCase { } }; try { - task.delayPrepareBulkRequest(threadPool, timeValueNanos(System.nanoTime()), batchSizeForMaxDelay, new AbstractRunnable() { - @Override - protected void doRun() throws Exception { - boolean oldValue = done.getAndSet(true); - if (oldValue) { - throw new RuntimeException("Ran twice oh no!"); + workerState.delayPrepareBulkRequest(threadPool, timeValueNanos(System.nanoTime()), batchSizeForMaxDelay, + new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + boolean oldValue = done.getAndSet(true); + if (oldValue) { + throw new RuntimeException("Ran twice oh no!"); + } + } + + @Override + public void onFailure(Exception e) { + errors.add(e); } } - - @Override - public void onFailure(Exception e) { - errors.add(e); - } - }); + ); // Rethrottle on a random number of threads, one of which is this thread. Runnable test = () -> { @@ -172,7 +177,7 @@ public class WorkingBulkByScrollTaskTests extends ESTestCase { int rethrottles = 0; while (false == done.get()) { float requestsPerSecond = (float) randomDoubleBetween(0, originalRequestsPerSecond * 2, true); - task.rethrottle(requestsPerSecond); + workerState.rethrottle(requestsPerSecond); rethrottles += 1; } logger.info("Rethrottled [{}] times", rethrottles); @@ -237,7 +242,7 @@ public class WorkingBulkByScrollTaskTests extends ESTestCase { }; try { // Have the task use the thread pool to delay a task that does nothing - task.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), 1, new AbstractRunnable() { + workerState.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), 1, new AbstractRunnable() { @Override protected void doRun() throws Exception { } @@ -254,12 +259,12 @@ public class WorkingBulkByScrollTaskTests extends ESTestCase { } public void testPerfectlyThrottledBatchTime() { - task.rethrottle(Float.POSITIVE_INFINITY); - assertThat((double) task.perfectlyThrottledBatchTime(randomInt()), closeTo(0f, 0f)); + workerState.rethrottle(Float.POSITIVE_INFINITY); + assertThat((double) workerState.perfectlyThrottledBatchTime(randomInt()), closeTo(0f, 0f)); int total = between(0, 1000000); - task.rethrottle(1); - assertThat((double) task.perfectlyThrottledBatchTime(total), + workerState.rethrottle(1); + assertThat((double) workerState.perfectlyThrottledBatchTime(total), closeTo(TimeUnit.SECONDS.toNanos(total), TimeUnit.SECONDS.toNanos(1))); } } diff --git a/core/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java b/core/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java index 009b9ab5d65..7ea3dd2094b 100644 --- a/core/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java +++ b/core/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java @@ -114,21 +114,21 @@ public class SliceBuilderTests extends ESTestCase { public void testInvalidArguments() throws Exception { Exception e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", -1, 10)); - assertEquals(e.getMessage(), "id must be greater than or equal to 0"); + assertEquals("id must be greater than or equal to 0", e.getMessage()); e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 10, -1)); - assertEquals(e.getMessage(), "max must be greater than 1"); + assertEquals("max must be greater than 1", e.getMessage()); e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 10, 0)); - assertEquals(e.getMessage(), "max must be greater than 1"); + assertEquals("max must be greater than 1", e.getMessage()); e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 10, 5)); - assertEquals(e.getMessage(), "max must be greater than id"); + assertEquals("max must be greater than id", e.getMessage()); e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 1000, 1000)); - assertEquals(e.getMessage(), "max must be greater than id"); + assertEquals("max must be greater than id", e.getMessage()); e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 1001, 1000)); - assertEquals(e.getMessage(), "max must be greater than id"); + assertEquals("max must be greater than id", e.getMessage()); } public void testToFilter() throws IOException { diff --git a/docs/reference/docs/delete-by-query.asciidoc b/docs/reference/docs/delete-by-query.asciidoc index 1e26aac6d61..b2a59231d34 100644 --- a/docs/reference/docs/delete-by-query.asciidoc +++ b/docs/reference/docs/delete-by-query.asciidoc @@ -339,11 +339,19 @@ take effect on after completing the current batch. This prevents scroll timeouts. [float] -[[docs-delete-by-query-manual-slice]] -=== Manually slicing +[[docs-delete-by-query-slice]] +=== Slicing -Delete-by-query supports <> allowing you to manually parallelize -the process relatively easily: +Delete-by-query supports <> to parallelize the deleting process. +This parallelization can improve efficiency and provide a convenient way to +break the request down into smaller parts. + +[float] +[[docs-delete-by-query-manual-slice]] +==== Manually slicing + +Slice a delete-by-query manually by providing a slice id and total number of +slices to each request: [source,js] ---------------------------------------------------------------- @@ -412,10 +420,11 @@ Which results in a sensible `total` like this one: [float] [[docs-delete-by-query-automatic-slice]] -=== Automatic slicing +==== Automatic slicing You can also let delete-by-query automatically parallelize using -<> to slice on `_uid`: +<> to slice on `_uid`. Use `slices` to specify the number of +slices to use: [source,js] ---------------------------------------------------------------- @@ -463,6 +472,11 @@ Which results in a sensible `total` like this one: ---------------------------------------------------------------- // TESTRESPONSE +Setting `slices` to `auto` will let Elasticsearch choose the number of slices +to use. This setting will use one slice per shard, up to a certain limit. If +there are multiple source indices, it will choose the number of slices based +on the index with the smallest number of shards. + Adding `slices` to `_delete_by_query` just automates the manual process used in the section above, creating sub-requests which means it has some quirks: @@ -489,18 +503,20 @@ though these are all taken at approximately the same time. [float] [[docs-delete-by-query-picking-slices]] -=== Picking the number of slices +===== Picking the number of slices -At this point we have a few recommendations around the number of `slices` to -use (the `max` parameter in the slice API if manually parallelizing): +If slicing automatically, setting `slices` to `auto` will choose a reasonable +number for most indices. If you're slicing manually or otherwise tuning +automatic slicing, use these guidelines. -* Don't use large numbers. `500` creates fairly massive CPU thrash. -* It is more efficient from a query performance standpoint to use some multiple -of the number of shards in the source index. -* Using exactly as many shards as are in the source index is the most efficient -from a query performance standpoint. -* Indexing performance should scale linearly across available resources with -the number of `slices`. -* Whether indexing or query performance dominates that process depends on lots -of factors like the documents being reindexed and the cluster doing the -reindexing. +Query performance is most efficient when the number of `slices` is equal to the +number of shards in the index. If that number is large, (for example, +500) choose a lower number as too many `slices` will hurt performance. Setting +`slices` higher than the number of shards generally does not improve efficiency +and adds overhead. + +Delete performance scales linearly across available resources with the +number of slices. + +Whether query or delete performance dominates the runtime depends on the +documents being reindexed and cluster resources. diff --git a/docs/reference/docs/reindex.asciidoc b/docs/reference/docs/reindex.asciidoc index 9056450bf7a..bb48703cd5e 100644 --- a/docs/reference/docs/reindex.asciidoc +++ b/docs/reference/docs/reindex.asciidoc @@ -787,11 +787,19 @@ and it'll look like: Or you can search by `tag` or whatever you want. +[float] +[[docs-reindex-slice]] +=== Slicing + +Reindex supports <> to parallelize the reindexing process. +This parallelization can improve efficiency and provide a convenient way to +break the request down into smaller parts. + [float] [[docs-reindex-manual-slice]] ==== Manual slicing -Reindex supports <>, allowing you to manually parallelize the -process relatively easily: +Slice a reindex request manually by providing a slice id and total number of +slices to each request: [source,js] ---------------------------------------------------------------- @@ -849,10 +857,10 @@ Which results in a sensible `total` like this one: [float] [[docs-reindex-automatic-slice]] -=== Automatic slicing +==== Automatic slicing You can also let reindex automatically parallelize using <> to -slice on `_uid`: +slice on `_uid`. Use `slices` to specify the number of slices to use: [source,js] ---------------------------------------------------------------- @@ -890,6 +898,11 @@ Which results in a sensible `total` like this one: ---------------------------------------------------------------- // TESTRESPONSE +Setting `slices` to `auto` will let Elasticsearch choose the number of slices +to use. This setting will use one slice per shard, up to a certain limit. If +there are multiple source indices, it will choose the number of slices based +on the index with the smallest number of shards. + Adding `slices` to `_reindex` just automates the manual process used in the section above, creating sub-requests which means it has some quirks: @@ -915,21 +928,23 @@ though these are all taken at approximately the same time. [float] [[docs-reindex-picking-slices]] -=== Picking the number of slices +===== Picking the number of slices -At this point we have a few recommendations around the number of `slices` to -use (the `max` parameter in the slice API if manually parallelizing): +If slicing automatically, setting `slices` to `auto` will choose a reasonable +number for most indices. If you're slicing manually or otherwise tuning +automatic slicing, use these guidelines. -* Don't use large numbers. `500` creates fairly massive CPU thrash. -* It is more efficient from a query performance standpoint to use some multiple -of the number of shards in the source index. -* Using exactly as many shards as are in the source index is the most efficient -from a query performance standpoint. -* Indexing performance should scale linearly across available resources with -the number of `slices`. -* Whether indexing or query performance dominates that process depends on lots -of factors like the documents being reindexed and the cluster doing the -reindexing. +Query performance is most efficient when the number of `slices` is equal to the +number of shards in the index. If that number is large, (for example, +500) choose a lower number as too many `slices` will hurt performance. Setting +`slices` higher than the number of shards generally does not improve efficiency +and adds overhead. + +Indexing performance scales linearly across available resources with the +number of slices. + +Whether query or indexing performance dominates the runtime depends on the +documents being reindexed and cluster resources. [float] === Reindex daily indices diff --git a/docs/reference/docs/update-by-query.asciidoc b/docs/reference/docs/update-by-query.asciidoc index 28c250dcfe1..2597fd28cb8 100644 --- a/docs/reference/docs/update-by-query.asciidoc +++ b/docs/reference/docs/update-by-query.asciidoc @@ -403,11 +403,19 @@ query takes effect immediately but rethrotting that slows down the query will take effect on after completing the current batch. This prevents scroll timeouts. +[float] +[[docs-update-by-query-slice]] +=== Slicing + +Update-by-query supports <> to parallelize the updating process. +This parallelization can improve efficiency and provide a convenient way to +break the request down into smaller parts. + [float] [[docs-update-by-query-manual-slice]] ==== Manual slicing -Update-by-query supports <> allowing you to manually parallelize -the process relatively easily: +Slice an update-by-query manually by providing a slice id and total number of +slices to each request: [source,js] ---------------------------------------------------------------- @@ -459,10 +467,11 @@ Which results in a sensible `total` like this one: [float] [[docs-update-by-query-automatic-slice]] -=== Automatic slicing +==== Automatic slicing You can also let update-by-query automatically parallelize using -<> to slice on `_uid`: +<> to slice on `_uid`. Use `slices` to specify the number of +slices to use: [source,js] ---------------------------------------------------------------- @@ -497,6 +506,11 @@ Which results in a sensible `total` like this one: ---------------------------------------------------------------- // TESTRESPONSE +Setting `slices` to `auto` will let Elasticsearch choose the number of slices +to use. This setting will use one slice per shard, up to a certain limit. If +there are multiple source indices, it will choose the number of slices based +on the index with the smallest number of shards. + Adding `slices` to `_update_by_query` just automates the manual process used in the section above, creating sub-requests which means it has some quirks: @@ -523,22 +537,23 @@ though these are all taken at approximately the same time. [float] [[docs-update-by-query-picking-slices]] -=== Picking the number of slices +===== Picking the number of slices -At this point we have a few recommendations around the number of `slices` to -use (the `max` parameter in the slice API if manually parallelizing): +If slicing automatically, setting `slices` to `auto` will choose a reasonable +number for most indices. If you're slicing manually or otherwise tuning +automatic slicing, use these guidelines. -* Don't use large numbers. `500` creates fairly massive CPU thrash. -* It is more efficient from a query performance standpoint to use some multiple -of the number of shards in the source index. -* Using exactly as many shards as are in the source index is the most efficient -from a query performance standpoint. -* Indexing performance should scale linearly across available resources with -the number of `slices`. -* Whether indexing or query performance dominates that process depends on lots -of factors like the documents being reindexed and the cluster doing the -reindexing. +Query performance is most efficient when the number of `slices` is equal to the +number of shards in the index. If that number is large, (for example, +500) choose a lower number as too many `slices` will hurt performance. Setting +`slices` higher than the number of shards generally does not improve efficiency +and adds overhead. +Update performance scales linearly across available resources with the +number of slices. + +Whether query or update performance dominates the runtime depends on the +documents being reindexed and cluster resources. [float] [[picking-up-a-new-property]] diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index 91673fd0a41..0fc89677f40 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -87,7 +87,8 @@ import static org.elasticsearch.search.sort.SortBuilders.fieldSort; */ public abstract class AbstractAsyncBulkByScrollAction> { protected final Logger logger; - protected final WorkingBulkByScrollTask task; + protected final BulkByScrollTask task; + protected final WorkerBulkByScrollTaskState worker; protected final ThreadPool threadPool; protected final ScriptService scriptService; protected final ClusterState clusterState; @@ -114,16 +115,22 @@ public abstract class AbstractAsyncBulkByScrollAction, ScrollableHitSource.Hit, RequestWrapper> scriptApplier; - public AbstractAsyncBulkByScrollAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, Request mainRequest, ScriptService scriptService, ClusterState clusterState, ActionListener listener) { this(task, logger, client, threadPool, mainRequest, scriptService, clusterState, listener, client.settings()); } - public AbstractAsyncBulkByScrollAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, Request mainRequest, ScriptService scriptService, ClusterState clusterState, ActionListener listener, Settings settings) { + this.task = task; + if (!task.isWorker()) { + throw new IllegalArgumentException("Given task [" + task.getId() + "] must have a child worker"); + } + this.worker = task.getWorkerState(); + this.logger = logger; this.client = client; this.settings = settings; @@ -133,7 +140,7 @@ public abstract class AbstractAsyncBulkByScrollAction 0) { total = min(total, mainRequest.getSize()); } - task.setTotal(total); + worker.setTotal(total); AbstractRunnable prepareBulkRequestRunnable = new AbstractRunnable() { @Override protected void doRun() throws Exception { @@ -289,7 +296,7 @@ public abstract class AbstractAsyncBulkByScrollAction hits = response.getHits(); if (mainRequest.getSize() != SIZE_ALL_MATCHES) { // Truncate the hits if we have more than the request size - long remaining = max(0, mainRequest.getSize() - task.getSuccessfullyProcessed()); + long remaining = max(0, mainRequest.getSize() - worker.getSuccessfullyProcessed()); if (remaining < hits.size()) { hits = hits.subList(0, (int) remaining); } @@ -372,16 +379,16 @@ public abstract class AbstractAsyncBulkByScrollAction= mainRequest.getSize()) { + if (mainRequest.getSize() != SIZE_ALL_MATCHES && worker.getSuccessfullyProcessed() >= mainRequest.getSize()) { // We've processed all the requested docs. refreshAndFinish(emptyList(), emptyList(), false); return; @@ -425,7 +432,7 @@ public abstract class AbstractAsyncBulkByScrollAction { onScrollResponse(lastBatchStartTime, lastBatchSize, response); }); @@ -433,7 +440,7 @@ public abstract class AbstractAsyncBulkByScrollAction failures) { if (failure.getStatus() == CONFLICT) { - task.countVersionConflict(); + worker.countVersionConflict(); if (false == mainRequest.isAbortOnVersionConflict()) { return; } @@ -759,9 +766,9 @@ public abstract class AbstractAsyncBulkByScrollAction, ScrollableHitSource.Hit, RequestWrapper> { + public abstract static class ScriptApplier implements BiFunction, ScrollableHitSource.Hit, RequestWrapper> { - private final WorkingBulkByScrollTask task; + private final WorkerBulkByScrollTaskState taskWorker; private final ScriptService scriptService; private final Script script; private final Map params; @@ -769,9 +776,11 @@ public abstract class AbstractAsyncBulkByScrollAction context; - public ScriptApplier(WorkingBulkByScrollTask task, ScriptService scriptService, Script script, + public ScriptApplier(WorkerBulkByScrollTaskState taskWorker, + ScriptService scriptService, + Script script, Map params) { - this.task = task; + this.taskWorker = taskWorker; this.scriptService = scriptService; this.script = script; this.params = params; @@ -864,7 +873,7 @@ public abstract class AbstractAsyncBulkByScrollAction scriptChangedOpType(RequestWrapper request, OpType oldOpType, OpType newOpType) { switch (newOpType) { case NOOP: - task.countNoop(); + taskWorker.countNoop(); return null; case DELETE: RequestWrapper delete = wrap(new DeleteRequest(request.getIndex(), request.getType(), request.getId())); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java index 64b02c4be81..4ea2592801d 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java @@ -29,6 +29,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.tasks.LoggingTaskListener; import org.elasticsearch.tasks.Task; @@ -90,7 +91,11 @@ public abstract class AbstractBaseReindexRestHandler< request.setRefresh(restRequest.paramAsBoolean("refresh", request.isRefresh())); request.setTimeout(restRequest.paramAsTime("timeout", request.getTimeout())); - request.setSlices(restRequest.paramAsInt("slices", request.getSlices())); + + Integer slices = parseSlices(restRequest); + if (slices != null) { + request.setSlices(slices); + } String waitForActiveShards = restRequest.param("wait_for_active_shards"); if (waitForActiveShards != null) { @@ -115,6 +120,32 @@ public abstract class AbstractBaseReindexRestHandler< }; } + private static Integer parseSlices(RestRequest request) { + String slicesString = request.param("slices"); + if (slicesString == null) { + return null; + } + + if (slicesString.equals(AbstractBulkByScrollRequest.AUTO_SLICES_VALUE)) { + return AbstractBulkByScrollRequest.AUTO_SLICES; + } + + int slices; + try { + slices = Integer.parseInt(slicesString); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "[slices] must be a positive integer or the string \"auto\", but was [" + slicesString + "]", e); + } + + if (slices < 1) { + throw new IllegalArgumentException( + "[slices] must be a positive integer or the string \"auto\", but was [" + slicesString + "]"); + } + + return slices; + } + /** * @return requests_per_second from the request as a float if it was on the request, null otherwise */ diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java index 2608f5715ba..8dd30a9fa9d 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java @@ -31,7 +31,7 @@ import org.elasticsearch.threadpool.ThreadPool; * Implementation of delete-by-query using scrolling and bulk. */ public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction { - public AsyncDeleteByQueryAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService, ClusterState clusterState, ActionListener listener) { super(task, logger, client, threadPool, request, scriptService, clusterState, listener); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollParallelizationHelper.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollParallelizationHelper.java index 48f10306454..19cca917290 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollParallelizationHelper.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollParallelizationHelper.java @@ -21,31 +21,118 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.tasks.TaskManager; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; /** * Helps parallelize reindex requests using sliced scrolls. */ class BulkByScrollParallelizationHelper { + + static final int AUTO_SLICE_CEILING = 20; + private BulkByScrollParallelizationHelper() {} - public static > void startSlices(Client client, TaskManager taskManager, - Action action, - String localNodeId, ParentBulkByScrollTask task, Request request, - ActionListener listener) { + /** + * Takes an action created by a {@link BulkByScrollTask} and runs it with regard to whether the request is sliced or not. + * + * If the request is not sliced (i.e. the number of slices is 1), the worker action in the given {@link Runnable} will be started on + * the local node. If the request is sliced (i.e. the number of slices is more than 1), then a subrequest will be created for each + * slice and sent. + * + * If slices are set as {@code "auto"}, this class will resolve that to a specific number based on characteristics of the source + * indices. A request with {@code "auto"} slices may end up being sliced or unsliced. + */ + static > void startSlicedAction( + Request request, + BulkByScrollTask task, + Action action, + ActionListener listener, + Client client, + DiscoveryNode node, + Runnable workerAction) { + + if (request.getSlices() == AbstractBulkByScrollRequest.AUTO_SLICES) { + ClusterSearchShardsRequest shardsRequest = new ClusterSearchShardsRequest(); + shardsRequest.indices(request.getSearchRequest().indices()); + client.admin().cluster().searchShards(shardsRequest, ActionListener.wrap( + response -> { + int actualNumSlices = countSlicesBasedOnShards(response); + sliceConditionally(request, task, action, listener, client, node, workerAction, actualNumSlices); + }, + listener::onFailure + )); + } else { + sliceConditionally(request, task, action, listener, client, node, workerAction, request.getSlices()); + } + } + + private static > void sliceConditionally( + Request request, + BulkByScrollTask task, + Action action, + ActionListener listener, + Client client, + DiscoveryNode node, + Runnable workerAction, + int slices) { + + if (slices > 1) { + task.setWorkerCount(slices); + sendSubRequests(client, action, node.getId(), task, request, listener); + } else { + SliceBuilder sliceBuilder = request.getSearchRequest().source().slice(); + Integer sliceId = sliceBuilder == null + ? null + : sliceBuilder.getId(); + task.setWorker(request.getRequestsPerSecond(), sliceId); + workerAction.run(); + } + } + + private static int countSlicesBasedOnShards(ClusterSearchShardsResponse response) { + Map countsByIndex = Arrays.stream(response.getGroups()).collect(Collectors.toMap( + group -> group.getShardId().getIndex(), + group -> 1, + (sum, term) -> sum + term + )); + Set counts = new HashSet<>(countsByIndex.values()); + int leastShards = Collections.min(counts); + return Math.min(leastShards, AUTO_SLICE_CEILING); + } + + private static > void sendSubRequests( + Client client, + Action action, + String localNodeId, + BulkByScrollTask task, + Request request, + ActionListener listener) { + + LeaderBulkByScrollTaskState worker = task.getLeaderState(); + int totalSlices = worker.getSlices(); TaskId parentTaskId = new TaskId(localNodeId, task.getId()); - for (final SearchRequest slice : sliceIntoSubRequests(request.getSearchRequest(), UidFieldMapper.NAME, request.getSlices())) { + for (final SearchRequest slice : sliceIntoSubRequests(request.getSearchRequest(), UidFieldMapper.NAME, totalSlices)) { // TODO move the request to the correct node. maybe here or somehow do it as part of startup for reindex in general.... - Request requestForSlice = request.forSlice(parentTaskId, slice); + Request requestForSlice = request.forSlice(parentTaskId, slice, totalSlices); ActionListener sliceListener = ActionListener.wrap( - r -> task.onSliceResponse(listener, slice.source().slice().getId(), r), - e -> task.onSliceFailure(listener, slice.source().slice().getId(), e)); + r -> worker.onSliceResponse(listener, slice.source().slice().getId(), r), + e -> worker.onSliceFailure(listener, slice.source().slice().getId(), e)); client.execute(action, requestForSlice, sliceListener); } } @@ -80,5 +167,4 @@ class BulkByScrollParallelizationHelper { } return slices; } - } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java index 99e1a9f166d..e2de5cd4ffc 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java @@ -51,15 +51,17 @@ public class TransportDeleteByQueryAction extends HandledTransportAction listener) { - if (request.getSlices() > 1) { - BulkByScrollParallelizationHelper.startSlices(client, taskManager, DeleteByQueryAction.INSTANCE, - clusterService.localNode().getId(), (ParentBulkByScrollTask) task, request, listener); - } else { - ClusterState state = clusterService.state(); - ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task); - new AsyncDeleteByQueryAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state, + BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task; + BulkByScrollParallelizationHelper.startSlicedAction(request, bulkByScrollTask, DeleteByQueryAction.INSTANCE, listener, client, + clusterService.localNode(), + () -> { + ClusterState state = clusterService.state(); + ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), + bulkByScrollTask); + new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, request, scriptService, state, listener).start(); - } + } + ); } @Override diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java index 77a3f19ddae..92d7c9ee51f 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java @@ -109,18 +109,22 @@ public class TransportReindexAction extends HandledTransportAction listener) { - if (request.getSlices() > 1) { - BulkByScrollParallelizationHelper.startSlices(client, taskManager, ReindexAction.INSTANCE, clusterService.localNode().getId(), - (ParentBulkByScrollTask) task, request, listener); - } else { - checkRemoteWhitelist(remoteWhitelist, request.getRemoteInfo()); - ClusterState state = clusterService.state(); - validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(), - indexNameExpressionResolver, autoCreateIndex, state); - ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task); - new AsyncIndexBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state, + checkRemoteWhitelist(remoteWhitelist, request.getRemoteInfo()); + ClusterState state = clusterService.state(); + validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(), + indexNameExpressionResolver, autoCreateIndex, state); + + BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task; + + BulkByScrollParallelizationHelper.startSlicedAction(request, bulkByScrollTask, ReindexAction.INSTANCE, listener, client, + clusterService.localNode(), + () -> { + ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), + bulkByScrollTask); + new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, request, scriptService, state, listener).start(); - } + } + ); } @Override @@ -244,13 +248,13 @@ public class TransportReindexAction extends HandledTransportAction createdThreads = emptyList(); - AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, ReindexRequest request, ScriptService scriptService, ClusterState clusterState, ActionListener listener) { this(task, logger, client, threadPool, request, scriptService, clusterState, listener, client.settings()); } - AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, ReindexRequest request, ScriptService scriptService, ClusterState clusterState, ActionListener listener, Settings settings) { super(task, logger, client, threadPool, request, scriptService, clusterState, listener, settings); @@ -271,8 +275,8 @@ public class TransportReindexAction extends HandledTransportAction()); RestClient restClient = buildRestClient(remoteInfo, task.getId(), createdThreads); - return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, task::countSearchRetry, this::finishHim, restClient, - remoteInfo.getQuery(), mainRequest.getSearchRequest()); + return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry, this::finishHim, + restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest()); } return super.buildScrollableResultSource(backoffPolicy); } @@ -293,7 +297,7 @@ public class TransportReindexAction extends HandledTransportAction, ScrollableHitSource.Hit, RequestWrapper> buildScriptApplier() { Script script = mainRequest.getScript(); if (script != null) { - return new ReindexScriptApplier(task, scriptService, script, script.getParams()); + return new ReindexScriptApplier(worker, scriptService, script, script.getParams()); } return super.buildScriptApplier(); } @@ -385,9 +389,9 @@ public class TransportReindexAction extends HandledTransportAction params) { - super(task, scriptService, script, params); + super(taskWorker, scriptService, script, params); } /* diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java index bcfb2813474..d8105e4a6ec 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java @@ -59,21 +59,48 @@ public class TransportRethrottleAction extends TransportTasksAction listener) { - int runningSubTasks = task.runningSliceSubTasks(); - if (runningSubTasks == 0) { - logger.debug("rethrottling local task [{}] to [{}] requests per second", task.getId(), newRequestsPerSecond); - task.rethrottle(newRequestsPerSecond); - listener.onResponse(task.taskInfo(localNodeId, true)); + + if (task.isWorker()) { + rethrottleChildTask(logger, localNodeId, task, newRequestsPerSecond, listener); return; } - RethrottleRequest subRequest = new RethrottleRequest(); - subRequest.setRequestsPerSecond(newRequestsPerSecond / runningSubTasks); - subRequest.setParentTaskId(new TaskId(localNodeId, task.getId())); - logger.debug("rethrottling children of task [{}] to [{}] requests per second", task.getId(), subRequest.getRequestsPerSecond()); - client.execute(RethrottleAction.INSTANCE, subRequest, ActionListener.wrap(r -> { - r.rethrowFailures("Rethrottle"); - listener.onResponse(task.getInfoGivenSliceInfo(localNodeId, r.getTasks())); - }, listener::onFailure)); + + if (task.isLeader()) { + rethrottleParentTask(logger, localNodeId, client, task, newRequestsPerSecond, listener); + return; + } + + throw new IllegalArgumentException("task [" + task.getId() + "] must be set as a child or parent"); + } + + private static void rethrottleParentTask(Logger logger, String localNodeId, Client client, BulkByScrollTask task, + float newRequestsPerSecond, ActionListener listener) { + final LeaderBulkByScrollTaskState leaderState = task.getLeaderState(); + final int runningSubtasks = leaderState.runningSliceSubTasks(); + + if (runningSubtasks > 0) { + RethrottleRequest subRequest = new RethrottleRequest(); + subRequest.setRequestsPerSecond(newRequestsPerSecond / runningSubtasks); + subRequest.setParentTaskId(new TaskId(localNodeId, task.getId())); + logger.debug("rethrottling children of task [{}] to [{}] requests per second", task.getId(), + subRequest.getRequestsPerSecond()); + client.execute(RethrottleAction.INSTANCE, subRequest, ActionListener.wrap( + r -> { + r.rethrowFailures("Rethrottle"); + listener.onResponse(task.taskInfoGivenSubtaskInfo(localNodeId, r.getTasks())); + }, + listener::onFailure)); + } else { + logger.debug("children of task [{}] are already finished, nothing to rethrottle", task.getId()); + listener.onResponse(task.taskInfo(localNodeId, true)); + } + } + + private static void rethrottleChildTask(Logger logger, String localNodeId, BulkByScrollTask task, float newRequestsPerSecond, + ActionListener listener) { + logger.debug("rethrottling local task [{}] to [{}] requests per second", task.getId(), newRequestsPerSecond); + task.getWorkerState().rethrottle(newRequestsPerSecond); + listener.onResponse(task.taskInfo(localNodeId, true)); } @Override diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java index 8924c7038c9..e21a6408bd8 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java @@ -64,15 +64,17 @@ public class TransportUpdateByQueryAction extends HandledTransportAction listener) { - if (request.getSlices() > 1) { - BulkByScrollParallelizationHelper.startSlices(client, taskManager, UpdateByQueryAction.INSTANCE, - clusterService.localNode().getId(), (ParentBulkByScrollTask) task, request, listener); - } else { - ClusterState state = clusterService.state(); - ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task); - new AsyncIndexBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state, + BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task; + BulkByScrollParallelizationHelper.startSlicedAction(request, bulkByScrollTask, UpdateByQueryAction.INSTANCE, listener, client, + clusterService.localNode(), + () -> { + ClusterState state = clusterService.state(); + ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), + bulkByScrollTask); + new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, request, scriptService, state, listener).start(); - } + } + ); } @Override @@ -84,13 +86,13 @@ public class TransportUpdateByQueryAction extends HandledTransportAction { - AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, UpdateByQueryRequest request, ScriptService scriptService, ClusterState clusterState, ActionListener listener) { this(task, logger, client, threadPool, request, scriptService, clusterState, listener, client.settings()); } - AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, UpdateByQueryRequest request, ScriptService scriptService, ClusterState clusterState, ActionListener listener, Settings settings) { super(task, logger, client, threadPool, request, scriptService, clusterState, listener, settings); @@ -109,7 +111,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction, ScrollableHitSource.Hit, RequestWrapper> buildScriptApplier() { Script script = mainRequest.getScript(); if (script != null) { - return new UpdateByQueryScriptApplier(task, scriptService, script, script.getParams()); + return new UpdateByQueryScriptApplier(worker, scriptService, script, script.getParams()); } return super.buildScriptApplier(); } @@ -129,9 +131,9 @@ public class TransportUpdateByQueryAction extends HandledTransportAction params) { - super(task, scriptService, script, params); + UpdateByQueryScriptApplier(WorkerBulkByScrollTaskState taskWorker, ScriptService scriptService, Script script, + Map params) { + super(taskWorker, scriptService, script, params); } @Override diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index 7f59987617f..fe754b38817 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -124,7 +124,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { private PlainActionFuture listener; private String scrollId; private TaskManager taskManager; - private WorkingBulkByScrollTask testTask; + private BulkByScrollTask testTask; + private WorkerBulkByScrollTaskState worker; private Map expectedHeaders = new HashMap<>(); private DiscoveryNode localNode; private TaskId taskId; @@ -141,7 +142,9 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { listener = new PlainActionFuture<>(); scrollId = null; taskManager = new TaskManager(Settings.EMPTY); - testTask = (WorkingBulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest); + testTask = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest); + testTask.setWorker(testRequest.getRequestsPerSecond(), null); + worker = testTask.getWorkerState(); localNode = new DiscoveryNode("thenode", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); taskId = new TaskId(localNode.getId(), testTask.getId()); @@ -309,7 +312,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { * Mimicks a ThreadPool rejecting execution of the task. */ public void testThreadPoolRejectionsAbortRequest() throws Exception { - testTask.rethrottle(1); + worker.rethrottle(1); setupClient(new TestThreadPool(getTestName()) { @Override public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { @@ -439,7 +442,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { firstSearchRequest.scroll(timeValueSeconds(10)); // Set throttle to 1 request per second to make the math simpler - testTask.rethrottle(1f); + worker.rethrottle(1f); // Make the last batch look nearly instant but have 100 documents TimeValue lastBatchStartTime = timeValueNanos(System.nanoTime()); TimeValue now = timeValueNanos(lastBatchStartTime.nanos() + 1); @@ -459,7 +462,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { assertEquals(99, capturedDelay.get().seconds()); } else { // Let's rethrottle between the starting the scroll and getting the response - testTask.rethrottle(10f); + worker.rethrottle(10f); client.lastScroll.get().listener.onResponse(searchResponse); // The delay uses the new throttle assertEquals(9, capturedDelay.get().seconds()); @@ -624,7 +627,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { long total = randomIntBetween(0, Integer.MAX_VALUE); ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null); // Use a long delay here so the test will time out if the cancellation doesn't reschedule the throttled task - testTask.rethrottle(1); + worker.rethrottle(1); simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1000, response); // Now that we've got our cancel we'll just verify that it all came through all right @@ -694,7 +697,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { } @Override - public DummyAbstractBulkByScrollRequest forSlice(TaskId slicingTask, SearchRequest slice) { + public DummyAbstractBulkByScrollRequest forSlice(TaskId slicingTask, SearchRequest slice, int totalSlices) { throw new UnsupportedOperationException(); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryBasicTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryBasicTests.java index aba7cd69359..276dc955f82 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryBasicTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryBasicTests.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.alias.Alias; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilders; @@ -31,7 +30,10 @@ import org.elasticsearch.test.InternalSettingsPlugin; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE; @@ -224,7 +226,7 @@ public class DeleteByQueryBasicTests extends ReindexTestCase { assertHitCount(client().prepareSearch("test").setSize(0).get(), docs); } - public void testWorkers() throws Exception { + public void testSlices() throws Exception { indexRandom(true, client().prepareIndex("test", "test", "1").setSource("foo", "a"), client().prepareIndex("test", "test", "2").setSource("foo", "a"), @@ -236,18 +238,74 @@ public class DeleteByQueryBasicTests extends ReindexTestCase { ); assertHitCount(client().prepareSearch("test").setTypes("test").setSize(0).get(), 7); + int slices = randomSlices(); + int expectedSlices = expectedSliceStatuses(slices, "test"); + // Deletes the two docs that matches "foo:a" - assertThat(deleteByQuery().source("test").filter(termQuery("foo", "a")).refresh(true).setSlices(5).get(), - matcher().deleted(2).slices(hasSize(5))); + assertThat( + deleteByQuery() + .source("test") + .filter(termQuery("foo", "a")) + .refresh(true) + .setSlices(slices).get(), + matcher() + .deleted(2) + .slices(hasSize(expectedSlices))); assertHitCount(client().prepareSearch("test").setTypes("test").setSize(0).get(), 5); // Delete remaining docs - DeleteByQueryRequestBuilder request = deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).refresh(true) - .setSlices(5); - assertThat(request.get(), matcher().deleted(5).slices(hasSize(5))); + assertThat( + deleteByQuery() + .source("test") + .filter(QueryBuilders.matchAllQuery()) + .refresh(true) + .setSlices(slices).get(), + matcher() + .deleted(5) + .slices(hasSize(expectedSlices))); assertHitCount(client().prepareSearch("test").setTypes("test").setSize(0).get(), 0); } + public void testMultipleSources() throws Exception { + int sourceIndices = between(2, 5); + + Map> docs = new HashMap<>(); + for (int sourceIndex = 0; sourceIndex < sourceIndices; sourceIndex++) { + String indexName = "test" + sourceIndex; + docs.put(indexName, new ArrayList<>()); + int numDocs = between(5, 15); + for (int i = 0; i < numDocs; i++) { + docs.get(indexName).add(client().prepareIndex(indexName, "test", Integer.toString(i)).setSource("foo", "a")); + } + } + + List allDocs = docs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); + indexRandom(true, allDocs); + for (Map.Entry> entry : docs.entrySet()) { + assertHitCount(client().prepareSearch(entry.getKey()).setSize(0).get(), entry.getValue().size()); + } + + int slices = randomSlices(1, 10); + int expectedSlices = expectedSliceStatuses(slices, docs.keySet()); + + String[] sourceIndexNames = docs.keySet().toArray(new String[docs.size()]); + + assertThat( + deleteByQuery() + .source(sourceIndexNames) + .filter(QueryBuilders.matchAllQuery()) + .refresh(true) + .setSlices(slices).get(), + matcher() + .deleted(allDocs.size()) + .slices(hasSize(expectedSlices))); + + for (String index : docs.keySet()) { + assertHitCount(client().prepareSearch(index).setTypes("test").setSize(0).get(), 0); + } + + } + /** * Test delete by query support for filtering by type. This entire feature * can and should be removed when we drop support for types index with diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexBasicTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexBasicTests.java index 4f7753fca9a..43764bf25fc 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexBasicTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexBasicTests.java @@ -22,7 +22,11 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.index.IndexRequestBuilder; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -88,8 +92,6 @@ public class ReindexBasicTests extends ReindexTestCase { } public void testCopyManyWithSlices() throws Exception { - int workers = between(2, 10); - List docs = new ArrayList<>(); int max = between(150, 500); for (int i = 0; i < max; i++) { @@ -99,21 +101,61 @@ public class ReindexBasicTests extends ReindexTestCase { indexRandom(true, docs); assertHitCount(client().prepareSearch("source").setSize(0).get(), max); + int slices = randomSlices(); + int expectedSlices = expectedSliceStatuses(slices, "source"); + // Copy all the docs - ReindexRequestBuilder copy = reindex().source("source").destination("dest", "type").refresh(true).setSlices(workers); + ReindexRequestBuilder copy = reindex().source("source").destination("dest", "type").refresh(true).setSlices(slices); // Use a small batch size so we have to use more than one batch copy.source().setSize(5); - assertThat(copy.get(), matcher().created(max).batches(greaterThanOrEqualTo(max / 5)).slices(hasSize(workers))); + assertThat(copy.get(), matcher().created(max).batches(greaterThanOrEqualTo(max / 5)).slices(hasSize(expectedSlices))); assertHitCount(client().prepareSearch("dest").setTypes("type").setSize(0).get(), max); // Copy some of the docs int half = max / 2; - copy = reindex().source("source").destination("dest_half", "type").refresh(true).setSlices(workers); + copy = reindex().source("source").destination("dest_half", "type").refresh(true).setSlices(slices); // Use a small batch size so we have to use more than one batch copy.source().setSize(5); copy.size(half); // The real "size" of the request. BulkByScrollResponse response = copy.get(); - assertThat(response, matcher().created(lessThanOrEqualTo((long) half)).slices(hasSize(workers))); + assertThat(response, matcher().created(lessThanOrEqualTo((long) half)).slices(hasSize(expectedSlices))); assertHitCount(client().prepareSearch("dest_half").setSize(0).get(), response.getCreated()); } + + public void testMultipleSources() throws Exception { + int sourceIndices = between(2, 5); + + Map> docs = new HashMap<>(); + for (int sourceIndex = 0; sourceIndex < sourceIndices; sourceIndex++) { + String indexName = "source" + sourceIndex; + String typeName = "test" + sourceIndex; + docs.put(indexName, new ArrayList<>()); + int numDocs = between(50, 200); + for (int i = 0; i < numDocs; i++) { + docs.get(indexName).add(client().prepareIndex(indexName, typeName, "id_" + sourceIndex + "_" + i).setSource("foo", "a")); + } + } + + List allDocs = docs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); + indexRandom(true, allDocs); + for (Map.Entry> entry : docs.entrySet()) { + assertHitCount(client().prepareSearch(entry.getKey()).setSize(0).get(), entry.getValue().size()); + } + + int slices = randomSlices(1, 10); + int expectedSlices = expectedSliceStatuses(slices, docs.keySet()); + + String[] sourceIndexNames = docs.keySet().toArray(new String[docs.size()]); + ReindexRequestBuilder request = reindex() + .source(sourceIndexNames) + .destination("dest", "type") + .refresh(true) + .setSlices(slices); + + BulkByScrollResponse response = request.get(); + assertThat(response, matcher().created(allDocs.size()).slices(hasSize(expectedSlices))); + assertHitCount(client().prepareSearch("dest").setSize(0).get(), allDocs.size()); + } + + } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java index fcf80ea283c..54854afb35e 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java @@ -25,7 +25,10 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.stream.Collectors; +import static java.util.Collections.singleton; import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; /** @@ -62,4 +65,51 @@ public abstract class ReindexTestCase extends ESIntegTestCase { public static BulkIndexByScrollResponseMatcher matcher() { return new BulkIndexByScrollResponseMatcher(); } + + static int randomSlices(int min, int max) { + if (randomBoolean()) { + return AbstractBulkByScrollRequest.AUTO_SLICES; + } else { + return between(min, max); + } + } + + static int randomSlices() { + return randomSlices(2, 10); + } + + /** + * Figures out how many slices the request handling will use + */ + protected int expectedSlices(int requestSlices, Collection indices) { + if (requestSlices == AbstractBulkByScrollRequest.AUTO_SLICES) { + int leastNumShards = Collections.min(indices.stream() + .map(sourceIndex -> getNumShards(sourceIndex).numPrimaries) + .collect(Collectors.toList())); + return Math.min(leastNumShards, BulkByScrollParallelizationHelper.AUTO_SLICE_CEILING); + } else { + return requestSlices; + } + } + + protected int expectedSlices(int requestSlices, String index) { + return expectedSlices(requestSlices, singleton(index)); + } + + /** + * Figures out how many slice statuses to expect in the response + */ + protected int expectedSliceStatuses(int requestSlices, Collection indices) { + int slicesConfigured = expectedSlices(requestSlices, indices); + + if (slicesConfigured > 1) { + return slicesConfigured; + } else { + return 0; + } + } + + protected int expectedSliceStatuses(int slicesConfigured, String index) { + return expectedSliceStatuses(slicesConfigured, singleton(index)); + } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RethrottleTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RethrottleTests.java index 228bae4ed4a..566c057a798 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RethrottleTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RethrottleTests.java @@ -60,15 +60,15 @@ public class RethrottleTests extends ReindexTestCase { } public void testReindexWithWorkers() throws Exception { - testCase(reindex().source("test").destination("dest").setSlices(between(2, 10)), ReindexAction.NAME); + testCase(reindex().source("test").destination("dest").setSlices(randomSlices()), ReindexAction.NAME); } public void testUpdateByQueryWithWorkers() throws Exception { - testCase(updateByQuery().source("test").setSlices(between(2, 10)), UpdateByQueryAction.NAME); + testCase(updateByQuery().source("test").setSlices(randomSlices()), UpdateByQueryAction.NAME); } public void testDeleteByQueryWithWorkers() throws Exception { - testCase(deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).setSlices(between(2, 10)), DeleteByQueryAction.NAME); + testCase(deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).setSlices(randomSlices()), DeleteByQueryAction.NAME); } private void testCase(AbstractBulkByScrollRequestBuilder request, String actionName) throws Exception { @@ -76,8 +76,12 @@ public class RethrottleTests extends ReindexTestCase { /* Add ten documents per slice so most slices will have many documents to process, having to go to multiple batches. * we can't rely on all of them doing so, but */ + + createIndex("test"); + int numSlices = expectedSlices(request.request().getSlices(), "test"); + List docs = new ArrayList<>(); - for (int i = 0; i < request.request().getSlices() * 10; i++) { + for (int i = 0; i < numSlices * 10; i++) { docs.add(client().prepareIndex("test", "test", Integer.toString(i)).setSource("foo", "bar")); } indexRandom(true, docs); @@ -87,15 +91,15 @@ public class RethrottleTests extends ReindexTestCase { request.source().setSize(1); // Make sure we use multiple batches ActionFuture responseListener = request.execute(); - TaskGroup taskGroupToRethrottle = findTaskToRethrottle(actionName, request.request().getSlices()); + TaskGroup taskGroupToRethrottle = findTaskToRethrottle(actionName, numSlices); TaskId taskToRethrottle = taskGroupToRethrottle.getTaskInfo().getTaskId(); - if (request.request().getSlices() == 1) { + if (numSlices == 1) { assertThat(taskGroupToRethrottle.getChildTasks(), empty()); } else { // There should be a sane number of child tasks running assertThat(taskGroupToRethrottle.getChildTasks(), - hasSize(allOf(greaterThanOrEqualTo(1), lessThanOrEqualTo(request.request().getSlices())))); + hasSize(allOf(greaterThanOrEqualTo(1), lessThanOrEqualTo(numSlices)))); // Wait for all of the sub tasks to start (or finish, some might finish early, all that matters is that not all do) assertBusy(() -> { BulkByScrollTask.Status parent = (BulkByScrollTask.Status) client().admin().cluster().prepareGetTask(taskToRethrottle).get() @@ -103,7 +107,7 @@ public class RethrottleTests extends ReindexTestCase { long finishedSubTasks = parent.getSliceStatuses().stream().filter(Objects::nonNull).count(); ListTasksResponse list = client().admin().cluster().prepareListTasks().setParentTaskId(taskToRethrottle).get(); list.rethrowFailures("subtasks"); - assertThat(finishedSubTasks + list.getTasks().size(), greaterThanOrEqualTo((long) request.request().getSlices())); + assertThat(finishedSubTasks + list.getTasks().size(), greaterThanOrEqualTo((long) numSlices)); assertThat(list.getTasks().size(), greaterThan(0)); }); } @@ -114,8 +118,9 @@ public class RethrottleTests extends ReindexTestCase { rethrottleResponse.rethrowFailures("Rethrottle"); assertThat(rethrottleResponse.getTasks(), hasSize(1)); BulkByScrollTask.Status status = (BulkByScrollTask.Status) rethrottleResponse.getTasks().get(0).getStatus(); + // Now check the resulting requests per second. - if (request.request().getSlices() == 1) { + if (numSlices == 1) { // If there is a single slice it should match perfectly assertEquals(newRequestsPerSecond, status.getRequestsPerSecond(), Float.MIN_NORMAL); } else { @@ -128,7 +133,7 @@ public class RethrottleTests extends ReindexTestCase { float maxExpectedSliceRequestsPerSecond = newRequestsPerSecond == Float.POSITIVE_INFINITY ? Float.POSITIVE_INFINITY : (newRequestsPerSecond / unfinished) * 1.01F; float minExpectedSliceRequestsPerSecond = newRequestsPerSecond == Float.POSITIVE_INFINITY ? - Float.POSITIVE_INFINITY : (newRequestsPerSecond / request.request().getSlices()) * 0.99F; + Float.POSITIVE_INFINITY : (newRequestsPerSecond / numSlices) * 0.99F; boolean oneSliceRethrottled = false; float totalRequestsPerSecond = 0; for (BulkByScrollTask.StatusOrException statusOrException : status.getSliceStatuses()) { @@ -164,7 +169,7 @@ public class RethrottleTests extends ReindexTestCase { // Now the response should come back quickly because we've rethrottled the request BulkByScrollResponse response = responseListener.get(); assertThat("Entire request completed in a single batch. This may invalidate the test as throttling is done between batches.", - response.getBatches(), greaterThanOrEqualTo(request.request().getSlices())); + response.getBatches(), greaterThanOrEqualTo(numSlices)); } private TaskGroup findTaskToRethrottle(String actionName, int sliceCount) { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java index 39806e475c7..946ab030c82 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java @@ -70,17 +70,16 @@ public class RoundTripTests extends ESTestCase { roundTrip(reindex, tripped); assertRequestEquals(reindex, tripped); - // Try slices with a version that doesn't support slices. That should fail. - reindex.setSlices(between(2, 1000)); - Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_5_0_0_rc1, reindex, null)); - assertEquals("Attempting to send sliced reindex-style request to a node that doesn't support it. " - + "Version is [5.0.0-rc1] but must be [5.1.1]", e.getMessage()); + // Try slices=auto with a version that doesn't support it, which should fail + reindex.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); + Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, reindex, null)); + assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage()); - // Try without slices with a version that doesn't support slices. That should work. + // Try regular slices with a version that doesn't support slices=auto, which should succeed tripped = new ReindexRequest(); - reindex.setSlices(1); - roundTrip(Version.V_5_0_0_rc1, reindex, tripped); - assertRequestEquals(Version.V_5_0_0_rc1, reindex, tripped); + reindex.setSlices(between(1, Integer.MAX_VALUE)); + roundTrip(Version.V_6_0_0_alpha1, reindex, tripped); + assertRequestEquals(Version.V_6_0_0_alpha1, reindex, tripped); } public void testUpdateByQueryRequest() throws IOException { @@ -94,16 +93,15 @@ public class RoundTripTests extends ESTestCase { assertRequestEquals(update, tripped); assertEquals(update.getPipeline(), tripped.getPipeline()); - // Try slices with a version that doesn't support slices. That should fail. - update.setSlices(between(2, 1000)); - Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_5_0_0_rc1, update, null)); - assertEquals("Attempting to send sliced reindex-style request to a node that doesn't support it. " - + "Version is [5.0.0-rc1] but must be [5.1.1]", e.getMessage()); + // Try slices=auto with a version that doesn't support it, which should fail + update.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); + Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, update, null)); + assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage()); - // Try without slices with a version that doesn't support slices. That should work. + // Try regular slices with a version that doesn't support slices=auto, which should succeed tripped = new UpdateByQueryRequest(); - update.setSlices(1); - roundTrip(Version.V_5_0_0_rc1, update, tripped); + update.setSlices(between(1, Integer.MAX_VALUE)); + roundTrip(Version.V_6_0_0_alpha1, update, tripped); assertRequestEquals(update, tripped); assertEquals(update.getPipeline(), tripped.getPipeline()); } @@ -115,16 +113,15 @@ public class RoundTripTests extends ESTestCase { roundTrip(delete, tripped); assertRequestEquals(delete, tripped); - // Try slices with a version that doesn't support slices. That should fail. - delete.setSlices(between(2, 1000)); - Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_5_0_0_rc1, delete, null)); - assertEquals("Attempting to send sliced reindex-style request to a node that doesn't support it. " - + "Version is [5.0.0-rc1] but must be [5.1.1]", e.getMessage()); + // Try slices=auto with a version that doesn't support it, which should fail + delete.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); + Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, delete, null)); + assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage()); - // Try without slices with a version that doesn't support slices. That should work. + // Try regular slices with a version that doesn't support slices=auto, which should succeed tripped = new DeleteByQueryRequest(); - delete.setSlices(1); - roundTrip(Version.V_5_0_0_rc1, delete, tripped); + delete.setSlices(between(1, Integer.MAX_VALUE)); + roundTrip(Version.V_6_0_0_alpha1, delete, tripped); assertRequestEquals(delete, tripped); } @@ -139,7 +136,9 @@ public class RoundTripTests extends ESTestCase { request.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), null, "test")); request.setWaitForActiveShards(randomIntBetween(0, 10)); request.setRequestsPerSecond(between(0, Integer.MAX_VALUE)); - request.setSlices(between(1, Integer.MAX_VALUE)); + + int slices = ReindexTestCase.randomSlices(1, Integer.MAX_VALUE); + request.setSlices(slices); } private void randomRequest(AbstractBulkIndexByScrollRequest request) { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java index 222aedd2e9e..62a2c34ea58 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java @@ -48,12 +48,13 @@ import static org.mockito.Mockito.verify; public class TransportRethrottleActionTests extends ESTestCase { private int slices; - private ParentBulkByScrollTask task; + private BulkByScrollTask task; @Before public void createTask() { slices = between(2, 50); - task = new ParentBulkByScrollTask(1, "test_type", "test_action", "test", null, slices); + task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID); + task.setWorkerCount(slices); } /** @@ -113,7 +114,7 @@ public class TransportRethrottleActionTests extends ESTestCase { List sliceStatuses = new ArrayList<>(slices); for (int i = 0; i < succeeded; i++) { BulkByScrollTask.Status status = believeableCompletedStatus(i); - task.onSliceResponse(neverCalled(), i, + task.getLeaderState().onSliceResponse(neverCalled(), i, new BulkByScrollResponse(timeValueMillis(10), status, emptyList(), emptyList(), false)); sliceStatuses.add(new BulkByScrollTask.StatusOrException(status)); } @@ -134,7 +135,8 @@ public class TransportRethrottleActionTests extends ESTestCase { @SuppressWarnings("unchecked") ActionListener listener = i < slices - 1 ? neverCalled() : mock(ActionListener.class); BulkByScrollTask.Status status = believeableCompletedStatus(i); - task.onSliceResponse(listener, i, new BulkByScrollResponse(timeValueMillis(10), status, emptyList(), emptyList(), false)); + task.getLeaderState().onSliceResponse(listener, i, new BulkByScrollResponse(timeValueMillis(10), status, emptyList(), + emptyList(), false)); if (i == slices - 1) { // The whole thing succeeded so we should have got the success captureResponse(BulkByScrollResponse.class, listener).getStatus(); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryBasicTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryBasicTests.java index 663575a2933..ce254b87969 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryBasicTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryBasicTests.java @@ -19,8 +19,16 @@ package org.elasticsearch.index.reindex; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.search.sort.SortOrder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.hasSize; @@ -63,32 +71,91 @@ public class UpdateByQueryBasicTests extends ReindexTestCase { assertEquals(2, client().prepareGet("test", "test", "4").get().getVersion()); } - public void testWorkers() throws Exception { - indexRandom(true, client().prepareIndex("test", "test", "1").setSource("foo", "a"), - client().prepareIndex("test", "test", "2").setSource("foo", "a"), - client().prepareIndex("test", "test", "3").setSource("foo", "b"), - client().prepareIndex("test", "test", "4").setSource("foo", "c")); + public void testSlices() throws Exception { + indexRandom(true, + client().prepareIndex("test", "test", "1").setSource("foo", "a"), + client().prepareIndex("test", "test", "2").setSource("foo", "a"), + client().prepareIndex("test", "test", "3").setSource("foo", "b"), + client().prepareIndex("test", "test", "4").setSource("foo", "c")); assertHitCount(client().prepareSearch("test").setTypes("test").setSize(0).get(), 4); assertEquals(1, client().prepareGet("test", "test", "1").get().getVersion()); assertEquals(1, client().prepareGet("test", "test", "4").get().getVersion()); + int slices = randomSlices(2, 10); + int expectedSlices = expectedSliceStatuses(slices, "test"); + // Reindex all the docs - assertThat(updateByQuery().source("test").refresh(true).setSlices(5).get(), matcher().updated(4).slices(hasSize(5))); + assertThat( + updateByQuery() + .source("test") + .refresh(true) + .setSlices(slices).get(), + matcher() + .updated(4) + .slices(hasSize(expectedSlices))); assertEquals(2, client().prepareGet("test", "test", "1").get().getVersion()); assertEquals(2, client().prepareGet("test", "test", "4").get().getVersion()); // Now none of them - assertThat(updateByQuery().source("test").filter(termQuery("foo", "no_match")).setSlices(5).refresh(true).get(), - matcher().updated(0).slices(hasSize(5))); + assertThat( + updateByQuery() + .source("test") + .filter(termQuery("foo", "no_match")) + .setSlices(slices) + .refresh(true).get(), + matcher() + .updated(0) + .slices(hasSize(expectedSlices))); assertEquals(2, client().prepareGet("test", "test", "1").get().getVersion()); assertEquals(2, client().prepareGet("test", "test", "4").get().getVersion()); // Now half of them - assertThat(updateByQuery().source("test").filter(termQuery("foo", "a")).refresh(true).setSlices(5).get(), - matcher().updated(2).slices(hasSize(5))); + assertThat( + updateByQuery() + .source("test") + .filter(termQuery("foo", "a")) + .refresh(true) + .setSlices(slices).get(), + matcher() + .updated(2) + .slices(hasSize(expectedSlices))); assertEquals(3, client().prepareGet("test", "test", "1").get().getVersion()); assertEquals(3, client().prepareGet("test", "test", "2").get().getVersion()); assertEquals(2, client().prepareGet("test", "test", "3").get().getVersion()); assertEquals(2, client().prepareGet("test", "test", "4").get().getVersion()); } + + public void testMultipleSources() throws Exception { + int sourceIndices = between(2, 5); + + Map> docs = new HashMap<>(); + for (int sourceIndex = 0; sourceIndex < sourceIndices; sourceIndex++) { + String indexName = "test" + sourceIndex; + docs.put(indexName, new ArrayList<>()); + int numDocs = between(5, 15); + for (int i = 0; i < numDocs; i++) { + docs.get(indexName).add(client().prepareIndex(indexName, "test", Integer.toString(i)).setSource("foo", "a")); + } + } + + List allDocs = docs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); + indexRandom(true, allDocs); + for (Map.Entry> entry : docs.entrySet()) { + assertHitCount(client().prepareSearch(entry.getKey()).setSize(0).get(), entry.getValue().size()); + } + + int slices = randomSlices(1, 10); + int expectedSlices = expectedSliceStatuses(slices, docs.keySet()); + + String[] sourceIndexNames = docs.keySet().toArray(new String[docs.size()]); + BulkByScrollResponse response = updateByQuery().source(sourceIndexNames).refresh(true).setSlices(slices).get(); + assertThat(response, matcher().updated(allDocs.size()).slices(hasSize(expectedSlices))); + + for (Map.Entry> entry : docs.entrySet()) { + String index = entry.getKey(); + List indexDocs = entry.getValue(); + int randomDoc = between(0, indexDocs.size() - 1); + assertEquals(2, client().prepareGet(index, "test", Integer.toString(randomDoc)).get().getVersion()); + } + } } diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/20_validation.yml b/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/20_validation.yml index 9daf1502a36..715e81f5ded 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/20_validation.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/20_validation.yml @@ -126,7 +126,7 @@ --- "junk in slices fails": - do: - catch: /Failed to parse int parameter \[slices\] with value \[junk\]/ + catch: /\[slices\] must be a positive integer or the string "auto"/ delete_by_query: slices: junk index: test @@ -136,7 +136,7 @@ --- "zero slices fails": - do: - catch: /\[slices\] must be at least 1/ + catch: /\[slices\] must be a positive integer or the string "auto"/ delete_by_query: slices: 0 index: test diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/80_slices.yml b/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/80_slices.yml index fe0c816ee14..6e911e3f1ba 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/80_slices.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/80_slices.yml @@ -45,6 +45,7 @@ - match: {throttled_millis: 0} - gte: { took: 0 } - is_false: task + - length: {slices: 5} - match: {slices.0.version_conflicts: 0} - match: {slices.0.throttled_millis: 0} - match: {slices.1.version_conflicts: 0} @@ -128,6 +129,7 @@ - match: {response.throttled_millis: 0} - gte: { response.took: 0 } - is_false: response.task + - length: {response.slices: 5} - match: {response.slices.0.version_conflicts: 0} - match: {response.slices.0.throttled_millis: 0} - match: {response.slices.1.version_conflicts: 0} @@ -142,6 +144,7 @@ - match: {task.status.deleted: 4} - match: {task.status.version_conflicts: 0} - match: {task.status.throttled_millis: 0} + - length: {task.status.slices: 5} - match: {task.status.slices.0.version_conflicts: 0} - match: {task.status.slices.0.throttled_millis: 0} - match: {task.status.slices.1.version_conflicts: 0} @@ -252,6 +255,7 @@ - match: {response.throttled_millis: 0} - gte: { response.took: 0 } - is_false: response.task + - length: {response.slices: 2} - match: {response.slices.0.version_conflicts: 0} - match: {response.slices.0.throttled_millis: 0} - match: {response.slices.1.version_conflicts: 0} @@ -260,6 +264,7 @@ - match: {task.status.deleted: 6} - match: {task.status.version_conflicts: 0} - match: {task.status.throttled_millis: 0} + - length: {task.status.slices: 2} - match: {task.status.slices.0.version_conflicts: 0} - match: {task.status.slices.0.throttled_millis: 0} - match: {task.status.slices.1.version_conflicts: 0} @@ -277,3 +282,72 @@ count: index: test - match: {count: 0} + +--- +"Multiple slices with auto slice": + - do: + indices.create: + index: test + body: + settings: + index: + number_of_shards: 3 + - do: + index: + index: test + type: foo + id: 1 + body: { "text": "test" } + - do: + index: + index: test + type: foo + id: 2 + body: { "text": "test" } + - do: + index: + index: test + type: foo + id: 3 + body: { "text": "test" } + - do: + index: + index: test + type: foo + id: 4 + body: { "text": "test" } + - do: + indices.refresh: {} + + - do: + delete_by_query: + index: test + slices: auto + body: + query: + match_all: {} + + - is_false: timed_out + - match: {deleted: 4} + - is_false: created + - is_false: updated + - match: {version_conflicts: 0} + - match: {failures: []} + - match: {noops: 0} + - match: {throttled_millis: 0} + - gte: { took: 0 } + - is_false: task + - length: {slices: 3} + - match: {slices.0.version_conflicts: 0} + - match: {slices.0.throttled_millis: 0} + - match: {slices.1.version_conflicts: 0} + - match: {slices.1.throttled_millis: 0} + - match: {slices.2.version_conflicts: 0} + - match: {slices.2.throttled_millis: 0} + + - do: + indices.refresh: {} + - do: + count: + index: test + - match: {count: 0} diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/20_validation.yml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/20_validation.yml index b64eaac7dec..bef31b1bd79 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/20_validation.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/20_validation.yml @@ -275,7 +275,7 @@ --- "junk in slices fails": - do: - catch: /Failed to parse int parameter \[slices\] with value \[junk\]/ + catch: /\[slices\] must be a positive integer or the string "auto"/ reindex: slices: junk body: @@ -287,7 +287,7 @@ --- "zero slices fails": - do: - catch: /\[slices\] must be at least 1/ + catch: /\[slices\] must be a positive integer or the string "auto"/ reindex: slices: 0 body: diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/80_slices.yml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/80_slices.yml index 5c54adb5c08..fb06018d7c0 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/80_slices.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/80_slices.yml @@ -43,6 +43,7 @@ - gte: { took: 0 } - is_false: task - is_false: deleted + - length: {slices: 5} - match: {slices.0.updated: 0} - match: {slices.0.version_conflicts: 0} - match: {slices.0.throttled_millis: 0} @@ -127,6 +128,7 @@ - gte: { response.took: 0 } - is_false: response.task - is_false: response.deleted + - length: {response.slices: 5} - match: {response.slices.0.updated: 0} - match: {response.slices.0.version_conflicts: 0} - match: {response.slices.0.throttled_millis: 0} @@ -147,6 +149,7 @@ - match: {task.status.updated: 0} - match: {task.status.version_conflicts: 0} - match: {task.status.throttled_millis: 0} + - length: {task.status.slices: 5} - match: {task.status.slices.0.updated: 0} - match: {task.status.slices.0.version_conflicts: 0} - match: {task.status.slices.0.throttled_millis: 0} @@ -260,6 +263,7 @@ - gte: { response.took: 0 } - is_false: response.task - is_false: response.deleted + - length: {response.slices: 2} - match: {response.slices.0.updated: 0} - match: {response.slices.0.version_conflicts: 0} - match: {response.slices.0.throttled_millis: 0} @@ -271,6 +275,7 @@ - match: {task.status.updated: 0} - match: {task.status.version_conflicts: 0} - match: {task.status.throttled_millis: 0} + - length: {task.status.slices: 2} - match: {task.status.slices.0.updated: 0} - match: {task.status.slices.0.version_conflicts: 0} - match: {task.status.slices.0.throttled_millis: 0} @@ -285,3 +290,67 @@ search: index: .tasks - match: { hits.total: 1 } + + +--- +"Multiple slices with auto slice": + - do: + indices.create: + index: source + body: + settings: + index: + number_of_shards: 3 + - do: + index: + index: source + type: foo + id: 1 + body: { "text": "test" } + - do: + index: + index: source + type: foo + id: 2 + body: { "text": "test" } + - do: + index: + index: source + type: foo + id: 3 + body: { "text": "test" } + - do: + index: + index: source + type: foo + id: 4 + body: { "text": "test" } + - do: + indices.refresh: {} + + - do: + reindex: + slices: auto + body: + source: + index: source + dest: + index: dest + - match: {created: 4} + - match: {updated: 0} + - match: {version_conflicts: 0} + - match: {failures: []} + - match: {throttled_millis: 0} + - gte: { took: 0 } + - is_false: task + - is_false: deleted + - length: {slices: 3} + - match: {slices.0.updated: 0} + - match: {slices.0.version_conflicts: 0} + - match: {slices.0.throttled_millis: 0} + - match: {slices.1.updated: 0} + - match: {slices.1.version_conflicts: 0} + - match: {slices.1.throttled_millis: 0} + - match: {slices.2.updated: 0} + - match: {slices.2.version_conflicts: 0} + - match: {slices.2.throttled_millis: 0} diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/20_validation.yml b/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/20_validation.yml index 8f8d492df3a..b7499180cda 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/20_validation.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/20_validation.yml @@ -108,7 +108,7 @@ --- "junk in slices fails": - do: - catch: /Failed to parse int parameter \[slices\] with value \[junk\]/ + catch: /\[slices\] must be a positive integer or the string "auto"/ update_by_query: slices: junk index: test @@ -116,7 +116,7 @@ --- "zero slices fails": - do: - catch: /\[slices\] must be at least 1/ + catch: /\[slices\] must be a positive integer or the string "auto"/ update_by_query: slices: 0 index: test diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/70_slices.yml b/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/70_slices.yml index f390d43f9f3..dbb79f87037 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/70_slices.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/70_slices.yml @@ -44,6 +44,7 @@ - match: {throttled_millis: 0} - gte: { took: 0 } - is_false: task + - length: {slices: 5} - match: {slices.0.version_conflicts: 0} - match: {slices.0.throttled_millis: 0} - match: {slices.1.version_conflicts: 0} @@ -120,6 +121,7 @@ - match: {response.throttled_millis: 0} - gte: { response.took: 0 } - is_false: response.task + - length: {response.slices: 5} - match: {response.slices.0.version_conflicts: 0} - match: {response.slices.0.throttled_millis: 0} - match: {response.slices.1.version_conflicts: 0} @@ -134,6 +136,7 @@ - match: {task.status.updated: 4} - match: {task.status.version_conflicts: 0} - match: {task.status.throttled_millis: 0} + - length: {task.status.slices: 5} - match: {task.status.slices.0.version_conflicts: 0} - match: {task.status.slices.0.throttled_millis: 0} - match: {task.status.slices.1.version_conflicts: 0} @@ -239,6 +242,7 @@ - match: {response.throttled_millis: 0} - gte: { response.took: 0 } - is_false: response.task + - length: {response.slices: 2} - match: {response.slices.0.version_conflicts: 0} - match: {response.slices.0.throttled_millis: 0} - match: {response.slices.1.version_conflicts: 0} @@ -247,6 +251,7 @@ - match: {task.status.updated: 6} - match: {task.status.version_conflicts: 0} - match: {task.status.throttled_millis: 0} + - length: {task.status.slices: 2} - match: {task.status.slices.0.version_conflicts: 0} - match: {task.status.slices.0.throttled_millis: 0} - match: {task.status.slices.1.version_conflicts: 0} @@ -259,3 +264,65 @@ search: index: .tasks - match: { hits.total: 1 } + + +--- +"Multiple slices with auto slice": + - do: + indices.create: + index: test + body: + settings: + index: + number_of_shards: 3 + - do: + index: + index: test + type: foo + id: 1 + body: { "text": "test" } + - do: + index: + index: test + type: foo + id: 2 + body: { "text": "test" } + - do: + index: + index: test + type: foo + id: 3 + body: { "text": "test" } + - do: + index: + index: test + type: foo + id: 4 + body: { "text": "test" } + - do: + indices.refresh: {} + + - do: + update_by_query: + index: test + slices: auto + body: + query: + match_all: {} + + - is_false: timed_out + - match: {updated: 4} + - is_false: created + - match: {version_conflicts: 0} + - match: {failures: []} + - match: {noops: 0} + - match: {throttled_millis: 0} + - gte: { took: 0 } + - is_false: task + - length: {slices: 3} + - match: {slices.0.version_conflicts: 0} + - match: {slices.0.throttled_millis: 0} + - match: {slices.1.version_conflicts: 0} + - match: {slices.1.throttled_millis: 0} + - match: {slices.2.version_conflicts: 0} + - match: {slices.2.throttled_millis: 0} diff --git a/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java index 079c784342b..a0752b00485 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java @@ -32,12 +32,14 @@ public abstract class AbstractAsyncBulkByScrollActionTestCase< Response extends BulkByScrollResponse> extends ESTestCase { protected ThreadPool threadPool; - protected WorkingBulkByScrollTask task; + protected BulkByScrollTask task; @Before public void setupForTest() { threadPool = new TestThreadPool(getTestName()); - task = new WorkingBulkByScrollTask(1, "test", "test", "test", TaskId.EMPTY_TASK_ID, null, Float.MAX_VALUE); + task = new BulkByScrollTask(1, "test", "test", "test", TaskId.EMPTY_TASK_ID); + task.setWorker(Float.POSITIVE_INFINITY, null); + } @After