[reindex] Add thottling support

The throttle is applied when starting the next scroll request so that its
timeout can include the throttle time.
This commit is contained in:
Nik Everett 2016-03-09 15:53:06 -05:00
parent ea93b803d2
commit da96b6e41d
16 changed files with 476 additions and 87 deletions

View File

@ -961,6 +961,10 @@ public final class XContentBuilder implements BytesStream, Releasable {
return this;
}
public XContentBuilder timeValueField(String rawFieldName, String readableFieldName, TimeValue timeValue) throws IOException {
return timeValueField(rawFieldName, readableFieldName, timeValue.millis(), TimeUnit.MILLISECONDS);
}
public XContentBuilder timeValueField(String rawFieldName, String readableFieldName, long rawTime, TimeUnit timeUnit) throws
IOException {
if (humanReadable) {

View File

@ -299,7 +299,8 @@ POST /_reindex
=== URL Parameters
In addition to the standard parameters like `pretty`, the Reindex API also
supports `refresh`, `wait_for_completion`, `consistency`, and `timeout`.
supports `refresh`, `wait_for_completion`, `consistency`, `timeout`, and
`requests_per_second`.
Sending the `refresh` url parameter will cause all indexes to which the request
wrote to be refreshed. This is different than the Index API's `refresh`
@ -317,8 +318,14 @@ request. `timeout` controls how long each write request waits for unavailable
shards to become available. Both work exactly how they work in the
{ref}/docs-bulk.html[Bulk API].
`timeout` controls how long each batch waits for the target shard to become
available. It works exactly how it works in the {ref}/docs-bulk.html[Bulk API].
`requests_per_second` can be set to any decimal number (1.4, 6, 1000, etc) and
throttle the number of requests per second that the reindex issues. The
throttling is done waiting between bulk batches so that it can manipulate the
scroll timeout. The wait time is the difference between the time it took the
batch to complete and the time `requests_per_second * requests_in_the_batch`.
Since the batch isn't broken into multiple bulk requests large batch sizes will
cause Elasticsearch to create many requests and then wait for a while before
starting the next set. This is "bursty" instead of "smooth".
[float]
=== Response body
@ -333,6 +340,8 @@ The JSON response looks like this:
"created": 123,
"batches": 1,
"version_conflicts": 2,
"retries": 0,
"throttled_millis": 0,
"failures" : [ ]
}
--------------------------------------------------
@ -357,6 +366,14 @@ The number of scroll responses pulled back by the the reindex.
The number of version conflicts that reindex hit.
`retries`::
The number of retries that the reindex did in response to a full queue.
`throttled_millis`::
Number of milliseconds the request slept to conform to `requests_per_second`.
`failures`::
Array of all indexing failures. If this is non-empty then the request aborted
@ -403,7 +420,9 @@ The responses looks like:
"deleted" : 0,
"batches" : 36,
"version_conflicts" : 0,
"noops" : 0
"noops" : 0,
"retries": 0,
"throttled_millis": 0
},
"description" : ""
} ]

View File

@ -169,8 +169,14 @@ request. `timeout` controls how long each write request waits for unavailable
shards to become available. Both work exactly how they work in the
{ref}/docs-bulk.html[Bulk API].
`timeout` controls how long each batch waits for the target shard to become
available. It works exactly how it works in the {ref}/docs-bulk.html[Bulk API].
`requests_per_second` can be set to any decimal number (1.4, 6, 1000, etc) and
throttle the number of requests per second that the update by query issues. The
throttling is done waiting between bulk batches so that it can manipulate the
scroll timeout. The wait time is the difference between the time it took the
batch to complete and the time `requests_per_second * requests_in_the_batch`.
Since the batch isn't broken into multiple bulk requests large batch sizes will
cause Elasticsearch to create many requests and then wait for a while before
starting the next set. This is "bursty" instead of "smooth".
[float]
=== Response body
@ -184,6 +190,8 @@ The JSON response looks like this:
"updated": 0,
"batches": 1,
"version_conflicts": 2,
"retries": 0,
"throttled_millis": 0,
"failures" : [ ]
}
--------------------------------------------------
@ -204,6 +212,14 @@ The number of scroll responses pulled back by the the update by query.
The number of version conflicts that the update by query hit.
`retries`::
The number of retries that the update by query did in response to a full queue.
`throttled_millis`::
Number of milliseconds the request slept to conform to `requests_per_second`.
`failures`::
Array of all indexing failures. If this is non-empty then the request aborted
@ -251,7 +267,9 @@ The responses looks like:
"deleted" : 0,
"batches" : 36,
"version_conflicts" : 0,
"noops" : 0
"noops" : 0,
"retries": 0,
"throttled_millis": 0
},
"description" : ""
} ]

