From da96b6e41d1b44eec8bfa170d8d0d8647e57dfef Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Wed, 9 Mar 2016 15:53:06 -0500 Subject: [PATCH] [reindex] Add thottling support The throttle is applied when starting the next scroll request so that its timeout can include the throttle time. --- .../common/xcontent/XContentBuilder.java | 4 + docs/reference/docs/reindex.asciidoc | 27 ++- docs/reference/docs/update-by-query.asciidoc | 24 ++- .../AbstractAsyncBulkByScrollAction.java | 104 ++++++--- .../AbstractBaseReindexRestHandler.java | 9 +- .../reindex/AbstractBulkByScrollRequest.java | 24 +++ .../index/reindex/BulkByScrollTask.java | 29 ++- .../reindex/AsyncBulkByScrollActionTests.java | 199 ++++++++++++++---- .../index/reindex/BulkByScrollTaskTests.java | 20 +- .../index/reindex/RoundTripTests.java | 7 +- .../rest-api-spec/test/reindex/10_basic.yaml | 4 + .../test/reindex/80_throttle.yaml | 53 +++++ .../test/update-by-query/10_basic.yaml | 4 + .../test/update-by-query/70_throttle.yaml | 39 ++++ .../resources/rest-api-spec/api/reindex.json | 11 +- .../rest-api-spec/api/update-by-query.json | 5 + 16 files changed, 476 insertions(+), 87 deletions(-) create mode 100644 modules/reindex/src/test/resources/rest-api-spec/test/reindex/80_throttle.yaml create mode 100644 modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/70_throttle.yaml diff --git a/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java b/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java index 8ca53af186c..2229d45840b 100644 --- a/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java +++ b/core/src/main/java/org/elasticsearch/common/xcontent/XContentBuilder.java @@ -961,6 +961,10 @@ public final class XContentBuilder implements BytesStream, Releasable { return this; } + public XContentBuilder timeValueField(String rawFieldName, String readableFieldName, TimeValue timeValue) throws IOException { + return timeValueField(rawFieldName, readableFieldName, timeValue.millis(), TimeUnit.MILLISECONDS); + } + public XContentBuilder timeValueField(String rawFieldName, String readableFieldName, long rawTime, TimeUnit timeUnit) throws IOException { if (humanReadable) { diff --git a/docs/reference/docs/reindex.asciidoc b/docs/reference/docs/reindex.asciidoc index 8173503054f..5f4641ca187 100644 --- a/docs/reference/docs/reindex.asciidoc +++ b/docs/reference/docs/reindex.asciidoc @@ -299,7 +299,8 @@ POST /_reindex === URL Parameters In addition to the standard parameters like `pretty`, the Reindex API also -supports `refresh`, `wait_for_completion`, `consistency`, and `timeout`. +supports `refresh`, `wait_for_completion`, `consistency`, `timeout`, and +`requests_per_second`. Sending the `refresh` url parameter will cause all indexes to which the request wrote to be refreshed. This is different than the Index API's `refresh` @@ -317,8 +318,14 @@ request. `timeout` controls how long each write request waits for unavailable shards to become available. Both work exactly how they work in the {ref}/docs-bulk.html[Bulk API]. -`timeout` controls how long each batch waits for the target shard to become -available. It works exactly how it works in the {ref}/docs-bulk.html[Bulk API]. +`requests_per_second` can be set to any decimal number (1.4, 6, 1000, etc) and +throttle the number of requests per second that the reindex issues. The +throttling is done waiting between bulk batches so that it can manipulate the +scroll timeout. The wait time is the difference between the time it took the +batch to complete and the time `requests_per_second * requests_in_the_batch`. +Since the batch isn't broken into multiple bulk requests large batch sizes will +cause Elasticsearch to create many requests and then wait for a while before +starting the next set. This is "bursty" instead of "smooth". [float] === Response body @@ -333,6 +340,8 @@ The JSON response looks like this: "created": 123, "batches": 1, "version_conflicts": 2, + "retries": 0, + "throttled_millis": 0, "failures" : [ ] } -------------------------------------------------- @@ -357,6 +366,14 @@ The number of scroll responses pulled back by the the reindex. The number of version conflicts that reindex hit. +`retries`:: + +The number of retries that the reindex did in response to a full queue. + +`throttled_millis`:: + +Number of milliseconds the request slept to conform to `requests_per_second`. + `failures`:: Array of all indexing failures. If this is non-empty then the request aborted @@ -403,7 +420,9 @@ The responses looks like: "deleted" : 0, "batches" : 36, "version_conflicts" : 0, - "noops" : 0 + "noops" : 0, + "retries": 0, + "throttled_millis": 0 }, "description" : "" } ] diff --git a/docs/reference/docs/update-by-query.asciidoc b/docs/reference/docs/update-by-query.asciidoc index c0336491d9a..52667bf79a8 100644 --- a/docs/reference/docs/update-by-query.asciidoc +++ b/docs/reference/docs/update-by-query.asciidoc @@ -169,8 +169,14 @@ request. `timeout` controls how long each write request waits for unavailable shards to become available. Both work exactly how they work in the {ref}/docs-bulk.html[Bulk API]. -`timeout` controls how long each batch waits for the target shard to become -available. It works exactly how it works in the {ref}/docs-bulk.html[Bulk API]. +`requests_per_second` can be set to any decimal number (1.4, 6, 1000, etc) and +throttle the number of requests per second that the update by query issues. The +throttling is done waiting between bulk batches so that it can manipulate the +scroll timeout. The wait time is the difference between the time it took the +batch to complete and the time `requests_per_second * requests_in_the_batch`. +Since the batch isn't broken into multiple bulk requests large batch sizes will +cause Elasticsearch to create many requests and then wait for a while before +starting the next set. This is "bursty" instead of "smooth". [float] === Response body @@ -184,6 +190,8 @@ The JSON response looks like this: "updated": 0, "batches": 1, "version_conflicts": 2, + "retries": 0, + "throttled_millis": 0, "failures" : [ ] } -------------------------------------------------- @@ -204,6 +212,14 @@ The number of scroll responses pulled back by the the update by query. The number of version conflicts that the update by query hit. +`retries`:: + +The number of retries that the update by query did in response to a full queue. + +`throttled_millis`:: + +Number of milliseconds the request slept to conform to `requests_per_second`. + `failures`:: Array of all indexing failures. If this is non-empty then the request aborted @@ -251,7 +267,9 @@ The responses looks like: "deleted" : 0, "batches" : 36, "version_conflicts" : 0, - "noops" : 0 + "noops" : 0, + "retries": 0, + "throttled_millis": 0 }, "description" : "" } ] diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index 2eb0cc5ba78..a1564aa136a 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -54,6 +54,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -63,13 +64,14 @@ import static java.util.Collections.emptyList; import static java.util.Collections.unmodifiableList; import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES; import static org.elasticsearch.rest.RestStatus.CONFLICT; import static org.elasticsearch.search.sort.SortBuilders.fieldSort; /** * Abstract base for scrolling across a search and executing bulk actions on all - * results. + * results. All package private methods are package private so their tests can use them. */ public abstract class AbstractAsyncBulkByScrollAction, Response> { /** @@ -81,6 +83,7 @@ public abstract class AbstractAsyncBulkByScrollAction scroll = new AtomicReference<>(); + private final AtomicLong lastBatchStartTime = new AtomicLong(-1); private final Set destinationIndices = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final ESLogger logger; @@ -107,15 +110,10 @@ public abstract class AbstractAsyncBulkByScrollAction indexingFailures, List searchFailures, boolean timedOut); + /** + * Start the action by firing the initial search request. + */ public void start() { - initialSearch(); - } - - public BulkByScrollTask getTask() { - return task; - } - - void initialSearch() { if (task.isCancelled()) { finishHim(null); return; @@ -137,7 +135,7 @@ public abstract class AbstractAsyncBulkByScrollAction() { @Override public void onResponse(SearchResponse response) { - onScrollResponse(response); + onScrollResponse(timeValueNanos(max(0, earliestNextBatchStartTime - System.nanoTime())), response); } @Override @@ -308,6 +325,21 @@ public abstract class AbstractAsyncBulkByScrollAction failures) { if (failure.getStatus() == CONFLICT) { task.countVersionConflict(); @@ -318,6 +350,9 @@ public abstract class AbstractAsyncBulkByScrollAction indexingFailures, List searchFailures, boolean timedOut) { if (task.isCancelled() || false == mainRequest.isRefresh()) { finishHim(null, indexingFailures, searchFailures, timedOut); @@ -385,7 +420,7 @@ public abstract class AbstractAsyncBulkByScrollAction, Response extends BulkIndexByScrollResponse, - TA extends TransportAction> extends BaseRestHandler { +public abstract class AbstractBaseReindexRestHandler< + Request extends AbstractBulkByScrollRequest, + Response extends BulkIndexByScrollResponse, + TA extends TransportAction + > extends BaseRestHandler { protected final IndicesQueriesRegistry indicesQueriesRegistry; protected final AggregatorParsers aggParsers; protected final Suggesters suggesters; @@ -59,6 +61,7 @@ public abstract class AbstractBaseReindexRestHandler(channel)); return; diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java index 41b436e6074..29e2acb352e 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java @@ -85,6 +85,13 @@ public abstract class AbstractBulkByScrollRequest 0) { + throttledNanos.addAndGet(nanos); + } + } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index a4e9c42a33e..daf9a15daea 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.ReplicationRequest; @@ -67,12 +68,14 @@ import org.junit.After; import org.junit.Before; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -83,10 +86,15 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; import static org.apache.lucene.util.TestUtil.randomSimpleString; import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff; +import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; +import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.emptyCollectionOf; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; @@ -99,17 +107,24 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { private String scrollId; private TaskManager taskManager; private BulkByScrollTask task; + private Map expectedHeaders = new HashMap<>(); @Before public void setupForTest() { client = new MyMockClient(new NoOpClient(getTestName())); threadPool = new ThreadPool(getTestName()); mainRequest = new DummyAbstractBulkByScrollRequest(); - firstSearchRequest = null; + firstSearchRequest = new SearchRequest().scroll(timeValueSeconds(10)); listener = new PlainActionFuture<>(); scrollId = null; taskManager = new TaskManager(Settings.EMPTY); task = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", mainRequest); + + // Fill the context with something random so we can make sure we inherited it appropriately. + expectedHeaders.clear(); + expectedHeaders.put(randomSimpleString(random()), randomSimpleString(random())); + threadPool.getThreadContext().newStoredContext(); + threadPool.getThreadContext().putHeader(expectedHeaders); } @After @@ -135,34 +150,35 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { long total = randomIntBetween(0, Integer.MAX_VALUE); InternalSearchHits hits = new InternalSearchHits(null, total, 0); InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); - new DummyAbstractAsyncBulkByScrollAction() - .onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); + new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0), + new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); assertEquals(total, task.getStatus().getTotal()); } - public void testEachScrollResponseIsABatch() { - // Replace the generic thread pool with one that executes immediately so the batch is updated immediately - threadPool.shutdown(); - threadPool = new ThreadPool(getTestName()) { - @Override - public Executor generic() { - return new Executor() { - @Override - public void execute(Runnable command) { - command.run(); - } - }; - } - }; + /** + * Tests that each scroll response is a batch and that the batch is launched properly. + */ + public void testScrollResponseBatchingBehavior() throws Exception { int maxBatches = randomIntBetween(0, 100); for (int batches = 1; batches < maxBatches; batches++) { + long now = System.nanoTime(); InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap()); InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] { hit }, 0, 0); InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); - new DummyAbstractAsyncBulkByScrollAction() - .onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); + DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); + action.onScrollResponse(timeValueSeconds(0), + new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); - assertEquals(batches, task.getStatus().getBatches()); + // Use assert busy because the update happens on another thread + final int expectedBatches = batches; + assertBusy(() -> assertEquals(expectedBatches, task.getStatus().getBatches())); + + /* + * While we're here we can check that getting a scroll response sets the last scroll start time which makes sure the wait time + * isn't counted as time that the last batch took. + */ + assertThat(action.getLastBatchStartTime(), greaterThanOrEqualTo(now)); + assertEquals(expectedHeaders, client.lastHeaders.get()); } } @@ -220,22 +236,20 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { * Mimicks a ThreadPool rejecting execution of the task. */ public void testThreadPoolRejectionsAbortRequest() throws Exception { + TimeValue expectedDelay = parseTimeValue(randomPositiveTimeValue(), "test"); threadPool.shutdown(); threadPool = new ThreadPool(getTestName()) { @Override - public Executor generic() { - return new Executor() { - @Override - public void execute(Runnable command) { - ((AbstractRunnable) command).onRejection(new EsRejectedExecutionException("test")); - } - }; + public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { + assertEquals(expectedDelay, delay); // While we're here we can check that the sleep made it through + ((AbstractRunnable) command).onRejection(new EsRejectedExecutionException("test")); + return null; } }; InternalSearchHits hits = new InternalSearchHits(null, 0, 0); InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); new DummyAbstractAsyncBulkByScrollAction() - .onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); + .onScrollResponse(expectedDelay, new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); try { listener.get(); fail("Expected a failure"); @@ -243,6 +257,9 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { assertThat(e.getMessage(), equalTo("EsRejectedExecutionException[test]")); } assertThat(client.scrollsCleared, contains(scrollId)); + + // While we're mocking the threadPool lets also check that we incremented the throttle counter + assertEquals(expectedDelay, task.getStatus().getThrottled()); } /** @@ -252,7 +269,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { public void testShardFailuresAbortRequest() throws Exception { ShardSearchFailure shardFailure = new ShardSearchFailure(new RuntimeException("test")); InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, false, null); - new DummyAbstractAsyncBulkByScrollAction().onScrollResponse( + new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0), new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[] { shardFailure })); BulkIndexByScrollResponse response = listener.get(); assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class)); @@ -267,8 +284,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { */ public void testSearchTimeoutsAbortRequest() throws Exception { InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, true, null); - new DummyAbstractAsyncBulkByScrollAction() - .onScrollResponse(new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[0])); + new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0), + new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[0])); BulkIndexByScrollResponse response = listener.get(); assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class)); assertThat(response.getSearchFailures(), emptyCollectionOf(ShardSearchFailure.class)); @@ -304,7 +321,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap()); InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] {hit}, 0, 0); InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); - action.onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); + action.onScrollResponse(timeValueSeconds(0), new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); try { listener.get(); fail("Expected failure."); @@ -334,6 +351,55 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { assertEquals(retryAttempts, task.getStatus().getRetries()); } + public void testPerfectlyThrottledBatchTime() { + DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); + mainRequest.setRequestsPerSecond(0); + assertThat((double) action.perfectlyThrottledBatchTime(randomInt()), closeTo(0f, 0f)); + + int total = between(0, 1000000); + mainRequest.setRequestsPerSecond(1); + assertThat((double) action.perfectlyThrottledBatchTime(total), + closeTo(TimeUnit.SECONDS.toNanos(total), TimeUnit.SECONDS.toNanos(1))); + } + + public void testScrollDelay() throws Exception { + /* + * Replace the thread pool with one that will save the delay sent for the command. We'll use that to check that we used a proper + * delay for throttling. + */ + AtomicReference capturedDelay = new AtomicReference<>(); + threadPool.shutdown(); + threadPool = new ThreadPool(getTestName()) { + @Override + public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { + capturedDelay.set(delay); + return null; + } + }; + + DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); + action.setScroll(scrollId()); + + // We'd like to get about 1 request a second + mainRequest.setRequestsPerSecond(1f); + // Make the last scroll look nearly instant + action.setLastBatchStartTime(System.nanoTime()); + // The last batch had 100 documents + action.startNextScroll(100); + + // So the next request is going to have to wait an extra 100 seconds or so (base was 10, so 110ish) + assertThat(client.lastScroll.get().request.scroll().keepAlive().seconds(), either(equalTo(110L)).or(equalTo(109L))); + + // Now we can simulate a response and check the delay that we used for the task + InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap()); + InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] { hit }, 0, 0); + InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); + client.lastScroll.get().listener.onResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); + + // The delay is still 100ish seconds because there hasn't been much time between when we requested the bulk and when we got it. + assertThat(capturedDelay.get().seconds(), either(equalTo(100L)).or(equalTo(99L))); + } + private long retryTestCase(boolean failWithRejection) throws Exception { int totalFailures = randomIntBetween(1, mainRequest.getMaxRetries()); int size = randomIntBetween(1, 100); @@ -353,7 +419,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { } @Override - void startNextScroll() { + void startNextScroll(int lastBatchSize) { successLatch.countDown(); } }; @@ -418,12 +484,12 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { } public void testCancelBeforeInitialSearch() throws Exception { - cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.initialSearch()); + cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.start()); } public void testCancelBeforeScrollResponse() throws Exception { // We bail so early we don't need to pass in a half way valid response. - cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.onScrollResponse(null)); + cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.onScrollResponse(timeValueSeconds(0), null)); } public void testCancelBeforeSendBulkRequest() throws Exception { @@ -437,7 +503,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { } public void testCancelBeforeStartNextScroll() throws Exception { - cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNextScroll()); + cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNextScroll(0)); } public void testCancelBeforeStartNormalTermination() throws Exception { @@ -447,6 +513,46 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { assertNull("No refresh was attempted", client.lastRefreshRequest.get()); } + /** + * Tests that we can cancel the request during its throttling delay. This can't use {@link #cancelTaskCase(Consumer)} because it needs + * to send the request un-canceled and cancel it at a specific time. + */ + public void testCancelWhileDelayedAfterScrollResponse() throws Exception { + String reason = randomSimpleString(random()); + + /* + * Replace the thread pool with one that will cancel the task as soon as anything is scheduled, which reindex tries to do when there + * is a delay. + */ + threadPool.shutdown(); + threadPool = new ThreadPool(getTestName()) { + @Override + public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { + taskManager.cancel(task, reason, (Set s) -> {}); + command.run(); + return null; + } + }; + + // Send the scroll response which will trigger the custom thread pool above, canceling the request before running the response + DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); + boolean previousScrollSet = usually(); + if (previousScrollSet) { + action.setScroll(scrollId()); + } + long total = randomIntBetween(0, Integer.MAX_VALUE); + InternalSearchHits hits = new InternalSearchHits(null, total, 0); + InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); + action.onScrollResponse(timeValueSeconds(0), new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); + + // Now that we've got our cancel we'll just verify that it all came through allright + assertEquals(reason, listener.get().getReasonCancelled()); + if (previousScrollSet) { + // Canceled tasks always start to clear the scroll before they die. + assertThat(client.scrollsCleared, contains(scrollId)); + } + } + private void cancelTaskCase(Consumer testMe) throws Exception { DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); boolean previousScrollSet = usually(); @@ -489,10 +595,12 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { } } - private static class MyMockClient extends FilterClient { + private class MyMockClient extends FilterClient { private final List scrollsCleared = new ArrayList<>(); private final AtomicInteger bulksAttempts = new AtomicInteger(); + private final AtomicReference> lastHeaders = new AtomicReference<>(); private final AtomicReference lastRefreshRequest = new AtomicReference<>(); + private final AtomicReference> lastScroll = new AtomicReference<>(); private int bulksToReject = 0; @@ -505,11 +613,16 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { protected , Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder> void doExecute( Action action, Request request, ActionListener listener) { + lastHeaders.set(threadPool.getThreadContext().getHeaders()); if (request instanceof RefreshRequest) { lastRefreshRequest.set((RefreshRequest) request); listener.onResponse(null); return; } + if (request instanceof SearchScrollRequest) { + lastScroll.set(new RequestAndListener<>((SearchScrollRequest) request, (ActionListener) listener)); + return; + } if (request instanceof ClearScrollRequest) { ClearScrollRequest clearScroll = (ClearScrollRequest) request; scrollsCleared.addAll(clearScroll.getScrollIds()); @@ -561,4 +674,14 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { super.doExecute(action, request, listener); } } + + private static class RequestAndListener, Response> { + private final Request request; + private final ActionListener listener; + + public RequestAndListener(Request request, ActionListener listener) { + this.request = request; + this.listener = listener; + } + } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java index 81a3a2cc706..442943be21f 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java @@ -19,9 +19,12 @@ package org.elasticsearch.index.reindex; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.junit.Before; +import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; + public class BulkByScrollTaskTests extends ESTestCase { private BulkByScrollTask task; @@ -101,13 +104,14 @@ public class BulkByScrollTaskTests extends ESTestCase { } public void testStatusHatesNegatives() { - expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(-1, 0, 0, 0, 0, 0, 0, 0, null)); - expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, -1, 0, 0, 0, 0, 0, 0, null)); - expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, -1, 0, 0, 0, 0, 0, null)); - expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, -1, 0, 0, 0, 0, null)); - expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, -1, 0, 0, 0, null)); - expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, -1, 0, 0, null)); - expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, -1, 0, null)); - expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, 0, -1, null)); + TimeValue throttle = parseTimeValue(randomPositiveTimeValue(), "test"); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(-1, 0, 0, 0, 0, 0, 0, 0, throttle, null)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, -1, 0, 0, 0, 0, 0, 0, throttle, null)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, -1, 0, 0, 0, 0, 0, throttle, null)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, -1, 0, 0, 0, 0, throttle, null)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, -1, 0, 0, 0, throttle, null)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, -1, 0, 0, throttle, null)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, -1, 0, throttle, null)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, 0, -1, throttle, null)); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java index 6e1cbb59e86..cfa763e5dba 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java @@ -42,6 +42,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static org.apache.lucene.util.TestUtil.randomSimpleString; +import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; /** @@ -77,6 +78,7 @@ public class RoundTripTests extends ESTestCase { request.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), null, "test")); request.setConsistency(randomFrom(WriteConsistencyLevel.values())); request.setScript(random().nextBoolean() ? null : randomScript()); + request.setRequestsPerSecond(between(0, Integer.MAX_VALUE)); } private void assertRequestEquals(AbstractBulkIndexByScrollRequest request, @@ -90,6 +92,7 @@ public class RoundTripTests extends ESTestCase { assertEquals(request.getScript(), tripped.getScript()); assertEquals(request.getRetryBackoffInitialTime(), tripped.getRetryBackoffInitialTime()); assertEquals(request.getMaxRetries(), tripped.getMaxRetries()); + assertEquals(request.getRequestsPerSecond(), tripped.getRequestsPerSecond(), 0d); } public void testBulkByTaskStatus() throws IOException { @@ -119,7 +122,7 @@ public class RoundTripTests extends ESTestCase { private BulkByScrollTask.Status randomStatus() { return new BulkByScrollTask.Status(randomPositiveLong(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(), randomPositiveInt(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(), - random().nextBoolean() ? null : randomSimpleString(random())); + parseTimeValue(randomPositiveTimeValue(), "test"), random().nextBoolean() ? null : randomSimpleString(random())); } private List randomIndexingFailures() { @@ -194,5 +197,7 @@ public class RoundTripTests extends ESTestCase { assertEquals(expected.getVersionConflicts(), actual.getVersionConflicts()); assertEquals(expected.getNoops(), actual.getNoops()); assertEquals(expected.getRetries(), actual.getRetries()); + assertEquals(expected.getThrottled(), actual.getThrottled()); + assertEquals(expected.getReasonCancelled(), actual.getReasonCancelled()); } } diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml index 31e97967af0..413c8d1c143 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml @@ -21,6 +21,7 @@ - match: {version_conflicts: 0} - match: {batches: 1} - match: {failures: []} + - match: {throttled_millis: 0} - is_true: took - is_false: task @@ -53,6 +54,7 @@ - match: {version_conflicts: 0} - match: {batches: 1} - match: {failures: []} + - match: {throttled_millis: 0} - is_true: took - is_false: task @@ -84,6 +86,7 @@ - is_false: failures - is_false: noops - is_false: took + - is_false: throttled_millis - is_false: created - do: @@ -163,6 +166,7 @@ - match: {version_conflicts: 1} - match: {batches: 1} - match: {failures: []} + - match: {throttled_millis: 0} - is_true: took --- diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/80_throttle.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/80_throttle.yaml new file mode 100644 index 00000000000..2543670d5e6 --- /dev/null +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/80_throttle.yaml @@ -0,0 +1,53 @@ +--- +"Throttle the request": + # Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard + # and a small batch size on the request + - do: + indices.create: + index: source + body: + settings: + number_of_shards: "1" + number_of_replicas: "0" + - do: + cluster.health: + wait_for_status: yellow + - do: + index: + index: source + type: foo + id: 1 + body: { "text": "test" } + - do: + index: + index: source + type: foo + id: 2 + body: { "text": "test" } + - do: + index: + index: source + type: foo + id: 3 + body: { "text": "test" } + - do: + indices.refresh: {} + + - do: + reindex: + requests_per_second: 1 + body: + source: + index: source + size: 1 + dest: + index: dest + - match: {created: 3} + - match: {updated: 0} + - match: {version_conflicts: 0} + - match: {batches: 3} + - match: {failures: []} + - gt: {throttled_millis: 1000} + - lt: {throttled_millis: 4000} + - is_true: took + - is_false: task diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml index 65db8a5e66f..bf54ac5584f 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/10_basic.yaml @@ -18,6 +18,7 @@ - match: {batches: 1} - match: {failures: []} - match: {noops: 0} + - match: {throttled_millis: 0} - is_true: took - is_false: created # Update by query can't create - is_false: task @@ -45,6 +46,7 @@ - is_false: failures - is_false: noops - is_false: took + - is_false: throttled_millis - is_false: created - do: @@ -125,6 +127,7 @@ - match: {batches: 1} - match: {noops: 0} - match: {failures: []} + - match: {throttled_millis: 0} - is_true: took --- @@ -182,6 +185,7 @@ - match: {version_conflicts: 0} - match: {batches: 1} - match: {failures: []} + - match: {throttled_millis: 0} - is_true: took --- diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/70_throttle.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/70_throttle.yaml new file mode 100644 index 00000000000..f0e75b8b2d5 --- /dev/null +++ b/modules/reindex/src/test/resources/rest-api-spec/test/update-by-query/70_throttle.yaml @@ -0,0 +1,39 @@ +"Throttle the request": + # Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard + # and a small batch size on the request + - do: + indices.create: + index: test + body: + settings: + number_of_shards: 1 + - do: + cluster.health: + wait_for_status: yellow + - do: + index: + index: test + type: foo + body: { "text": "test" } + - do: + index: + index: test + type: foo + body: { "text": "test" } + - do: + index: + index: test + type: foo + body: { "text": "test" } + - do: + indices.refresh: {} + + - do: + update-by-query: + index: test + scroll_size: 1 + requests_per_second: 1 + - match: {batches: 3} + - match: {updated: 3} + - gt: {throttled_millis: 1000} + - lt: {throttled_millis: 4000} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/reindex.json b/rest-api-spec/src/main/resources/rest-api-spec/api/reindex.json index f09efef7c91..bfe12c981dc 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/reindex.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/reindex.json @@ -22,9 +22,14 @@ "description" : "Explicit write consistency setting for the operation" }, "wait_for_completion": { - "type" : "boolean", - "default": false, - "description" : "Should the request should block until the reindex is complete." + "type" : "boolean", + "default": false, + "description" : "Should the request should block until the reindex is complete." + }, + "requests_per_second": { + "type": "float", + "default": 0, + "description": "The throttle for this request in sub-requests per second. 0 means set no throttle." } } }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/update-by-query.json b/rest-api-spec/src/main/resources/rest-api-spec/api/update-by-query.json index dca49cbcc6a..fe7fdf8a840 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/update-by-query.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/update-by-query.json @@ -198,6 +198,11 @@ "type" : "boolean", "default": false, "description" : "Should the request should block until the reindex is complete." + }, + "requests_per_second": { + "type": "float", + "default": 0, + "description": "The throttle for this request in sub-requests per second. 0 means set no throttle." } } },