Teach reindex to retry on rejection
And count it in the status too!
This commit is contained in:
parent
35307054ea
commit
4642111bac
|
@ -38,7 +38,7 @@ import java.util.function.Predicate;
|
|||
/**
|
||||
* Encapsulates synchronous and asynchronous retry logic.
|
||||
*/
|
||||
class Retry {
|
||||
public class Retry {
|
||||
private final Class<? extends Throwable> retryOnThrowable;
|
||||
|
||||
private BackoffPolicy backoffPolicy;
|
||||
|
|
|
@ -22,10 +22,12 @@ package org.elasticsearch.plugin.reindex;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.bulk.Retry;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.ClearScrollRequest;
|
||||
import org.elasticsearch.action.search.ClearScrollResponse;
|
||||
|
@ -39,6 +41,7 @@ import org.elasticsearch.common.logging.ESLogger;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
|
@ -46,6 +49,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -56,6 +60,7 @@ import static java.lang.Math.max;
|
|||
import static java.lang.Math.min;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.unmodifiableList;
|
||||
import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
|
||||
import static org.elasticsearch.plugin.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
|
||||
import static org.elasticsearch.rest.RestStatus.CONFLICT;
|
||||
|
@ -82,6 +87,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
|||
private final ThreadPool threadPool;
|
||||
private final SearchRequest firstSearchRequest;
|
||||
private final ActionListener<Response> listener;
|
||||
private final Retry retry;
|
||||
|
||||
public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, ESLogger logger, Client client, ThreadPool threadPool,
|
||||
Request mainRequest, SearchRequest firstSearchRequest, ActionListener<Response> listener) {
|
||||
|
@ -92,6 +98,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
|||
this.mainRequest = mainRequest;
|
||||
this.firstSearchRequest = firstSearchRequest;
|
||||
this.listener = listener;
|
||||
retry = Retry.on(EsRejectedExecutionException.class).policy(wrapBackoffPolicy(backoffPolicy()));
|
||||
}
|
||||
|
||||
protected abstract BulkRequest buildBulk(Iterable<SearchHit> docs);
|
||||
|
@ -151,52 +158,36 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
|||
threadPool.generic().execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
try {
|
||||
SearchHit[] docs = searchResponse.getHits().getHits();
|
||||
logger.debug("scroll returned [{}] documents with a scroll id of [{}]", docs.length, searchResponse.getScrollId());
|
||||
if (docs.length == 0) {
|
||||
startNormalTermination(emptyList(), emptyList());
|
||||
return;
|
||||
}
|
||||
task.countBatch();
|
||||
List<SearchHit> docsIterable = Arrays.asList(docs);
|
||||
if (mainRequest.getSize() != SIZE_ALL_MATCHES) {
|
||||
// Truncate the docs if we have more than the request size
|
||||
long remaining = max(0, mainRequest.getSize() - task.getSuccessfullyProcessed());
|
||||
if (remaining < docs.length) {
|
||||
docsIterable = docsIterable.subList(0, (int) remaining);
|
||||
}
|
||||
}
|
||||
BulkRequest request = buildBulk(docsIterable);
|
||||
if (request.requests().isEmpty()) {
|
||||
/*
|
||||
* If we noop-ed the entire batch then just skip to the next
|
||||
* batch or the BulkRequest would fail validation.
|
||||
*/
|
||||
startNextScrollRequest();
|
||||
return;
|
||||
}
|
||||
request.timeout(mainRequest.getTimeout());
|
||||
request.consistencyLevel(mainRequest.getConsistency());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("sending [{}] entry, [{}] bulk request", request.requests().size(),
|
||||
new ByteSizeValue(request.estimatedSizeInBytes()));
|
||||
}
|
||||
// NOCOMMIT handle rejections
|
||||
client.bulk(request, new ActionListener<BulkResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse response) {
|
||||
onBulkResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
finishHim(e);
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
finishHim(t);
|
||||
SearchHit[] docs = searchResponse.getHits().getHits();
|
||||
logger.debug("scroll returned [{}] documents with a scroll id of [{}]", docs.length, searchResponse.getScrollId());
|
||||
if (docs.length == 0) {
|
||||
startNormalTermination(emptyList(), emptyList());
|
||||
return;
|
||||
}
|
||||
task.countBatch();
|
||||
List<SearchHit> docsIterable = Arrays.asList(docs);
|
||||
if (mainRequest.getSize() != SIZE_ALL_MATCHES) {
|
||||
// Truncate the docs if we have more than the request size
|
||||
long remaining = max(0, mainRequest.getSize() - task.getSuccessfullyProcessed());
|
||||
if (remaining < docs.length) {
|
||||
docsIterable = docsIterable.subList(0, (int) remaining);
|
||||
}
|
||||
}
|
||||
BulkRequest request = buildBulk(docsIterable);
|
||||
if (request.requests().isEmpty()) {
|
||||
/*
|
||||
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
|
||||
*/
|
||||
startNextScrollRequest();
|
||||
return;
|
||||
}
|
||||
request.timeout(mainRequest.getTimeout());
|
||||
request.consistencyLevel(mainRequest.getConsistency());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("sending [{}] entry, [{}] bulk request", request.requests().size(),
|
||||
new ByteSizeValue(request.estimatedSizeInBytes()));
|
||||
}
|
||||
sendBulkRequest(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -206,6 +197,20 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
|||
});
|
||||
}
|
||||
|
||||
void sendBulkRequest(BulkRequest request) {
|
||||
retry.withAsyncBackoff(client, request, new ActionListener<BulkResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse response) {
|
||||
onBulkResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
finishHim(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void onBulkResponse(BulkResponse response) {
|
||||
try {
|
||||
List<Failure> failures = new ArrayList<Failure>();
|
||||
|
@ -342,4 +347,38 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
|||
listener.onFailure(failure);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the backoff policy for use with retries. Package private for testing.
|
||||
*/
|
||||
BackoffPolicy backoffPolicy() {
|
||||
return exponentialBackoff(mainRequest.getRetryBackoffInitialTime(), mainRequest.getMaxRetries());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a backoffPolicy in another policy that counts the number of backoffs acquired.
|
||||
*/
|
||||
private BackoffPolicy wrapBackoffPolicy(BackoffPolicy backoffPolicy) {
|
||||
return new BackoffPolicy() {
|
||||
@Override
|
||||
public Iterator<TimeValue> iterator() {
|
||||
return new Iterator<TimeValue>() {
|
||||
private final Iterator<TimeValue> delegate = backoffPolicy.iterator();
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return delegate.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue next() {
|
||||
if (false == delegate.hasNext()) {
|
||||
return null;
|
||||
}
|
||||
task.countRetry();
|
||||
return delegate.next();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,11 +34,13 @@ import java.io.IOException;
|
|||
import java.util.Arrays;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
|
||||
|
||||
public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScrollRequest<Self>>
|
||||
extends ActionRequest<Self> {
|
||||
public static final int SIZE_ALL_MATCHES = -1;
|
||||
private static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(5);
|
||||
private static final TimeValue DEFAULT_SCROLL_TIMEOUT = timeValueMinutes(5);
|
||||
private static final int DEFAULT_SCROLL_SIZE = 100;
|
||||
|
||||
/**
|
||||
|
@ -72,6 +74,17 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
|
|||
*/
|
||||
private WriteConsistencyLevel consistency = WriteConsistencyLevel.DEFAULT;
|
||||
|
||||
/**
|
||||
* Initial delay after a rejection before retrying a bulk request. With the default maxRetries the total backoff for retrying rejections
|
||||
* is about 25 seconds per bulk request. Once the entire bulk request is successful the retry counter resets.
|
||||
*/
|
||||
private TimeValue retryBackoffInitialTime = timeValueMillis(50);
|
||||
|
||||
/**
|
||||
* Total number of retries attempted for rejections. There is no way to ask for unlimited retries.
|
||||
*/
|
||||
private int maxRetries = 10;
|
||||
|
||||
public AbstractBulkByScrollRequest() {
|
||||
}
|
||||
|
||||
|
@ -97,6 +110,9 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
|
|||
if (source.source().from() != -1) {
|
||||
e = addValidationError("from is not supported in this context", e);
|
||||
}
|
||||
if (maxRetries < 0) {
|
||||
e = addValidationError("retries cannnot be negative", e);
|
||||
}
|
||||
if (false == (size == -1 || size > 0)) {
|
||||
e = addValidationError(
|
||||
"size should be greater than 0 if the request is limited to some number of documents or -1 if it isn't but it was ["
|
||||
|
@ -206,6 +222,36 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
|
|||
return self();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initial delay after a rejection before retrying request.
|
||||
*/
|
||||
public TimeValue getRetryBackoffInitialTime() {
|
||||
return retryBackoffInitialTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the initial delay after a rejection before retrying request.
|
||||
*/
|
||||
public Self setRetryBackoffInitialTime(TimeValue retryBackoffInitialTime) {
|
||||
this.retryBackoffInitialTime = retryBackoffInitialTime;
|
||||
return self();
|
||||
}
|
||||
|
||||
/**
|
||||
* Total number of retries attempted for rejections.
|
||||
*/
|
||||
public int getMaxRetries() {
|
||||
return maxRetries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the total number of retries attempted for rejections. There is no way to ask for unlimited retries.
|
||||
*/
|
||||
public Self setMaxRetries(int maxRetries) {
|
||||
this.maxRetries = maxRetries;
|
||||
return self();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Task createTask(long id, String type, String action) {
|
||||
return new BulkByScrollTask(id, type, action, this::getDescription);
|
||||
|
@ -221,6 +267,8 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
|
|||
refresh = in.readBoolean();
|
||||
timeout = TimeValue.readTimeValue(in);
|
||||
consistency = WriteConsistencyLevel.fromId(in.readByte());
|
||||
retryBackoffInitialTime = TimeValue.readTimeValue(in);
|
||||
maxRetries = in.readVInt();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -232,6 +280,8 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
|
|||
out.writeBoolean(refresh);
|
||||
timeout.writeTo(out);
|
||||
out.writeByte(consistency.id());
|
||||
retryBackoffInitialTime.writeTo(out);
|
||||
out.writeVInt(maxRetries);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -248,5 +298,4 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
|
|||
b.append(Arrays.toString(source.types()));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ public class BulkByScrollTask extends Task {
|
|||
private final AtomicLong noops = new AtomicLong(0);
|
||||
private final AtomicInteger batch = new AtomicInteger(0);
|
||||
private final AtomicLong versionConflicts = new AtomicLong(0);
|
||||
private final AtomicLong retries = new AtomicLong(0);
|
||||
|
||||
public BulkByScrollTask(long id, String type, String action, Provider<String> description) {
|
||||
super(id, type, action, description);
|
||||
|
@ -51,7 +52,8 @@ public class BulkByScrollTask extends Task {
|
|||
|
||||
@Override
|
||||
public Status getStatus() {
|
||||
return new Status(total.get(), updated.get(), created.get(), deleted.get(), batch.get(), versionConflicts.get(), noops.get());
|
||||
return new Status(total.get(), updated.get(), created.get(), deleted.get(), batch.get(), versionConflicts.get(), noops.get(),
|
||||
retries.get());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -62,7 +64,7 @@ public class BulkByScrollTask extends Task {
|
|||
}
|
||||
|
||||
public static class Status implements Task.Status {
|
||||
public static final Status PROTOTYPE = new Status(0, 0, 0, 0, 0, 0, 0);
|
||||
public static final Status PROTOTYPE = new Status(0, 0, 0, 0, 0, 0, 0, 0);
|
||||
|
||||
private final long total;
|
||||
private final long updated;
|
||||
|
@ -71,8 +73,9 @@ public class BulkByScrollTask extends Task {
|
|||
private final int batches;
|
||||
private final long versionConflicts;
|
||||
private final long noops;
|
||||
private final long retries;
|
||||
|
||||
public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops) {
|
||||
public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops, long retries) {
|
||||
this.total = checkPositive(total, "total");
|
||||
this.updated = checkPositive(updated, "updated");
|
||||
this.created = checkPositive(created, "created");
|
||||
|
@ -80,6 +83,7 @@ public class BulkByScrollTask extends Task {
|
|||
this.batches = checkPositive(batches, "batches");
|
||||
this.versionConflicts = checkPositive(versionConflicts, "versionConflicts");
|
||||
this.noops = checkPositive(noops, "noops");
|
||||
this.retries = checkPositive(retries, "retries");
|
||||
}
|
||||
|
||||
public Status(StreamInput in) throws IOException {
|
||||
|
@ -90,6 +94,7 @@ public class BulkByScrollTask extends Task {
|
|||
batches = in.readVInt();
|
||||
versionConflicts = in.readVLong();
|
||||
noops = in.readVLong();
|
||||
retries = in.readVLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -101,6 +106,7 @@ public class BulkByScrollTask extends Task {
|
|||
out.writeVInt(batches);
|
||||
out.writeVLong(versionConflicts);
|
||||
out.writeVLong(noops);
|
||||
out.writeVLong(retries);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -123,6 +129,7 @@ public class BulkByScrollTask extends Task {
|
|||
builder.field("batches", batches);
|
||||
builder.field("version_conflicts", versionConflicts);
|
||||
builder.field("noops", noops);
|
||||
builder.field("retries", retries);
|
||||
return builder;
|
||||
}
|
||||
|
||||
|
@ -145,6 +152,7 @@ public class BulkByScrollTask extends Task {
|
|||
builder.append(",batches=").append(batches);
|
||||
builder.append(",versionConflicts=").append(versionConflicts);
|
||||
builder.append(",noops=").append(noops);
|
||||
builder.append(",retries=").append(retries);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -207,6 +215,13 @@ public class BulkByScrollTask extends Task {
|
|||
return noops;
|
||||
}
|
||||
|
||||
/**
|
||||
* Number of retries that had to be attempted due to rejected executions.
|
||||
*/
|
||||
public long getRetries() {
|
||||
return retries;
|
||||
}
|
||||
|
||||
private int checkPositive(int value, String name) {
|
||||
if (value < 0) {
|
||||
throw new IllegalArgumentException(name + " must be greater than 0 but was [" + value + "]");
|
||||
|
@ -249,4 +264,8 @@ public class BulkByScrollTask extends Task {
|
|||
void countVersionConflict() {
|
||||
versionConflicts.incrementAndGet();
|
||||
}
|
||||
|
||||
void countRetry() {
|
||||
retries.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,10 +24,15 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.ClearScrollRequest;
|
||||
import org.elasticsearch.action.search.ClearScrollResponse;
|
||||
|
@ -35,6 +40,9 @@ import org.elasticsearch.action.search.SearchRequest;
|
|||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.replication.ReplicationRequest;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.FilterClient;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
|
@ -44,6 +52,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.internal.InternalSearchHit;
|
||||
import org.elasticsearch.search.internal.InternalSearchHits;
|
||||
|
@ -55,18 +64,26 @@ import org.junit.After;
|
|||
import org.junit.Before;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.apache.lucene.util.TestUtil.randomSimpleString;
|
||||
import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.emptyCollectionOf;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.iterableWithSize;
|
||||
|
||||
public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
private MockClearScrollClient client;
|
||||
private MyMockClient client;
|
||||
private ThreadPool threadPool;
|
||||
private DummyAbstractBulkByScrollRequest mainRequest;
|
||||
private SearchRequest firstSearchRequest;
|
||||
|
@ -76,7 +93,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|||
|
||||
@Before
|
||||
public void setupForTest() {
|
||||
client = new MockClearScrollClient(new NoOpClient(getTestName()));
|
||||
client = new MyMockClient(new NoOpClient(getTestName()));
|
||||
threadPool = new ThreadPool(getTestName());
|
||||
mainRequest = new DummyAbstractBulkByScrollRequest();
|
||||
firstSearchRequest = null;
|
||||
|
@ -147,7 +164,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|||
long updated = 0;
|
||||
long deleted = 0;
|
||||
for (int batches = 0; batches < maxBatches; batches++) {
|
||||
BulkItemResponse[] responses = new BulkItemResponse[randomIntBetween(0, 10000)];
|
||||
BulkItemResponse[] responses = new BulkItemResponse[randomIntBetween(0, 100)];
|
||||
for (int i = 0; i < responses.length; i++) {
|
||||
ShardId shardId = new ShardId(new Index("name", "uid"), 0);
|
||||
String opType;
|
||||
|
@ -244,6 +261,105 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|||
assertThat(response.getSearchFailures(), emptyCollectionOf(ShardSearchFailure.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* Mimicks script failures or general wrongness by implementers.
|
||||
*/
|
||||
public void testListenerReceiveBuildBulkExceptions() throws Exception {
|
||||
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction() {
|
||||
@Override
|
||||
protected BulkRequest buildBulk(Iterable<SearchHit> docs) {
|
||||
throw new RuntimeException("surprise");
|
||||
}
|
||||
};
|
||||
InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap());
|
||||
InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] {hit}, 0, 0);
|
||||
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
|
||||
action.onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
|
||||
try {
|
||||
listener.get();
|
||||
fail("Expected failure.");
|
||||
} catch (ExecutionException e) {
|
||||
assertThat(e.getCause(), instanceOf(RuntimeException.class));
|
||||
assertThat(e.getCause().getMessage(), equalTo("surprise"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mimicks bulk rejections. These should be retried and eventually succeed.
|
||||
*/
|
||||
public void testBulkRejectionsRetryWithEnoughRetries() throws Exception {
|
||||
int bulksToTry = randomIntBetween(1, 10);
|
||||
long retryAttempts = 0;
|
||||
for (int i = 0; i < bulksToTry; i++) {
|
||||
retryAttempts += retryTestCase(false);
|
||||
assertEquals(retryAttempts, task.getStatus().getRetries());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mimicks bulk rejections. These should be retried but we fail anyway because we run out of retries.
|
||||
*/
|
||||
public void testBulkRejectionsRetryAndFailAnyway() throws Exception {
|
||||
long retryAttempts = retryTestCase(true);
|
||||
assertEquals(retryAttempts, task.getStatus().getRetries());
|
||||
}
|
||||
|
||||
private long retryTestCase(boolean failWithRejection) throws Exception {
|
||||
int totalFailures = randomIntBetween(1, mainRequest.getMaxRetries());
|
||||
int size = randomIntBetween(1, 100);
|
||||
int retryAttempts = totalFailures - (failWithRejection ? 1 : 0);
|
||||
|
||||
client.bulksToReject = client.bulksAttempts.get() + totalFailures;
|
||||
/*
|
||||
* When we get a successful bulk response we usually start the next scroll request but lets just intercept that so we don't have to
|
||||
* deal with it. We just wait for it to happen.
|
||||
*/
|
||||
CountDownLatch successLatch = new CountDownLatch(1);
|
||||
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction() {
|
||||
@Override
|
||||
BackoffPolicy backoffPolicy() {
|
||||
// Force a backoff time of 0 to prevent sleeping
|
||||
return constantBackoff(timeValueMillis(0), retryAttempts);
|
||||
}
|
||||
|
||||
@Override
|
||||
void startNextScrollRequest() {
|
||||
successLatch.countDown();
|
||||
}
|
||||
};
|
||||
BulkRequest request = new BulkRequest();
|
||||
for (int i = 0; i < size + 1; i++) {
|
||||
request.add(new IndexRequest("index", "type", "id" + i));
|
||||
}
|
||||
action.sendBulkRequest(request);
|
||||
if (failWithRejection) {
|
||||
BulkIndexByScrollResponse response = listener.get();
|
||||
assertThat(response.getIndexingFailures(), iterableWithSize(equalTo(1)));
|
||||
assertEquals(response.getIndexingFailures().get(0).getStatus(), RestStatus.TOO_MANY_REQUESTS);
|
||||
assertThat(response.getSearchFailures(), emptyCollectionOf(ShardSearchFailure.class));
|
||||
} else {
|
||||
successLatch.await(10, TimeUnit.SECONDS);
|
||||
}
|
||||
return retryAttempts;
|
||||
}
|
||||
|
||||
/**
|
||||
* The default retry time matches what we say it is in the javadoc for the request.
|
||||
*/
|
||||
public void testDefaultRetryTimes() {
|
||||
Iterator<TimeValue> policy = new DummyAbstractAsyncBulkByScrollAction().backoffPolicy().iterator();
|
||||
long millis = 0;
|
||||
while (policy.hasNext()) {
|
||||
millis += policy.next().millis();
|
||||
}
|
||||
/*
|
||||
* This is the total number of milliseconds that a reindex made with the default settings will backoff before attempting one final
|
||||
* time. If that request is rejected then the whole process fails with a rejected exception.
|
||||
*/
|
||||
int defaultBackoffBeforeFailing = 24670;
|
||||
assertEquals(defaultBackoffBeforeFailing, millis);
|
||||
}
|
||||
|
||||
private class DummyAbstractAsyncBulkByScrollAction
|
||||
extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest, BulkIndexByScrollResponse> {
|
||||
public DummyAbstractAsyncBulkByScrollAction() {
|
||||
|
@ -270,10 +386,13 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private static class MockClearScrollClient extends FilterClient {
|
||||
private List<String> scrollsCleared = new ArrayList<>();
|
||||
private static class MyMockClient extends FilterClient {
|
||||
private final List<String> scrollsCleared = new ArrayList<>();
|
||||
private final AtomicInteger bulksAttempts = new AtomicInteger();
|
||||
|
||||
public MockClearScrollClient(Client in) {
|
||||
private int bulksToReject = 0;
|
||||
|
||||
public MyMockClient(Client in) {
|
||||
super(in);
|
||||
}
|
||||
|
||||
|
@ -288,6 +407,48 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|||
listener.onResponse((Response) new ClearScrollResponse(true, clearScroll.getScrollIds().size()));
|
||||
return;
|
||||
}
|
||||
if (request instanceof BulkRequest) {
|
||||
BulkRequest bulk = (BulkRequest) request;
|
||||
int toReject;
|
||||
if (bulksAttempts.incrementAndGet() > bulksToReject) {
|
||||
toReject = -1;
|
||||
} else {
|
||||
toReject = randomIntBetween(0, bulk.requests().size() - 1);
|
||||
}
|
||||
BulkItemResponse[] responses = new BulkItemResponse[bulk.requests().size()];
|
||||
for (int i = 0; i < bulk.requests().size(); i++) {
|
||||
ActionRequest<?> item = bulk.requests().get(i);
|
||||
String opType;
|
||||
DocWriteResponse response;
|
||||
ShardId shardId = new ShardId(new Index(((ReplicationRequest<?>) item).index(), "uuid"), 0);
|
||||
if (item instanceof IndexRequest) {
|
||||
IndexRequest index = (IndexRequest) item;
|
||||
opType = index.opType().lowercase();
|
||||
response = new IndexResponse(shardId, index.type(), index.id(), randomIntBetween(0, Integer.MAX_VALUE),
|
||||
true);
|
||||
} else if (item instanceof UpdateRequest) {
|
||||
UpdateRequest update = (UpdateRequest) item;
|
||||
opType = "update";
|
||||
response = new UpdateResponse(shardId, update.type(), update.id(),
|
||||
randomIntBetween(0, Integer.MAX_VALUE), true);
|
||||
} else if (item instanceof DeleteRequest) {
|
||||
DeleteRequest delete = (DeleteRequest) item;
|
||||
opType = "delete";
|
||||
response = new DeleteResponse(shardId, delete.type(), delete.id(), randomIntBetween(0, Integer.MAX_VALUE),
|
||||
true);
|
||||
} else {
|
||||
throw new RuntimeException("Unknown request: " + item);
|
||||
}
|
||||
if (i == toReject) {
|
||||
responses[i] = new BulkItemResponse(i, opType,
|
||||
new Failure(response.getIndex(), response.getType(), response.getId(), new EsRejectedExecutionException()));
|
||||
} else {
|
||||
responses[i] = new BulkItemResponse(i, opType, response);
|
||||
}
|
||||
}
|
||||
listener.onResponse((Response) new BulkResponse(responses, 1));
|
||||
return;
|
||||
}
|
||||
super.doExecute(action, request, listener);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,12 +101,13 @@ public class BulkByScrollTaskTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testStatusHatesNegatives() {
|
||||
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(-1, 0, 0, 0, 0, 0, 0));
|
||||
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, -1, 0, 0, 0, 0, 0));
|
||||
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, -1, 0, 0, 0, 0));
|
||||
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, -1, 0, 0, 0));
|
||||
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, -1, 0, 0));
|
||||
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, -1, 0));
|
||||
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, -1));
|
||||
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(-1, 0, 0, 0, 0, 0, 0, 0));
|
||||
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, -1, 0, 0, 0, 0, 0, 0));
|
||||
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, -1, 0, 0, 0, 0, 0));
|
||||
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, -1, 0, 0, 0, 0));
|
||||
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, -1, 0, 0, 0));
|
||||
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, -1, 0, 0));
|
||||
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, -1, 0));
|
||||
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, 0, -1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,6 +88,8 @@ public class RoundTripTests extends ESTestCase {
|
|||
assertEquals(request.getTimeout(), tripped.getTimeout());
|
||||
assertEquals(request.getConsistency(), tripped.getConsistency());
|
||||
assertEquals(request.getScript(), tripped.getScript());
|
||||
assertEquals(request.getRetryBackoffInitialTime(), tripped.getRetryBackoffInitialTime());
|
||||
assertEquals(request.getMaxRetries(), tripped.getMaxRetries());
|
||||
}
|
||||
|
||||
public void testBulkByTaskStatus() throws IOException {
|
||||
|
@ -116,7 +118,7 @@ public class RoundTripTests extends ESTestCase {
|
|||
|
||||
private BulkByScrollTask.Status randomStatus() {
|
||||
return new BulkByScrollTask.Status(randomPositiveLong(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(),
|
||||
randomPositiveInt(), randomPositiveLong(), randomPositiveLong());
|
||||
randomPositiveInt(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong());
|
||||
}
|
||||
|
||||
private List<Failure> randomIndexingFailures() {
|
||||
|
@ -185,8 +187,11 @@ public class RoundTripTests extends ESTestCase {
|
|||
|
||||
private void assertTaskStatusEquals(BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) {
|
||||
assertEquals(expected.getUpdated(), actual.getUpdated());
|
||||
assertEquals(expected.getCreated(), actual.getCreated());
|
||||
assertEquals(expected.getDeleted(), actual.getDeleted());
|
||||
assertEquals(expected.getBatches(), actual.getBatches());
|
||||
assertEquals(expected.getVersionConflicts(), actual.getVersionConflicts());
|
||||
assertEquals(expected.getNoops(), actual.getNoops());
|
||||
assertEquals(expected.getRetries(), actual.getRetries());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue