Reindex should retry on search failures

This uses the same backoff policy we use for bulk and just retries until
the request isn't rejected.

Instead of `{"retries": 12}` in the response to count retries this now
looks like `{"retries": {"bulk": 12", "search": 1}`.

Closes #18059
This commit is contained in:
Nik Everett 2016-05-12 16:07:46 -04:00
parent 584be0b3f8
commit fe4823eae0
22 changed files with 518 additions and 109 deletions

View File

@ -33,7 +33,10 @@ That will return something like this:
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"retries": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0,
"requests_per_second": "unlimited",
"throttled_until_millis": 0,
@ -386,7 +389,10 @@ The JSON response looks like this:
"created": 123,
"batches": 1,
"version_conflicts": 2,
"retries": 0,
"retries": {
"bulk": 0,
"search": 0
}
"throttled_millis": 0,
"failures" : [ ]
}
@ -414,7 +420,8 @@ The number of version conflicts that reindex hit.
`retries`::
The number of retries that the reindex did in response to a full queue.
The number of retries attempted by reindex. `bulk` is the number of bulk
actions retried and `search` is the number of search actions retried.
`throttled_millis`::
@ -468,7 +475,10 @@ The responses looks like:
"batches" : 4,
"version_conflicts" : 0,
"noops" : 0,
"retries": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0
},
"description" : ""

View File

@ -26,7 +26,10 @@ That will return something like this:
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"retries": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0,
"requests_per_second": "unlimited",
"throttled_until_millis": 0,
@ -220,7 +223,10 @@ The JSON response looks like this:
"updated": 0,
"batches": 1,
"version_conflicts": 2,
"retries": 0,
"retries": {
"bulk": 0,
"search": 0
}
"throttled_millis": 0,
"failures" : [ ]
}
@ -244,7 +250,8 @@ The number of version conflicts that the update by query hit.
`retries`::
The number of retries that the update by query did in response to a full queue.
The number of retries attempted by update-by-query. `bulk` is the number of bulk
actions retried and `search` is the number of search actions retried.
`throttled_millis`::
@ -299,7 +306,10 @@ The responses looks like:
"batches" : 4,
"version_conflicts" : 0,
"noops" : 0,
"retries": 0,
"retries": {
"bulk": 0,
"search": 0
}
"throttled_millis": 0
},
"description" : ""

View File

@ -34,6 +34,7 @@ way to do this is to upgrade to Elasticsearch 2.3 or later and to use the
* <<breaking_50_percolator>>
* <<breaking_50_suggester>>
* <<breaking_50_index_apis>>
* <<breaking_50_document_api_changes>>
* <<breaking_50_settings_changes>>
* <<breaking_50_allocation>>
* <<breaking_50_http_changes>>

View File

