diff --git a/core/src/main/java/org/elasticsearch/action/bulk/Retry.java b/core/src/main/java/org/elasticsearch/action/bulk/Retry.java index 72e0da71921..acaa784ac87 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/Retry.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/Retry.java @@ -38,7 +38,7 @@ import java.util.function.Predicate; /** * Encapsulates synchronous and asynchronous retry logic. */ -class Retry { +public class Retry { private final Class retryOnThrowable; private BackoffPolicy backoffPolicy; diff --git a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractAsyncBulkByScrollAction.java b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractAsyncBulkByScrollAction.java index 77d7e144372..fd67cb8618f 100644 --- a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractAsyncBulkByScrollAction.java +++ b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractAsyncBulkByScrollAction.java @@ -22,10 +22,12 @@ package org.elasticsearch.plugin.reindex; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.bulk.Retry; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollResponse; @@ -39,6 +41,7 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; @@ -46,6 +49,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -56,6 +60,7 @@ import static java.lang.Math.max; import static java.lang.Math.min; import static java.util.Collections.emptyList; import static java.util.Collections.unmodifiableList; +import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; import static org.elasticsearch.plugin.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES; import static org.elasticsearch.rest.RestStatus.CONFLICT; @@ -82,6 +87,7 @@ public abstract class AbstractAsyncBulkByScrollAction listener; + private final Retry retry; public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, ESLogger logger, Client client, ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest, ActionListener listener) { @@ -92,6 +98,7 @@ public abstract class AbstractAsyncBulkByScrollAction docs); @@ -151,52 +158,36 @@ public abstract class AbstractAsyncBulkByScrollAction docsIterable = Arrays.asList(docs); - if (mainRequest.getSize() != SIZE_ALL_MATCHES) { - // Truncate the docs if we have more than the request size - long remaining = max(0, mainRequest.getSize() - task.getSuccessfullyProcessed()); - if (remaining < docs.length) { - docsIterable = docsIterable.subList(0, (int) remaining); - } - } - BulkRequest request = buildBulk(docsIterable); - if (request.requests().isEmpty()) { - /* - * If we noop-ed the entire batch then just skip to the next - * batch or the BulkRequest would fail validation. - */ - startNextScrollRequest(); - return; - } - request.timeout(mainRequest.getTimeout()); - request.consistencyLevel(mainRequest.getConsistency()); - if (logger.isDebugEnabled()) { - logger.debug("sending [{}] entry, [{}] bulk request", request.requests().size(), - new ByteSizeValue(request.estimatedSizeInBytes())); - } - // NOCOMMIT handle rejections - client.bulk(request, new ActionListener() { - @Override - public void onResponse(BulkResponse response) { - onBulkResponse(response); - } - - @Override - public void onFailure(Throwable e) { - finishHim(e); - } - }); - } catch (Throwable t) { - finishHim(t); + SearchHit[] docs = searchResponse.getHits().getHits(); + logger.debug("scroll returned [{}] documents with a scroll id of [{}]", docs.length, searchResponse.getScrollId()); + if (docs.length == 0) { + startNormalTermination(emptyList(), emptyList()); + return; } + task.countBatch(); + List docsIterable = Arrays.asList(docs); + if (mainRequest.getSize() != SIZE_ALL_MATCHES) { + // Truncate the docs if we have more than the request size + long remaining = max(0, mainRequest.getSize() - task.getSuccessfullyProcessed()); + if (remaining < docs.length) { + docsIterable = docsIterable.subList(0, (int) remaining); + } + } + BulkRequest request = buildBulk(docsIterable); + if (request.requests().isEmpty()) { + /* + * If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation. + */ + startNextScrollRequest(); + return; + } + request.timeout(mainRequest.getTimeout()); + request.consistencyLevel(mainRequest.getConsistency()); + if (logger.isDebugEnabled()) { + logger.debug("sending [{}] entry, [{}] bulk request", request.requests().size(), + new ByteSizeValue(request.estimatedSizeInBytes())); + } + sendBulkRequest(request); } @Override @@ -206,6 +197,20 @@ public abstract class AbstractAsyncBulkByScrollAction() { + @Override + public void onResponse(BulkResponse response) { + onBulkResponse(response); + } + + @Override + public void onFailure(Throwable e) { + finishHim(e); + } + }); + } + void onBulkResponse(BulkResponse response) { try { List failures = new ArrayList(); @@ -342,4 +347,38 @@ public abstract class AbstractAsyncBulkByScrollAction iterator() { + return new Iterator() { + private final Iterator delegate = backoffPolicy.iterator(); + @Override + public boolean hasNext() { + return delegate.hasNext(); + } + + @Override + public TimeValue next() { + if (false == delegate.hasNext()) { + return null; + } + task.countRetry(); + return delegate.next(); + } + }; + } + }; + } } diff --git a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractBulkByScrollRequest.java b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractBulkByScrollRequest.java index fd7b32f2cb5..f1e6198170e 100644 --- a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractBulkByScrollRequest.java +++ b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractBulkByScrollRequest.java @@ -34,11 +34,13 @@ import java.io.IOException; import java.util.Arrays; import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; +import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes; public abstract class AbstractBulkByScrollRequest> extends ActionRequest { public static final int SIZE_ALL_MATCHES = -1; - private static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(5); + private static final TimeValue DEFAULT_SCROLL_TIMEOUT = timeValueMinutes(5); private static final int DEFAULT_SCROLL_SIZE = 100; /** @@ -72,6 +74,17 @@ public abstract class AbstractBulkByScrollRequest 0)) { e = addValidationError( "size should be greater than 0 if the request is limited to some number of documents or -1 if it isn't but it was [" @@ -206,6 +222,36 @@ public abstract class AbstractBulkByScrollRequest description) { super(id, type, action, description); @@ -51,7 +52,8 @@ public class BulkByScrollTask extends Task { @Override public Status getStatus() { - return new Status(total.get(), updated.get(), created.get(), deleted.get(), batch.get(), versionConflicts.get(), noops.get()); + return new Status(total.get(), updated.get(), created.get(), deleted.get(), batch.get(), versionConflicts.get(), noops.get(), + retries.get()); } /** @@ -62,7 +64,7 @@ public class BulkByScrollTask extends Task { } public static class Status implements Task.Status { - public static final Status PROTOTYPE = new Status(0, 0, 0, 0, 0, 0, 0); + public static final Status PROTOTYPE = new Status(0, 0, 0, 0, 0, 0, 0, 0); private final long total; private final long updated; @@ -71,8 +73,9 @@ public class BulkByScrollTask extends Task { private final int batches; private final long versionConflicts; private final long noops; + private final long retries; - public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops) { + public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops, long retries) { this.total = checkPositive(total, "total"); this.updated = checkPositive(updated, "updated"); this.created = checkPositive(created, "created"); @@ -80,6 +83,7 @@ public class BulkByScrollTask extends Task { this.batches = checkPositive(batches, "batches"); this.versionConflicts = checkPositive(versionConflicts, "versionConflicts"); this.noops = checkPositive(noops, "noops"); + this.retries = checkPositive(retries, "retries"); } public Status(StreamInput in) throws IOException { @@ -90,6 +94,7 @@ public class BulkByScrollTask extends Task { batches = in.readVInt(); versionConflicts = in.readVLong(); noops = in.readVLong(); + retries = in.readVLong(); } @Override @@ -101,6 +106,7 @@ public class BulkByScrollTask extends Task { out.writeVInt(batches); out.writeVLong(versionConflicts); out.writeVLong(noops); + out.writeVLong(retries); } @Override @@ -123,6 +129,7 @@ public class BulkByScrollTask extends Task { builder.field("batches", batches); builder.field("version_conflicts", versionConflicts); builder.field("noops", noops); + builder.field("retries", retries); return builder; } @@ -145,6 +152,7 @@ public class BulkByScrollTask extends Task { builder.append(",batches=").append(batches); builder.append(",versionConflicts=").append(versionConflicts); builder.append(",noops=").append(noops); + builder.append(",retries=").append(retries); } @Override @@ -207,6 +215,13 @@ public class BulkByScrollTask extends Task { return noops; } + /** + * Number of retries that had to be attempted due to rejected executions. + */ + public long getRetries() { + return retries; + } + private int checkPositive(int value, String name) { if (value < 0) { throw new IllegalArgumentException(name + " must be greater than 0 but was [" + value + "]"); @@ -249,4 +264,8 @@ public class BulkByScrollTask extends Task { void countVersionConflict() { versionConflicts.incrementAndGet(); } + + void countRetry() { + retries.incrementAndGet(); + } } diff --git a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AsyncBulkByScrollActionTests.java b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AsyncBulkByScrollActionTests.java index 79ed5b9f0f4..b724c4f5d36 100644 --- a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AsyncBulkByScrollActionTests.java +++ b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AsyncBulkByScrollActionTests.java @@ -24,10 +24,15 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollResponse; @@ -35,6 +40,9 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.replication.ReplicationRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.FilterClient; import org.elasticsearch.common.text.Text; @@ -44,6 +52,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHits; @@ -55,18 +64,26 @@ import org.junit.After; import org.junit.Before; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.emptyMap; import static org.apache.lucene.util.TestUtil.randomSimpleString; +import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff; +import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.emptyCollectionOf; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.iterableWithSize; public class AsyncBulkByScrollActionTests extends ESTestCase { - private MockClearScrollClient client; + private MyMockClient client; private ThreadPool threadPool; private DummyAbstractBulkByScrollRequest mainRequest; private SearchRequest firstSearchRequest; @@ -76,7 +93,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { @Before public void setupForTest() { - client = new MockClearScrollClient(new NoOpClient(getTestName())); + client = new MyMockClient(new NoOpClient(getTestName())); threadPool = new ThreadPool(getTestName()); mainRequest = new DummyAbstractBulkByScrollRequest(); firstSearchRequest = null; @@ -147,7 +164,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { long updated = 0; long deleted = 0; for (int batches = 0; batches < maxBatches; batches++) { - BulkItemResponse[] responses = new BulkItemResponse[randomIntBetween(0, 10000)]; + BulkItemResponse[] responses = new BulkItemResponse[randomIntBetween(0, 100)]; for (int i = 0; i < responses.length; i++) { ShardId shardId = new ShardId(new Index("name", "uid"), 0); String opType; @@ -244,6 +261,105 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { assertThat(response.getSearchFailures(), emptyCollectionOf(ShardSearchFailure.class)); } + /** + * Mimicks script failures or general wrongness by implementers. + */ + public void testListenerReceiveBuildBulkExceptions() throws Exception { + DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction() { + @Override + protected BulkRequest buildBulk(Iterable docs) { + throw new RuntimeException("surprise"); + } + }; + InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap()); + InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] {hit}, 0, 0); + InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); + action.onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); + try { + listener.get(); + fail("Expected failure."); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(RuntimeException.class)); + assertThat(e.getCause().getMessage(), equalTo("surprise")); + } + } + + /** + * Mimicks bulk rejections. These should be retried and eventually succeed. + */ + public void testBulkRejectionsRetryWithEnoughRetries() throws Exception { + int bulksToTry = randomIntBetween(1, 10); + long retryAttempts = 0; + for (int i = 0; i < bulksToTry; i++) { + retryAttempts += retryTestCase(false); + assertEquals(retryAttempts, task.getStatus().getRetries()); + } + } + + /** + * Mimicks bulk rejections. These should be retried but we fail anyway because we run out of retries. + */ + public void testBulkRejectionsRetryAndFailAnyway() throws Exception { + long retryAttempts = retryTestCase(true); + assertEquals(retryAttempts, task.getStatus().getRetries()); + } + + private long retryTestCase(boolean failWithRejection) throws Exception { + int totalFailures = randomIntBetween(1, mainRequest.getMaxRetries()); + int size = randomIntBetween(1, 100); + int retryAttempts = totalFailures - (failWithRejection ? 1 : 0); + + client.bulksToReject = client.bulksAttempts.get() + totalFailures; + /* + * When we get a successful bulk response we usually start the next scroll request but lets just intercept that so we don't have to + * deal with it. We just wait for it to happen. + */ + CountDownLatch successLatch = new CountDownLatch(1); + DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction() { + @Override + BackoffPolicy backoffPolicy() { + // Force a backoff time of 0 to prevent sleeping + return constantBackoff(timeValueMillis(0), retryAttempts); + } + + @Override + void startNextScrollRequest() { + successLatch.countDown(); + } + }; + BulkRequest request = new BulkRequest(); + for (int i = 0; i < size + 1; i++) { + request.add(new IndexRequest("index", "type", "id" + i)); + } + action.sendBulkRequest(request); + if (failWithRejection) { + BulkIndexByScrollResponse response = listener.get(); + assertThat(response.getIndexingFailures(), iterableWithSize(equalTo(1))); + assertEquals(response.getIndexingFailures().get(0).getStatus(), RestStatus.TOO_MANY_REQUESTS); + assertThat(response.getSearchFailures(), emptyCollectionOf(ShardSearchFailure.class)); + } else { + successLatch.await(10, TimeUnit.SECONDS); + } + return retryAttempts; + } + + /** + * The default retry time matches what we say it is in the javadoc for the request. + */ + public void testDefaultRetryTimes() { + Iterator policy = new DummyAbstractAsyncBulkByScrollAction().backoffPolicy().iterator(); + long millis = 0; + while (policy.hasNext()) { + millis += policy.next().millis(); + } + /* + * This is the total number of milliseconds that a reindex made with the default settings will backoff before attempting one final + * time. If that request is rejected then the whole process fails with a rejected exception. + */ + int defaultBackoffBeforeFailing = 24670; + assertEquals(defaultBackoffBeforeFailing, millis); + } + private class DummyAbstractAsyncBulkByScrollAction extends AbstractAsyncBulkByScrollAction { public DummyAbstractAsyncBulkByScrollAction() { @@ -270,10 +386,13 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { } } - private static class MockClearScrollClient extends FilterClient { - private List scrollsCleared = new ArrayList<>(); + private static class MyMockClient extends FilterClient { + private final List scrollsCleared = new ArrayList<>(); + private final AtomicInteger bulksAttempts = new AtomicInteger(); - public MockClearScrollClient(Client in) { + private int bulksToReject = 0; + + public MyMockClient(Client in) { super(in); } @@ -288,6 +407,48 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { listener.onResponse((Response) new ClearScrollResponse(true, clearScroll.getScrollIds().size())); return; } + if (request instanceof BulkRequest) { + BulkRequest bulk = (BulkRequest) request; + int toReject; + if (bulksAttempts.incrementAndGet() > bulksToReject) { + toReject = -1; + } else { + toReject = randomIntBetween(0, bulk.requests().size() - 1); + } + BulkItemResponse[] responses = new BulkItemResponse[bulk.requests().size()]; + for (int i = 0; i < bulk.requests().size(); i++) { + ActionRequest item = bulk.requests().get(i); + String opType; + DocWriteResponse response; + ShardId shardId = new ShardId(new Index(((ReplicationRequest) item).index(), "uuid"), 0); + if (item instanceof IndexRequest) { + IndexRequest index = (IndexRequest) item; + opType = index.opType().lowercase(); + response = new IndexResponse(shardId, index.type(), index.id(), randomIntBetween(0, Integer.MAX_VALUE), + true); + } else if (item instanceof UpdateRequest) { + UpdateRequest update = (UpdateRequest) item; + opType = "update"; + response = new UpdateResponse(shardId, update.type(), update.id(), + randomIntBetween(0, Integer.MAX_VALUE), true); + } else if (item instanceof DeleteRequest) { + DeleteRequest delete = (DeleteRequest) item; + opType = "delete"; + response = new DeleteResponse(shardId, delete.type(), delete.id(), randomIntBetween(0, Integer.MAX_VALUE), + true); + } else { + throw new RuntimeException("Unknown request: " + item); + } + if (i == toReject) { + responses[i] = new BulkItemResponse(i, opType, + new Failure(response.getIndex(), response.getType(), response.getId(), new EsRejectedExecutionException())); + } else { + responses[i] = new BulkItemResponse(i, opType, response); + } + } + listener.onResponse((Response) new BulkResponse(responses, 1)); + return; + } super.doExecute(action, request, listener); } } diff --git a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/BulkByScrollTaskTests.java b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/BulkByScrollTaskTests.java index 7f01deca151..ab2ac9bf0e0 100644 --- a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/BulkByScrollTaskTests.java +++ b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/BulkByScrollTaskTests.java @@ -101,12 +101,13 @@ public class BulkByScrollTaskTests extends ESTestCase { } public void testStatusHatesNegatives() { - expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(-1, 0, 0, 0, 0, 0, 0)); - expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, -1, 0, 0, 0, 0, 0)); - expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, -1, 0, 0, 0, 0)); - expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, -1, 0, 0, 0)); - expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, -1, 0, 0)); - expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, -1, 0)); - expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, -1)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(-1, 0, 0, 0, 0, 0, 0, 0)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, -1, 0, 0, 0, 0, 0, 0)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, -1, 0, 0, 0, 0, 0)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, -1, 0, 0, 0, 0)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, -1, 0, 0, 0)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, -1, 0, 0)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, -1, 0)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, 0, -1)); } } diff --git a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/RoundTripTests.java b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/RoundTripTests.java index 1e1f7e5a99b..db25020ea4c 100644 --- a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/RoundTripTests.java +++ b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/RoundTripTests.java @@ -88,6 +88,8 @@ public class RoundTripTests extends ESTestCase { assertEquals(request.getTimeout(), tripped.getTimeout()); assertEquals(request.getConsistency(), tripped.getConsistency()); assertEquals(request.getScript(), tripped.getScript()); + assertEquals(request.getRetryBackoffInitialTime(), tripped.getRetryBackoffInitialTime()); + assertEquals(request.getMaxRetries(), tripped.getMaxRetries()); } public void testBulkByTaskStatus() throws IOException { @@ -116,7 +118,7 @@ public class RoundTripTests extends ESTestCase { private BulkByScrollTask.Status randomStatus() { return new BulkByScrollTask.Status(randomPositiveLong(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(), - randomPositiveInt(), randomPositiveLong(), randomPositiveLong()); + randomPositiveInt(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong()); } private List randomIndexingFailures() { @@ -185,8 +187,11 @@ public class RoundTripTests extends ESTestCase { private void assertTaskStatusEquals(BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) { assertEquals(expected.getUpdated(), actual.getUpdated()); + assertEquals(expected.getCreated(), actual.getCreated()); + assertEquals(expected.getDeleted(), actual.getDeleted()); assertEquals(expected.getBatches(), actual.getBatches()); assertEquals(expected.getVersionConflicts(), actual.getVersionConflicts()); assertEquals(expected.getNoops(), actual.getNoops()); + assertEquals(expected.getRetries(), actual.getRetries()); } }