diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index 0921abcb677..ceb00b2f81f 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -55,7 +55,6 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -66,7 +65,6 @@ import static java.util.Collections.emptyList; import static java.util.Collections.unmodifiableList; import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; -import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES; import static org.elasticsearch.rest.RestStatus.CONFLICT; import static org.elasticsearch.search.sort.SortBuilders.fieldSort; @@ -85,7 +83,6 @@ public abstract class AbstractAsyncBulkByScrollAction scroll = new AtomicReference<>(); - private final AtomicLong lastBatchStartTime = new AtomicLong(-1); private final Set destinationIndices = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final ESLogger logger; @@ -147,16 +144,17 @@ public abstract class AbstractAsyncBulkByScrollAction client.search(firstSearchRequest, listener), (SearchResponse response) -> { logger.debug("[{}] documents match query", response.getHits().getTotalHits()); - onScrollResponse(timeValueSeconds(0), response); + onScrollResponse(timeValueNanos(System.nanoTime()), 0, response); }); } /** * Process a scroll response. - * @param delay how long to delay processesing the response. This delay is how throttling is applied to the action. + * @param lastBatchStartTime the time when the last batch started. Used to calculate the throttling delay. + * @param lastBatchSize the size of the last batch. Used to calculate the throttling delay. * @param searchResponse the scroll response to process */ - void onScrollResponse(TimeValue delay, SearchResponse searchResponse) { + void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, SearchResponse searchResponse) { if (task.isCancelled()) { finishHim(null); return; @@ -179,7 +177,11 @@ public abstract class AbstractAsyncBulkByScrollAction() { @Override public void onResponse(BulkResponse response) { - onBulkResponse(response); + onBulkResponse(thisBatchStartTime, response); } @Override @@ -258,7 +259,7 @@ public abstract class AbstractAsyncBulkByScrollAction failures = new ArrayList(); Set destinationIndicesThisBatch = new HashSet<>(); @@ -306,7 +307,7 @@ public abstract class AbstractAsyncBulkByScrollAction client.searchScroll(request, listener), (SearchResponse response) -> { - onScrollResponse(timeValueNanos(max(0, earliestNextBatchStartTime - System.nanoTime())), response); + onScrollResponse(lastBatchStartTime, lastBatchSize, response); }); } - /** - * How many nanoseconds should a batch of lastBatchSize have taken if it were perfectly throttled? Package private for testing. - */ - float perfectlyThrottledBatchTime(int lastBatchSize) { - if (task.getRequestsPerSecond() == Float.POSITIVE_INFINITY) { - return 0; - } - // requests - // ------------------- == seconds - // request per seconds - float targetBatchTimeInSeconds = lastBatchSize / task.getRequestsPerSecond(); - // nanoseconds per seconds * seconds == nanoseconds - return TimeUnit.SECONDS.toNanos(1) * targetBatchTimeInSeconds; - } - private void recordFailure(Failure failure, List failures) { if (failure.getStatus() == CONFLICT) { task.countVersionConflict(); @@ -453,20 +438,6 @@ public abstract class AbstractAsyncBulkByScrollAction assertEquals(client.scrollsToReject + 1, client.scrollAttempts.get())); if (listener.isDone()) { Object result = listener.get(); @@ -203,7 +202,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { client.scrollsToReject = testRequest.getMaxRetries() + randomIntBetween(1, 100); DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff(); action.setScroll(scrollId()); - action.startNextScroll(0); + action.startNextScroll(timeValueNanos(System.nanoTime()), 0); assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.scrollAttempts.get())); assertBusy(() -> assertTrue(listener.isDone())); ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get()); @@ -219,7 +218,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { long total = randomIntBetween(0, Integer.MAX_VALUE); InternalSearchHits hits = new InternalSearchHits(null, total, 0); InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); - new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0), + new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0), 0, new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); assertEquals(total, testTask.getStatus().getTotal()); } @@ -230,24 +229,17 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { public void testScrollResponseBatchingBehavior() throws Exception { int maxBatches = randomIntBetween(0, 100); for (int batches = 1; batches < maxBatches; batches++) { - long now = System.nanoTime(); InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap()); InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] { hit }, 0, 0); InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); - action.onScrollResponse(timeValueSeconds(0), + action.onScrollResponse(timeValueNanos(System.nanoTime()), 0, new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); // Use assert busy because the update happens on another thread final int expectedBatches = batches; assertBusy(() -> assertEquals(expectedBatches, testTask.getStatus().getBatches())); - /* - * While we're here we can check that getting a scroll response sets the last scroll start time which makes sure the wait time - * isn't counted as time that the last batch took. - */ - assertThat(action.getLastBatchStartTime(), greaterThanOrEqualTo(now)); - /* * Also while we're here check that we preserved the headers from the last request. assertBusy because no requests might have * come in yet. @@ -297,7 +289,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { } responses[i] = new BulkItemResponse(i, opType, new IndexResponse(shardId, "type", "id" + i, randomInt(), createdResponse)); } - new DummyAbstractAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0)); + new DummyAbstractAsyncBulkByScrollAction().onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(responses, 0)); assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts()); assertEquals(updated, testTask.getStatus().getUpdated()); assertEquals(created, testTask.getStatus().getCreated()); @@ -310,20 +302,22 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { * Mimicks a ThreadPool rejecting execution of the task. */ public void testThreadPoolRejectionsAbortRequest() throws Exception { - TimeValue expectedDelay = parseTimeValue(randomPositiveTimeValue(), "test"); + testTask.rethrottle(1); threadPool.shutdown(); threadPool = new TestThreadPool(getTestName()) { @Override public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { - assertEquals(expectedDelay, delay); // While we're here we can check that the sleep made it through + // While we're here we can check that the sleep made it through + assertThat(delay.nanos(), greaterThan(0L)); + assertThat(delay.seconds(), lessThanOrEqualTo(10L)); ((AbstractRunnable) command).onRejection(new EsRejectedExecutionException("test")); return null; } }; InternalSearchHits hits = new InternalSearchHits(null, 0, 0); InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); - new DummyAbstractAsyncBulkByScrollAction() - .onScrollResponse(expectedDelay, new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); + new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueNanos(System.nanoTime()), 10, + new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); try { listener.get(); fail("Expected a failure"); @@ -343,7 +337,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { public void testShardFailuresAbortRequest() throws Exception { ShardSearchFailure shardFailure = new ShardSearchFailure(new RuntimeException("test")); InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, false, null); - new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0), + new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueNanos(System.nanoTime()), 0, new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[] { shardFailure })); BulkIndexByScrollResponse response = listener.get(); assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class)); @@ -358,7 +352,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { */ public void testSearchTimeoutsAbortRequest() throws Exception { InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, true, null); - new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0), + new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueNanos(System.nanoTime()), 0, new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[0])); BulkIndexByScrollResponse response = listener.get(); assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class)); @@ -368,14 +362,14 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { assertThat(client.scrollsCleared, contains(scrollId)); } - /** * Mimicks bulk indexing failures. */ public void testBulkFailuresAbortRequest() throws Exception { Failure failure = new Failure("index", "type", "id", new RuntimeException("test")); DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); - action.onBulkResponse(new BulkResponse(new BulkItemResponse[] {new BulkItemResponse(0, "index", failure)}, randomLong())); + BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[] {new BulkItemResponse(0, "index", failure)}, randomLong()); + action.onBulkResponse(timeValueNanos(System.nanoTime()), bulkResponse); BulkIndexByScrollResponse response = listener.get(); assertThat(response.getIndexingFailures(), contains(failure)); assertThat(response.getSearchFailures(), emptyCollectionOf(ShardSearchFailure.class)); @@ -394,15 +388,12 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { }; InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap()); InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] {hit}, 0, 0); - InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); - action.onScrollResponse(timeValueSeconds(0), new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); - try { - listener.get(); - fail("Expected failure."); - } catch (ExecutionException e) { - assertThat(e.getCause(), instanceOf(RuntimeException.class)); - assertThat(e.getCause().getMessage(), equalTo("surprise")); - } + InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false); + SearchResponse searchResponse = new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), null); + action.onScrollResponse(timeValueNanos(System.nanoTime()), 0, searchResponse); + ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get()); + assertThat(e.getCause(), instanceOf(RuntimeException.class)); + assertThat(e.getCause().getMessage(), equalTo("surprise")); } /** @@ -426,17 +417,6 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { assertEquals(testRequest.getMaxRetries(), testTask.getStatus().getBulkRetries()); } - public void testPerfectlyThrottledBatchTime() { - DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); - testRequest.setRequestsPerSecond(Float.POSITIVE_INFINITY); - assertThat((double) action.perfectlyThrottledBatchTime(randomInt()), closeTo(0f, 0f)); - - int total = between(0, 1000000); - testTask.rethrottle(1); - assertThat((double) action.perfectlyThrottledBatchTime(total), - closeTo(TimeUnit.SECONDS.toNanos(total), TimeUnit.SECONDS.toNanos(1))); - } - public void testScrollDelay() throws Exception { /* * Replace the thread pool with one that will save the delay sent for the command. We'll use that to check that we used a proper @@ -460,12 +440,10 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { // Set the base for the scroll to wait - this is added to the figure we calculate below firstSearchRequest.scroll(timeValueSeconds(10)); - // We'd like to get about 1 request a second + // Set throttle to 1 request per second to make the math simpler testTask.rethrottle(1f); - // Make the last scroll look nearly instant - action.setLastBatchStartTime(System.nanoTime()); - // The last batch had 100 documents - action.startNextScroll(100); + // Make the last batch look nearly instant but have 100 documents + action.startNextScroll(timeValueNanos(System.nanoTime()), 100); // So the next request is going to have to wait an extra 100 seconds or so (base was 10 seconds, so 110ish) assertThat(client.lastScroll.get().request.scroll().keepAlive().seconds(), either(equalTo(110L)).or(equalTo(109L))); @@ -473,11 +451,20 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { // Now we can simulate a response and check the delay that we used for the task InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap()); InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] { hit }, 0, 0); - InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); - client.lastScroll.get().listener.onResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); + InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false); + SearchResponse searchResponse = new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), null); - // The delay is still 100ish seconds because there hasn't been much time between when we requested the bulk and when we got it. - assertThat(capturedDelay.get().seconds(), either(equalTo(100L)).or(equalTo(99L))); + if (randomBoolean()) { + client.lastScroll.get().listener.onResponse(searchResponse); + // The delay is still 100ish seconds because there hasn't been much time between when we requested the bulk and when we got it. + assertThat(capturedDelay.get().seconds(), either(equalTo(100L)).or(equalTo(99L))); + } else { + // Let's rethrottle between the starting the scroll and getting the response + testTask.rethrottle(10f); + client.lastScroll.get().listener.onResponse(searchResponse); + // The delay uses the new throttle + assertThat(capturedDelay.get().seconds(), either(equalTo(10L)).or(equalTo(9L))); + } // Running the command ought to increment the delay counter on the task. capturedCommand.get().run(); @@ -501,7 +488,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { CountDownLatch successLatch = new CountDownLatch(1); DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff() { @Override - void startNextScroll(int lastBatchSize) { + void startNextScroll(TimeValue lastBatchStartTime, int lastBatchSize) { successLatch.countDown(); } }; @@ -509,7 +496,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { for (int i = 0; i < size + 1; i++) { request.add(new IndexRequest("index", "type", "id" + i)); } - action.sendBulkRequest(request); + action.sendBulkRequest(timeValueNanos(System.nanoTime()), request); if (failWithRejection) { BulkIndexByScrollResponse response = listener.get(); assertThat(response.getIndexingFailures(), hasSize(1)); @@ -576,22 +563,23 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { public void testCancelBeforeScrollResponse() throws Exception { // We bail so early we don't need to pass in a half way valid response. - cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.onScrollResponse(timeValueSeconds(0), null)); + cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.onScrollResponse(timeValueNanos(System.nanoTime()), 1, + null)); } public void testCancelBeforeSendBulkRequest() throws Exception { // We bail so early we don't need to pass in a half way valid request. - cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.sendBulkRequest(null)); + cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.sendBulkRequest(timeValueNanos(System.nanoTime()), null)); } public void testCancelBeforeOnBulkResponse() throws Exception { // We bail so early we don't need to pass in a half way valid response. cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> - action.onBulkResponse(new BulkResponse(new BulkItemResponse[0], 0))); + action.onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(new BulkItemResponse[0], 0))); } public void testCancelBeforeStartNextScroll() throws Exception { - cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNextScroll(0)); + cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNextScroll(timeValueNanos(System.nanoTime()), 0)); } public void testCancelBeforeStartNormalTermination() throws Exception { @@ -640,7 +628,9 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { InternalSearchHits hits = new InternalSearchHits(null, total, 0); InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); // Use a long delay here so the test will time out if the cancellation doesn't reschedule the throttled task - action.onScrollResponse(timeValueMinutes(10), new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); + SearchResponse scrollResponse = new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null); + testTask.rethrottle(1); + action.onScrollResponse(timeValueNanos(System.nanoTime()), 1000, scrollResponse); // Now that we've got our cancel we'll just verify that it all came through all right assertEquals(reason, listener.get(10, TimeUnit.SECONDS).getReasonCancelled()); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java index fd1a17a439d..05699c6f7af 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java @@ -42,8 +42,10 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; +import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -165,10 +167,11 @@ public class BulkByScrollTaskTests extends ESTestCase { * We never end up waiting this long because the test rethrottles over and over again, ratcheting down the delay a random amount * each time. */ - float originalRequestsPerSecond = (float) randomDoubleBetween(0, 10000, true); + float originalRequestsPerSecond = (float) randomDoubleBetween(1, 10000, true); task.rethrottle(originalRequestsPerSecond); TimeValue maxDelay = timeValueSeconds(between(1, 5)); assertThat(maxDelay.nanos(), greaterThanOrEqualTo(0L)); + int batchSizeForMaxDelay = (int) (maxDelay.seconds() * originalRequestsPerSecond); ThreadPool threadPool = new TestThreadPool(getTestName()) { @Override public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { @@ -177,7 +180,7 @@ public class BulkByScrollTaskTests extends ESTestCase { } }; try { - task.delayPrepareBulkRequest(threadPool, maxDelay, new AbstractRunnable() { + task.delayPrepareBulkRequest(threadPool, timeValueNanos(System.nanoTime()), batchSizeForMaxDelay, new AbstractRunnable() { @Override protected void doRun() throws Exception { boolean oldValue = done.getAndSet(true); @@ -263,7 +266,7 @@ public class BulkByScrollTaskTests extends ESTestCase { }; try { // Have the task use the thread pool to delay a task that does nothing - task.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), new AbstractRunnable() { + task.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), 1, new AbstractRunnable() { @Override protected void doRun() throws Exception { } @@ -284,4 +287,14 @@ public class BulkByScrollTaskTests extends ESTestCase { task.getStatus().toXContent(builder, ToXContent.EMPTY_PARAMS); assertThat(builder.string(), containsString("\"requests_per_second\":\"unlimited\"")); } + + public void testPerfectlyThrottledBatchTime() { + task.rethrottle(Float.POSITIVE_INFINITY); + assertThat((double) task.perfectlyThrottledBatchTime(randomInt()), closeTo(0f, 0f)); + + int total = between(0, 1000000); + task.rethrottle(1); + assertThat((double) task.perfectlyThrottledBatchTime(total), + closeTo(TimeUnit.SECONDS.toNanos(total), TimeUnit.SECONDS.toNanos(1))); + } }