View File

@ -54,6 +54,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -63,13 +64,14 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
import static org.elasticsearch.rest.RestStatus.CONFLICT;
import static org.elasticsearch.search.sort.SortBuilders.fieldSort;
/**
* Abstract base for scrolling across a search and executing bulk actions on all
* results.
* results. All package private methods are package private so their tests can use them.
*/
public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBulkByScrollRequest<Request>, Response> {
/**
@ -81,6 +83,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
private final AtomicLong startTime = new AtomicLong(-1);
private final AtomicReference<String> scroll = new AtomicReference<>();
private final AtomicLong lastBatchStartTime = new AtomicLong(-1);
private final Set<String> destinationIndices = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ESLogger logger;
@ -107,15 +110,10 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
protected abstract Response buildResponse(TimeValue took, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures,
boolean timedOut);
/**
* Start the action by firing the initial search request.
*/
public void start() {
initialSearch();
}
public BulkByScrollTask getTask() {
return task;
}
void initialSearch() {
if (task.isCancelled()) {
finishHim(null);
return;
@ -137,7 +135,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
@Override
public void onResponse(SearchResponse response) {
logger.debug("[{}] documents match query", response.getHits().getTotalHits());
onScrollResponse(response);
onScrollResponse(timeValueSeconds(0), response);
}
@Override
@ -151,13 +149,11 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
}
/**
* Set the last returned scrollId. Package private for testing.
* Process a scroll response.
* @param delay how long to delay processesing the response. This delay is how throttling is applied to the action.
* @param searchResponse the scroll response to process
*/
void setScroll(String scroll) {
this.scroll.set(scroll);
}
void onScrollResponse(SearchResponse searchResponse) {
void onScrollResponse(TimeValue delay, SearchResponse searchResponse) {
if (task.isCancelled()) {
finishHim(null);
return;
@ -177,9 +173,15 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
total = min(total, mainRequest.getSize());
}
task.setTotal(total);
threadPool.generic().execute(new AbstractRunnable() {
task.countThrottle(delay);
threadPool.schedule(delay, ThreadPool.Names.GENERIC, threadPool.getThreadContext().preserveContext(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
if (task.isCancelled()) {
finishHim(null);
return;
}
lastBatchStartTime.set(System.nanoTime());
SearchHit[] docs = searchResponse.getHits().getHits();
logger.debug("scroll returned [{}] documents with a scroll id of [{}]", docs.length, searchResponse.getScrollId());
if (docs.length == 0) {
@ -200,7 +202,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
/*
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
*/
startNextScroll();
startNextScroll(0);
return;
}
request.timeout(mainRequest.getTimeout());
@ -216,9 +218,12 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
public void onFailure(Throwable t) {
finishHim(t);
}
});
}));
}
/**
* Send a bulk request, handling retries.
*/
void sendBulkRequest(BulkRequest request) {
if (task.isCancelled()) {
finishHim(null);
@ -237,6 +242,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
});
}
/**
* Processes bulk responses, accounting for failures.
*/
void onBulkResponse(BulkResponse response) {
if (task.isCancelled()) {
finishHim(null);
@ -282,23 +290,32 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
startNormalTermination(emptyList(), emptyList(), false);
return;
}
startNextScroll();
startNextScroll(response.getItems().length);
} catch (Throwable t) {
finishHim(t);
}
}
void startNextScroll() {
/**
* Start the next scroll request.
*
* @param lastBatchSize the number of requests sent in the last batch. This is used to calculate the throttling values which are applied
* when the scroll returns
*/
void startNextScroll(int lastBatchSize) {
if (task.isCancelled()) {
finishHim(null);
return;
}
long earliestNextBatchStartTime = lastBatchStartTime.get() + (long) perfectlyThrottledBatchTime(lastBatchSize);
long waitTime = max(0, earliestNextBatchStartTime - System.nanoTime());
SearchScrollRequest request = new SearchScrollRequest();
request.scrollId(scroll.get()).scroll(firstSearchRequest.scroll());
// Add the wait time into the scroll timeout so it won't timeout while we wait for throttling
request.scrollId(scroll.get()).scroll(timeValueNanos(firstSearchRequest.scroll().keepAlive().nanos() + waitTime));
client.searchScroll(request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse response) {
onScrollResponse(response);
onScrollResponse(timeValueNanos(max(0, earliestNextBatchStartTime - System.nanoTime())), response);
}
@Override
@ -308,6 +325,21 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
});
}
/**
* How many nanoseconds should a batch of lastBatchSize have taken if it were perfectly throttled? Package private for testing.
*/
float perfectlyThrottledBatchTime(int lastBatchSize) {
if (mainRequest.getRequestsPerSecond() == 0) {
return 0;
}
// requests
// ------------------- == seconds
// request per seconds
float targetBatchTimeInSeconds = lastBatchSize / mainRequest.getRequestsPerSecond();
// nanoseconds per seconds * seconds == nanoseconds
return TimeUnit.SECONDS.toNanos(1) * targetBatchTimeInSeconds;
}
private void recordFailure(Failure failure, List<Failure> failures) {
if (failure.getStatus() == CONFLICT) {
task.countVersionConflict();
@ -318,6 +350,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
failures.add(failure);
}
/**
* Start terminating a request that finished non-catastrophically.
*/
void startNormalTermination(List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures, boolean timedOut) {
if (task.isCancelled() || false == mainRequest.isRefresh()) {
finishHim(null, indexingFailures, searchFailures, timedOut);
@ -385,7 +420,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
}
/**
* Build the backoff policy for use with retries. Package private for testing.
* Build the backoff policy for use with retries.
*/
BackoffPolicy backoffPolicy() {
return exponentialBackoff(mainRequest.getRetryBackoffInitialTime(), mainRequest.getMaxRetries());
@ -399,6 +434,27 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
destinationIndices.addAll(indices);
}
/**
* Set the last returned scrollId. Exists entirely for testing.
*/
void setScroll(String scroll) {
this.scroll.set(scroll);
}
/**
* Set the last batch's start time. Exists entirely for testing.
*/
void setLastBatchStartTime(long newValue) {
lastBatchStartTime.set(newValue);
}
/**
* Get the last batch's start time. Exists entirely for testing.
*/
long getLastBatchStartTime() {
return lastBatchStartTime.get();
}
/**
* Wraps a backoffPolicy in another policy that counts the number of backoffs acquired.
*/

View File

@ -19,7 +19,6 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.Client;
@ -39,8 +38,11 @@ import org.elasticsearch.tasks.Task;
import java.io.IOException;
public abstract class AbstractBaseReindexRestHandler<Request extends ActionRequest<Request>, Response extends BulkIndexByScrollResponse,
TA extends TransportAction<Request, Response>> extends BaseRestHandler {
public abstract class AbstractBaseReindexRestHandler<
Request extends AbstractBulkByScrollRequest<Request>,
Response extends BulkIndexByScrollResponse,
TA extends TransportAction<Request, Response>
> extends BaseRestHandler {
protected final IndicesQueriesRegistry indicesQueriesRegistry;
protected final AggregatorParsers aggParsers;
protected final Suggesters suggesters;
@ -59,6 +61,7 @@ public abstract class AbstractBaseReindexRestHandler<Request extends ActionReque
}
protected void execute(RestRequest request, Request internalRequest, RestChannel channel) throws IOException {
internalRequest.setRequestsPerSecond(request.paramAsFloat("requests_per_second", internalRequest.getRequestsPerSecond()));
if (request.paramAsBoolean("wait_for_completion", true)) {
action.execute(internalRequest, new BulkIndexByScrollResponseContentListener<Response>(channel));
return;

View File

@ -85,6 +85,13 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
*/
private int maxRetries = 11;
/**
* The throttle for this request in sub-requests per second. 0 means set no throttle and that is the default. Throttling is done between
* batches, as we start the next scroll requests. That way we can increase the scroll's timeout to make sure that it contains any time
* that we might wait.
*/
private float requestsPerSecond = 0;
public AbstractBulkByScrollRequest() {
}
@ -252,6 +259,21 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
return self();
}
/**
* The throttle for this request in sub-requests per second. 0 means set no throttle and that is the default.
*/
public float getRequestsPerSecond() {
return requestsPerSecond;
}
/**
* Set the throttle for this request in sub-requests per second. 0 means set no throttle and that is the default.
*/
public Self setRequestsPerSecond(float requestsPerSecond) {
this.requestsPerSecond = requestsPerSecond;
return self();
}
@Override
public Task createTask(long id, String type, String action) {
return new BulkByScrollTask(id, type, action, getDescription());
@ -269,6 +291,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
consistency = WriteConsistencyLevel.fromId(in.readByte());
retryBackoffInitialTime = TimeValue.readTimeValue(in);
maxRetries = in.readVInt();
requestsPerSecond = in.readFloat();
}
@Override
@ -282,6 +305,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
out.writeByte(consistency.id());
retryBackoffInitialTime.writeTo(out);
out.writeVInt(maxRetries);
out.writeFloat(requestsPerSecond);
}
/**

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
@ -30,6 +31,8 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
/**
* Task storing information about a currently running BulkByScroll request.
*/
@ -46,6 +49,7 @@ public class BulkByScrollTask extends CancellableTask {
private final AtomicInteger batch = new AtomicInteger(0);
private final AtomicLong versionConflicts = new AtomicLong(0);
private final AtomicLong retries = new AtomicLong(0);
private final AtomicLong throttledNanos = new AtomicLong();
public BulkByScrollTask(long id, String type, String action, String description) {
super(id, type, action, description);
@ -54,7 +58,7 @@ public class BulkByScrollTask extends CancellableTask {
@Override
public Status getStatus() {
return new Status(total.get(), updated.get(), created.get(), deleted.get(), batch.get(), versionConflicts.get(), noops.get(),
retries.get(), getReasonCancelled());
retries.get(), timeValueNanos(throttledNanos.get()), getReasonCancelled());
}
/**
@ -65,7 +69,7 @@ public class BulkByScrollTask extends CancellableTask {
}
public static class Status implements Task.Status {
public static final Status PROTOTYPE = new Status(0, 0, 0, 0, 0, 0, 0, 0, null);
public static final Status PROTOTYPE = new Status(0, 0, 0, 0, 0, 0, 0, 0, timeValueNanos(0), null);
private final long total;
private final long updated;
@ -75,10 +79,11 @@ public class BulkByScrollTask extends CancellableTask {
private final long versionConflicts;
private final long noops;
private final long retries;
private final TimeValue throttled;
private final String reasonCancelled;
public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops, long retries,
@Nullable String reasonCancelled) {
TimeValue throttled, @Nullable String reasonCancelled) {
this.total = checkPositive(total, "total");
this.updated = checkPositive(updated, "updated");
this.created = checkPositive(created, "created");
@ -87,6 +92,7 @@ public class BulkByScrollTask extends CancellableTask {
this.versionConflicts = checkPositive(versionConflicts, "versionConflicts");
this.noops = checkPositive(noops, "noops");
this.retries = checkPositive(retries, "retries");
this.throttled = throttled;
this.reasonCancelled = reasonCancelled;
}
@ -99,6 +105,7 @@ public class BulkByScrollTask extends CancellableTask {
versionConflicts = in.readVLong();
noops = in.readVLong();
retries = in.readVLong();
throttled = TimeValue.readTimeValue(in);
reasonCancelled = in.readOptionalString();
}
@ -112,6 +119,7 @@ public class BulkByScrollTask extends CancellableTask {
out.writeVLong(versionConflicts);
out.writeVLong(noops);
out.writeVLong(retries);
throttled.writeTo(out);
out.writeOptionalString(reasonCancelled);
}
@ -136,6 +144,7 @@ public class BulkByScrollTask extends CancellableTask {
builder.field("version_conflicts", versionConflicts);
builder.field("noops", noops);
builder.field("retries", retries);
builder.timeValueField("throttled_millis", "throttled", throttled);
if (reasonCancelled != null) {
builder.field("canceled", reasonCancelled);
}
@ -234,6 +243,13 @@ public class BulkByScrollTask extends CancellableTask {
return retries;
}
/**
* The total time this request has throttled itself.
*/
public TimeValue getThrottled() {
return throttled;
}
/**
* The reason that the request was canceled or null if it hasn't been.
*/
@ -287,4 +303,11 @@ public class BulkByScrollTask extends CancellableTask {
void countRetry() {
retries.incrementAndGet();
}
public void countThrottle(TimeValue delay) {
long nanos = delay.nanos();
if (nanos > 0) {
throttledNanos.addAndGet(nanos);
}
}
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationRequest;
@ -67,12 +68,14 @@ import org.junit.After;
import org.junit.Before;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -83,10 +86,15 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static org.apache.lucene.util.TestUtil.randomSimpleString;
import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.emptyCollectionOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
@ -99,17 +107,24 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
private String scrollId;
private TaskManager taskManager;
private BulkByScrollTask task;
private Map<String, String> expectedHeaders = new HashMap<>();
@Before
public void setupForTest() {
client = new MyMockClient(new NoOpClient(getTestName()));
threadPool = new ThreadPool(getTestName());
mainRequest = new DummyAbstractBulkByScrollRequest();
firstSearchRequest = null;
firstSearchRequest = new SearchRequest().scroll(timeValueSeconds(10));
listener = new PlainActionFuture<>();
scrollId = null;
taskManager = new TaskManager(Settings.EMPTY);
task = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", mainRequest);
// Fill the context with something random so we can make sure we inherited it appropriately.
expectedHeaders.clear();
expectedHeaders.put(randomSimpleString(random()), randomSimpleString(random()));
threadPool.getThreadContext().newStoredContext();
threadPool.getThreadContext().putHeader(expectedHeaders);
}
@After
@ -135,34 +150,35 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
long total = randomIntBetween(0, Integer.MAX_VALUE);
InternalSearchHits hits = new InternalSearchHits(null, total, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
new DummyAbstractAsyncBulkByScrollAction()
.onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0),
new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
assertEquals(total, task.getStatus().getTotal());
}
public void testEachScrollResponseIsABatch() {
// Replace the generic thread pool with one that executes immediately so the batch is updated immediately
threadPool.shutdown();
threadPool = new ThreadPool(getTestName()) {
@Override
public Executor generic() {
return new Executor() {
@Override
public void execute(Runnable command) {
command.run();
}
};
}
};
/**
* Tests that each scroll response is a batch and that the batch is launched properly.
*/
public void testScrollResponseBatchingBehavior() throws Exception {
int maxBatches = randomIntBetween(0, 100);
for (int batches = 1; batches < maxBatches; batches++) {
long now = System.nanoTime();
InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap());
InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] { hit }, 0, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
new DummyAbstractAsyncBulkByScrollAction()
.onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
action.onScrollResponse(timeValueSeconds(0),
new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
assertEquals(batches, task.getStatus().getBatches());
// Use assert busy because the update happens on another thread
final int expectedBatches = batches;
assertBusy(() -> assertEquals(expectedBatches, task.getStatus().getBatches()));
/*
* While we're here we can check that getting a scroll response sets the last scroll start time which makes sure the wait time
* isn't counted as time that the last batch took.
*/
assertThat(action.getLastBatchStartTime(), greaterThanOrEqualTo(now));
assertEquals(expectedHeaders, client.lastHeaders.get());
}
}
@ -220,22 +236,20 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
* Mimicks a ThreadPool rejecting execution of the task.
*/
public void testThreadPoolRejectionsAbortRequest() throws Exception {
TimeValue expectedDelay = parseTimeValue(randomPositiveTimeValue(), "test");
threadPool.shutdown();
threadPool = new ThreadPool(getTestName()) {
@Override
public Executor generic() {
return new Executor() {
@Override
public void execute(Runnable command) {
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
assertEquals(expectedDelay, delay); // While we're here we can check that the sleep made it through
((AbstractRunnable) command).onRejection(new EsRejectedExecutionException("test"));
}
};
return null;
}
};
InternalSearchHits hits = new InternalSearchHits(null, 0, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
new DummyAbstractAsyncBulkByScrollAction()
.onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
.onScrollResponse(expectedDelay, new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
try {
listener.get();
fail("Expected a failure");
@ -243,6 +257,9 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
assertThat(e.getMessage(), equalTo("EsRejectedExecutionException[test]"));
}
assertThat(client.scrollsCleared, contains(scrollId));
// While we're mocking the threadPool lets also check that we incremented the throttle counter
assertEquals(expectedDelay, task.getStatus().getThrottled());
}
/**
@ -252,7 +269,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
public void testShardFailuresAbortRequest() throws Exception {
ShardSearchFailure shardFailure = new ShardSearchFailure(new RuntimeException("test"));
InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, false, null);
new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(
new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0),
new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[] { shardFailure }));
BulkIndexByScrollResponse response = listener.get();
assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class));
@ -267,8 +284,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
*/
public void testSearchTimeoutsAbortRequest() throws Exception {
InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, true, null);
new DummyAbstractAsyncBulkByScrollAction()
.onScrollResponse(new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[0]));
new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0),
new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[0]));
BulkIndexByScrollResponse response = listener.get();
assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class));
assertThat(response.getSearchFailures(), emptyCollectionOf(ShardSearchFailure.class));
@ -304,7 +321,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap());
InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] {hit}, 0, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
action.onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
action.onScrollResponse(timeValueSeconds(0), new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
try {
listener.get();
fail("Expected failure.");
@ -334,6 +351,55 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
assertEquals(retryAttempts, task.getStatus().getRetries());
}
public void testPerfectlyThrottledBatchTime() {
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
mainRequest.setRequestsPerSecond(0);
assertThat((double) action.perfectlyThrottledBatchTime(randomInt()), closeTo(0f, 0f));
int total = between(0, 1000000);
mainRequest.setRequestsPerSecond(1);
assertThat((double) action.perfectlyThrottledBatchTime(total),
closeTo(TimeUnit.SECONDS.toNanos(total), TimeUnit.SECONDS.toNanos(1)));
}
public void testScrollDelay() throws Exception {
/*
* Replace the thread pool with one that will save the delay sent for the command. We'll use that to check that we used a proper
* delay for throttling.
*/
AtomicReference<TimeValue> capturedDelay = new AtomicReference<>();
threadPool.shutdown();
threadPool = new ThreadPool(getTestName()) {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
capturedDelay.set(delay);
return null;
}
};
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
action.setScroll(scrollId());
// We'd like to get about 1 request a second
mainRequest.setRequestsPerSecond(1f);
// Make the last scroll look nearly instant
action.setLastBatchStartTime(System.nanoTime());
// The last batch had 100 documents
action.startNextScroll(100);
// So the next request is going to have to wait an extra 100 seconds or so (base was 10, so 110ish)
assertThat(client.lastScroll.get().request.scroll().keepAlive().seconds(), either(equalTo(110L)).or(equalTo(109L)));
// Now we can simulate a response and check the delay that we used for the task
InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap());
InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] { hit }, 0, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
client.lastScroll.get().listener.onResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
// The delay is still 100ish seconds because there hasn't been much time between when we requested the bulk and when we got it.
assertThat(capturedDelay.get().seconds(), either(equalTo(100L)).or(equalTo(99L)));
}
private long retryTestCase(boolean failWithRejection) throws Exception {
int totalFailures = randomIntBetween(1, mainRequest.getMaxRetries());
int size = randomIntBetween(1, 100);
@ -353,7 +419,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
}
@Override
void startNextScroll() {
void startNextScroll(int lastBatchSize) {
successLatch.countDown();
}
};
@ -418,12 +484,12 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
}
public void testCancelBeforeInitialSearch() throws Exception {
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.initialSearch());
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.start());
}
public void testCancelBeforeScrollResponse() throws Exception {
// We bail so early we don't need to pass in a half way valid response.
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.onScrollResponse(null));
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.onScrollResponse(timeValueSeconds(0), null));
}
public void testCancelBeforeSendBulkRequest() throws Exception {
@ -437,7 +503,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
}
public void testCancelBeforeStartNextScroll() throws Exception {
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNextScroll());
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNextScroll(0));
}
public void testCancelBeforeStartNormalTermination() throws Exception {
@ -447,6 +513,46 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
assertNull("No refresh was attempted", client.lastRefreshRequest.get());
}
/**
* Tests that we can cancel the request during its throttling delay. This can't use {@link #cancelTaskCase(Consumer)} because it needs
* to send the request un-canceled and cancel it at a specific time.
*/
public void testCancelWhileDelayedAfterScrollResponse() throws Exception {
String reason = randomSimpleString(random());
/*
* Replace the thread pool with one that will cancel the task as soon as anything is scheduled, which reindex tries to do when there
* is a delay.
*/
threadPool.shutdown();
threadPool = new ThreadPool(getTestName()) {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
taskManager.cancel(task, reason, (Set<String> s) -> {});
command.run();
return null;
}
};
// Send the scroll response which will trigger the custom thread pool above, canceling the request before running the response
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
boolean previousScrollSet = usually();
if (previousScrollSet) {
action.setScroll(scrollId());
}
long total = randomIntBetween(0, Integer.MAX_VALUE);
InternalSearchHits hits = new InternalSearchHits(null, total, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
action.onScrollResponse(timeValueSeconds(0), new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
// Now that we've got our cancel we'll just verify that it all came through allright
assertEquals(reason, listener.get().getReasonCancelled());
if (previousScrollSet) {
// Canceled tasks always start to clear the scroll before they die.
assertThat(client.scrollsCleared, contains(scrollId));
}
}
private void cancelTaskCase(Consumer<DummyAbstractAsyncBulkByScrollAction> testMe) throws Exception {
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
boolean previousScrollSet = usually();
@ -489,10 +595,12 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
}
}
private static class MyMockClient extends FilterClient {
private class MyMockClient extends FilterClient {
private final List<String> scrollsCleared = new ArrayList<>();
private final AtomicInteger bulksAttempts = new AtomicInteger();
private final AtomicReference<Map<String, String>> lastHeaders = new AtomicReference<>();
private final AtomicReference<RefreshRequest> lastRefreshRequest = new AtomicReference<>();
private final AtomicReference<RequestAndListener<SearchScrollRequest, SearchResponse>> lastScroll = new AtomicReference<>();
private int bulksToReject = 0;
@ -505,11 +613,16 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
protected <Request extends ActionRequest<Request>, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
lastHeaders.set(threadPool.getThreadContext().getHeaders());
if (request instanceof RefreshRequest) {
lastRefreshRequest.set((RefreshRequest) request);
listener.onResponse(null);
return;
}
if (request instanceof SearchScrollRequest) {
lastScroll.set(new RequestAndListener<>((SearchScrollRequest) request, (ActionListener<SearchResponse>) listener));
return;
}
if (request instanceof ClearScrollRequest) {
ClearScrollRequest clearScroll = (ClearScrollRequest) request;
scrollsCleared.addAll(clearScroll.getScrollIds());
@ -561,4 +674,14 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
super.doExecute(action, request, listener);
}
}
private static class RequestAndListener<Request extends ActionRequest<Request>, Response> {
private final Request request;
private final ActionListener<Response> listener;
public RequestAndListener(Request request, ActionListener<Response> listener) {
this.request = request;
this.listener = listener;
}
}
}