@ -0,0 +1,33 @@
[[breaking_50_document_api_changes]]
=== Document API changes
==== Reindex and Update By Query
Before 5.0.0 `_reindex` and `_update_by_query` only retried bulk failures so
they used the following response format:
[source,js]
----------------------
{
...
"retries": 10
...
}
----------------------
Where `retries` counts the number of bulk retries. Now they retry on search
failures as well and use this response format:
[source,js]
----------------------
{
...
"retries": {
"bulk": 10,
"search": 1
}
...
}
----------------------
Where `bulk` counts the number of bulk retries and `search` counts the number
of search retries.

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@ -57,6 +58,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static java.lang.Math.max;
import static java.lang.Math.min;
@ -91,7 +93,8 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
private final ThreadPool threadPool;
private final SearchRequest firstSearchRequest;
private final ActionListener<Response> listener;
private final Retry retry;
private final BackoffPolicy backoffPolicy;
private final Retry bulkRetry;
public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, ESLogger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest, ActionListener<Response> listener) {
@ -102,7 +105,8 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
this.mainRequest = mainRequest;
this.firstSearchRequest = firstSearchRequest;
this.listener = listener;
retry = Retry.on(EsRejectedExecutionException.class).policy(wrapBackoffPolicy(backoffPolicy()));
backoffPolicy = buildBackoffPolicy();
bulkRetry = Retry.on(EsRejectedExecutionException.class).policy(wrapBackoffPolicy(backoffPolicy));
}
protected abstract BulkRequest buildBulk(Iterable<SearchHit> docs);
@ -131,21 +135,14 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
firstSearchRequest.types() == null || firstSearchRequest.types().length == 0 ? ""
: firstSearchRequest.types());
}
client.search(firstSearchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse response) {
logger.debug("[{}] documents match query", response.getHits().getTotalHits());
onScrollResponse(timeValueSeconds(0), response);
}
@Override
public void onFailure(Throwable e) {
finishHim(e);
}
});
} catch (Throwable t) {
finishHim(t);
return;
}
searchWithRetry(listener -> client.search(firstSearchRequest, listener), (SearchResponse response) -> {
logger.debug("[{}] documents match query", response.getHits().getTotalHits());
onScrollResponse(timeValueSeconds(0), response);
});
}
/**
@ -239,7 +236,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
finishHim(null);
return;
}
retry.withAsyncBackoff(client, request, new ActionListener<BulkResponse>() {
bulkRetry.withAsyncBackoff(client, request, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
onBulkResponse(response);
@ -322,16 +319,8 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
SearchScrollRequest request = new SearchScrollRequest();
// Add the wait time into the scroll timeout so it won't timeout while we wait for throttling
request.scrollId(scroll.get()).scroll(timeValueNanos(firstSearchRequest.scroll().keepAlive().nanos() + waitTime));
client.searchScroll(request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse response) {
onScrollResponse(timeValueNanos(max(0, earliestNextBatchStartTime - System.nanoTime())), response);
}
@Override
public void onFailure(Throwable e) {
finishHim(e);
}
searchWithRetry(listener -> client.searchScroll(request, listener), (SearchResponse response) -> {
onScrollResponse(timeValueNanos(max(0, earliestNextBatchStartTime - System.nanoTime())), response);
});
}
@ -434,9 +423,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
}
/**
* Build the backoff policy for use with retries.
* Get the backoff policy for use with retries.
*/
BackoffPolicy backoffPolicy() {
BackoffPolicy buildBackoffPolicy() {
return exponentialBackoff(mainRequest.getRetryBackoffInitialTime(), mainRequest.getMaxRetries());
}
@ -470,7 +459,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
}
/**
* Wraps a backoffPolicy in another policy that counts the number of backoffs acquired.
* Wraps a backoffPolicy in another policy that counts the number of backoffs acquired. Used to count bulk backoffs.
*/
private BackoffPolicy wrapBackoffPolicy(BackoffPolicy backoffPolicy) {
return new BackoffPolicy() {
@ -488,11 +477,52 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
if (false == delegate.hasNext()) {
return null;
}
task.countRetry();
task.countBulkRetry();
return delegate.next();
}
};
}
};
}
/**
* Run a search action and call onResponse when a the response comes in, retrying if the action fails with an exception caused by
* rejected execution.
*
* @param action consumes a listener and starts the action. The listener it consumes is rigged to retry on failure.
* @param onResponse consumes the response from the action
*/
private <T> void searchWithRetry(Consumer<ActionListener<T>> action, Consumer<T> onResponse) {
class RetryHelper extends AbstractRunnable implements ActionListener<T> {
private final Iterator<TimeValue> retries = backoffPolicy.iterator();
@Override
public void onResponse(T response) {
onResponse.accept(response);
}
@Override
protected void doRun() throws Exception {
action.accept(this);
}
@Override
public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrap(e, EsRejectedExecutionException.class) != null) {
if (retries.hasNext()) {
logger.trace("retrying rejected search", e);
threadPool.schedule(retries.next(), ThreadPool.Names.SAME, this);
task.countSearchRetry();
} else {
logger.warn("giving up on search because we retried {} times without success", e, retries);
finishHim(e);
}
} else {
logger.warn("giving up on search because it failed with a non-retryable exception", e);
finishHim(e);
}
}
}
new RetryHelper().run();
}
}

View File

@ -19,9 +19,6 @@
package org.elasticsearch.index.reindex;
import java.io.IOException;
import java.util.Arrays;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
@ -34,6 +31,9 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.Arrays;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;

View File

