diff --git a/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java b/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java index 9f10304622b..0355eeaee47 100644 --- a/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java +++ b/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java @@ -45,6 +45,10 @@ public abstract class AbstractBulkByScrollRequest 1) { - return new ParentBulkByScrollTask(id, type, action, getDescription(), parentTaskId, slices); - } - /* Extract the slice from the search request so it'll be available in the status. This is potentially useful for users that manually - * slice their search requests so they can keep track of it and **absolutely** useful for automatically sliced reindex requests so - * they can properly track the responses. */ - Integer sliceId = searchRequest.source().slice() == null ? null : searchRequest.source().slice().getId(); - return new WorkingBulkByScrollTask(id, type, action, getDescription(), parentTaskId, sliceId, requestsPerSecond); + return new BulkByScrollTask(id, type, action, getDescription(), parentTaskId); } @Override @@ -408,11 +409,7 @@ public abstract class AbstractBulkByScrollRequest 1) { - throw new IllegalArgumentException("Attempting to send sliced reindex-style request to a node that doesn't support " - + "it. Version is [" + out.getVersion() + "] but must be [" + Version.V_5_1_1 + "]"); - } + out.writeVInt(slices); } } diff --git a/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestBuilder.java b/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestBuilder.java index de3f22f0943..e3c5bd2197a 100644 --- a/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestBuilder.java @@ -145,8 +145,8 @@ public abstract class AbstractBulkByScrollRequestBuilder< /** * The number of slices this task should be divided into. Defaults to 1 meaning the task isn't sliced into subtasks. */ - public Self setSlices(int workers) { - request.setSlices(workers); + public Self setSlices(int slices) { + request.setSlices(slices); return self(); } } diff --git a/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkIndexByScrollRequest.java b/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkIndexByScrollRequest.java index 62c2635b301..65cdcf52b6f 100644 --- a/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkIndexByScrollRequest.java +++ b/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkIndexByScrollRequest.java @@ -68,8 +68,8 @@ public abstract class AbstractBulkIndexByScrollRequest sliceInfo) { + if (isLeader() == false) { + throw new IllegalStateException("This task is not set to be a leader of other slice subtasks"); + } - /* - * Overridden to force children to return compatible status. - */ - public abstract BulkByScrollTask.Status getStatus(); + List sliceStatuses = Arrays.asList( + new BulkByScrollTask.StatusOrException[leaderState.getSlices()]); + for (TaskInfo t : sliceInfo) { + BulkByScrollTask.Status status = (BulkByScrollTask.Status) t.getStatus(); + sliceStatuses.set(status.getSliceId(), new BulkByScrollTask.StatusOrException(status)); + } + Status status = leaderState.getStatus(sliceStatuses); + return taskInfo(localNodeId, getDescription(), status); + } + + private BulkByScrollTask.Status emptyStatus() { + return new Status(Collections.emptyList(), getReasonCancelled()); + } /** - * Build the status for this task given a snapshot of the information of running slices. + * Returns true if this task is a leader for other slice subtasks */ - public abstract TaskInfo getInfoGivenSliceInfo(String localNodeId, List sliceInfo); + public boolean isLeader() { + return leaderState != null; + } + + /** + * Sets this task to be a leader task for {@code slices} sliced subtasks + */ + public void setWorkerCount(int slices) { + if (isLeader()) { + throw new IllegalStateException("This task is already a leader for other slice subtasks"); + } + if (isWorker()) { + throw new IllegalStateException("This task is already a worker"); + } + + leaderState = new LeaderBulkByScrollTaskState(this, slices); + } + + /** + * Returns the object that tracks the state of sliced subtasks. Throws IllegalStateException if this task is not set to be + * a leader task. + */ + public LeaderBulkByScrollTaskState getLeaderState() { + if (!isLeader()) { + throw new IllegalStateException("This task is not set to be a leader for other slice subtasks"); + } + return leaderState; + } + + /** + * Returns true if this task is a worker task that performs search requests. False otherwise + */ + public boolean isWorker() { + return workerState != null; + } + + /** + * Sets this task to be a worker task that performs search requests + * @param requestsPerSecond How many search requests per second this task should make + * @param sliceId If this is is a sliced task, which slice number this task corresponds to. Null if not sliced. + */ + public void setWorker(float requestsPerSecond, @Nullable Integer sliceId) { + if (isWorker()) { + throw new IllegalStateException("This task is already a worker"); + } + if (isLeader()) { + throw new IllegalStateException("This task is already a leader for other slice subtasks"); + } + + workerState = new WorkerBulkByScrollTaskState(this, sliceId, requestsPerSecond); + } + + /** + * Returns the object that manages sending search requests. Throws IllegalStateException if this task is not set to be a + * worker task. + */ + public WorkerBulkByScrollTaskState getWorkerState() { + if (!isWorker()) { + throw new IllegalStateException("This task is not set to be a worker"); + } + return workerState; + } + + @Override + public void onCancelled() { + if (isLeader()) { + // The task cancellation task automatically finds children and cancels them, nothing extra to do + } else if (isWorker()) { + workerState.handleCancel(); + } else { + throw new IllegalStateException("This task has not had its sliced state initialized and doesn't know how to cancel itself"); + } + } @Override public boolean shouldCancelChildrenOnCancellation() { diff --git a/core/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java b/core/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java index ad70748f3e4..20f87e047b6 100644 --- a/core/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java +++ b/core/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java @@ -81,8 +81,8 @@ public class DeleteByQueryRequest extends AbstractBulkByScrollRequest results; - private final AtomicInteger counter; + /** + * How many subtasks are still running + */ + private final AtomicInteger runningSubtasks; - public ParentBulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId, int slices) { - super(id, type, action, description, parentTaskId); - this.results = new AtomicArray<>(slices); - this.counter = new AtomicInteger(slices); + public LeaderBulkByScrollTaskState(BulkByScrollTask task, int slices) { + this.task = task; + this.slices = slices; + results = new AtomicArray<>(slices); + runningSubtasks = new AtomicInteger(slices); } - @Override - public void rethrottle(float newRequestsPerSecond) { - // Nothing to do because all rethrottling is done on slice sub tasks. + /** + * Returns the number of slices this BulkByScrollRequest will use + */ + public int getSlices() { + return slices; } - @Override - public Status getStatus() { + /** + * Get the combined statuses of slice subtasks, merged with the given list of statuses + */ + public BulkByScrollTask.Status getStatus(List statuses) { // We only have access to the statuses of requests that have finished so we return them - List statuses = Arrays.asList(new StatusOrException[results.length()]); - addResultsToList(statuses); - return new Status(unmodifiableList(statuses), getReasonCancelled()); - } - - @Override - public int runningSliceSubTasks() { - return counter.get(); - } - - @Override - public TaskInfo getInfoGivenSliceInfo(String localNodeId, List sliceInfo) { - /* Merge the list of finished sub requests with the provided info. If a slice is both finished and in the list then we prefer the - * finished status because we don't expect them to change after the task is finished. */ - List sliceStatuses = Arrays.asList(new StatusOrException[results.length()]); - for (TaskInfo t : sliceInfo) { - Status status = (Status) t.getStatus(); - sliceStatuses.set(status.getSliceId(), new StatusOrException(status)); + if (statuses.size() != results.length()) { + throw new IllegalArgumentException("Given number of statuses does not match amount of expected results"); } - addResultsToList(sliceStatuses); - Status status = new Status(sliceStatuses, getReasonCancelled()); - return taskInfo(localNodeId, getDescription(), status); + addResultsToList(statuses); + return new BulkByScrollTask.Status(unmodifiableList(statuses), task.getReasonCancelled()); } - private void addResultsToList(List sliceStatuses) { + /** + * Get the combined statuses of sliced subtasks + */ + public BulkByScrollTask.Status getStatus() { + return getStatus(Arrays.asList(new BulkByScrollTask.StatusOrException[results.length()])); + } + + /** + * The number of sliced subtasks that are still running + */ + public int runningSliceSubTasks() { + return runningSubtasks.get(); + } + + private void addResultsToList(List sliceStatuses) { for (Result t : results.asList()) { if (t.response != null) { - sliceStatuses.set(t.sliceId, new StatusOrException(t.response.getStatus())); + sliceStatuses.set(t.sliceId, new BulkByScrollTask.StatusOrException(t.response.getStatus())); } else { - sliceStatuses.set(t.sliceId, new StatusOrException(t.failure)); + sliceStatuses.set(t.sliceId, new BulkByScrollTask.StatusOrException(t.failure)); } } } @@ -111,7 +117,7 @@ public class ParentBulkByScrollTask extends BulkByScrollTask { } private void recordSliceCompletionAndRespondIfAllDone(ActionListener listener) { - if (counter.decrementAndGet() != 0) { + if (runningSubtasks.decrementAndGet() != 0) { return; } List responses = new ArrayList<>(results.length()); @@ -130,7 +136,7 @@ public class ParentBulkByScrollTask extends BulkByScrollTask { } } if (exception == null) { - listener.onResponse(new BulkByScrollResponse(responses, getReasonCancelled())); + listener.onResponse(new BulkByScrollResponse(responses, task.getReasonCancelled())); } else { listener.onFailure(exception); } diff --git a/core/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java b/core/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java index 76944c7b804..276c4559153 100644 --- a/core/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java +++ b/core/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java @@ -94,8 +94,8 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest 1 but was [" + getSlices() + "]", e); + if (getSlices() == AbstractBulkByScrollRequest.AUTO_SLICES || getSlices() > 1) { + e = addValidationError("reindex from remote sources doesn't support slices > 1 but was [" + getSlices() + "]", e); } } return e; @@ -127,8 +127,8 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest delayedPrepareBulkRequestReference = new AtomicReference<>(); - public WorkingBulkByScrollTask(long id, String type, String action, String description, TaskId parentTask, Integer sliceId, - float requestsPerSecond) { - super(id, type, action, description, parentTask); + public WorkerBulkByScrollTaskState(BulkByScrollTask task, Integer sliceId, float requestsPerSecond) { + this.task = task; this.sliceId = sliceId; setRequestsPerSecond(requestsPerSecond); } - @Override - public Status getStatus() { - return new Status(sliceId, total.get(), updated.get(), created.get(), deleted.get(), batch.get(), versionConflicts.get(), - noops.get(), bulkRetries.get(), searchRetries.get(), timeValueNanos(throttledNanos.get()), getRequestsPerSecond(), - getReasonCancelled(), throttledUntil()); + public BulkByScrollTask.Status getStatus() { + return new BulkByScrollTask.Status( + sliceId, + total.get(), + updated.get(), + created.get(), + deleted.get(), + batch.get(), + versionConflicts.get(), + noops.get(), + bulkRetries.get(), + searchRetries.get(), + timeValueNanos(throttledNanos.get()), + getRequestsPerSecond(), + task.getReasonCancelled(), + throttledUntil()); } - @Override - protected void onCancelled() { - /* Drop the throttle to 0, immediately rescheduling any throttled - * operation so it will wake up and cancel itself. */ + public void handleCancel() { + // Drop the throttle to 0, immediately rescheduling any throttle operation so it will wake up and cancel itself. rethrottle(Float.POSITIVE_INFINITY); } - @Override - public int runningSliceSubTasks() { - return 0; - } - - @Override - public TaskInfo getInfoGivenSliceInfo(String localNodeId, List sliceInfo) { - throw new UnsupportedOperationException("This is only supported by " + ParentBulkByScrollTask.class.getName() + "."); - } - - TimeValue throttledUntil() { - DelayedPrepareBulkRequest delayed = delayedPrepareBulkRequestReference.get(); - if (delayed == null) { - return timeValueNanos(0); - } - if (delayed.future == null) { - return timeValueNanos(0); - } - return timeValueNanos(max(0, delayed.future.getDelay(TimeUnit.NANOSECONDS))); - } - public void setTotal(long totalHits) { total.set(totalHits); } @@ -171,6 +161,17 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success return requestsPerSecond; } + TimeValue throttledUntil() { + DelayedPrepareBulkRequest delayed = delayedPrepareBulkRequestReference.get(); + if (delayed == null) { + return timeValueNanos(0); + } + if (delayed.future == null) { + return timeValueNanos(0); + } + return timeValueNanos(max(0, delayed.future.getDelay(TimeUnit.NANOSECONDS))); + } + /** * Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be * rescheduled over and over again. @@ -180,9 +181,9 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success // Synchronize so we are less likely to schedule the same request twice. synchronized (delayedPrepareBulkRequestReference) { TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize); - logger.debug("[{}]: preparing bulk request for [{}]", getId(), delay); + logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay); delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(), - delay, new RunOnce(prepareBulkRequestRunnable))); + delay, new RunOnce(prepareBulkRequestRunnable))); } } @@ -213,16 +214,18 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success this.requestsPerSecond = requestsPerSecond; } - @Override + /** + * Apply {@code newRequestsPerSecond} as the new rate limit for this task's search requests + */ public void rethrottle(float newRequestsPerSecond) { synchronized (delayedPrepareBulkRequestReference) { - logger.debug("[{}]: rethrottling to [{}] requests per second", getId(), newRequestsPerSecond); + logger.debug("[{}]: rethrottling to [{}] requests per second", task.getId(), newRequestsPerSecond); setRequestsPerSecond(newRequestsPerSecond); DelayedPrepareBulkRequest delayedPrepareBulkRequest = this.delayedPrepareBulkRequestReference.get(); if (delayedPrepareBulkRequest == null) { // No request has been queued so nothing to reschedule. - logger.debug("[{}]: skipping rescheduling because there is no scheduled task", getId()); + logger.debug("[{}]: skipping rescheduling because there is no scheduled task", task.getId()); return; } @@ -260,8 +263,8 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success * change in throttle take effect the next time we delay * prepareBulkRequest. We can't just reschedule the request further * out in the future because the bulk context might time out. */ - logger.debug("[{}]: skipping rescheduling because the new throttle [{}] is slower than the old one [{}]", getId(), - newRequestsPerSecond, requestsPerSecond); + logger.debug("[{}]: skipping rescheduling because the new throttle [{}] is slower than the old one [{}]", task.getId(), + newRequestsPerSecond, requestsPerSecond); return this; } @@ -269,7 +272,7 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success // Actually reschedule the task if (false == FutureUtils.cancel(future)) { // Couldn't cancel, probably because the task has finished or been scheduled. Either way we have nothing to do here. - logger.debug("[{}]: skipping rescheduling because we couldn't cancel the task", getId()); + logger.debug("[{}]: skipping rescheduling because we couldn't cancel the task", task.getId()); return this; } @@ -278,7 +281,7 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success * test it you'll find that requests sneak through. So each request * is given a runOnce boolean to prevent that. */ TimeValue newDelay = newDelay(remainingDelay, newRequestsPerSecond); - logger.debug("[{}]: rescheduling for [{}] in the future", getId(), newDelay); + logger.debug("[{}]: rescheduling for [{}] in the future", task.getId(), newDelay); return new DelayedPrepareBulkRequest(threadPool, requestsPerSecond, newDelay, command); } diff --git a/core/src/test/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestTestCase.java b/core/src/test/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestTestCase.java index 8a255d376af..143e2416b88 100644 --- a/core/src/test/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestTestCase.java @@ -39,16 +39,21 @@ public abstract class AbstractBulkByScrollRequestTestCase r < 0, ESTestCase::randomFloat)); if (randomBoolean()) { original.setSize(between(0, Integer.MAX_VALUE)); } + // it's not important how many slices there are, we just need a number for forSlice + int actualSlices = between(2, 1000); + original.setSlices(randomBoolean() + ? actualSlices + : AbstractBulkByScrollRequest.AUTO_SLICES); + TaskId slicingTask = new TaskId(randomAlphaOfLength(5), randomLong()); SearchRequest sliceRequest = new SearchRequest(); - R forSliced = original.forSlice(slicingTask, sliceRequest); + R forSliced = original.forSlice(slicingTask, sliceRequest, actualSlices); assertEquals(original.isAbortOnVersionConflict(), forSliced.isAbortOnVersionConflict()); assertEquals(original.isRefresh(), forSliced.isRefresh()); assertEquals(original.getTimeout(), forSliced.getTimeout()); @@ -57,10 +62,10 @@ public abstract class AbstractBulkByScrollRequestTestCase listener = slice < slices - 1 ? neverCalled() : mock(ActionListener.class); - task.onSliceResponse(listener, slice, + taskState.onSliceResponse(listener, slice, new BulkByScrollResponse(timeValueMillis(10), sliceStatus, emptyList(), emptyList(), false)); status = task.getStatus(); diff --git a/core/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java b/core/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java index 32b01237375..9f4b20ff35b 100644 --- a/core/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java +++ b/core/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java @@ -45,7 +45,7 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase 1 but was [" + reindex.getSlices() + "];", + "Validation Failed: 1: reindex from remote sources doesn't support slices > 1 but was [" + reindex.getSlices() + "];", e.getMessage()); } - public void testNoSliceWithWorkers() { + public void testNoSliceBuilderSetWithSlicedRequest() { ReindexRequest reindex = newRequest(); reindex.getSearchRequest().source().slice(new SliceBuilder(0, 4)); reindex.setSlices(between(2, Integer.MAX_VALUE)); ActionRequestValidationException e = reindex.validate(); - assertEquals("Validation Failed: 1: can't specify both slice and workers;", e.getMessage()); + assertEquals("Validation Failed: 1: can't specify both manual and automatic slicing at the same time;", e.getMessage()); } @Override diff --git a/core/src/test/java/org/elasticsearch/index/reindex/WorkingBulkByScrollTaskTests.java b/core/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java similarity index 83% rename from core/src/test/java/org/elasticsearch/index/reindex/WorkingBulkByScrollTaskTests.java rename to core/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java index 5d594d080b8..64bf52c319e 100644 --- a/core/src/test/java/org/elasticsearch/index/reindex/WorkingBulkByScrollTaskTests.java +++ b/core/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java @@ -46,12 +46,15 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; -public class WorkingBulkByScrollTaskTests extends ESTestCase { - private WorkingBulkByScrollTask task; +public class WorkerBulkByScrollTaskStateTests extends ESTestCase { + private BulkByScrollTask task; + private WorkerBulkByScrollTaskState workerState; @Before public void createTask() { - task = new WorkingBulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, null, Float.POSITIVE_INFINITY); + task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID); + task.setWorker(Float.POSITIVE_INFINITY, null); + workerState = task.getWorkerState(); } public void testBasicData() { @@ -78,7 +81,7 @@ public class WorkingBulkByScrollTaskTests extends ESTestCase { assertEquals(noops, status.getNoops()); long totalHits = randomIntBetween(10, 1000); - task.setTotal(totalHits); + workerState.setTotal(totalHits); for (long p = 0; p < totalHits; p++) { status = task.getStatus(); assertEquals(totalHits, status.getTotal()); @@ -91,28 +94,28 @@ public class WorkingBulkByScrollTaskTests extends ESTestCase { if (randomBoolean()) { created++; - task.countCreated(); + workerState.countCreated(); } else if (randomBoolean()) { updated++; - task.countUpdated(); + workerState.countUpdated(); } else { deleted++; - task.countDeleted(); + workerState.countDeleted(); } if (rarely()) { versionConflicts++; - task.countVersionConflict(); + workerState.countVersionConflict(); } if (rarely()) { batch++; - task.countBatch(); + workerState.countBatch(); } if (rarely()) { noops++; - task.countNoop(); + workerState.countNoop(); } } status = task.getStatus(); @@ -139,7 +142,7 @@ public class WorkingBulkByScrollTaskTests extends ESTestCase { * each time. */ float originalRequestsPerSecond = (float) randomDoubleBetween(1, 10000, true); - task.rethrottle(originalRequestsPerSecond); + workerState.rethrottle(originalRequestsPerSecond); TimeValue maxDelay = timeValueSeconds(between(1, 5)); assertThat(maxDelay.nanos(), greaterThanOrEqualTo(0L)); int batchSizeForMaxDelay = (int) (maxDelay.seconds() * originalRequestsPerSecond); @@ -151,20 +154,22 @@ public class WorkingBulkByScrollTaskTests extends ESTestCase { } }; try { - task.delayPrepareBulkRequest(threadPool, timeValueNanos(System.nanoTime()), batchSizeForMaxDelay, new AbstractRunnable() { - @Override - protected void doRun() throws Exception { - boolean oldValue = done.getAndSet(true); - if (oldValue) { - throw new RuntimeException("Ran twice oh no!"); + workerState.delayPrepareBulkRequest(threadPool, timeValueNanos(System.nanoTime()), batchSizeForMaxDelay, + new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + boolean oldValue = done.getAndSet(true); + if (oldValue) { + throw new RuntimeException("Ran twice oh no!"); + } + } + + @Override + public void onFailure(Exception e) { + errors.add(e); } } - - @Override - public void onFailure(Exception e) { - errors.add(e); - } - }); + ); // Rethrottle on a random number of threads, one of which is this thread. Runnable test = () -> { @@ -172,7 +177,7 @@ public class WorkingBulkByScrollTaskTests extends ESTestCase { int rethrottles = 0; while (false == done.get()) { float requestsPerSecond = (float) randomDoubleBetween(0, originalRequestsPerSecond * 2, true); - task.rethrottle(requestsPerSecond); + workerState.rethrottle(requestsPerSecond); rethrottles += 1; } logger.info("Rethrottled [{}] times", rethrottles); @@ -237,7 +242,7 @@ public class WorkingBulkByScrollTaskTests extends ESTestCase { }; try { // Have the task use the thread pool to delay a task that does nothing - task.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), 1, new AbstractRunnable() { + workerState.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), 1, new AbstractRunnable() { @Override protected void doRun() throws Exception { } @@ -254,12 +259,12 @@ public class WorkingBulkByScrollTaskTests extends ESTestCase { } public void testPerfectlyThrottledBatchTime() { - task.rethrottle(Float.POSITIVE_INFINITY); - assertThat((double) task.perfectlyThrottledBatchTime(randomInt()), closeTo(0f, 0f)); + workerState.rethrottle(Float.POSITIVE_INFINITY); + assertThat((double) workerState.perfectlyThrottledBatchTime(randomInt()), closeTo(0f, 0f)); int total = between(0, 1000000); - task.rethrottle(1); - assertThat((double) task.perfectlyThrottledBatchTime(total), + workerState.rethrottle(1); + assertThat((double) workerState.perfectlyThrottledBatchTime(total), closeTo(TimeUnit.SECONDS.toNanos(total), TimeUnit.SECONDS.toNanos(1))); } } diff --git a/core/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java b/core/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java index 009b9ab5d65..7ea3dd2094b 100644 --- a/core/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java +++ b/core/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java @@ -114,21 +114,21 @@ public class SliceBuilderTests extends ESTestCase { public void testInvalidArguments() throws Exception { Exception e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", -1, 10)); - assertEquals(e.getMessage(), "id must be greater than or equal to 0"); + assertEquals("id must be greater than or equal to 0", e.getMessage()); e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 10, -1)); - assertEquals(e.getMessage(), "max must be greater than 1"); + assertEquals("max must be greater than 1", e.getMessage()); e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 10, 0)); - assertEquals(e.getMessage(), "max must be greater than 1"); + assertEquals("max must be greater than 1", e.getMessage()); e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 10, 5)); - assertEquals(e.getMessage(), "max must be greater than id"); + assertEquals("max must be greater than id", e.getMessage()); e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 1000, 1000)); - assertEquals(e.getMessage(), "max must be greater than id"); + assertEquals("max must be greater than id", e.getMessage()); e = expectThrows(IllegalArgumentException.class, () -> new SliceBuilder("field", 1001, 1000)); - assertEquals(e.getMessage(), "max must be greater than id"); + assertEquals("max must be greater than id", e.getMessage()); } public void testToFilter() throws IOException { diff --git a/docs/reference/docs/delete-by-query.asciidoc b/docs/reference/docs/delete-by-query.asciidoc index 1e26aac6d61..b2a59231d34 100644 --- a/docs/reference/docs/delete-by-query.asciidoc +++ b/docs/reference/docs/delete-by-query.asciidoc @@ -339,11 +339,19 @@ take effect on after completing the current batch. This prevents scroll timeouts. [float] -[[docs-delete-by-query-manual-slice]] -=== Manually slicing +[[docs-delete-by-query-slice]] +=== Slicing -Delete-by-query supports <> allowing you to manually parallelize -the process relatively easily: +Delete-by-query supports <> to parallelize the deleting process. +This parallelization can improve efficiency and provide a convenient way to +break the request down into smaller parts. + +[float] +[[docs-delete-by-query-manual-slice]] +==== Manually slicing + +Slice a delete-by-query manually by providing a slice id and total number of +slices to each request: [source,js] ---------------------------------------------------------------- @@ -412,10 +420,11 @@ Which results in a sensible `total` like this one: [float] [[docs-delete-by-query-automatic-slice]] -=== Automatic slicing +==== Automatic slicing You can also let delete-by-query automatically parallelize using -<> to slice on `_uid`: +<> to slice on `_uid`. Use `slices` to specify the number of +slices to use: [source,js] ---------------------------------------------------------------- @@ -463,6 +472,11 @@ Which results in a sensible `total` like this one: ---------------------------------------------------------------- // TESTRESPONSE +Setting `slices` to `auto` will let Elasticsearch choose the number of slices +to use. This setting will use one slice per shard, up to a certain limit. If +there are multiple source indices, it will choose the number of slices based +on the index with the smallest number of shards. + Adding `slices` to `_delete_by_query` just automates the manual process used in the section above, creating sub-requests which means it has some quirks: @@ -489,18 +503,20 @@ though these are all taken at approximately the same time. [float] [[docs-delete-by-query-picking-slices]] -=== Picking the number of slices +===== Picking the number of slices -At this point we have a few recommendations around the number of `slices` to -use (the `max` parameter in the slice API if manually parallelizing): +If slicing automatically, setting `slices` to `auto` will choose a reasonable +number for most indices. If you're slicing manually or otherwise tuning +automatic slicing, use these guidelines. -* Don't use large numbers. `500` creates fairly massive CPU thrash. -* It is more efficient from a query performance standpoint to use some multiple -of the number of shards in the source index. -* Using exactly as many shards as are in the source index is the most efficient -from a query performance standpoint. -* Indexing performance should scale linearly across available resources with -the number of `slices`. -* Whether indexing or query performance dominates that process depends on lots -of factors like the documents being reindexed and the cluster doing the -reindexing. +Query performance is most efficient when the number of `slices` is equal to the +number of shards in the index. If that number is large, (for example, +500) choose a lower number as too many `slices` will hurt performance. Setting +`slices` higher than the number of shards generally does not improve efficiency +and adds overhead. + +Delete performance scales linearly across available resources with the +number of slices. + +Whether query or delete performance dominates the runtime depends on the +documents being reindexed and cluster resources. diff --git a/docs/reference/docs/reindex.asciidoc b/docs/reference/docs/reindex.asciidoc index 9056450bf7a..bb48703cd5e 100644 --- a/docs/reference/docs/reindex.asciidoc +++ b/docs/reference/docs/reindex.asciidoc @@ -787,11 +787,19 @@ and it'll look like: Or you can search by `tag` or whatever you want. +[float] +[[docs-reindex-slice]] +=== Slicing + +Reindex supports <> to parallelize the reindexing process. +This parallelization can improve efficiency and provide a convenient way to +break the request down into smaller parts. + [float] [[docs-reindex-manual-slice]] ==== Manual slicing -Reindex supports <>, allowing you to manually parallelize the -process relatively easily: +Slice a reindex request manually by providing a slice id and total number of +slices to each request: [source,js] ---------------------------------------------------------------- @@ -849,10 +857,10 @@ Which results in a sensible `total` like this one: [float] [[docs-reindex-automatic-slice]] -=== Automatic slicing +==== Automatic slicing You can also let reindex automatically parallelize using <> to -slice on `_uid`: +slice on `_uid`. Use `slices` to specify the number of slices to use: [source,js] ---------------------------------------------------------------- @@ -890,6 +898,11 @@ Which results in a sensible `total` like this one: ---------------------------------------------------------------- // TESTRESPONSE +Setting `slices` to `auto` will let Elasticsearch choose the number of slices +to use. This setting will use one slice per shard, up to a certain limit. If +there are multiple source indices, it will choose the number of slices based +on the index with the smallest number of shards. + Adding `slices` to `_reindex` just automates the manual process used in the section above, creating sub-requests which means it has some quirks: @@ -915,21 +928,23 @@ though these are all taken at approximately the same time. [float] [[docs-reindex-picking-slices]] -=== Picking the number of slices +===== Picking the number of slices -At this point we have a few recommendations around the number of `slices` to -use (the `max` parameter in the slice API if manually parallelizing): +If slicing automatically, setting `slices` to `auto` will choose a reasonable +number for most indices. If you're slicing manually or otherwise tuning +automatic slicing, use these guidelines. -* Don't use large numbers. `500` creates fairly massive CPU thrash. -* It is more efficient from a query performance standpoint to use some multiple -of the number of shards in the source index. -* Using exactly as many shards as are in the source index is the most efficient -from a query performance standpoint. -* Indexing performance should scale linearly across available resources with -the number of `slices`. -* Whether indexing or query performance dominates that process depends on lots -of factors like the documents being reindexed and the cluster doing the -reindexing. +Query performance is most efficient when the number of `slices` is equal to the +number of shards in the index. If that number is large, (for example, +500) choose a lower number as too many `slices` will hurt performance. Setting +`slices` higher than the number of shards generally does not improve efficiency +and adds overhead. + +Indexing performance scales linearly across available resources with the +number of slices. + +Whether query or indexing performance dominates the runtime depends on the +documents being reindexed and cluster resources. [float] === Reindex daily indices diff --git a/docs/reference/docs/update-by-query.asciidoc b/docs/reference/docs/update-by-query.asciidoc index 28c250dcfe1..2597fd28cb8 100644 --- a/docs/reference/docs/update-by-query.asciidoc +++ b/docs/reference/docs/update-by-query.asciidoc @@ -403,11 +403,19 @@ query takes effect immediately but rethrotting that slows down the query will take effect on after completing the current batch. This prevents scroll timeouts. +[float] +[[docs-update-by-query-slice]] +=== Slicing + +Update-by-query supports <> to parallelize the updating process. +This parallelization can improve efficiency and provide a convenient way to +break the request down into smaller parts. + [float] [[docs-update-by-query-manual-slice]] ==== Manual slicing -Update-by-query supports <> allowing you to manually parallelize -the process relatively easily: +Slice an update-by-query manually by providing a slice id and total number of +slices to each request: [source,js] ---------------------------------------------------------------- @@ -459,10 +467,11 @@ Which results in a sensible `total` like this one: [float] [[docs-update-by-query-automatic-slice]] -=== Automatic slicing +==== Automatic slicing You can also let update-by-query automatically parallelize using -<> to slice on `_uid`: +<> to slice on `_uid`. Use `slices` to specify the number of +slices to use: [source,js] ---------------------------------------------------------------- @@ -497,6 +506,11 @@ Which results in a sensible `total` like this one: ---------------------------------------------------------------- // TESTRESPONSE +Setting `slices` to `auto` will let Elasticsearch choose the number of slices +to use. This setting will use one slice per shard, up to a certain limit. If +there are multiple source indices, it will choose the number of slices based +on the index with the smallest number of shards. + Adding `slices` to `_update_by_query` just automates the manual process used in the section above, creating sub-requests which means it has some quirks: @@ -523,22 +537,23 @@ though these are all taken at approximately the same time. [float] [[docs-update-by-query-picking-slices]] -=== Picking the number of slices +===== Picking the number of slices -At this point we have a few recommendations around the number of `slices` to -use (the `max` parameter in the slice API if manually parallelizing): +If slicing automatically, setting `slices` to `auto` will choose a reasonable +number for most indices. If you're slicing manually or otherwise tuning +automatic slicing, use these guidelines. -* Don't use large numbers. `500` creates fairly massive CPU thrash. -* It is more efficient from a query performance standpoint to use some multiple -of the number of shards in the source index. -* Using exactly as many shards as are in the source index is the most efficient -from a query performance standpoint. -* Indexing performance should scale linearly across available resources with -the number of `slices`. -* Whether indexing or query performance dominates that process depends on lots -of factors like the documents being reindexed and the cluster doing the -reindexing. +Query performance is most efficient when the number of `slices` is equal to the +number of shards in the index. If that number is large, (for example, +500) choose a lower number as too many `slices` will hurt performance. Setting +`slices` higher than the number of shards generally does not improve efficiency +and adds overhead. +Update performance scales linearly across available resources with the +number of slices. + +Whether query or update performance dominates the runtime depends on the +documents being reindexed and cluster resources. [float] [[picking-up-a-new-property]] diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index 91673fd0a41..0fc89677f40 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -87,7 +87,8 @@ import static org.elasticsearch.search.sort.SortBuilders.fieldSort; */ public abstract class AbstractAsyncBulkByScrollAction> { protected final Logger logger; - protected final WorkingBulkByScrollTask task; + protected final BulkByScrollTask task; + protected final WorkerBulkByScrollTaskState worker; protected final ThreadPool threadPool; protected final ScriptService scriptService; protected final ClusterState clusterState; @@ -114,16 +115,22 @@ public abstract class AbstractAsyncBulkByScrollAction, ScrollableHitSource.Hit, RequestWrapper> scriptApplier; - public AbstractAsyncBulkByScrollAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, Request mainRequest, ScriptService scriptService, ClusterState clusterState, ActionListener listener) { this(task, logger, client, threadPool, mainRequest, scriptService, clusterState, listener, client.settings()); } - public AbstractAsyncBulkByScrollAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, Request mainRequest, ScriptService scriptService, ClusterState clusterState, ActionListener listener, Settings settings) { + this.task = task; + if (!task.isWorker()) { + throw new IllegalArgumentException("Given task [" + task.getId() + "] must have a child worker"); + } + this.worker = task.getWorkerState(); + this.logger = logger; this.client = client; this.settings = settings; @@ -133,7 +140,7 @@ public abstract class AbstractAsyncBulkByScrollAction 0) { total = min(total, mainRequest.getSize()); } - task.setTotal(total); + worker.setTotal(total); AbstractRunnable prepareBulkRequestRunnable = new AbstractRunnable() { @Override protected void doRun() throws Exception { @@ -289,7 +296,7 @@ public abstract class AbstractAsyncBulkByScrollAction hits = response.getHits(); if (mainRequest.getSize() != SIZE_ALL_MATCHES) { // Truncate the hits if we have more than the request size - long remaining = max(0, mainRequest.getSize() - task.getSuccessfullyProcessed()); + long remaining = max(0, mainRequest.getSize() - worker.getSuccessfullyProcessed()); if (remaining < hits.size()) { hits = hits.subList(0, (int) remaining); } @@ -372,16 +379,16 @@ public abstract class AbstractAsyncBulkByScrollAction= mainRequest.getSize()) { + if (mainRequest.getSize() != SIZE_ALL_MATCHES && worker.getSuccessfullyProcessed() >= mainRequest.getSize()) { // We've processed all the requested docs. refreshAndFinish(emptyList(), emptyList(), false); return; @@ -425,7 +432,7 @@ public abstract class AbstractAsyncBulkByScrollAction { onScrollResponse(lastBatchStartTime, lastBatchSize, response); }); @@ -433,7 +440,7 @@ public abstract class AbstractAsyncBulkByScrollAction failures) { if (failure.getStatus() == CONFLICT) { - task.countVersionConflict(); + worker.countVersionConflict(); if (false == mainRequest.isAbortOnVersionConflict()) { return; } @@ -759,9 +766,9 @@ public abstract class AbstractAsyncBulkByScrollAction, ScrollableHitSource.Hit, RequestWrapper> { + public abstract static class ScriptApplier implements BiFunction, ScrollableHitSource.Hit, RequestWrapper> { - private final WorkingBulkByScrollTask task; + private final WorkerBulkByScrollTaskState taskWorker; private final ScriptService scriptService; private final Script script; private final Map params; @@ -769,9 +776,11 @@ public abstract class AbstractAsyncBulkByScrollAction context; - public ScriptApplier(WorkingBulkByScrollTask task, ScriptService scriptService, Script script, + public ScriptApplier(WorkerBulkByScrollTaskState taskWorker, + ScriptService scriptService, + Script script, Map params) { - this.task = task; + this.taskWorker = taskWorker; this.scriptService = scriptService; this.script = script; this.params = params; @@ -864,7 +873,7 @@ public abstract class AbstractAsyncBulkByScrollAction scriptChangedOpType(RequestWrapper request, OpType oldOpType, OpType newOpType) { switch (newOpType) { case NOOP: - task.countNoop(); + taskWorker.countNoop(); return null; case DELETE: RequestWrapper delete = wrap(new DeleteRequest(request.getIndex(), request.getType(), request.getId())); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java index 64b02c4be81..4ea2592801d 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java @@ -29,6 +29,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.tasks.LoggingTaskListener; import org.elasticsearch.tasks.Task; @@ -90,7 +91,11 @@ public abstract class AbstractBaseReindexRestHandler< request.setRefresh(restRequest.paramAsBoolean("refresh", request.isRefresh())); request.setTimeout(restRequest.paramAsTime("timeout", request.getTimeout())); - request.setSlices(restRequest.paramAsInt("slices", request.getSlices())); + + Integer slices = parseSlices(restRequest); + if (slices != null) { + request.setSlices(slices); + } String waitForActiveShards = restRequest.param("wait_for_active_shards"); if (waitForActiveShards != null) { @@ -115,6 +120,32 @@ public abstract class AbstractBaseReindexRestHandler< }; } + private static Integer parseSlices(RestRequest request) { + String slicesString = request.param("slices"); + if (slicesString == null) { + return null; + } + + if (slicesString.equals(AbstractBulkByScrollRequest.AUTO_SLICES_VALUE)) { + return AbstractBulkByScrollRequest.AUTO_SLICES; + } + + int slices; + try { + slices = Integer.parseInt(slicesString); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "[slices] must be a positive integer or the string \"auto\", but was [" + slicesString + "]", e); + } + + if (slices < 1) { + throw new IllegalArgumentException( + "[slices] must be a positive integer or the string \"auto\", but was [" + slicesString + "]"); + } + + return slices; + } + /** * @return requests_per_second from the request as a float if it was on the request, null otherwise */ diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java index 2608f5715ba..8dd30a9fa9d 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java @@ -31,7 +31,7 @@ import org.elasticsearch.threadpool.ThreadPool; * Implementation of delete-by-query using scrolling and bulk. */ public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction { - public AsyncDeleteByQueryAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService, ClusterState clusterState, ActionListener listener) { super(task, logger, client, threadPool, request, scriptService, clusterState, listener); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollParallelizationHelper.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollParallelizationHelper.java index 48f10306454..19cca917290 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollParallelizationHelper.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollParallelizationHelper.java @@ -21,31 +21,118 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.tasks.TaskManager; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; /** * Helps parallelize reindex requests using sliced scrolls. */ class BulkByScrollParallelizationHelper { + + static final int AUTO_SLICE_CEILING = 20; + private BulkByScrollParallelizationHelper() {} - public static > void startSlices(Client client, TaskManager taskManager, - Action action, - String localNodeId, ParentBulkByScrollTask task, Request request, - ActionListener listener) { + /** + * Takes an action created by a {@link BulkByScrollTask} and runs it with regard to whether the request is sliced or not. + * + * If the request is not sliced (i.e. the number of slices is 1), the worker action in the given {@link Runnable} will be started on + * the local node. If the request is sliced (i.e. the number of slices is more than 1), then a subrequest will be created for each + * slice and sent. + * + * If slices are set as {@code "auto"}, this class will resolve that to a specific number based on characteristics of the source + * indices. A request with {@code "auto"} slices may end up being sliced or unsliced. + */ + static > void startSlicedAction( + Request request, + BulkByScrollTask task, + Action action, + ActionListener listener, + Client client, + DiscoveryNode node, + Runnable workerAction) { + + if (request.getSlices() == AbstractBulkByScrollRequest.AUTO_SLICES) { + ClusterSearchShardsRequest shardsRequest = new ClusterSearchShardsRequest(); + shardsRequest.indices(request.getSearchRequest().indices()); + client.admin().cluster().searchShards(shardsRequest, ActionListener.wrap( + response -> { + int actualNumSlices = countSlicesBasedOnShards(response); + sliceConditionally(request, task, action, listener, client, node, workerAction, actualNumSlices); + }, + listener::onFailure + )); + } else { + sliceConditionally(request, task, action, listener, client, node, workerAction, request.getSlices()); + } + } + + private static > void sliceConditionally( + Request request, + BulkByScrollTask task, + Action action, + ActionListener listener, + Client client, + DiscoveryNode node, + Runnable workerAction, + int slices) { + + if (slices > 1) { + task.setWorkerCount(slices); + sendSubRequests(client, action, node.getId(), task, request, listener); + } else { + SliceBuilder sliceBuilder = request.getSearchRequest().source().slice(); + Integer sliceId = sliceBuilder == null + ? null + : sliceBuilder.getId(); + task.setWorker(request.getRequestsPerSecond(), sliceId); + workerAction.run(); + } + } + + private static int countSlicesBasedOnShards(ClusterSearchShardsResponse response) { + Map countsByIndex = Arrays.stream(response.getGroups()).collect(Collectors.toMap( + group -> group.getShardId().getIndex(), + group -> 1, + (sum, term) -> sum + term + )); + Set counts = new HashSet<>(countsByIndex.values()); + int leastShards = Collections.min(counts); + return Math.min(leastShards, AUTO_SLICE_CEILING); + } + + private static > void sendSubRequests( + Client client, + Action action, + String localNodeId, + BulkByScrollTask task, + Request request, + ActionListener listener) { + + LeaderBulkByScrollTaskState worker = task.getLeaderState(); + int totalSlices = worker.getSlices(); TaskId parentTaskId = new TaskId(localNodeId, task.getId()); - for (final SearchRequest slice : sliceIntoSubRequests(request.getSearchRequest(), UidFieldMapper.NAME, request.getSlices())) { + for (final SearchRequest slice : sliceIntoSubRequests(request.getSearchRequest(), UidFieldMapper.NAME, totalSlices)) { // TODO move the request to the correct node. maybe here or somehow do it as part of startup for reindex in general.... - Request requestForSlice = request.forSlice(parentTaskId, slice); + Request requestForSlice = request.forSlice(parentTaskId, slice, totalSlices); ActionListener sliceListener = ActionListener.wrap( - r -> task.onSliceResponse(listener, slice.source().slice().getId(), r), - e -> task.onSliceFailure(listener, slice.source().slice().getId(), e)); + r -> worker.onSliceResponse(listener, slice.source().slice().getId(), r), + e -> worker.onSliceFailure(listener, slice.source().slice().getId(), e)); client.execute(action, requestForSlice, sliceListener); } } @@ -80,5 +167,4 @@ class BulkByScrollParallelizationHelper { } return slices; } - } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java index 99e1a9f166d..e2de5cd4ffc 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java @@ -51,15 +51,17 @@ public class TransportDeleteByQueryAction extends HandledTransportAction listener) { - if (request.getSlices() > 1) { - BulkByScrollParallelizationHelper.startSlices(client, taskManager, DeleteByQueryAction.INSTANCE, - clusterService.localNode().getId(), (ParentBulkByScrollTask) task, request, listener); - } else { - ClusterState state = clusterService.state(); - ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task); - new AsyncDeleteByQueryAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state, + BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task; + BulkByScrollParallelizationHelper.startSlicedAction(request, bulkByScrollTask, DeleteByQueryAction.INSTANCE, listener, client, + clusterService.localNode(), + () -> { + ClusterState state = clusterService.state(); + ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), + bulkByScrollTask); + new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, request, scriptService, state, listener).start(); - } + } + ); } @Override diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java index 77a3f19ddae..92d7c9ee51f 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java @@ -109,18 +109,22 @@ public class TransportReindexAction extends HandledTransportAction listener) { - if (request.getSlices() > 1) { - BulkByScrollParallelizationHelper.startSlices(client, taskManager, ReindexAction.INSTANCE, clusterService.localNode().getId(), - (ParentBulkByScrollTask) task, request, listener); - } else { - checkRemoteWhitelist(remoteWhitelist, request.getRemoteInfo()); - ClusterState state = clusterService.state(); - validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(), - indexNameExpressionResolver, autoCreateIndex, state); - ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task); - new AsyncIndexBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state, + checkRemoteWhitelist(remoteWhitelist, request.getRemoteInfo()); + ClusterState state = clusterService.state(); + validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(), + indexNameExpressionResolver, autoCreateIndex, state); + + BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task; + + BulkByScrollParallelizationHelper.startSlicedAction(request, bulkByScrollTask, ReindexAction.INSTANCE, listener, client, + clusterService.localNode(), + () -> { + ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), + bulkByScrollTask); + new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, request, scriptService, state, listener).start(); - } + } + ); } @Override @@ -244,13 +248,13 @@ public class TransportReindexAction extends HandledTransportAction createdThreads = emptyList(); - AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, ReindexRequest request, ScriptService scriptService, ClusterState clusterState, ActionListener listener) { this(task, logger, client, threadPool, request, scriptService, clusterState, listener, client.settings()); } - AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, ReindexRequest request, ScriptService scriptService, ClusterState clusterState, ActionListener listener, Settings settings) { super(task, logger, client, threadPool, request, scriptService, clusterState, listener, settings); @@ -271,8 +275,8 @@ public class TransportReindexAction extends HandledTransportAction()); RestClient restClient = buildRestClient(remoteInfo, task.getId(), createdThreads); - return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, task::countSearchRetry, this::finishHim, restClient, - remoteInfo.getQuery(), mainRequest.getSearchRequest()); + return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry, this::finishHim, + restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest()); } return super.buildScrollableResultSource(backoffPolicy); } @@ -293,7 +297,7 @@ public class TransportReindexAction extends HandledTransportAction, ScrollableHitSource.Hit, RequestWrapper> buildScriptApplier() { Script script = mainRequest.getScript(); if (script != null) { - return new ReindexScriptApplier(task, scriptService, script, script.getParams()); + return new ReindexScriptApplier(worker, scriptService, script, script.getParams()); } return super.buildScriptApplier(); } @@ -385,9 +389,9 @@ public class TransportReindexAction extends HandledTransportAction params) { - super(task, scriptService, script, params); + super(taskWorker, scriptService, script, params); } /* diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java index bcfb2813474..d8105e4a6ec 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java @@ -59,21 +59,48 @@ public class TransportRethrottleAction extends TransportTasksAction listener) { - int runningSubTasks = task.runningSliceSubTasks(); - if (runningSubTasks == 0) { - logger.debug("rethrottling local task [{}] to [{}] requests per second", task.getId(), newRequestsPerSecond); - task.rethrottle(newRequestsPerSecond); - listener.onResponse(task.taskInfo(localNodeId, true)); + + if (task.isWorker()) { + rethrottleChildTask(logger, localNodeId, task, newRequestsPerSecond, listener); return; } - RethrottleRequest subRequest = new RethrottleRequest(); - subRequest.setRequestsPerSecond(newRequestsPerSecond / runningSubTasks); - subRequest.setParentTaskId(new TaskId(localNodeId, task.getId())); - logger.debug("rethrottling children of task [{}] to [{}] requests per second", task.getId(), subRequest.getRequestsPerSecond()); - client.execute(RethrottleAction.INSTANCE, subRequest, ActionListener.wrap(r -> { - r.rethrowFailures("Rethrottle"); - listener.onResponse(task.getInfoGivenSliceInfo(localNodeId, r.getTasks())); - }, listener::onFailure)); + + if (task.isLeader()) { + rethrottleParentTask(logger, localNodeId, client, task, newRequestsPerSecond, listener); + return; + } + + throw new IllegalArgumentException("task [" + task.getId() + "] must be set as a child or parent"); + } + + private static void rethrottleParentTask(Logger logger, String localNodeId, Client client, BulkByScrollTask task, + float newRequestsPerSecond, ActionListener listener) { + final LeaderBulkByScrollTaskState leaderState = task.getLeaderState(); + final int runningSubtasks = leaderState.runningSliceSubTasks(); + + if (runningSubtasks > 0) { + RethrottleRequest subRequest = new RethrottleRequest(); + subRequest.setRequestsPerSecond(newRequestsPerSecond / runningSubtasks); + subRequest.setParentTaskId(new TaskId(localNodeId, task.getId())); + logger.debug("rethrottling children of task [{}] to [{}] requests per second", task.getId(), + subRequest.getRequestsPerSecond()); + client.execute(RethrottleAction.INSTANCE, subRequest, ActionListener.wrap( + r -> { + r.rethrowFailures("Rethrottle"); + listener.onResponse(task.taskInfoGivenSubtaskInfo(localNodeId, r.getTasks())); + }, + listener::onFailure)); + } else { + logger.debug("children of task [{}] are already finished, nothing to rethrottle", task.getId()); + listener.onResponse(task.taskInfo(localNodeId, true)); + } + } + + private static void rethrottleChildTask(Logger logger, String localNodeId, BulkByScrollTask task, float newRequestsPerSecond, + ActionListener listener) { + logger.debug("rethrottling local task [{}] to [{}] requests per second", task.getId(), newRequestsPerSecond); + task.getWorkerState().rethrottle(newRequestsPerSecond); + listener.onResponse(task.taskInfo(localNodeId, true)); } @Override diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java index 8924c7038c9..e21a6408bd8 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java @@ -64,15 +64,17 @@ public class TransportUpdateByQueryAction extends HandledTransportAction listener) { - if (request.getSlices() > 1) { - BulkByScrollParallelizationHelper.startSlices(client, taskManager, UpdateByQueryAction.INSTANCE, - clusterService.localNode().getId(), (ParentBulkByScrollTask) task, request, listener); - } else { - ClusterState state = clusterService.state(); - ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task); - new AsyncIndexBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state, + BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task; + BulkByScrollParallelizationHelper.startSlicedAction(request, bulkByScrollTask, UpdateByQueryAction.INSTANCE, listener, client, + clusterService.localNode(), + () -> { + ClusterState state = clusterService.state(); + ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), + bulkByScrollTask); + new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, request, scriptService, state, listener).start(); - } + } + ); } @Override @@ -84,13 +86,13 @@ public class TransportUpdateByQueryAction extends HandledTransportAction { - AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, UpdateByQueryRequest request, ScriptService scriptService, ClusterState clusterState, ActionListener listener) { this(task, logger, client, threadPool, request, scriptService, clusterState, listener, client.settings()); } - AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, + AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client, ThreadPool threadPool, UpdateByQueryRequest request, ScriptService scriptService, ClusterState clusterState, ActionListener listener, Settings settings) { super(task, logger, client, threadPool, request, scriptService, clusterState, listener, settings); @@ -109,7 +111,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction, ScrollableHitSource.Hit, RequestWrapper> buildScriptApplier() { Script script = mainRequest.getScript(); if (script != null) { - return new UpdateByQueryScriptApplier(task, scriptService, script, script.getParams()); + return new UpdateByQueryScriptApplier(worker, scriptService, script, script.getParams()); } return super.buildScriptApplier(); } @@ -129,9 +131,9 @@ public class TransportUpdateByQueryAction extends HandledTransportAction params) { - super(task, scriptService, script, params); + UpdateByQueryScriptApplier(WorkerBulkByScrollTaskState taskWorker, ScriptService scriptService, Script script, + Map params) { + super(taskWorker, scriptService, script, params); } @Override diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index 7f59987617f..fe754b38817 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -124,7 +124,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { private PlainActionFuture listener; private String scrollId; private TaskManager taskManager; - private WorkingBulkByScrollTask testTask; + private BulkByScrollTask testTask; + private WorkerBulkByScrollTaskState worker; private Map expectedHeaders = new HashMap<>(); private DiscoveryNode localNode; private TaskId taskId; @@ -141,7 +142,9 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { listener = new PlainActionFuture<>(); scrollId = null; taskManager = new TaskManager(Settings.EMPTY); - testTask = (WorkingBulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest); + testTask = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest); + testTask.setWorker(testRequest.getRequestsPerSecond(), null); + worker = testTask.getWorkerState(); localNode = new DiscoveryNode("thenode", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); taskId = new TaskId(localNode.getId(), testTask.getId()); @@ -309,7 +312,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { * Mimicks a ThreadPool rejecting execution of the task. */ public void testThreadPoolRejectionsAbortRequest() throws Exception { - testTask.rethrottle(1); + worker.rethrottle(1); setupClient(new TestThreadPool(getTestName()) { @Override public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { @@ -439,7 +442,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { firstSearchRequest.scroll(timeValueSeconds(10)); // Set throttle to 1 request per second to make the math simpler - testTask.rethrottle(1f); + worker.rethrottle(1f); // Make the last batch look nearly instant but have 100 documents TimeValue lastBatchStartTime = timeValueNanos(System.nanoTime()); TimeValue now = timeValueNanos(lastBatchStartTime.nanos() + 1); @@ -459,7 +462,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { assertEquals(99, capturedDelay.get().seconds()); } else { // Let's rethrottle between the starting the scroll and getting the response - testTask.rethrottle(10f); + worker.rethrottle(10f); client.lastScroll.get().listener.onResponse(searchResponse); // The delay uses the new throttle assertEquals(9, capturedDelay.get().seconds()); @@ -624,7 +627,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { long total = randomIntBetween(0, Integer.MAX_VALUE); ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null); // Use a long delay here so the test will time out if the cancellation doesn't reschedule the throttled task - testTask.rethrottle(1); + worker.rethrottle(1); simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1000, response); // Now that we've got our cancel we'll just verify that it all came through all right @@ -694,7 +697,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { } @Override - public DummyAbstractBulkByScrollRequest forSlice(TaskId slicingTask, SearchRequest slice) { + public DummyAbstractBulkByScrollRequest forSlice(TaskId slicingTask, SearchRequest slice, int totalSlices) { throw new UnsupportedOperationException(); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryBasicTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryBasicTests.java index aba7cd69359..276dc955f82 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryBasicTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryBasicTests.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.alias.Alias; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilders; @@ -31,7 +30,10 @@ import org.elasticsearch.test.InternalSettingsPlugin; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE; @@ -224,7 +226,7 @@ public class DeleteByQueryBasicTests extends ReindexTestCase { assertHitCount(client().prepareSearch("test").setSize(0).get(), docs); } - public void testWorkers() throws Exception { + public void testSlices() throws Exception { indexRandom(true, client().prepareIndex("test", "test", "1").setSource("foo", "a"), client().prepareIndex("test", "test", "2").setSource("foo", "a"), @@ -236,18 +238,74 @@ public class DeleteByQueryBasicTests extends ReindexTestCase { ); assertHitCount(client().prepareSearch("test").setTypes("test").setSize(0).get(), 7); + int slices = randomSlices(); + int expectedSlices = expectedSliceStatuses(slices, "test"); + // Deletes the two docs that matches "foo:a" - assertThat(deleteByQuery().source("test").filter(termQuery("foo", "a")).refresh(true).setSlices(5).get(), - matcher().deleted(2).slices(hasSize(5))); + assertThat( + deleteByQuery() + .source("test") + .filter(termQuery("foo", "a")) + .refresh(true) + .setSlices(slices).get(), + matcher() + .deleted(2) + .slices(hasSize(expectedSlices))); assertHitCount(client().prepareSearch("test").setTypes("test").setSize(0).get(), 5); // Delete remaining docs - DeleteByQueryRequestBuilder request = deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).refresh(true) - .setSlices(5); - assertThat(request.get(), matcher().deleted(5).slices(hasSize(5))); + assertThat( + deleteByQuery() + .source("test") + .filter(QueryBuilders.matchAllQuery()) + .refresh(true) + .setSlices(slices).get(), + matcher() + .deleted(5) + .slices(hasSize(expectedSlices))); assertHitCount(client().prepareSearch("test").setTypes("test").setSize(0).get(), 0); } + public void testMultipleSources() throws Exception { + int sourceIndices = between(2, 5); + + Map> docs = new HashMap<>(); + for (int sourceIndex = 0; sourceIndex < sourceIndices; sourceIndex++) { + String indexName = "test" + sourceIndex; + docs.put(indexName, new ArrayList<>()); + int numDocs = between(5, 15); + for (int i = 0; i < numDocs; i++) { + docs.get(indexName).add(client().prepareIndex(indexName, "test", Integer.toString(i)).setSource("foo", "a")); + } + } + + List allDocs = docs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); + indexRandom(true, allDocs); + for (Map.Entry> entry : docs.entrySet()) { + assertHitCount(client().prepareSearch(entry.getKey()).setSize(0).get(), entry.getValue().size()); + } + + int slices = randomSlices(1, 10); + int expectedSlices = expectedSliceStatuses(slices, docs.keySet()); + + String[] sourceIndexNames = docs.keySet().toArray(new String[docs.size()]); + + assertThat( + deleteByQuery() + .source(sourceIndexNames) + .filter(QueryBuilders.matchAllQuery()) + .refresh(true) + .setSlices(slices).get(), + matcher() + .deleted(allDocs.size()) + .slices(hasSize(expectedSlices))); + + for (String index : docs.keySet()) { + assertHitCount(client().prepareSearch(index).setTypes("test").setSize(0).get(), 0); + } + + } + /** * Test delete by query support for filtering by type. This entire feature * can and should be removed when we drop support for types index with diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexBasicTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexBasicTests.java index 4f7753fca9a..43764bf25fc 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexBasicTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexBasicTests.java @@ -22,7 +22,11 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.index.IndexRequestBuilder; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -88,8 +92,6 @@ public class ReindexBasicTests extends ReindexTestCase { } public void testCopyManyWithSlices() throws Exception { - int workers = between(2, 10); - List docs = new ArrayList<>(); int max = between(150, 500); for (int i = 0; i < max; i++) { @@ -99,21 +101,61 @@ public class ReindexBasicTests extends ReindexTestCase { indexRandom(true, docs); assertHitCount(client().prepareSearch("source").setSize(0).get(), max); + int slices = randomSlices(); + int expectedSlices = expectedSliceStatuses(slices, "source"); + // Copy all the docs - ReindexRequestBuilder copy = reindex().source("source").destination("dest", "type").refresh(true).setSlices(workers); + ReindexRequestBuilder copy = reindex().source("source").destination("dest", "type").refresh(true).setSlices(slices); // Use a small batch size so we have to use more than one batch copy.source().setSize(5); - assertThat(copy.get(), matcher().created(max).batches(greaterThanOrEqualTo(max / 5)).slices(hasSize(workers))); + assertThat(copy.get(), matcher().created(max).batches(greaterThanOrEqualTo(max / 5)).slices(hasSize(expectedSlices))); assertHitCount(client().prepareSearch("dest").setTypes("type").setSize(0).get(), max); // Copy some of the docs int half = max / 2; - copy = reindex().source("source").destination("dest_half", "type").refresh(true).setSlices(workers); + copy = reindex().source("source").destination("dest_half", "type").refresh(true).setSlices(slices); // Use a small batch size so we have to use more than one batch copy.source().setSize(5); copy.size(half); // The real "size" of the request. BulkByScrollResponse response = copy.get(); - assertThat(response, matcher().created(lessThanOrEqualTo((long) half)).slices(hasSize(workers))); + assertThat(response, matcher().created(lessThanOrEqualTo((long) half)).slices(hasSize(expectedSlices))); assertHitCount(client().prepareSearch("dest_half").setSize(0).get(), response.getCreated()); } + + public void testMultipleSources() throws Exception { + int sourceIndices = between(2, 5); + + Map> docs = new HashMap<>(); + for (int sourceIndex = 0; sourceIndex < sourceIndices; sourceIndex++) { + String indexName = "source" + sourceIndex; + String typeName = "test" + sourceIndex; + docs.put(indexName, new ArrayList<>()); + int numDocs = between(50, 200); + for (int i = 0; i < numDocs; i++) { + docs.get(indexName).add(client().prepareIndex(indexName, typeName, "id_" + sourceIndex + "_" + i).setSource("foo", "a")); + } + } + + List allDocs = docs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); + indexRandom(true, allDocs); + for (Map.Entry> entry : docs.entrySet()) { + assertHitCount(client().prepareSearch(entry.getKey()).setSize(0).get(), entry.getValue().size()); + } + + int slices = randomSlices(1, 10); + int expectedSlices = expectedSliceStatuses(slices, docs.keySet()); + + String[] sourceIndexNames = docs.keySet().toArray(new String[docs.size()]); + ReindexRequestBuilder request = reindex() + .source(sourceIndexNames) + .destination("dest", "type") + .refresh(true) + .setSlices(slices); + + BulkByScrollResponse response = request.get(); + assertThat(response, matcher().created(allDocs.size()).slices(hasSize(expectedSlices))); + assertHitCount(client().prepareSearch("dest").setSize(0).get(), allDocs.size()); + } + + } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java index fcf80ea283c..54854afb35e 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java @@ -25,7 +25,10 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.stream.Collectors; +import static java.util.Collections.singleton; import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; /** @@ -62,4 +65,51 @@ public abstract class ReindexTestCase extends ESIntegTestCase { public static BulkIndexByScrollResponseMatcher matcher() { return new BulkIndexByScrollResponseMatcher(); } + + static int randomSlices(int min, int max) { + if (randomBoolean()) { + return AbstractBulkByScrollRequest.AUTO_SLICES; + } else { + return between(min, max); + } + } + + static int randomSlices() { + return randomSlices(2, 10); + } + + /** + * Figures out how many slices the request handling will use + */ + protected int expectedSlices(int requestSlices, Collection indices) { + if (requestSlices == AbstractBulkByScrollRequest.AUTO_SLICES) { + int leastNumShards = Collections.min(indices.stream() + .map(sourceIndex -> getNumShards(sourceIndex).numPrimaries) + .collect(Collectors.toList())); + return Math.min(leastNumShards, BulkByScrollParallelizationHelper.AUTO_SLICE_CEILING); + } else { + return requestSlices; + } + } + + protected int expectedSlices(int requestSlices, String index) { + return expectedSlices(requestSlices, singleton(index)); + } + + /** + * Figures out how many slice statuses to expect in the response + */ + protected int expectedSliceStatuses(int requestSlices, Collection indices) { + int slicesConfigured = expectedSlices(requestSlices, indices); + + if (slicesConfigured > 1) { + return slicesConfigured; + } else { + return 0; + } + } + + protected int expectedSliceStatuses(int slicesConfigured, String index) { + return expectedSliceStatuses(slicesConfigured, singleton(index)); + } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RethrottleTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RethrottleTests.java index 228bae4ed4a..566c057a798 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RethrottleTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RethrottleTests.java @@ -60,15 +60,15 @@ public class RethrottleTests extends ReindexTestCase { } public void testReindexWithWorkers() throws Exception { - testCase(reindex().source("test").destination("dest").setSlices(between(2, 10)), ReindexAction.NAME); + testCase(reindex().source("test").destination("dest").setSlices(randomSlices()), ReindexAction.NAME); } public void testUpdateByQueryWithWorkers() throws Exception { - testCase(updateByQuery().source("test").setSlices(between(2, 10)), UpdateByQueryAction.NAME); + testCase(updateByQuery().source("test").setSlices(randomSlices()), UpdateByQueryAction.NAME); } public void testDeleteByQueryWithWorkers() throws Exception { - testCase(deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).setSlices(between(2, 10)), DeleteByQueryAction.NAME); + testCase(deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).setSlices(randomSlices()), DeleteByQueryAction.NAME); } private void testCase(AbstractBulkByScrollRequestBuilder request, String actionName) throws Exception { @@ -76,8 +76,12 @@ public class RethrottleTests extends ReindexTestCase { /* Add ten documents per slice so most slices will have many documents to process, having to go to multiple batches. * we can't rely on all of them doing so, but */ + + createIndex("test"); + int numSlices = expectedSlices(request.request().getSlices(), "test"); + List docs = new ArrayList<>(); - for (int i = 0; i < request.request().getSlices() * 10; i++) { + for (int i = 0; i < numSlices * 10; i++) { docs.add(client().prepareIndex("test", "test", Integer.toString(i)).setSource("foo", "bar")); } indexRandom(true, docs); @@ -87,15 +91,15 @@ public class RethrottleTests extends ReindexTestCase { request.source().setSize(1); // Make sure we use multiple batches ActionFuture responseListener = request.execute(); - TaskGroup taskGroupToRethrottle = findTaskToRethrottle(actionName, request.request().getSlices()); + TaskGroup taskGroupToRethrottle = findTaskToRethrottle(actionName, numSlices); TaskId taskToRethrottle = taskGroupToRethrottle.getTaskInfo().getTaskId(); - if (request.request().getSlices() == 1) { + if (numSlices == 1) { assertThat(taskGroupToRethrottle.getChildTasks(), empty()); } else { // There should be a sane number of child tasks running assertThat(taskGroupToRethrottle.getChildTasks(), - hasSize(allOf(greaterThanOrEqualTo(1), lessThanOrEqualTo(request.request().getSlices())))); + hasSize(allOf(greaterThanOrEqualTo(1), lessThanOrEqualTo(numSlices)))); // Wait for all of the sub tasks to start (or finish, some might finish early, all that matters is that not all do) assertBusy(() -> { BulkByScrollTask.Status parent = (BulkByScrollTask.Status) client().admin().cluster().prepareGetTask(taskToRethrottle).get() @@ -103,7 +107,7 @@ public class RethrottleTests extends ReindexTestCase { long finishedSubTasks = parent.getSliceStatuses().stream().filter(Objects::nonNull).count(); ListTasksResponse list = client().admin().cluster().prepareListTasks().setParentTaskId(taskToRethrottle).get(); list.rethrowFailures("subtasks"); - assertThat(finishedSubTasks + list.getTasks().size(), greaterThanOrEqualTo((long) request.request().getSlices())); + assertThat(finishedSubTasks + list.getTasks().size(), greaterThanOrEqualTo((long) numSlices)); assertThat(list.getTasks().size(), greaterThan(0)); }); } @@ -114,8 +118,9 @@ public class RethrottleTests extends ReindexTestCase { rethrottleResponse.rethrowFailures("Rethrottle"); assertThat(rethrottleResponse.getTasks(), hasSize(1)); BulkByScrollTask.Status status = (BulkByScrollTask.Status) rethrottleResponse.getTasks().get(0).getStatus(); + // Now check the resulting requests per second. - if (request.request().getSlices() == 1) { + if (numSlices == 1) { // If there is a single slice it should match perfectly assertEquals(newRequestsPerSecond, status.getRequestsPerSecond(), Float.MIN_NORMAL); } else { @@ -128,7 +133,7 @@ public class RethrottleTests extends ReindexTestCase { float maxExpectedSliceRequestsPerSecond = newRequestsPerSecond == Float.POSITIVE_INFINITY ? Float.POSITIVE_INFINITY : (newRequestsPerSecond / unfinished) * 1.01F; float minExpectedSliceRequestsPerSecond = newRequestsPerSecond == Float.POSITIVE_INFINITY ? - Float.POSITIVE_INFINITY : (newRequestsPerSecond / request.request().getSlices()) * 0.99F; + Float.POSITIVE_INFINITY : (newRequestsPerSecond / numSlices) * 0.99F; boolean oneSliceRethrottled = false; float totalRequestsPerSecond = 0; for (BulkByScrollTask.StatusOrException statusOrException : status.getSliceStatuses()) { @@ -164,7 +169,7 @@ public class RethrottleTests extends ReindexTestCase { // Now the response should come back quickly because we've rethrottled the request BulkByScrollResponse response = responseListener.get(); assertThat("Entire request completed in a single batch. This may invalidate the test as throttling is done between batches.", - response.getBatches(), greaterThanOrEqualTo(request.request().getSlices())); + response.getBatches(), greaterThanOrEqualTo(numSlices)); } private TaskGroup findTaskToRethrottle(String actionName, int sliceCount) { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java index 39806e475c7..946ab030c82 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java @@ -70,17 +70,16 @@ public class RoundTripTests extends ESTestCase { roundTrip(reindex, tripped); assertRequestEquals(reindex, tripped); - // Try slices with a version that doesn't support slices. That should fail. - reindex.setSlices(between(2, 1000)); - Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_5_0_0_rc1, reindex, null)); - assertEquals("Attempting to send sliced reindex-style request to a node that doesn't support it. " - + "Version is [5.0.0-rc1] but must be [5.1.1]", e.getMessage()); + // Try slices=auto with a version that doesn't support it, which should fail + reindex.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); + Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, reindex, null)); + assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage()); - // Try without slices with a version that doesn't support slices. That should work. + // Try regular slices with a version that doesn't support slices=auto, which should succeed tripped = new ReindexRequest(); - reindex.setSlices(1); - roundTrip(Version.V_5_0_0_rc1, reindex, tripped); - assertRequestEquals(Version.V_5_0_0_rc1, reindex, tripped); + reindex.setSlices(between(1, Integer.MAX_VALUE)); + roundTrip(Version.V_6_0_0_alpha1, reindex, tripped); + assertRequestEquals(Version.V_6_0_0_alpha1, reindex, tripped); } public void testUpdateByQueryRequest() throws IOException { @@ -94,16 +93,15 @@ public class RoundTripTests extends ESTestCase { assertRequestEquals(update, tripped); assertEquals(update.getPipeline(), tripped.getPipeline()); - // Try slices with a version that doesn't support slices. That should fail. - update.setSlices(between(2, 1000)); - Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_5_0_0_rc1, update, null)); - assertEquals("Attempting to send sliced reindex-style request to a node that doesn't support it. " - + "Version is [5.0.0-rc1] but must be [5.1.1]", e.getMessage()); + // Try slices=auto with a version that doesn't support it, which should fail + update.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); + Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, update, null)); + assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage()); - // Try without slices with a version that doesn't support slices. That should work. + // Try regular slices with a version that doesn't support slices=auto, which should succeed tripped = new UpdateByQueryRequest(); - update.setSlices(1); - roundTrip(Version.V_5_0_0_rc1, update, tripped); + update.setSlices(between(1, Integer.MAX_VALUE)); + roundTrip(Version.V_6_0_0_alpha1, update, tripped); assertRequestEquals(update, tripped); assertEquals(update.getPipeline(), tripped.getPipeline()); } @@ -115,16 +113,15 @@ public class RoundTripTests extends ESTestCase { roundTrip(delete, tripped); assertRequestEquals(delete, tripped); - // Try slices with a version that doesn't support slices. That should fail. - delete.setSlices(between(2, 1000)); - Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_5_0_0_rc1, delete, null)); - assertEquals("Attempting to send sliced reindex-style request to a node that doesn't support it. " - + "Version is [5.0.0-rc1] but must be [5.1.1]", e.getMessage()); + // Try slices=auto with a version that doesn't support it, which should fail + delete.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); + Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, delete, null)); + assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage()); - // Try without slices with a version that doesn't support slices. That should work. + // Try regular slices with a version that doesn't support slices=auto, which should succeed tripped = new DeleteByQueryRequest(); - delete.setSlices(1); - roundTrip(Version.V_5_0_0_rc1, delete, tripped); + delete.setSlices(between(1, Integer.MAX_VALUE)); + roundTrip(Version.V_6_0_0_alpha1, delete, tripped); assertRequestEquals(delete, tripped); } @@ -139,7 +136,9 @@ public class RoundTripTests extends ESTestCase { request.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), null, "test")); request.setWaitForActiveShards(randomIntBetween(0, 10)); request.setRequestsPerSecond(between(0, Integer.MAX_VALUE)); - request.setSlices(between(1, Integer.MAX_VALUE)); + + int slices = ReindexTestCase.randomSlices(1, Integer.MAX_VALUE); + request.setSlices(slices); } private void randomRequest(AbstractBulkIndexByScrollRequest request) { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java index 222aedd2e9e..62a2c34ea58 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java @@ -48,12 +48,13 @@ import static org.mockito.Mockito.verify; public class TransportRethrottleActionTests extends ESTestCase { private int slices; - private ParentBulkByScrollTask task; + private BulkByScrollTask task; @Before public void createTask() { slices = between(2, 50); - task = new ParentBulkByScrollTask(1, "test_type", "test_action", "test", null, slices); + task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID); + task.setWorkerCount(slices); } /** @@ -113,7 +114,7 @@ public class TransportRethrottleActionTests extends ESTestCase { List sliceStatuses = new ArrayList<>(slices); for (int i = 0; i < succeeded; i++) { BulkByScrollTask.Status status = believeableCompletedStatus(i); - task.onSliceResponse(neverCalled(), i, + task.getLeaderState().onSliceResponse(neverCalled(), i, new BulkByScrollResponse(timeValueMillis(10), status, emptyList(), emptyList(), false)); sliceStatuses.add(new BulkByScrollTask.StatusOrException(status)); } @@ -134,7 +135,8 @@ public class TransportRethrottleActionTests extends ESTestCase { @SuppressWarnings("unchecked") ActionListener listener = i < slices - 1 ? neverCalled() : mock(ActionListener.class); BulkByScrollTask.Status status = believeableCompletedStatus(i); - task.onSliceResponse(listener, i, new BulkByScrollResponse(timeValueMillis(10), status, emptyList(), emptyList(), false)); + task.getLeaderState().onSliceResponse(listener, i, new BulkByScrollResponse(timeValueMillis(10), status, emptyList(), + emptyList(), false)); if (i == slices - 1) { // The whole thing succeeded so we should have got the success captureResponse(BulkByScrollResponse.class, listener).getStatus(); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryBasicTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryBasicTests.java index 663575a2933..ce254b87969 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryBasicTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryBasicTests.java @@ -19,8 +19,16 @@ package org.elasticsearch.index.reindex; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.search.sort.SortOrder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.hasSize; @@ -63,32 +71,91 @@ public class UpdateByQueryBasicTests extends ReindexTestCase { assertEquals(2, client().prepareGet("test", "test", "4").get().getVersion()); } - public void testWorkers() throws Exception { - indexRandom(true, client().prepareIndex("test", "test", "1").setSource("foo", "a"), - client().prepareIndex("test", "test", "2").setSource("foo", "a"), - client().prepareIndex("test", "test", "3").setSource("foo", "b"), - client().prepareIndex("test", "test", "4").setSource("foo", "c")); + public void testSlices() throws Exception { + indexRandom(true, + client().prepareIndex("test", "test", "1").setSource("foo", "a"), + client().prepareIndex("test", "test", "2").setSource("foo", "a"), + client().prepareIndex("test", "test", "3").setSource("foo", "b"), + client().prepareIndex("test", "test", "4").setSource("foo", "c")); assertHitCount(client().prepareSearch("test").setTypes("test").setSize(0).get(), 4); assertEquals(1, client().prepareGet("test", "test", "1").get().getVersion()); assertEquals(1, client().prepareGet("test", "test", "4").get().getVersion()); + int slices = randomSlices(2, 10); + int expectedSlices = expectedSliceStatuses(slices, "test"); + // Reindex all the docs - assertThat(updateByQuery().source("test").refresh(true).setSlices(5).get(), matcher().updated(4).slices(hasSize(5))); + assertThat( + updateByQuery() + .source("test") + .refresh(true) + .setSlices(slices).get(), + matcher() + .updated(4) + .slices(hasSize(expectedSlices))); assertEquals(2, client().prepareGet("test", "test", "1").get().getVersion()); assertEquals(2, client().prepareGet("test", "test", "4").get().getVersion()); // Now none of them - assertThat(updateByQuery().source("test").filter(termQuery("foo", "no_match")).setSlices(5).refresh(true).get(), - matcher().updated(0).slices(hasSize(5))); + assertThat( + updateByQuery() + .source("test") + .filter(termQuery("foo", "no_match")) + .setSlices(slices) + .refresh(true).get(), + matcher() + .updated(0) + .slices(hasSize(expectedSlices))); assertEquals(2, client().prepareGet("test", "test", "1").get().getVersion()); assertEquals(2, client().prepareGet("test", "test", "4").get().getVersion()); // Now half of them - assertThat(updateByQuery().source("test").filter(termQuery("foo", "a")).refresh(true).setSlices(5).get(), - matcher().updated(2).slices(hasSize(5))); + assertThat( + updateByQuery() + .source("test") + .filter(termQuery("foo", "a")) + .refresh(true) + .setSlices(slices).get(), + matcher() + .updated(2) + .slices(hasSize(expectedSlices))); assertEquals(3, client().prepareGet("test", "test", "1").get().getVersion()); assertEquals(3, client().prepareGet("test", "test", "2").get().getVersion()); assertEquals(2, client().prepareGet("test", "test", "3").get().getVersion()); assertEquals(2, client().prepareGet("test", "test", "4").get().getVersion()); } + + public void testMultipleSources() throws Exception { + int sourceIndices = between(2, 5); + + Map> docs = new HashMap<>(); + for (int sourceIndex = 0; sourceIndex < sourceIndices; sourceIndex++) { + String indexName = "test" + sourceIndex; + docs.put(indexName, new ArrayList<>()); + int numDocs = between(5, 15); + for (int i = 0; i < numDocs; i++) { + docs.get(indexName).add(client().prepareIndex(indexName, "test", Integer.toString(i)).setSource("foo", "a")); + } + } + + List allDocs = docs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); + indexRandom(true, allDocs); + for (Map.Entry> entry : docs.entrySet()) { + assertHitCount(client().prepareSearch(entry.getKey()).setSize(0).get(), entry.getValue().size()); + } + + int slices = randomSlices(1, 10); + int expectedSlices = expectedSliceStatuses(slices, docs.keySet()); + + String[] sourceIndexNames = docs.keySet().toArray(new String[docs.size()]); + BulkByScrollResponse response = updateByQuery().source(sourceIndexNames).refresh(true).setSlices(slices).get(); + assertThat(response, matcher().updated(allDocs.size()).slices(hasSize(expectedSlices))); + + for (Map.Entry> entry : docs.entrySet()) { + String index = entry.getKey(); + List indexDocs = entry.getValue(); + int randomDoc = between(0, indexDocs.size() - 1); + assertEquals(2, client().prepareGet(index, "test", Integer.toString(randomDoc)).get().getVersion()); + } + } } diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/20_validation.yml b/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/20_validation.yml index 9daf1502a36..715e81f5ded 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/20_validation.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/20_validation.yml @@ -126,7 +126,7 @@ --- "junk in slices fails": - do: - catch: /Failed to parse int parameter \[slices\] with value \[junk\]/ + catch: /\[slices\] must be a positive integer or the string "auto"/ delete_by_query: slices: junk index: test @@ -136,7 +136,7 @@ --- "zero slices fails": - do: - catch: /\[slices\] must be at least 1/ + catch: /\[slices\] must be a positive integer or the string "auto"/ delete_by_query: slices: 0 index: test diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/80_slices.yml b/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/80_slices.yml index fe0c816ee14..6e911e3f1ba 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/80_slices.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/80_slices.yml @@ -45,6 +45,7 @@ - match: {throttled_millis: 0} - gte: { took: 0 } - is_false: task + - length: {slices: 5} - match: {slices.0.version_conflicts: 0} - match: {slices.0.throttled_millis: 0} - match: {slices.1.version_conflicts: 0} @@ -128,6 +129,7 @@ - match: {response.throttled_millis: 0} - gte: { response.took: 0 } - is_false: response.task + - length: {response.slices: 5} - match: {response.slices.0.version_conflicts: 0} - match: {response.slices.0.throttled_millis: 0} - match: {response.slices.1.version_conflicts: 0} @@ -142,6 +144,7 @@ - match: {task.status.deleted: 4} - match: {task.status.version_conflicts: 0} - match: {task.status.throttled_millis: 0} + - length: {task.status.slices: 5} - match: {task.status.slices.0.version_conflicts: 0} - match: {task.status.slices.0.throttled_millis: 0} - match: {task.status.slices.1.version_conflicts: 0} @@ -252,6 +255,7 @@ - match: {response.throttled_millis: 0} - gte: { response.took: 0 } - is_false: response.task + - length: {response.slices: 2} - match: {response.slices.0.version_conflicts: 0} - match: {response.slices.0.throttled_millis: 0} - match: {response.slices.1.version_conflicts: 0} @@ -260,6 +264,7 @@ - match: {task.status.deleted: 6} - match: {task.status.version_conflicts: 0} - match: {task.status.throttled_millis: 0} + - length: {task.status.slices: 2} - match: {task.status.slices.0.version_conflicts: 0} - match: {task.status.slices.0.throttled_millis: 0} - match: {task.status.slices.1.version_conflicts: 0} @@ -277,3 +282,72 @@ count: index: test - match: {count: 0} + +--- +"Multiple slices with auto slice": + - do: + indices.create: + index: test + body: + settings: + index: + number_of_shards: 3 + - do: + index: + index: test + type: foo + id: 1 + body: { "text": "test" } + - do: + index: + index: test + type: foo + id: 2 + body: { "text": "test" } + - do: + index: + index: test + type: foo + id: 3 + body: { "text": "test" } + - do: + index: + index: test + type: foo + id: 4 + body: { "text": "test" } + - do: + indices.refresh: {} + + - do: + delete_by_query: + index: test + slices: auto + body: + query: + match_all: {} + + - is_false: timed_out + - match: {deleted: 4} + - is_false: created + - is_false: updated + - match: {version_conflicts: 0} + - match: {failures: []} + - match: {noops: 0} + - match: {throttled_millis: 0} + - gte: { took: 0 } + - is_false: task + - length: {slices: 3} + - match: {slices.0.version_conflicts: 0} + - match: {slices.0.throttled_millis: 0} + - match: {slices.1.version_conflicts: 0} + - match: {slices.1.throttled_millis: 0} + - match: {slices.2.version_conflicts: 0} + - match: {slices.2.throttled_millis: 0} + + - do: + indices.refresh: {} + - do: + count: + index: test + - match: {count: 0} diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/20_validation.yml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/20_validation.yml index b64eaac7dec..bef31b1bd79 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/20_validation.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/20_validation.yml @@ -275,7 +275,7 @@ --- "junk in slices fails": - do: - catch: /Failed to parse int parameter \[slices\] with value \[junk\]/ + catch: /\[slices\] must be a positive integer or the string "auto"/ reindex: slices: junk body: @@ -287,7 +287,7 @@ --- "zero slices fails": - do: - catch: /\[slices\] must be at least 1/ + catch: /\[slices\] must be a positive integer or the string "auto"/ reindex: slices: 0 body: diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/80_slices.yml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/80_slices.yml index 5c54adb5c08..fb06018d7c0 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/80_slices.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/80_slices.yml @@ -43,6 +43,7 @@ - gte: { took: 0 } - is_false: task - is_false: deleted + - length: {slices: 5} - match: {slices.0.updated: 0} - match: {slices.0.version_conflicts: 0} - match: {slices.0.throttled_millis: 0} @@ -127,6 +128,7 @@ - gte: { response.took: 0 } - is_false: response.task - is_false: response.deleted + - length: {response.slices: 5} - match: {response.slices.0.updated: 0} - match: {response.slices.0.version_conflicts: 0} - match: {response.slices.0.throttled_millis: 0} @@ -147,6 +149,7 @@ - match: {task.status.updated: 0} - match: {task.status.version_conflicts: 0} - match: {task.status.throttled_millis: 0} + - length: {task.status.slices: 5} - match: {task.status.slices.0.updated: 0} - match: {task.status.slices.0.version_conflicts: 0} - match: {task.status.slices.0.throttled_millis: 0} @@ -260,6 +263,7 @@ - gte: { response.took: 0 } - is_false: response.task - is_false: response.deleted + - length: {response.slices: 2} - match: {response.slices.0.updated: 0} - match: {response.slices.0.version_conflicts: 0} - match: {response.slices.0.throttled_millis: 0} @@ -271,6 +275,7 @@ - match: {task.status.updated: 0} - match: {task.status.version_conflicts: 0} - match: {task.status.throttled_millis: 0} + - length: {task.status.slices: 2} - match: {task.status.slices.0.updated: 0} - match: {task.status.slices.0.version_conflicts: 0} - match: {task.status.slices.0.throttled_millis: 0} @@ -285,3 +290,67 @@ search: index: .tasks - match: { hits.total: 1 } + + +--- +"Multiple slices with auto slice": + - do: + indices.create: + index: source + body: + settings: + index: + number_of_shards: 3 + - do: + index: + index: source + type: foo + id: 1 + body: { "text": "test" } + - do: + index: + index: source + type: foo + id: 2 + body: { "text": "test" } + - do: + index: + index: source + type: foo + id: 3 + body: { "text": "test" } + - do: + index: + index: source + type: foo + id: 4 + body: { "text": "test" } + - do: + indices.refresh: {} + + - do: + reindex: + slices: auto + body: + source: + index: source + dest: + index: dest + - match: {created: 4} + - match: {updated: 0} + - match: {version_conflicts: 0} + - match: {failures: []} + - match: {throttled_millis: 0} + - gte: { took: 0 } + - is_false: task + - is_false: deleted + - length: {slices: 3} + - match: {slices.0.updated: 0} + - match: {slices.0.version_conflicts: 0} + - match: {slices.0.throttled_millis: 0} + - match: {slices.1.updated: 0} + - match: {slices.1.version_conflicts: 0} + - match: {slices.1.throttled_millis: 0} + - match: {slices.2.updated: 0} + - match: {slices.2.version_conflicts: 0} + - match: {slices.2.throttled_millis: 0} diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/20_validation.yml b/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/20_validation.yml index 8f8d492df3a..b7499180cda 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/20_validation.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/20_validation.yml @@ -108,7 +108,7 @@ --- "junk in slices fails": - do: - catch: /Failed to parse int parameter \[slices\] with value \[junk\]/ + catch: /\[slices\] must be a positive integer or the string "auto"/ update_by_query: slices: junk index: test @@ -116,7 +116,7 @@ --- "zero slices fails": - do: - catch: /\[slices\] must be at least 1/ + catch: /\[slices\] must be a positive integer or the string "auto"/ update_by_query: slices: 0 index: test diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/70_slices.yml b/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/70_slices.yml index f390d43f9f3..dbb79f87037 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/70_slices.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/70_slices.yml @@ -44,6 +44,7 @@ - match: {throttled_millis: 0} - gte: { took: 0 } - is_false: task + - length: {slices: 5} - match: {slices.0.version_conflicts: 0} - match: {slices.0.throttled_millis: 0} - match: {slices.1.version_conflicts: 0} @@ -120,6 +121,7 @@ - match: {response.throttled_millis: 0} - gte: { response.took: 0 } - is_false: response.task + - length: {response.slices: 5} - match: {response.slices.0.version_conflicts: 0} - match: {response.slices.0.throttled_millis: 0} - match: {response.slices.1.version_conflicts: 0} @@ -134,6 +136,7 @@ - match: {task.status.updated: 4} - match: {task.status.version_conflicts: 0} - match: {task.status.throttled_millis: 0} + - length: {task.status.slices: 5} - match: {task.status.slices.0.version_conflicts: 0} - match: {task.status.slices.0.throttled_millis: 0} - match: {task.status.slices.1.version_conflicts: 0} @@ -239,6 +242,7 @@ - match: {response.throttled_millis: 0} - gte: { response.took: 0 } - is_false: response.task + - length: {response.slices: 2} - match: {response.slices.0.version_conflicts: 0} - match: {response.slices.0.throttled_millis: 0} - match: {response.slices.1.version_conflicts: 0} @@ -247,6 +251,7 @@ - match: {task.status.updated: 6} - match: {task.status.version_conflicts: 0} - match: {task.status.throttled_millis: 0} + - length: {task.status.slices: 2} - match: {task.status.slices.0.version_conflicts: 0} - match: {task.status.slices.0.throttled_millis: 0} - match: {task.status.slices.1.version_conflicts: 0} @@ -259,3 +264,65 @@ search: index: .tasks - match: { hits.total: 1 } + + +--- +"Multiple slices with auto slice": + - do: + indices.create: + index: test + body: + settings: + index: + number_of_shards: 3 + - do: + index: + index: test + type: foo + id: 1 + body: { "text": "test" } + - do: + index: + index: test + type: foo + id: 2 + body: { "text": "test" } + - do: + index: + index: test + type: foo + id: 3 + body: { "text": "test" } + - do: + index: + index: test + type: foo + id: 4 + body: { "text": "test" } + - do: + indices.refresh: {} + + - do: + update_by_query: + index: test + slices: auto + body: + query: + match_all: {} + + - is_false: timed_out + - match: {updated: 4} + - is_false: created + - match: {version_conflicts: 0} + - match: {failures: []} + - match: {noops: 0} + - match: {throttled_millis: 0} + - gte: { took: 0 } + - is_false: task + - length: {slices: 3} + - match: {slices.0.version_conflicts: 0} + - match: {slices.0.throttled_millis: 0} + - match: {slices.1.version_conflicts: 0} + - match: {slices.1.throttled_millis: 0} + - match: {slices.2.version_conflicts: 0} + - match: {slices.2.throttled_millis: 0} diff --git a/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java index 079c784342b..a0752b00485 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java @@ -32,12 +32,14 @@ public abstract class AbstractAsyncBulkByScrollActionTestCase< Response extends BulkByScrollResponse> extends ESTestCase { protected ThreadPool threadPool; - protected WorkingBulkByScrollTask task; + protected BulkByScrollTask task; @Before public void setupForTest() { threadPool = new TestThreadPool(getTestName()); - task = new WorkingBulkByScrollTask(1, "test", "test", "test", TaskId.EMPTY_TASK_ID, null, Float.MAX_VALUE); + task = new BulkByScrollTask(1, "test", "test", "test", TaskId.EMPTY_TASK_ID); + task.setWorker(Float.POSITIVE_INFINITY, null); + } @After