Fix a race condition in reindex's rethrottle

If you rethrottled the request while is was performing a scroll
request then it wouldn't properly pick up the rethrottle for that
batch. This was causing test failure and might cause issues for
users. The work around is simple though: just issue the rethrottle
again with a slightly faster throttle than the first time.

Caught by:
https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+master+multijob-os-compatibility/os=centos/525/console
This commit is contained in:
Nik Everett 2016-06-03 15:41:40 -04:00
parent 4b21157906
commit 5b94c4a25b
4 changed files with 111 additions and 115 deletions

View File

@ -55,7 +55,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -66,7 +65,6 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff; import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES; import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
import static org.elasticsearch.rest.RestStatus.CONFLICT; import static org.elasticsearch.rest.RestStatus.CONFLICT;
import static org.elasticsearch.search.sort.SortBuilders.fieldSort; import static org.elasticsearch.search.sort.SortBuilders.fieldSort;
@ -85,7 +83,6 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
private final AtomicLong startTime = new AtomicLong(-1); private final AtomicLong startTime = new AtomicLong(-1);
private final AtomicReference<String> scroll = new AtomicReference<>(); private final AtomicReference<String> scroll = new AtomicReference<>();
private final AtomicLong lastBatchStartTime = new AtomicLong(-1);
private final Set<String> destinationIndices = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set<String> destinationIndices = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ESLogger logger; private final ESLogger logger;
@ -147,16 +144,17 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
} }
searchWithRetry(listener -> client.search(firstSearchRequest, listener), (SearchResponse response) -> { searchWithRetry(listener -> client.search(firstSearchRequest, listener), (SearchResponse response) -> {
logger.debug("[{}] documents match query", response.getHits().getTotalHits()); logger.debug("[{}] documents match query", response.getHits().getTotalHits());
onScrollResponse(timeValueSeconds(0), response); onScrollResponse(timeValueNanos(System.nanoTime()), 0, response);
}); });
} }
/** /**
* Process a scroll response. * Process a scroll response.
* @param delay how long to delay processesing the response. This delay is how throttling is applied to the action. * @param lastBatchStartTime the time when the last batch started. Used to calculate the throttling delay.
* @param lastBatchSize the size of the last batch. Used to calculate the throttling delay.
* @param searchResponse the scroll response to process * @param searchResponse the scroll response to process
*/ */
void onScrollResponse(TimeValue delay, SearchResponse searchResponse) { void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, SearchResponse searchResponse) {
if (task.isCancelled()) { if (task.isCancelled()) {
finishHim(null); finishHim(null);
return; return;
@ -179,7 +177,11 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
AbstractRunnable prepareBulkRequestRunnable = new AbstractRunnable() { AbstractRunnable prepareBulkRequestRunnable = new AbstractRunnable() {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
prepareBulkRequest(searchResponse); /*
* It is important that the batch start time be calculated from here, scroll response to scroll response. That way the time
* waiting on the scroll doesn't count against this batch in the throttle.
*/
prepareBulkRequest(timeValueNanos(System.nanoTime()), searchResponse);
} }
@Override @Override
@ -188,7 +190,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
} }
}; };
prepareBulkRequestRunnable = (AbstractRunnable) threadPool.getThreadContext().preserveContext(prepareBulkRequestRunnable); prepareBulkRequestRunnable = (AbstractRunnable) threadPool.getThreadContext().preserveContext(prepareBulkRequestRunnable);
task.delayPrepareBulkRequest(threadPool, delay, prepareBulkRequestRunnable); task.delayPrepareBulkRequest(threadPool, lastBatchStartTime, lastBatchSize, prepareBulkRequestRunnable);
} }
/** /**
@ -196,12 +198,11 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
* delay has been slept. Uses the generic thread pool because reindex is rare enough not to need its own thread pool and because the * delay has been slept. Uses the generic thread pool because reindex is rare enough not to need its own thread pool and because the
* thread may be blocked by the user script. * thread may be blocked by the user script.
*/ */
void prepareBulkRequest(SearchResponse searchResponse) { void prepareBulkRequest(TimeValue thisBatchStartTime, SearchResponse searchResponse) {
if (task.isCancelled()) { if (task.isCancelled()) {
finishHim(null); finishHim(null);
return; return;
} }
lastBatchStartTime.set(System.nanoTime());
SearchHit[] docs = searchResponse.getHits().getHits(); SearchHit[] docs = searchResponse.getHits().getHits();
logger.debug("scroll returned [{}] documents with a scroll id of [{}]", docs.length, searchResponse.getScrollId()); logger.debug("scroll returned [{}] documents with a scroll id of [{}]", docs.length, searchResponse.getScrollId());
if (docs.length == 0) { if (docs.length == 0) {
@ -222,7 +223,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
/* /*
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation. * If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
*/ */
startNextScroll(0); startNextScroll(thisBatchStartTime, 0);
return; return;
} }
request.timeout(mainRequest.getTimeout()); request.timeout(mainRequest.getTimeout());
@ -231,13 +232,13 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
logger.debug("sending [{}] entry, [{}] bulk request", request.requests().size(), logger.debug("sending [{}] entry, [{}] bulk request", request.requests().size(),
new ByteSizeValue(request.estimatedSizeInBytes())); new ByteSizeValue(request.estimatedSizeInBytes()));
} }
sendBulkRequest(request); sendBulkRequest(thisBatchStartTime, request);
} }
/** /**
* Send a bulk request, handling retries. * Send a bulk request, handling retries.
*/ */
void sendBulkRequest(BulkRequest request) { void sendBulkRequest(TimeValue thisBatchStartTime, BulkRequest request) {
if (task.isCancelled()) { if (task.isCancelled()) {
finishHim(null); finishHim(null);
return; return;
@ -245,7 +246,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
bulkRetry.withAsyncBackoff(client, request, new ActionListener<BulkResponse>() { bulkRetry.withAsyncBackoff(client, request, new ActionListener<BulkResponse>() {
@Override @Override
public void onResponse(BulkResponse response) { public void onResponse(BulkResponse response) {
onBulkResponse(response); onBulkResponse(thisBatchStartTime, response);
} }
@Override @Override
@ -258,7 +259,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
/** /**
* Processes bulk responses, accounting for failures. * Processes bulk responses, accounting for failures.
*/ */
void onBulkResponse(BulkResponse response) { void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) {
try { try {
List<Failure> failures = new ArrayList<Failure>(); List<Failure> failures = new ArrayList<Failure>();
Set<String> destinationIndicesThisBatch = new HashSet<>(); Set<String> destinationIndicesThisBatch = new HashSet<>();
@ -306,7 +307,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
return; return;
} }
startNextScroll(response.getItems().length); startNextScroll(thisBatchStartTime, response.getItems().length);
} catch (Throwable t) { } catch (Throwable t) {
finishHim(t); finishHim(t);
} }
@ -318,36 +319,20 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
* @param lastBatchSize the number of requests sent in the last batch. This is used to calculate the throttling values which are applied * @param lastBatchSize the number of requests sent in the last batch. This is used to calculate the throttling values which are applied
* when the scroll returns * when the scroll returns
*/ */
void startNextScroll(int lastBatchSize) { void startNextScroll(TimeValue lastBatchStartTime, int lastBatchSize) {
if (task.isCancelled()) { if (task.isCancelled()) {
finishHim(null); finishHim(null);
return; return;
} }
long earliestNextBatchStartTime = lastBatchStartTime.get() + (long) perfectlyThrottledBatchTime(lastBatchSize);
long waitTime = max(0, earliestNextBatchStartTime - System.nanoTime());
SearchScrollRequest request = new SearchScrollRequest(); SearchScrollRequest request = new SearchScrollRequest();
// Add the wait time into the scroll timeout so it won't timeout while we wait for throttling // Add the wait time into the scroll timeout so it won't timeout while we wait for throttling
request.scrollId(scroll.get()).scroll(timeValueNanos(firstSearchRequest.scroll().keepAlive().nanos() + waitTime)); request.scrollId(scroll.get()).scroll(timeValueNanos(
firstSearchRequest.scroll().keepAlive().nanos() + task.throttleWaitTime(lastBatchStartTime, lastBatchSize).nanos()));
searchWithRetry(listener -> client.searchScroll(request, listener), (SearchResponse response) -> { searchWithRetry(listener -> client.searchScroll(request, listener), (SearchResponse response) -> {
onScrollResponse(timeValueNanos(max(0, earliestNextBatchStartTime - System.nanoTime())), response); onScrollResponse(lastBatchStartTime, lastBatchSize, response);
}); });
} }
/**
* How many nanoseconds should a batch of lastBatchSize have taken if it were perfectly throttled? Package private for testing.
*/
float perfectlyThrottledBatchTime(int lastBatchSize) {
if (task.getRequestsPerSecond() == Float.POSITIVE_INFINITY) {
return 0;
}
// requests
// ------------------- == seconds
// request per seconds
float targetBatchTimeInSeconds = lastBatchSize / task.getRequestsPerSecond();
// nanoseconds per seconds * seconds == nanoseconds
return TimeUnit.SECONDS.toNanos(1) * targetBatchTimeInSeconds;
}
private void recordFailure(Failure failure, List<Failure> failures) { private void recordFailure(Failure failure, List<Failure> failures) {
if (failure.getStatus() == CONFLICT) { if (failure.getStatus() == CONFLICT) {
task.countVersionConflict(); task.countVersionConflict();
@ -453,20 +438,6 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
this.scroll.set(scroll); this.scroll.set(scroll);
} }
/**
* Set the last batch's start time. Exists entirely for testing.
*/
void setLastBatchStartTime(long newValue) {
lastBatchStartTime.set(newValue);
}
/**
* Get the last batch's start time. Exists entirely for testing.
*/
long getLastBatchStartTime() {
return lastBatchStartTime.get();
}
/** /**
* Wraps a backoffPolicy in another policy that counts the number of backoffs acquired. Used to count bulk backoffs. * Wraps a backoffPolicy in another policy that counts the number of backoffs acquired. Used to count bulk backoffs.
*/ */

View File

@ -49,7 +49,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
* Task storing information about a currently running BulkByScroll request. * Task storing information about a currently running BulkByScroll request.
*/ */
public class BulkByScrollTask extends CancellableTask { public class BulkByScrollTask extends CancellableTask {
private static final ESLogger logger = ESLoggerFactory.getLogger(BulkByScrollTask.class.getName()); private static final ESLogger logger = ESLoggerFactory.getLogger(BulkByScrollTask.class.getPackage().getName());
/** /**
* The total number of documents this request will process. 0 means we don't yet know or, possibly, there are actually 0 documents * The total number of documents this request will process. 0 means we don't yet know or, possibly, there are actually 0 documents
@ -403,9 +403,11 @@ public class BulkByScrollTask extends CancellableTask {
* Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be * Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be
* rescheduled over and over again. * rescheduled over and over again.
*/ */
void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue delay, AbstractRunnable prepareBulkRequestRunnable) { void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchStartTime, int lastBatchSize,
AbstractRunnable prepareBulkRequestRunnable) {
// Synchronize so we are less likely to schedule the same request twice. // Synchronize so we are less likely to schedule the same request twice.
synchronized (delayedPrepareBulkRequestReference) { synchronized (delayedPrepareBulkRequestReference) {
TimeValue delay = throttleWaitTime(lastBatchStartTime, lastBatchSize);
AbstractRunnable oneTime = new AbstractRunnable() { AbstractRunnable oneTime = new AbstractRunnable() {
private final AtomicBoolean hasRun = new AtomicBoolean(false); private final AtomicBoolean hasRun = new AtomicBoolean(false);
@ -426,6 +428,26 @@ public class BulkByScrollTask extends CancellableTask {
} }
} }
TimeValue throttleWaitTime(TimeValue lastBatchStartTime, int lastBatchSize) {
long earliestNextBatchStartTime = lastBatchStartTime.nanos() + (long) perfectlyThrottledBatchTime(lastBatchSize);
return timeValueNanos(max(0, earliestNextBatchStartTime - System.nanoTime()));
}
/**
* How many nanoseconds should a batch of lastBatchSize have taken if it were perfectly throttled? Package private for testing.
*/
float perfectlyThrottledBatchTime(int lastBatchSize) {
if (requestsPerSecond == Float.POSITIVE_INFINITY) {
return 0;
}
// requests
// ------------------- == seconds
// request per seconds
float targetBatchTimeInSeconds = lastBatchSize / requestsPerSecond;
// nanoseconds per seconds * seconds == nanoseconds
return TimeUnit.SECONDS.toNanos(1) * targetBatchTimeInSeconds;
}
private void setRequestsPerSecond(float requestsPerSecond) { private void setRequestsPerSecond(float requestsPerSecond) {
this.requestsPerSecond = requestsPerSecond; this.requestsPerSecond = requestsPerSecond;
} }

View File

@ -97,19 +97,18 @@ import static java.util.Collections.emptySet;
import static java.util.Collections.singleton; import static java.util.Collections.singleton;
import static org.apache.lucene.util.TestUtil.randomSimpleString; import static org.apache.lucene.util.TestUtil.randomSimpleString;
import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff; import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.emptyCollectionOf; import static org.hamcrest.Matchers.emptyCollectionOf;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class AsyncBulkByScrollActionTests extends ESTestCase { public class AsyncBulkByScrollActionTests extends ESTestCase {
private MyMockClient client; private MyMockClient client;
@ -189,7 +188,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
client.scrollsToReject = randomIntBetween(0, testRequest.getMaxRetries() - 1); client.scrollsToReject = randomIntBetween(0, testRequest.getMaxRetries() - 1);
DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff(); DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
action.setScroll(scrollId()); action.setScroll(scrollId());
action.startNextScroll(0); action.startNextScroll(timeValueNanos(System.nanoTime()), 0);
assertBusy(() -> assertEquals(client.scrollsToReject + 1, client.scrollAttempts.get())); assertBusy(() -> assertEquals(client.scrollsToReject + 1, client.scrollAttempts.get()));
if (listener.isDone()) { if (listener.isDone()) {
Object result = listener.get(); Object result = listener.get();
@ -203,7 +202,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
client.scrollsToReject = testRequest.getMaxRetries() + randomIntBetween(1, 100); client.scrollsToReject = testRequest.getMaxRetries() + randomIntBetween(1, 100);
DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff(); DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
action.setScroll(scrollId()); action.setScroll(scrollId());
action.startNextScroll(0); action.startNextScroll(timeValueNanos(System.nanoTime()), 0);
assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.scrollAttempts.get())); assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.scrollAttempts.get()));
assertBusy(() -> assertTrue(listener.isDone())); assertBusy(() -> assertTrue(listener.isDone()));
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get()); ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
@ -219,7 +218,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
long total = randomIntBetween(0, Integer.MAX_VALUE); long total = randomIntBetween(0, Integer.MAX_VALUE);
InternalSearchHits hits = new InternalSearchHits(null, total, 0); InternalSearchHits hits = new InternalSearchHits(null, total, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0), new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0), 0,
new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
assertEquals(total, testTask.getStatus().getTotal()); assertEquals(total, testTask.getStatus().getTotal());
} }
@ -230,24 +229,17 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
public void testScrollResponseBatchingBehavior() throws Exception { public void testScrollResponseBatchingBehavior() throws Exception {
int maxBatches = randomIntBetween(0, 100); int maxBatches = randomIntBetween(0, 100);
for (int batches = 1; batches < maxBatches; batches++) { for (int batches = 1; batches < maxBatches; batches++) {
long now = System.nanoTime();
InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap()); InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap());
InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] { hit }, 0, 0); InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] { hit }, 0, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
action.onScrollResponse(timeValueSeconds(0), action.onScrollResponse(timeValueNanos(System.nanoTime()), 0,
new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
// Use assert busy because the update happens on another thread // Use assert busy because the update happens on another thread
final int expectedBatches = batches; final int expectedBatches = batches;
assertBusy(() -> assertEquals(expectedBatches, testTask.getStatus().getBatches())); assertBusy(() -> assertEquals(expectedBatches, testTask.getStatus().getBatches()));
/*
* While we're here we can check that getting a scroll response sets the last scroll start time which makes sure the wait time
* isn't counted as time that the last batch took.
*/
assertThat(action.getLastBatchStartTime(), greaterThanOrEqualTo(now));
/* /*
* Also while we're here check that we preserved the headers from the last request. assertBusy because no requests might have * Also while we're here check that we preserved the headers from the last request. assertBusy because no requests might have
* come in yet. * come in yet.
@ -297,7 +289,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
} }
responses[i] = new BulkItemResponse(i, opType, new IndexResponse(shardId, "type", "id" + i, randomInt(), createdResponse)); responses[i] = new BulkItemResponse(i, opType, new IndexResponse(shardId, "type", "id" + i, randomInt(), createdResponse));
} }
new DummyAbstractAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0)); new DummyAbstractAsyncBulkByScrollAction().onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(responses, 0));
assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts()); assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts());
assertEquals(updated, testTask.getStatus().getUpdated()); assertEquals(updated, testTask.getStatus().getUpdated());
assertEquals(created, testTask.getStatus().getCreated()); assertEquals(created, testTask.getStatus().getCreated());
@ -310,20 +302,22 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
* Mimicks a ThreadPool rejecting execution of the task. * Mimicks a ThreadPool rejecting execution of the task.
*/ */
public void testThreadPoolRejectionsAbortRequest() throws Exception { public void testThreadPoolRejectionsAbortRequest() throws Exception {
TimeValue expectedDelay = parseTimeValue(randomPositiveTimeValue(), "test"); testTask.rethrottle(1);
threadPool.shutdown(); threadPool.shutdown();
threadPool = new TestThreadPool(getTestName()) { threadPool = new TestThreadPool(getTestName()) {
@Override @Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) { public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
assertEquals(expectedDelay, delay); // While we're here we can check that the sleep made it through // While we're here we can check that the sleep made it through
assertThat(delay.nanos(), greaterThan(0L));
assertThat(delay.seconds(), lessThanOrEqualTo(10L));
((AbstractRunnable) command).onRejection(new EsRejectedExecutionException("test")); ((AbstractRunnable) command).onRejection(new EsRejectedExecutionException("test"));
return null; return null;
} }
}; };
InternalSearchHits hits = new InternalSearchHits(null, 0, 0); InternalSearchHits hits = new InternalSearchHits(null, 0, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
new DummyAbstractAsyncBulkByScrollAction() new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueNanos(System.nanoTime()), 10,
.onScrollResponse(expectedDelay, new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
try { try {
listener.get(); listener.get();
fail("Expected a failure"); fail("Expected a failure");
@ -343,7 +337,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
public void testShardFailuresAbortRequest() throws Exception { public void testShardFailuresAbortRequest() throws Exception {
ShardSearchFailure shardFailure = new ShardSearchFailure(new RuntimeException("test")); ShardSearchFailure shardFailure = new ShardSearchFailure(new RuntimeException("test"));
InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, false, null); InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, false, null);
new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0), new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueNanos(System.nanoTime()), 0,
new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[] { shardFailure })); new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[] { shardFailure }));
BulkIndexByScrollResponse response = listener.get(); BulkIndexByScrollResponse response = listener.get();
assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class)); assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class));
@ -358,7 +352,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
*/ */
public void testSearchTimeoutsAbortRequest() throws Exception { public void testSearchTimeoutsAbortRequest() throws Exception {
InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, true, null); InternalSearchResponse internalResponse = new InternalSearchResponse(null, null, null, null, true, null);
new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueSeconds(0), new DummyAbstractAsyncBulkByScrollAction().onScrollResponse(timeValueNanos(System.nanoTime()), 0,
new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[0])); new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[0]));
BulkIndexByScrollResponse response = listener.get(); BulkIndexByScrollResponse response = listener.get();
assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class)); assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class));
@ -368,14 +362,14 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
assertThat(client.scrollsCleared, contains(scrollId)); assertThat(client.scrollsCleared, contains(scrollId));
} }
/** /**
* Mimicks bulk indexing failures. * Mimicks bulk indexing failures.
*/ */
public void testBulkFailuresAbortRequest() throws Exception { public void testBulkFailuresAbortRequest() throws Exception {
Failure failure = new Failure("index", "type", "id", new RuntimeException("test")); Failure failure = new Failure("index", "type", "id", new RuntimeException("test"));
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
action.onBulkResponse(new BulkResponse(new BulkItemResponse[] {new BulkItemResponse(0, "index", failure)}, randomLong())); BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[] {new BulkItemResponse(0, "index", failure)}, randomLong());
action.onBulkResponse(timeValueNanos(System.nanoTime()), bulkResponse);
BulkIndexByScrollResponse response = listener.get(); BulkIndexByScrollResponse response = listener.get();
assertThat(response.getIndexingFailures(), contains(failure)); assertThat(response.getIndexingFailures(), contains(failure));
assertThat(response.getSearchFailures(), emptyCollectionOf(ShardSearchFailure.class)); assertThat(response.getSearchFailures(), emptyCollectionOf(ShardSearchFailure.class));
@ -394,15 +388,12 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
}; };
InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap()); InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap());
InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] {hit}, 0, 0); InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] {hit}, 0, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false);
action.onScrollResponse(timeValueSeconds(0), new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); SearchResponse searchResponse = new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), null);
try { action.onScrollResponse(timeValueNanos(System.nanoTime()), 0, searchResponse);
listener.get(); ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
fail("Expected failure."); assertThat(e.getCause(), instanceOf(RuntimeException.class));
} catch (ExecutionException e) { assertThat(e.getCause().getMessage(), equalTo("surprise"));
assertThat(e.getCause(), instanceOf(RuntimeException.class));
assertThat(e.getCause().getMessage(), equalTo("surprise"));
}
} }
/** /**
@ -426,17 +417,6 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
assertEquals(testRequest.getMaxRetries(), testTask.getStatus().getBulkRetries()); assertEquals(testRequest.getMaxRetries(), testTask.getStatus().getBulkRetries());
} }
public void testPerfectlyThrottledBatchTime() {
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
testRequest.setRequestsPerSecond(Float.POSITIVE_INFINITY);
assertThat((double) action.perfectlyThrottledBatchTime(randomInt()), closeTo(0f, 0f));
int total = between(0, 1000000);
testTask.rethrottle(1);
assertThat((double) action.perfectlyThrottledBatchTime(total),
closeTo(TimeUnit.SECONDS.toNanos(total), TimeUnit.SECONDS.toNanos(1)));
}
public void testScrollDelay() throws Exception { public void testScrollDelay() throws Exception {
/* /*
* Replace the thread pool with one that will save the delay sent for the command. We'll use that to check that we used a proper * Replace the thread pool with one that will save the delay sent for the command. We'll use that to check that we used a proper
@ -460,12 +440,10 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
// Set the base for the scroll to wait - this is added to the figure we calculate below // Set the base for the scroll to wait - this is added to the figure we calculate below
firstSearchRequest.scroll(timeValueSeconds(10)); firstSearchRequest.scroll(timeValueSeconds(10));
// We'd like to get about 1 request a second // Set throttle to 1 request per second to make the math simpler
testTask.rethrottle(1f); testTask.rethrottle(1f);
// Make the last scroll look nearly instant // Make the last batch look nearly instant but have 100 documents
action.setLastBatchStartTime(System.nanoTime()); action.startNextScroll(timeValueNanos(System.nanoTime()), 100);
// The last batch had 100 documents
action.startNextScroll(100);
// So the next request is going to have to wait an extra 100 seconds or so (base was 10 seconds, so 110ish) // So the next request is going to have to wait an extra 100 seconds or so (base was 10 seconds, so 110ish)
assertThat(client.lastScroll.get().request.scroll().keepAlive().seconds(), either(equalTo(110L)).or(equalTo(109L))); assertThat(client.lastScroll.get().request.scroll().keepAlive().seconds(), either(equalTo(110L)).or(equalTo(109L)));
@ -473,11 +451,20 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
// Now we can simulate a response and check the delay that we used for the task // Now we can simulate a response and check the delay that we used for the task
InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap()); InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap());
InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] { hit }, 0, 0); InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] { hit }, 0, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false);
client.lastScroll.get().listener.onResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); SearchResponse searchResponse = new SearchResponse(internalResponse, scrollId(), 5, 4, randomLong(), null);
// The delay is still 100ish seconds because there hasn't been much time between when we requested the bulk and when we got it. if (randomBoolean()) {
assertThat(capturedDelay.get().seconds(), either(equalTo(100L)).or(equalTo(99L))); client.lastScroll.get().listener.onResponse(searchResponse);
// The delay is still 100ish seconds because there hasn't been much time between when we requested the bulk and when we got it.
assertThat(capturedDelay.get().seconds(), either(equalTo(100L)).or(equalTo(99L)));
} else {
// Let's rethrottle between the starting the scroll and getting the response
testTask.rethrottle(10f);
client.lastScroll.get().listener.onResponse(searchResponse);
// The delay uses the new throttle
assertThat(capturedDelay.get().seconds(), either(equalTo(10L)).or(equalTo(9L)));
}
// Running the command ought to increment the delay counter on the task. // Running the command ought to increment the delay counter on the task.
capturedCommand.get().run(); capturedCommand.get().run();
@ -501,7 +488,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
CountDownLatch successLatch = new CountDownLatch(1); CountDownLatch successLatch = new CountDownLatch(1);
DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff() { DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff() {
@Override @Override
void startNextScroll(int lastBatchSize) { void startNextScroll(TimeValue lastBatchStartTime, int lastBatchSize) {
successLatch.countDown(); successLatch.countDown();
} }
}; };
@ -509,7 +496,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
for (int i = 0; i < size + 1; i++) { for (int i = 0; i < size + 1; i++) {
request.add(new IndexRequest("index", "type", "id" + i)); request.add(new IndexRequest("index", "type", "id" + i));
} }
action.sendBulkRequest(request); action.sendBulkRequest(timeValueNanos(System.nanoTime()), request);
if (failWithRejection) { if (failWithRejection) {
BulkIndexByScrollResponse response = listener.get(); BulkIndexByScrollResponse response = listener.get();
assertThat(response.getIndexingFailures(), hasSize(1)); assertThat(response.getIndexingFailures(), hasSize(1));
@ -576,22 +563,23 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
public void testCancelBeforeScrollResponse() throws Exception { public void testCancelBeforeScrollResponse() throws Exception {
// We bail so early we don't need to pass in a half way valid response. // We bail so early we don't need to pass in a half way valid response.
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.onScrollResponse(timeValueSeconds(0), null)); cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.onScrollResponse(timeValueNanos(System.nanoTime()), 1,
null));
} }
public void testCancelBeforeSendBulkRequest() throws Exception { public void testCancelBeforeSendBulkRequest() throws Exception {
// We bail so early we don't need to pass in a half way valid request. // We bail so early we don't need to pass in a half way valid request.
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.sendBulkRequest(null)); cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.sendBulkRequest(timeValueNanos(System.nanoTime()), null));
} }
public void testCancelBeforeOnBulkResponse() throws Exception { public void testCancelBeforeOnBulkResponse() throws Exception {
// We bail so early we don't need to pass in a half way valid response. // We bail so early we don't need to pass in a half way valid response.
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) ->
action.onBulkResponse(new BulkResponse(new BulkItemResponse[0], 0))); action.onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(new BulkItemResponse[0], 0)));
} }
public void testCancelBeforeStartNextScroll() throws Exception { public void testCancelBeforeStartNextScroll() throws Exception {
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNextScroll(0)); cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNextScroll(timeValueNanos(System.nanoTime()), 0));
} }
public void testCancelBeforeStartNormalTermination() throws Exception { public void testCancelBeforeStartNormalTermination() throws Exception {
@ -640,7 +628,9 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
InternalSearchHits hits = new InternalSearchHits(null, total, 0); InternalSearchHits hits = new InternalSearchHits(null, total, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
// Use a long delay here so the test will time out if the cancellation doesn't reschedule the throttled task // Use a long delay here so the test will time out if the cancellation doesn't reschedule the throttled task
action.onScrollResponse(timeValueMinutes(10), new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); SearchResponse scrollResponse = new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null);
testTask.rethrottle(1);
action.onScrollResponse(timeValueNanos(System.nanoTime()), 1000, scrollResponse);
// Now that we've got our cancel we'll just verify that it all came through all right // Now that we've got our cancel we'll just verify that it all came through all right
assertEquals(reason, listener.get(10, TimeUnit.SECONDS).getReasonCancelled()); assertEquals(reason, listener.get(10, TimeUnit.SECONDS).getReasonCancelled());

View File

@ -42,8 +42,10 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -165,10 +167,11 @@ public class BulkByScrollTaskTests extends ESTestCase {
* We never end up waiting this long because the test rethrottles over and over again, ratcheting down the delay a random amount * We never end up waiting this long because the test rethrottles over and over again, ratcheting down the delay a random amount
* each time. * each time.
*/ */
float originalRequestsPerSecond = (float) randomDoubleBetween(0, 10000, true); float originalRequestsPerSecond = (float) randomDoubleBetween(1, 10000, true);
task.rethrottle(originalRequestsPerSecond); task.rethrottle(originalRequestsPerSecond);
TimeValue maxDelay = timeValueSeconds(between(1, 5)); TimeValue maxDelay = timeValueSeconds(between(1, 5));
assertThat(maxDelay.nanos(), greaterThanOrEqualTo(0L)); assertThat(maxDelay.nanos(), greaterThanOrEqualTo(0L));
int batchSizeForMaxDelay = (int) (maxDelay.seconds() * originalRequestsPerSecond);
ThreadPool threadPool = new TestThreadPool(getTestName()) { ThreadPool threadPool = new TestThreadPool(getTestName()) {
@Override @Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) { public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
@ -177,7 +180,7 @@ public class BulkByScrollTaskTests extends ESTestCase {
} }
}; };
try { try {
task.delayPrepareBulkRequest(threadPool, maxDelay, new AbstractRunnable() { task.delayPrepareBulkRequest(threadPool, timeValueNanos(System.nanoTime()), batchSizeForMaxDelay, new AbstractRunnable() {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
boolean oldValue = done.getAndSet(true); boolean oldValue = done.getAndSet(true);
@ -263,7 +266,7 @@ public class BulkByScrollTaskTests extends ESTestCase {
}; };
try { try {
// Have the task use the thread pool to delay a task that does nothing // Have the task use the thread pool to delay a task that does nothing
task.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), new AbstractRunnable() { task.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), 1, new AbstractRunnable() {
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
} }
@ -284,4 +287,14 @@ public class BulkByScrollTaskTests extends ESTestCase {
task.getStatus().toXContent(builder, ToXContent.EMPTY_PARAMS); task.getStatus().toXContent(builder, ToXContent.EMPTY_PARAMS);
assertThat(builder.string(), containsString("\"requests_per_second\":\"unlimited\"")); assertThat(builder.string(), containsString("\"requests_per_second\":\"unlimited\""));
} }
public void testPerfectlyThrottledBatchTime() {
task.rethrottle(Float.POSITIVE_INFINITY);
assertThat((double) task.perfectlyThrottledBatchTime(randomInt()), closeTo(0f, 0f));
int total = between(0, 1000000);
task.rethrottle(1);
assertThat((double) task.perfectlyThrottledBatchTime(total),
closeTo(TimeUnit.SECONDS.toNanos(total), TimeUnit.SECONDS.toNanos(1)));
}
} }