View File

@ -19,9 +19,12 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
public class BulkByScrollTaskTests extends ESTestCase {
private BulkByScrollTask task;
@ -101,13 +104,14 @@ public class BulkByScrollTaskTests extends ESTestCase {
}
public void testStatusHatesNegatives() {
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(-1, 0, 0, 0, 0, 0, 0, 0, null));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, -1, 0, 0, 0, 0, 0, 0, null));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, -1, 0, 0, 0, 0, 0, null));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, -1, 0, 0, 0, 0, null));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, -1, 0, 0, 0, null));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, -1, 0, 0, null));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, -1, 0, null));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, 0, -1, null));
TimeValue throttle = parseTimeValue(randomPositiveTimeValue(), "test");
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(-1, 0, 0, 0, 0, 0, 0, 0, throttle, null));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, -1, 0, 0, 0, 0, 0, 0, throttle, null));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, -1, 0, 0, 0, 0, 0, throttle, null));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, -1, 0, 0, 0, 0, throttle, null));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, -1, 0, 0, 0, throttle, null));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, -1, 0, 0, throttle, null));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, -1, 0, throttle, null));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, 0, -1, throttle, null));
}
}

View File

@ -42,6 +42,7 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static org.apache.lucene.util.TestUtil.randomSimpleString;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
/**
@ -77,6 +78,7 @@ public class RoundTripTests extends ESTestCase {
request.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), null, "test"));
request.setConsistency(randomFrom(WriteConsistencyLevel.values()));
request.setScript(random().nextBoolean() ? null : randomScript());
request.setRequestsPerSecond(between(0, Integer.MAX_VALUE));
}
private void assertRequestEquals(AbstractBulkIndexByScrollRequest<?> request,
@ -90,6 +92,7 @@ public class RoundTripTests extends ESTestCase {
assertEquals(request.getScript(), tripped.getScript());
assertEquals(request.getRetryBackoffInitialTime(), tripped.getRetryBackoffInitialTime());
assertEquals(request.getMaxRetries(), tripped.getMaxRetries());
assertEquals(request.getRequestsPerSecond(), tripped.getRequestsPerSecond(), 0d);
}
public void testBulkByTaskStatus() throws IOException {
@ -119,7 +122,7 @@ public class RoundTripTests extends ESTestCase {
private BulkByScrollTask.Status randomStatus() {
return new BulkByScrollTask.Status(randomPositiveLong(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(),
randomPositiveInt(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(),
random().nextBoolean() ? null : randomSimpleString(random()));
parseTimeValue(randomPositiveTimeValue(), "test"), random().nextBoolean() ? null : randomSimpleString(random()));
}
private List<Failure> randomIndexingFailures() {
@ -194,5 +197,7 @@ public class RoundTripTests extends ESTestCase {
assertEquals(expected.getVersionConflicts(), actual.getVersionConflicts());
assertEquals(expected.getNoops(), actual.getNoops());
assertEquals(expected.getRetries(), actual.getRetries());
assertEquals(expected.getThrottled(), actual.getThrottled());
assertEquals(expected.getReasonCancelled(), actual.getReasonCancelled());
}
}

View File

@ -21,6 +21,7 @@
- match: {version_conflicts: 0}
- match: {batches: 1}
- match: {failures: []}
- match: {throttled_millis: 0}
- is_true: took
- is_false: task
@ -53,6 +54,7 @@
- match: {version_conflicts: 0}
- match: {batches: 1}
- match: {failures: []}
- match: {throttled_millis: 0}
- is_true: took
- is_false: task
@ -84,6 +86,7 @@
- is_false: failures
- is_false: noops
- is_false: took
- is_false: throttled_millis
- is_false: created
- do:
@ -163,6 +166,7 @@
- match: {version_conflicts: 1}
- match: {batches: 1}
- match: {failures: []}
- match: {throttled_millis: 0}
- is_true: took
---

View File

@ -0,0 +1,53 @@
---
"Throttle the request":
# Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
# and a small batch size on the request
- do:
indices.create:
index: source
body:
settings:
number_of_shards: "1"
number_of_replicas: "0"
- do:
cluster.health:
wait_for_status: yellow
- do:
index:
index: source
type: foo
id: 1
body: { "text": "test" }
- do:
index:
index: source
type: foo
id: 2
body: { "text": "test" }
- do:
index:
index: source
type: foo
id: 3
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
reindex:
requests_per_second: 1
body:
source:
index: source
size: 1
dest:
index: dest
- match: {created: 3}
- match: {updated: 0}
- match: {version_conflicts: 0}
- match: {batches: 3}
- match: {failures: []}
- gt: {throttled_millis: 1000}
- lt: {throttled_millis: 4000}
- is_true: took
- is_false: task

View File

@ -18,6 +18,7 @@
- match: {batches: 1}
- match: {failures: []}
- match: {noops: 0}
- match: {throttled_millis: 0}
- is_true: took
- is_false: created # Update by query can't create
- is_false: task
@ -45,6 +46,7 @@
- is_false: failures
- is_false: noops
- is_false: took
- is_false: throttled_millis
- is_false: created
- do:
@ -125,6 +127,7 @@
- match: {batches: 1}
- match: {noops: 0}
- match: {failures: []}
- match: {throttled_millis: 0}
- is_true: took
---
@ -182,6 +185,7 @@
- match: {version_conflicts: 0}
- match: {batches: 1}
- match: {failures: []}
- match: {throttled_millis: 0}
- is_true: took
---

View File

@ -0,0 +1,39 @@
"Throttle the request":
# Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
# and a small batch size on the request
- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
- do:
cluster.health:
wait_for_status: yellow
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
update-by-query:
index: test
scroll_size: 1
requests_per_second: 1
- match: {batches: 3}
- match: {updated: 3}
- gt: {throttled_millis: 1000}
- lt: {throttled_millis: 4000}

View File

@ -25,6 +25,11 @@
"type" : "boolean",
"default": false,
"description" : "Should the request should block until the reindex is complete."
},
"requests_per_second": {
"type": "float",
"default": 0,
"description": "The throttle for this request in sub-requests per second. 0 means set no throttle."
}
}
},

View File

@ -198,6 +198,11 @@
"type" : "boolean",
"default": false,
"description" : "Should the request should block until the reindex is complete."
},
"requests_per_second": {
"type": "float",
"default": 0,
"description": "The throttle for this request in sub-requests per second. 0 means set no throttle."
}
}
},