@ -107,6 +107,23 @@ public abstract class AbstractBulkByScrollRequestBuilder<
return self();
}
/**
* Initial delay after a rejection before retrying a bulk request. With the default maxRetries the total backoff for retrying rejections
* is about one minute per bulk request. Once the entire bulk request is successful the retry counter resets.
*/
public Self setRetryBackoffInitialTime(TimeValue retryBackoffInitialTime) {
request.setRetryBackoffInitialTime(retryBackoffInitialTime);
return self();
}
/**
* Total number of retries attempted for rejections. There is no way to ask for unlimited retries.
*/
public Self setMaxRetries(int maxRetries) {
request.setMaxRetries(maxRetries);
return self();
}
/**
* Set the throttle for this request in sub-requests per second. {@link Float#POSITIVE_INFINITY} means set no throttle and that is the
* default. Throttling is done between batches, as we start the next scroll requests. That way we can increase the scroll's timeout to

View File

@ -26,12 +26,11 @@ import org.elasticsearch.script.Script;
public abstract class AbstractBulkIndexByScrollRequestBuilder<
Request extends AbstractBulkIndexByScrollRequest<Request>,
Response extends BulkIndexByScrollResponse,
Self extends AbstractBulkIndexByScrollRequestBuilder<Request, Response, Self>>
extends AbstractBulkByScrollRequestBuilder<Request, Response, Self> {
Self extends AbstractBulkIndexByScrollRequestBuilder<Request, Self>>
extends AbstractBulkByScrollRequestBuilder<Request, BulkIndexByScrollResponse, Self> {
protected AbstractBulkIndexByScrollRequestBuilder(ElasticsearchClient client,
Action<Request, Response, Self> action, SearchRequestBuilder search, Request request) {
Action<Request, BulkIndexByScrollResponse, Self> action, SearchRequestBuilder search, Request request) {
super(client, action, search, request);
}

View File

@ -58,7 +58,8 @@ public class BulkByScrollTask extends CancellableTask {
private final AtomicLong noops = new AtomicLong(0);
private final AtomicInteger batch = new AtomicInteger(0);
private final AtomicLong versionConflicts = new AtomicLong(0);
private final AtomicLong retries = new AtomicLong(0);
private final AtomicLong bulkRetries = new AtomicLong(0);
private final AtomicLong searchRetries = new AtomicLong(0);
private final AtomicLong throttledNanos = new AtomicLong();
/**
* The number of requests per second to which to throttle the request that this task represents. The other variables are all AtomicXXX
@ -84,7 +85,8 @@ public class BulkByScrollTask extends CancellableTask {
@Override
public Status getStatus() {
return new Status(total.get(), updated.get(), created.get(), deleted.get(), batch.get(), versionConflicts.get(), noops.get(),
retries.get(), timeValueNanos(throttledNanos.get()), getRequestsPerSecond(), getReasonCancelled(), throttledUntil());
bulkRetries.get(), searchRetries.get(), timeValueNanos(throttledNanos.get()), getRequestsPerSecond(), getReasonCancelled(),
throttledUntil());
}
private TimeValue throttledUntil() {
@ -133,14 +135,16 @@ public class BulkByScrollTask extends CancellableTask {
private final int batches;
private final long versionConflicts;
private final long noops;
private final long retries;
private final long bulkRetries;
private final long searchRetries;
private final TimeValue throttled;
private final float requestsPerSecond;
private final String reasonCancelled;
private final TimeValue throttledUntil;
public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops, long retries,
TimeValue throttled, float requestsPerSecond, @Nullable String reasonCancelled, TimeValue throttledUntil) {
public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops,
long bulkRetries, long searchRetries, TimeValue throttled, float requestsPerSecond, @Nullable String reasonCancelled,
TimeValue throttledUntil) {
this.total = checkPositive(total, "total");
this.updated = checkPositive(updated, "updated");
this.created = checkPositive(created, "created");
@ -148,7 +152,8 @@ public class BulkByScrollTask extends CancellableTask {
this.batches = checkPositive(batches, "batches");
this.versionConflicts = checkPositive(versionConflicts, "versionConflicts");
this.noops = checkPositive(noops, "noops");
this.retries = checkPositive(retries, "retries");
this.bulkRetries = checkPositive(bulkRetries, "bulkRetries");
this.searchRetries = checkPositive(searchRetries, "searchRetries");
this.throttled = throttled;
this.requestsPerSecond = requestsPerSecond;
this.reasonCancelled = reasonCancelled;
@ -163,7 +168,8 @@ public class BulkByScrollTask extends CancellableTask {
batches = in.readVInt();
versionConflicts = in.readVLong();
noops = in.readVLong();
retries = in.readVLong();
bulkRetries = in.readVLong();
searchRetries = in.readVLong();
throttled = TimeValue.readTimeValue(in);
requestsPerSecond = in.readFloat();
reasonCancelled = in.readOptionalString();
@ -179,7 +185,8 @@ public class BulkByScrollTask extends CancellableTask {
out.writeVInt(batches);
out.writeVLong(versionConflicts);
out.writeVLong(noops);
out.writeVLong(retries);
out.writeVLong(bulkRetries);
out.writeVLong(searchRetries);
throttled.writeTo(out);
out.writeFloat(requestsPerSecond);
out.writeOptionalString(reasonCancelled);
@ -208,7 +215,11 @@ public class BulkByScrollTask extends CancellableTask {
builder.field("batches", batches);
builder.field("version_conflicts", versionConflicts);
builder.field("noops", noops);
builder.field("retries", retries);
builder.startObject("retries"); {
builder.field("bulk", bulkRetries);
builder.field("search", searchRetries);
}
builder.endObject();
builder.timeValueField("throttled_millis", "throttled", throttled);
builder.field("requests_per_second", requestsPerSecond == Float.POSITIVE_INFINITY ? "unlimited" : requestsPerSecond);
if (reasonCancelled != null) {
@ -233,7 +244,7 @@ public class BulkByScrollTask extends CancellableTask {
builder.append(",batches=").append(batches);
builder.append(",versionConflicts=").append(versionConflicts);
builder.append(",noops=").append(noops);
builder.append(",retries=").append(retries);
builder.append(",retries=").append(bulkRetries);
if (reasonCancelled != null) {
builder.append(",canceled=").append(reasonCancelled);
}
@ -296,10 +307,17 @@ public class BulkByScrollTask extends CancellableTask {
}
/**
* Number of retries that had to be attempted due to rejected executions.
* Number of retries that had to be attempted due to bulk actions being rejected.
*/
public long getRetries() {
return retries;
public long getBulkRetries() {
return bulkRetries;
}
/**
* Number of retries that had to be attempted due to search actions being rejected.
*/
public long getSearchRetries() {
return searchRetries;
}
/**
@ -373,8 +391,12 @@ public class BulkByScrollTask extends CancellableTask {
versionConflicts.incrementAndGet();
}
void countRetry() {
retries.incrementAndGet();
void countBulkRetry() {
bulkRetries.incrementAndGet();
}
void countSearchRetry() {
searchRetries.incrementAndGet();
}
float getRequestsPerSecond() {

View File

@ -98,6 +98,20 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
return status.getReasonCancelled();
}
/**
* The number of times that the request had retry bulk actions.
*/
public long getBulkRetries() {
return status.getBulkRetries();
}
/**
* The number of times that the request had retry search actions.
*/
public long getSearchRetries() {
return status.getSearchRetries();
}
/**
* All of the indexing failures. Version conflicts are only included if the request sets abortOnVersionConflict to true (the
* default).

View File

@ -19,10 +19,6 @@
package org.elasticsearch.index.reindex;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.IndicesRequest;
@ -32,8 +28,11 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import static java.util.Collections.unmodifiableList;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.index.VersionType.INTERNAL;

View File

@ -27,7 +27,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
public class ReindexRequestBuilder extends
AbstractBulkIndexByScrollRequestBuilder<ReindexRequest, BulkIndexByScrollResponse, ReindexRequestBuilder> {
AbstractBulkIndexByScrollRequestBuilder<ReindexRequest, ReindexRequestBuilder> {
private final IndexRequestBuilder destination;
public ReindexRequestBuilder(ElasticsearchClient client,

View File

@ -19,14 +19,14 @@
package org.elasticsearch.index.reindex;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import java.util.ArrayList;
import java.util.List;
import static java.util.Collections.unmodifiableList;
/**

View File

@ -25,7 +25,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
public class UpdateByQueryRequestBuilder extends
AbstractBulkIndexByScrollRequestBuilder<UpdateByQueryRequest, BulkIndexByScrollResponse, UpdateByQueryRequestBuilder> {
AbstractBulkIndexByScrollRequestBuilder<UpdateByQueryRequest, UpdateByQueryRequestBuilder> {
public UpdateByQueryRequestBuilder(ElasticsearchClient client,
Action<UpdateByQueryRequest, BulkIndexByScrollResponse, UpdateByQueryRequestBuilder> action) {

View File

@ -19,6 +19,8 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
@ -38,6 +40,8 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
@ -98,6 +102,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.emptyCollectionOf;
import static org.hamcrest.Matchers.equalTo;
@ -122,8 +127,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
public void setupForTest() {
client = new MyMockClient(new NoOpClient(getTestName()));
threadPool = new ThreadPool(getTestName());
testRequest = new DummyAbstractBulkByScrollRequest();
firstSearchRequest = new SearchRequest().scroll(timeValueSeconds(10));
firstSearchRequest = new SearchRequest();
testRequest = new DummyAbstractBulkByScrollRequest(firstSearchRequest);
listener = new PlainActionFuture<>();
scrollId = null;
taskManager = new TaskManager(Settings.EMPTY);
@ -150,10 +155,62 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
* random scroll id so it is checked instead.
*/
private String scrollId() {
scrollId = randomSimpleString(random(), 1, 1000); // Empty string's get special behavior we don't want
scrollId = randomSimpleString(random(), 1, 1000); // Empty strings get special behavior we don't want
return scrollId;
}
public void testStartRetriesOnRejectionAndSucceeds() throws Exception {
client.searchesToReject = randomIntBetween(0, testRequest.getMaxRetries() - 1);
DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
action.start();
assertBusy(() -> assertEquals(client.searchesToReject + 1, client.searchAttempts.get()));
if (listener.isDone()) {
Object result = listener.get();
fail("Expected listener not to be done but it was and had " + result);
}
assertNotNull("There should be a search attempt pending that we didn't reject", client.lastSearch.get());
assertEquals(client.searchesToReject, testTask.getStatus().getSearchRetries());
}
public void testStartRetriesOnRejectionButFailsOnTooManyRejections() throws Exception {
client.searchesToReject = testRequest.getMaxRetries() + randomIntBetween(1, 100);
DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
action.start();
assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.searchAttempts.get()));
assertBusy(() -> assertTrue(listener.isDone()));
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
assertThat(ExceptionsHelper.stackTrace(e), containsString(EsRejectedExecutionException.class.getSimpleName()));
assertNull("There shouldn't be a search attempt pending that we didn't reject", client.lastSearch.get());
assertEquals(testRequest.getMaxRetries(), testTask.getStatus().getSearchRetries());
}
public void testStartNextScrollRetriesOnRejectionAndSucceeds() throws Exception {
client.scrollsToReject = randomIntBetween(0, testRequest.getMaxRetries() - 1);
DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
action.setScroll(scrollId());
action.startNextScroll(0);
assertBusy(() -> assertEquals(client.scrollsToReject + 1, client.scrollAttempts.get()));
if (listener.isDone()) {
Object result = listener.get();
fail("Expected listener not to be done but it was and had " + result);
}
assertNotNull("There should be a scroll attempt pending that we didn't reject", client.lastScroll.get());
assertEquals(client.scrollsToReject, testTask.getStatus().getSearchRetries());
}
public void testStartNextScrollRetriesOnRejectionButFailsOnTooManyRejections() throws Exception {
client.scrollsToReject = testRequest.getMaxRetries() + randomIntBetween(1, 100);
DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
action.setScroll(scrollId());
action.startNextScroll(0);
assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.scrollAttempts.get()));
assertBusy(() -> assertTrue(listener.isDone()));
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
assertThat(ExceptionsHelper.stackTrace(e), containsString(EsRejectedExecutionException.class.getSimpleName()));
assertNull("There shouldn't be a scroll attempt pending that we didn't reject", client.lastScroll.get());
assertEquals(testRequest.getMaxRetries(), testTask.getStatus().getSearchRetries());
}
public void testScrollResponseSetsTotal() {
// Default is 0, meaning unstarted
assertEquals(0, testTask.getStatus().getTotal());
@ -354,8 +411,9 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
int bulksToTry = randomIntBetween(1, 10);
long retryAttempts = 0;
for (int i = 0; i < bulksToTry; i++) {
retryAttempts += retryTestCase(false);
assertEquals(retryAttempts, testTask.getStatus().getRetries());
bulkRetryTestCase(false);
retryAttempts += testRequest.getMaxRetries();
assertEquals(retryAttempts, testTask.getStatus().getBulkRetries());
}
}
@ -363,8 +421,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
* Mimicks bulk rejections. These should be retried but we fail anyway because we run out of retries.
*/
public void testBulkRejectionsRetryAndFailAnyway() throws Exception {
long retryAttempts = retryTestCase(true);
assertEquals(retryAttempts, testTask.getStatus().getRetries());
bulkRetryTestCase(true);
assertEquals(testRequest.getMaxRetries(), testTask.getStatus().getBulkRetries());
}
public void testPerfectlyThrottledBatchTime() {
@ -398,6 +456,9 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
action.setScroll(scrollId());
// Set the base for the scroll to wait - this is added to the figure we calculate below
firstSearchRequest.scroll(timeValueSeconds(10));
// We'd like to get about 1 request a second
testTask.rethrottle(1f);
// Make the last scroll look nearly instant
@ -405,7 +466,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
// The last batch had 100 documents
action.startNextScroll(100);
// So the next request is going to have to wait an extra 100 seconds or so (base was 10, so 110ish)
// So the next request is going to have to wait an extra 100 seconds or so (base was 10 seconds, so 110ish)
assertThat(client.lastScroll.get().request.scroll().keepAlive().seconds(), either(equalTo(110L)).or(equalTo(109L)));
// Now we can simulate a response and check the delay that we used for the task
@ -422,10 +483,14 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
assertEquals(capturedDelay.get(), testTask.getStatus().getThrottled());
}
private long retryTestCase(boolean failWithRejection) throws Exception {
/**
* Execute a bulk retry test case. The total number of failures is random and the number of retries attempted is set to
* testRequest.getMaxRetries and controled by the failWithRejection parameter.
*/
private void bulkRetryTestCase(boolean failWithRejection) throws Exception {
int totalFailures = randomIntBetween(1, testRequest.getMaxRetries());
int size = randomIntBetween(1, 100);
int retryAttempts = totalFailures - (failWithRejection ? 1 : 0);
testRequest.setMaxRetries(totalFailures - (failWithRejection ? 1 : 0));
client.bulksToReject = client.bulksAttempts.get() + totalFailures;
/*
@ -433,13 +498,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
* deal with it. We just wait for it to happen.
*/
CountDownLatch successLatch = new CountDownLatch(1);
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction() {
@Override
BackoffPolicy backoffPolicy() {
// Force a backoff time of 0 to prevent sleeping
return constantBackoff(timeValueMillis(0), retryAttempts);
}
DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff() {
@Override
void startNextScroll(int lastBatchSize) {
successLatch.countDown();
@ -459,14 +518,13 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
} else {
successLatch.await(10, TimeUnit.SECONDS);
}
return retryAttempts;
}
/**
* The default retry time matches what we say it is in the javadoc for the request.
*/
public void testDefaultRetryTimes() {
Iterator<TimeValue> policy = new DummyAbstractAsyncBulkByScrollAction().backoffPolicy().iterator();
Iterator<TimeValue> policy = new DummyAbstractAsyncBulkByScrollAction().buildBackoffPolicy().iterator();
long millis = 0;
while (policy.hasNext()) {
millis += policy.next().millis();
@ -625,7 +683,22 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
}
}
/**
* An extension to {@linkplain DummyAbstractAsyncBulkByScrollAction} that uses a 0 delaying backoff policy.
*/
private class DummyActionWithoutBackoff extends DummyAbstractAsyncBulkByScrollAction {
@Override
BackoffPolicy buildBackoffPolicy() {
// Force a backoff time of 0 to prevent sleeping
return constantBackoff(timeValueMillis(0), testRequest.getMaxRetries());
}
}
private static class DummyAbstractBulkByScrollRequest extends AbstractBulkByScrollRequest<DummyAbstractBulkByScrollRequest> {
public DummyAbstractBulkByScrollRequest(SearchRequest searchRequest) {
super(searchRequest);
}
@Override
protected DummyAbstractBulkByScrollRequest self() {
return this;
@ -635,11 +708,23 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
private class MyMockClient extends FilterClient {
private final List<String> scrollsCleared = new ArrayList<>();
private final AtomicInteger bulksAttempts = new AtomicInteger();
private final AtomicInteger searchAttempts = new AtomicInteger();
private final AtomicInteger scrollAttempts = new AtomicInteger();
private final AtomicReference<Map<String, String>> lastHeaders = new AtomicReference<>();
private final AtomicReference<RefreshRequest> lastRefreshRequest = new AtomicReference<>();
/**
* Last search attempt that wasn't rejected outright.
*/
private final AtomicReference<RequestAndListener<SearchRequest, SearchResponse>> lastSearch = new AtomicReference<>();
/**
* Last scroll attempt that wasn't rejected outright.
*/
private final AtomicReference<RequestAndListener<SearchScrollRequest, SearchResponse>> lastScroll = new AtomicReference<>();
private int bulksToReject = 0;
private int searchesToReject = 0;
private int scrollsToReject = 0;
public MyMockClient(Client in) {
super(in);
@ -661,7 +746,19 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
listener.onResponse(null);
return;
}
if (request instanceof SearchRequest) {
if (searchAttempts.incrementAndGet() <= searchesToReject) {
listener.onFailure(wrappedRejectedException());
return;
}
lastSearch.set(new RequestAndListener<>((SearchRequest) request, (ActionListener<SearchResponse>) listener));
return;
}
if (request instanceof SearchScrollRequest) {
if (scrollAttempts.incrementAndGet() <= scrollsToReject) {
listener.onFailure(wrappedRejectedException());
return;
}
lastScroll.set(new RequestAndListener<>((SearchScrollRequest) request, (ActionListener<SearchResponse>) listener));
return;
}
@ -715,6 +812,25 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
}
super.doExecute(action, request, listener);
}
private Throwable wrappedRejectedException() {
Exception e = new EsRejectedExecutionException();
int wraps = randomIntBetween(0, 4);
for (int i = 0; i < wraps; i++) {
switch (randomIntBetween(0, 2)) {
case 0:
e = new SearchPhaseExecutionException("test", "test failure", e, new ShardSearchFailure[0]);
continue;
case 1:
e = new ReduceSearchPhaseException("test", "test failure", e, new ShardSearchFailure[0]);
continue;
case 2:
e = new ElasticsearchException(e);
continue;
}
}
return e;
}
}
private static class RequestAndListener<Request extends ActionRequest<Request>, Response> {

View File

@ -127,25 +127,28 @@ public class BulkByScrollTaskTests extends ESTestCase {
}
public void testStatusHatesNegatives() {
expectThrows(IllegalArgumentException.class, status(-1, 0, 0, 0, 0, 0, 0, 0));
expectThrows(IllegalArgumentException.class, status(0, -1, 0, 0, 0, 0, 0, 0));
expectThrows(IllegalArgumentException.class, status(0, 0, -1, 0, 0, 0, 0, 0));
expectThrows(IllegalArgumentException.class, status(0, 0, 0, -1, 0, 0, 0, 0));
expectThrows(IllegalArgumentException.class, status(0, 0, 0, 0, -1, 0, 0, 0));
expectThrows(IllegalArgumentException.class, status(0, 0, 0, 0, 0, -1, 0, 0));
expectThrows(IllegalArgumentException.class, status(0, 0, 0, 0, 0, 0, -1, 0));
expectThrows(IllegalArgumentException.class, status(0, 0, 0, 0, 0, 0, 0, -1));
checkStatusNegatives(-1, 0, 0, 0, 0, 0, 0, 0, 0, "total");
checkStatusNegatives(0, -1, 0, 0, 0, 0, 0, 0, 0, "updated");
checkStatusNegatives(0, 0, -1, 0, 0, 0, 0, 0, 0, "created");
checkStatusNegatives(0, 0, 0, -1, 0, 0, 0, 0, 0, "deleted");
checkStatusNegatives(0, 0, 0, 0, -1, 0, 0, 0, 0, "batches");
checkStatusNegatives(0, 0, 0, 0, 0, -1, 0, 0, 0, "versionConflicts");
checkStatusNegatives(0, 0, 0, 0, 0, 0, -1, 0, 0, "noops");
checkStatusNegatives(0, 0, 0, 0, 0, 0, 0, -1, 0, "bulkRetries");
checkStatusNegatives(0, 0, 0, 0, 0, 0, 0, 0, -1, "searchRetries");
}
/**
* Build a task status with only some values. Used for testing negative values.
*/
private ThrowingRunnable status(long total, long updated, long created, long deleted, int batches, long versionConflicts,
long noops, long retries) {
private void checkStatusNegatives(long total, long updated, long created, long deleted, int batches, long versionConflicts,
long noops, long bulkRetries, long searchRetries, String fieldName) {
TimeValue throttle = parseTimeValue(randomPositiveTimeValue(), "test");
TimeValue throttledUntil = parseTimeValue(randomPositiveTimeValue(), "test");
return () -> new BulkByScrollTask.Status(-1, 0, 0, 0, 0, 0, 0, 0, throttle, 0f, null, throttledUntil);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(total, updated, created,
deleted, batches, versionConflicts, noops, bulkRetries, searchRetries, throttle, 0f, null, throttledUntil));
assertEquals(e.getMessage(), fieldName + " must be greater than 0 but was [-1]");
}
/**

View File

@ -118,7 +118,7 @@ public class BulkIndexByScrollResponseMatcher extends TypeSafeMatcher<BulkIndexB
@Override
public void describeTo(Description description) {
description.appendText("indexed matches ").appendDescriptionOf(updatedMatcher);
description.appendText("updated matches ").appendDescriptionOf(updatedMatcher);
description.appendText(" and created matches ").appendDescriptionOf(createdMatcher);
if (batchesMatcher != null) {
description.appendText(" and batches matches ").appendDescriptionOf(batchesMatcher);

View File

@ -61,16 +61,15 @@ public class CancelTestUtils {
private static final CyclicBarrier barrier = new CyclicBarrier(2);
public static <Request extends AbstractBulkIndexByScrollRequest<Request>,
Response extends BulkIndexByScrollResponse,
Builder extends AbstractBulkIndexByScrollRequestBuilder<Request, Response, Builder>>
Response testCancel(ESIntegTestCase test, Builder request, String actionToCancel) throws Exception {
Builder extends AbstractBulkIndexByScrollRequestBuilder<Request, Builder>>
BulkIndexByScrollResponse testCancel(ESIntegTestCase test, Builder request, String actionToCancel) throws Exception {
test.indexRandom(true, client().prepareIndex("source", "test", "1").setSource("foo", "a"),
client().prepareIndex("source", "test", "2").setSource("foo", "a"));
request.source("source").script(new Script("sticky", ScriptType.INLINE, "native", emptyMap()));
request.source().setSize(1);
ListenableActionFuture<Response> response = request.execute();
ListenableActionFuture<BulkIndexByScrollResponse> response = request.execute();
// Wait until the script is on the first document.
barrier.await(30, TimeUnit.SECONDS);

View File

@ -43,7 +43,7 @@ public class RethrottleTests extends ReindexTestCase {
testCase(updateByQuery().source("test"), UpdateByQueryAction.NAME);
}
private void testCase(AbstractBulkIndexByScrollRequestBuilder<?, ? extends BulkIndexByScrollResponse, ?> request, String actionName)
private void testCase(AbstractBulkIndexByScrollRequestBuilder<?, ?> request, String actionName)
throws Exception {
// Use a single shard so the reindex has to happen in multiple batches
client().admin().indices().prepareCreate("test").setSettings("index.number_of_shards", 1).get();

View File

@ -0,0 +1,155 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.Retry;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.MockSearchService;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.IntFunction;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.greaterThan;
/**
* Integration test for retry behavior. Useful because retrying relies on the way that the rest of Elasticsearch throws exceptions and unit
* tests won't verify that.
*/
public class RetryTests extends ReindexTestCase {
/**
* The number of concurrent requests to test.
*/
private static final int CONCURRENT = 12;
/**
* Enough docs that the requests will likely step on each other.
*/
private static final int DOC_COUNT = 200;
/**
* Lower the queue sizes to be small enough that both bulk and searches will time out and have to be retried.
*/
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal));
settings.put("threadpool.bulk.queue_size", 1);
settings.put("threadpool.bulk.size", 1);
settings.put("threadpool.search.queue_size", 1);
settings.put("threadpool.search.size", 1);
return settings.build();
}
/**
* Disable search context leak detection because we expect leaks when there is an {@link EsRejectedExecutionException} queueing the
* reduce phase.
*/
@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
List<Class<? extends Plugin>> mockPlugins = new ArrayList<>();
for (Class<? extends Plugin> plugin: super.getMockPlugins()) {
if (plugin.equals(MockSearchService.TestPlugin.class)) {
continue;
}
mockPlugins.add(plugin);
}
return mockPlugins;
}
public void testReindex() throws Exception {
setupSourceIndex("source");
testCase(true, i -> reindex().source("source").destination("dest" + i));
}
public void testUpdateByQuery() throws Exception {
for (int i = 0; i < CONCURRENT; i++) {
setupSourceIndex("source" + i);
}
testCase(false, i -> updateByQuery().source("source" + i));
}
private void testCase(boolean expectCreated, IntFunction<AbstractBulkIndexByScrollRequestBuilder<?, ?>> requestBuilder)
throws Exception {
List<ListenableActionFuture<BulkIndexByScrollResponse>> futures = new ArrayList<>(CONCURRENT);
for (int i = 0; i < CONCURRENT; i++) {
AbstractBulkIndexByScrollRequestBuilder<?, ?> request = requestBuilder.apply(i);
// Make sure we use more than one batch so we get the full reindex behavior
request.source().setSize(DOC_COUNT / randomIntBetween(2, 10));
// Use a low, random initial wait so we are unlikely collide with others retrying.
request.setRetryBackoffInitialTime(timeValueMillis(randomIntBetween(10, 300)));
futures.add(request.execute());
}
// Finish all the requests
List<BulkIndexByScrollResponse> responses = new ArrayList<>(CONCURRENT);
for (ListenableActionFuture<BulkIndexByScrollResponse> future : futures) {
responses.add(future.get());
}
// Now check them
long bulkRetries = 0;
long searchRetries = 0;
BulkIndexByScrollResponseMatcher matcher = matcher();
if (expectCreated) {
matcher.created(DOC_COUNT);
} else {
matcher.updated(DOC_COUNT);
}
for (BulkIndexByScrollResponse response : responses) {
assertThat(response, matcher);
bulkRetries += response.getBulkRetries();
searchRetries += response.getSearchRetries();
}
// We expect at least one retry or this test isn't very useful
assertThat(bulkRetries, greaterThan(0L));
assertThat(searchRetries, greaterThan(0L));
}
private void setupSourceIndex(String name) {
try {
// Build the test index with a single shard so we can be sure that a search request *can* complete with the one thread
assertAcked(client().admin().indices().prepareCreate(name).setSettings(
"index.number_of_shards", 1,
"index.number_of_replicas", 0).get());
waitForRelocation(ClusterHealthStatus.GREEN);
// Build the test data. Don't use indexRandom because that won't work consistently with such small thread pools.
BulkRequestBuilder bulk = client().prepareBulk();
for (int i = 0; i < DOC_COUNT; i++) {
bulk.add(client().prepareIndex(name, "test").setSource("foo", "bar " + i));
}
Retry retry = Retry.on(EsRejectedExecutionException.class).policy(BackoffPolicy.exponentialBackoff());
BulkResponse response = retry.withSyncBackoff(client(), bulk.request());
assertFalse(response.buildFailureMessage(), response.hasFailures());
refresh(name);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -138,7 +138,7 @@ public class RoundTripTests extends ESTestCase {
private BulkByScrollTask.Status randomStatus() {
return new BulkByScrollTask.Status(randomPositiveLong(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(),
randomInt(Integer.MAX_VALUE), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(),
randomInt(Integer.MAX_VALUE), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(),
parseTimeValue(randomPositiveTimeValue(), "test"), abs(random().nextFloat()),
random().nextBoolean() ? null : randomSimpleString(random()), parseTimeValue(randomPositiveTimeValue(), "test"));
}
@ -210,7 +210,8 @@ public class RoundTripTests extends ESTestCase {
assertEquals(expected.getBatches(), actual.getBatches());
assertEquals(expected.getVersionConflicts(), actual.getVersionConflicts());
assertEquals(expected.getNoops(), actual.getNoops());
assertEquals(expected.getRetries(), actual.getRetries());
assertEquals(expected.getBulkRetries(), actual.getBulkRetries());
assertEquals(expected.getSearchRetries(), actual.getSearchRetries());
assertEquals(expected.getThrottled(), actual.getThrottled());
assertEquals(expected.getRequestsPerSecond(), actual.getRequestsPerSecond(), 0f);
assertEquals(expected.getReasonCancelled(), actual.getReasonCancelled());

View File

@ -19,12 +19,12 @@
package org.elasticsearch.index.reindex;
import java.util.List;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.test.ESTestCase;
import java.util.List;
import static org.apache.lucene.util.TestUtil.randomSimpleString;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.hasSize;