diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java new file mode 100644 index 00000000000..597d35a9996 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java @@ -0,0 +1,219 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client; + +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.get.MultiGetRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.RestStatus; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase { + + private static final String INDEX_NAME = "index"; + private static final String TYPE_NAME = "type"; + + private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) { + return BulkProcessor.builder(highLevelClient()::bulkAsync, listener); + } + + public void testBulkRejectionLoadWithoutBackoff() throws Exception { + boolean rejectedExecutionExpected = true; + executeBulkRejectionLoad(BackoffPolicy.noBackoff(), rejectedExecutionExpected); + } + + public void testBulkRejectionLoadWithBackoff() throws Throwable { + boolean rejectedExecutionExpected = false; + executeBulkRejectionLoad(BackoffPolicy.exponentialBackoff(), rejectedExecutionExpected); + } + + private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejectedExecutionExpected) throws Exception { + final CorrelatingBackoffPolicy internalPolicy = new CorrelatingBackoffPolicy(backoffPolicy); + final int numberOfAsyncOps = randomIntBetween(600, 700); + final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps); + final Set responses = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + BulkProcessor bulkProcessor = initBulkProcessorBuilder(new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + internalPolicy.logResponse(response); + responses.add(response); + latch.countDown(); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + responses.add(failure); + latch.countDown(); + } + }).setBulkActions(1) + .setConcurrentRequests(randomIntBetween(0, 100)) + .setBackoffPolicy(internalPolicy) + .build(); + + MultiGetRequest multiGetRequest = indexDocs(bulkProcessor, numberOfAsyncOps); + latch.await(10, TimeUnit.SECONDS); + bulkProcessor.close(); + + assertEquals(responses.size(), numberOfAsyncOps); + + boolean rejectedAfterAllRetries = false; + for (Object response : responses) { + if (response instanceof BulkResponse) { + BulkResponse bulkResponse = (BulkResponse) response; + for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { + if (bulkItemResponse.isFailed()) { + BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); + if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) { + if (rejectedExecutionExpected == false) { + Iterator backoffState = internalPolicy.backoffStateFor(bulkResponse); + assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState); + if (backoffState.hasNext()) { + // we're not expecting that we overwhelmed it even once when we maxed out the number of retries + throw new AssertionError("Got rejected although backoff policy would allow more retries", + failure.getCause()); + } else { + rejectedAfterAllRetries = true; + logger.debug("We maxed out the number of bulk retries and got rejected (this is ok)."); + } + } + } else { + throw new AssertionError("Unexpected failure with status: " + failure.getStatus()); + } + } + } + } else { + Throwable t = (Throwable) response; + // we're not expecting any other errors + throw new AssertionError("Unexpected failure", t); + } + } + + highLevelClient().indices().refresh(new RefreshRequest()); + int multiGetResponsesCount = highLevelClient().multiGet(multiGetRequest).getResponses().length; + + if (rejectedExecutionExpected) { + assertThat(multiGetResponsesCount, lessThanOrEqualTo(numberOfAsyncOps)); + } else if (rejectedAfterAllRetries) { + assertThat(multiGetResponsesCount, lessThan(numberOfAsyncOps)); + } else { + assertThat(multiGetResponsesCount, equalTo(numberOfAsyncOps)); + } + + } + + private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) { + MultiGetRequest multiGetRequest = new MultiGetRequest(); + for (int i = 1; i <= numDocs; i++) { + processor.add(new IndexRequest(INDEX_NAME, TYPE_NAME, Integer.toString(i)) + .source(XContentType.JSON, "field", randomRealisticUnicodeOfCodepointLengthBetween(1, 30))); + multiGetRequest.add(INDEX_NAME, TYPE_NAME, Integer.toString(i)); + } + return multiGetRequest; + } + + /** + * Internal helper class to correlate backoff states with bulk responses. This is needed to check whether we maxed out the number + * of retries but still got rejected (which is perfectly fine and can also happen from time to time under heavy load). + * + * This implementation relies on an implementation detail in Retry, namely that the bulk listener is notified on the same thread + * as the last call to the backoff policy's iterator. The advantage is that this is non-invasive to the rest of the production code. + */ + private static class CorrelatingBackoffPolicy extends BackoffPolicy { + private final Map> correlations = new ConcurrentHashMap<>(); + // this is intentionally *not* static final. We will only ever have one instance of this class per test case and want the + // thread local to be eligible for garbage collection right after the test to avoid leaks. + private final ThreadLocal> iterators = new ThreadLocal<>(); + + private final BackoffPolicy delegate; + + private CorrelatingBackoffPolicy(BackoffPolicy delegate) { + this.delegate = delegate; + } + + public Iterator backoffStateFor(BulkResponse response) { + return correlations.get(response); + } + + // Assumption: This method is called from the same thread as the last call to the internal iterator's #hasNext() / #next() + // see also Retry.AbstractRetryHandler#onResponse(). + public void logResponse(BulkResponse response) { + Iterator iterator = iterators.get(); + // did we ever retry? + if (iterator != null) { + // we should correlate any iterator only once + iterators.remove(); + correlations.put(response, iterator); + } + } + + @Override + public Iterator iterator() { + return new CorrelatingIterator(iterators, delegate.iterator()); + } + + private static class CorrelatingIterator implements Iterator { + private final Iterator delegate; + private final ThreadLocal> iterators; + + private CorrelatingIterator(ThreadLocal> iterators, Iterator delegate) { + this.iterators = iterators; + this.delegate = delegate; + } + + @Override + public boolean hasNext() { + // update on every invocation as we might get rescheduled on a different thread. Unfortunately, there is a chance that + // we pollute the thread local map with stale values. Due to the implementation of Retry and the life cycle of the + // enclosing class CorrelatingBackoffPolicy this should not pose a major problem though. + iterators.set(this); + return delegate.hasNext(); + } + + @Override + public TimeValue next() { + // update on every invocation + iterators.set(this); + return delegate.next(); + } + } + } +} diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index 8350d866150..5bd83b6c19a 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -32,7 +32,6 @@ import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.Retry; -import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.ParentTaskAssigningClient; @@ -41,7 +40,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.IndexFieldMapper; @@ -49,6 +47,7 @@ import org.elasticsearch.index.mapper.RoutingFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.TypeFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; @@ -75,8 +74,8 @@ import static java.lang.Math.min; import static java.util.Collections.emptyList; import static java.util.Collections.unmodifiableList; import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff; -import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; +import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES; import static org.elasticsearch.rest.RestStatus.CONFLICT; import static org.elasticsearch.search.sort.SortBuilders.fieldSort; @@ -139,7 +138,7 @@ public abstract class AbstractAsyncBulkByScrollAction 0 ? concurrentRequests : 1); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/Retry.java b/server/src/main/java/org/elasticsearch/action/bulk/Retry.java index 9985d23b9ba..75a1a2d5f8d 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/Retry.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/Retry.java @@ -19,13 +19,13 @@ package org.elasticsearch.action.bulk; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -40,12 +40,10 @@ import java.util.function.Predicate; * Encapsulates synchronous and asynchronous retry logic. */ public class Retry { - private final Class retryOnThrowable; private final BackoffPolicy backoffPolicy; private final Scheduler scheduler; - public Retry(Class retryOnThrowable, BackoffPolicy backoffPolicy, Scheduler scheduler) { - this.retryOnThrowable = retryOnThrowable; + public Retry(BackoffPolicy backoffPolicy, Scheduler scheduler) { this.backoffPolicy = backoffPolicy; this.scheduler = scheduler; } @@ -60,7 +58,7 @@ public class Retry { */ public void withBackoff(BiConsumer> consumer, BulkRequest bulkRequest, ActionListener listener, Settings settings) { - RetryHandler r = new RetryHandler(retryOnThrowable, backoffPolicy, consumer, listener, settings, scheduler); + RetryHandler r = new RetryHandler(backoffPolicy, consumer, listener, settings, scheduler); r.execute(bulkRequest); } @@ -81,12 +79,13 @@ public class Retry { } static class RetryHandler implements ActionListener { + private static final RestStatus RETRY_STATUS = RestStatus.TOO_MANY_REQUESTS; + private final Logger logger; private final Scheduler scheduler; private final BiConsumer> consumer; private final ActionListener listener; private final Iterator backoff; - private final Class retryOnThrowable; // Access only when holding a client-side lock, see also #addResponses() private final List responses = new ArrayList<>(); private final long startTimestampNanos; @@ -95,10 +94,8 @@ public class Retry { private volatile BulkRequest currentBulkRequest; private volatile ScheduledFuture scheduledRequestFuture; - RetryHandler(Class retryOnThrowable, BackoffPolicy backoffPolicy, - BiConsumer> consumer, ActionListener listener, - Settings settings, Scheduler scheduler) { - this.retryOnThrowable = retryOnThrowable; + RetryHandler(BackoffPolicy backoffPolicy, BiConsumer> consumer, + ActionListener listener, Settings settings, Scheduler scheduler) { this.backoff = backoffPolicy.iterator(); this.consumer = consumer; this.listener = listener; @@ -160,9 +157,8 @@ public class Retry { } for (BulkItemResponse bulkItemResponse : bulkItemResponses) { if (bulkItemResponse.isFailed()) { - final Throwable cause = bulkItemResponse.getFailure().getCause(); - final Throwable rootCause = ExceptionsHelper.unwrapCause(cause); - if (!rootCause.getClass().equals(retryOnThrowable)) { + final RestStatus status = bulkItemResponse.status(); + if (status != RETRY_STATUS) { return false; } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index 4b96f3d1754..f1731083ae3 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -18,15 +18,13 @@ */ package org.elasticsearch.action.bulk; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; -import org.hamcrest.Matcher; import java.util.Collections; import java.util.Iterator; @@ -38,6 +36,7 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2) @@ -108,26 +107,28 @@ public class BulkProcessorRetryIT extends ESIntegTestCase { assertThat(responses.size(), equalTo(numberOfAsyncOps)); // validate all responses + boolean rejectedAfterAllRetries = false; for (Object response : responses) { if (response instanceof BulkResponse) { BulkResponse bulkResponse = (BulkResponse) response; for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); - Throwable rootCause = ExceptionsHelper.unwrapCause(failure.getCause()); - if (rootCause instanceof EsRejectedExecutionException) { + if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) { if (rejectedExecutionExpected == false) { Iterator backoffState = internalPolicy.backoffStateFor(bulkResponse); assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState); if (backoffState.hasNext()) { // we're not expecting that we overwhelmed it even once when we maxed out the number of retries - throw new AssertionError("Got rejected although backoff policy would allow more retries", rootCause); + throw new AssertionError("Got rejected although backoff policy would allow more retries", + failure.getCause()); } else { + rejectedAfterAllRetries = true; logger.debug("We maxed out the number of bulk retries and got rejected (this is ok)."); } } } else { - throw new AssertionError("Unexpected failure", rootCause); + throw new AssertionError("Unexpected failure status: " + failure.getStatus()); } } } @@ -140,18 +141,20 @@ public class BulkProcessorRetryIT extends ESIntegTestCase { client().admin().indices().refresh(new RefreshRequest()).get(); - // validate we did not create any duplicates due to retries - Matcher searchResultCount; - // it is ok if we lost some index operations to rejected executions (which is possible even when backing off (although less likely) - searchResultCount = lessThanOrEqualTo((long) numberOfAsyncOps); - SearchResponse results = client() .prepareSearch(INDEX_NAME) .setTypes(TYPE_NAME) .setQuery(QueryBuilders.matchAllQuery()) .setSize(0) .get(); - assertThat(results.getHits().getTotalHits(), searchResultCount); + + if (rejectedExecutionExpected) { + assertThat((int) results.getHits().getTotalHits(), lessThanOrEqualTo(numberOfAsyncOps)); + } else if (rejectedAfterAllRetries) { + assertThat((int) results.getHits().getTotalHits(), lessThan(numberOfAsyncOps)); + } else { + assertThat((int) results.getHits().getTotalHits(), equalTo(numberOfAsyncOps)); + } } private static void indexDocs(BulkProcessor processor, int numDocs) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java b/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java index 136097a2926..320f11ff6d0 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/RetryTests.java @@ -84,7 +84,7 @@ public class RetryTests extends ESTestCase { BackoffPolicy backoff = BackoffPolicy.constantBackoff(DELAY, CALLS_TO_FAIL); BulkRequest bulkRequest = createBulkRequest(); - BulkResponse response = new Retry(EsRejectedExecutionException.class, backoff, bulkClient.threadPool()) + BulkResponse response = new Retry(backoff, bulkClient.threadPool()) .withBackoff(bulkClient::bulk, bulkRequest, bulkClient.settings()) .actionGet(); @@ -96,7 +96,7 @@ public class RetryTests extends ESTestCase { BackoffPolicy backoff = BackoffPolicy.constantBackoff(DELAY, CALLS_TO_FAIL - 1); BulkRequest bulkRequest = createBulkRequest(); - BulkResponse response = new Retry(EsRejectedExecutionException.class, backoff, bulkClient.threadPool()) + BulkResponse response = new Retry(backoff, bulkClient.threadPool()) .withBackoff(bulkClient::bulk, bulkRequest, bulkClient.settings()) .actionGet(); @@ -109,7 +109,7 @@ public class RetryTests extends ESTestCase { AssertingListener listener = new AssertingListener(); BulkRequest bulkRequest = createBulkRequest(); - Retry retry = new Retry(EsRejectedExecutionException.class, backoff, bulkClient.threadPool()); + Retry retry = new Retry(backoff, bulkClient.threadPool()); retry.withBackoff(bulkClient::bulk, bulkRequest, listener, bulkClient.settings()); listener.awaitCallbacksCalled(); @@ -124,7 +124,7 @@ public class RetryTests extends ESTestCase { AssertingListener listener = new AssertingListener(); BulkRequest bulkRequest = createBulkRequest(); - Retry retry = new Retry(EsRejectedExecutionException.class, backoff, bulkClient.threadPool()); + Retry retry = new Retry(backoff, bulkClient.threadPool()); retry.withBackoff(bulkClient::bulk, bulkRequest, listener, bulkClient.settings()); listener.awaitCallbacksCalled();