Tighten assertions in BulkProcessorRetryIT

With this commit we check more precisely on the result of a bulk
request. It could either be ok, fail or be rejected due to resource
constraints. Previously, we have relied that by default we never
get rejected.

However, this is a valid condition even when retrying. With this
commit we check that we either retried often enough that we don't
get rejected *and* if we got rejected that we maxed out the number
of specified retries.
This commit is contained in:
Daniel Mitterdorfer 2015-12-29 11:40:33 +01:00
parent a49fe189b0
commit 46a4aa9704
1 changed files with 82 additions and 11 deletions

View File

@ -22,13 +22,13 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import java.util.Collections; import java.util.*;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -70,6 +70,7 @@ public class BulkProcessorRetryIT extends ESIntegTestCase {
} }
private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejectedExecutionExpected) throws Throwable { private void executeBulkRejectionLoad(BackoffPolicy backoffPolicy, boolean rejectedExecutionExpected) throws Throwable {
final CorrelatingBackoffPolicy internalPolicy = new CorrelatingBackoffPolicy(backoffPolicy);
int numberOfAsyncOps = randomIntBetween(600, 700); int numberOfAsyncOps = randomIntBetween(600, 700);
final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps); final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps);
final Set<Object> responses = Collections.newSetFromMap(new ConcurrentHashMap<>()); final Set<Object> responses = Collections.newSetFromMap(new ConcurrentHashMap<>());
@ -85,6 +86,7 @@ public class BulkProcessorRetryIT extends ESIntegTestCase {
@Override @Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
internalPolicy.logResponse(response);
responses.add(response); responses.add(response);
latch.countDown(); latch.countDown();
} }
@ -97,7 +99,7 @@ public class BulkProcessorRetryIT extends ESIntegTestCase {
}).setBulkActions(1) }).setBulkActions(1)
// zero means that we're in the sync case, more means that we're in the async case // zero means that we're in the sync case, more means that we're in the async case
.setConcurrentRequests(randomIntBetween(0, 100)) .setConcurrentRequests(randomIntBetween(0, 100))
.setBackoffPolicy(backoffPolicy) .setBackoffPolicy(internalPolicy)
.build(); .build();
indexDocs(bulkProcessor, numberOfAsyncOps); indexDocs(bulkProcessor, numberOfAsyncOps);
latch.await(10, TimeUnit.SECONDS); latch.await(10, TimeUnit.SECONDS);
@ -115,8 +117,14 @@ public class BulkProcessorRetryIT extends ESIntegTestCase {
Throwable rootCause = ExceptionsHelper.unwrapCause(failure.getCause()); Throwable rootCause = ExceptionsHelper.unwrapCause(failure.getCause());
if (rootCause instanceof EsRejectedExecutionException) { if (rootCause instanceof EsRejectedExecutionException) {
if (rejectedExecutionExpected == false) { if (rejectedExecutionExpected == false) {
// we're not expecting that we overwhelmed it even once Iterator<TimeValue> backoffState = internalPolicy.backoffStateFor(bulkResponse);
throw new AssertionError("Unexpected failure reason", rootCause); assertNotNull("backoffState is null (indicates a bulk request got rejected without retry)", backoffState);
if (backoffState.hasNext()) {
// we're not expecting that we overwhelmed it even once when we maxed out the number of retries
throw new AssertionError("Got rejected although backoff policy would allow more retries", rootCause);
} else {
logger.debug("We maxed out the number of bulk retries and got rejected (this is ok).");
}
} }
} else { } else {
throw new AssertionError("Unexpected failure", rootCause); throw new AssertionError("Unexpected failure", rootCause);
@ -134,12 +142,8 @@ public class BulkProcessorRetryIT extends ESIntegTestCase {
// validate we did not create any duplicates due to retries // validate we did not create any duplicates due to retries
Matcher<Long> searchResultCount; Matcher<Long> searchResultCount;
if (rejectedExecutionExpected) { // it is ok if we lost some index operations to rejected executions (which is possible even when backing off (although less likely)
// it is ok if we lost some index operations to rejected executions
searchResultCount = lessThanOrEqualTo((long) numberOfAsyncOps); searchResultCount = lessThanOrEqualTo((long) numberOfAsyncOps);
} else {
searchResultCount = equalTo((long) numberOfAsyncOps);
}
SearchResponse results = client() SearchResponse results = client()
.prepareSearch(INDEX_NAME) .prepareSearch(INDEX_NAME)
@ -161,4 +165,71 @@ public class BulkProcessorRetryIT extends ESIntegTestCase {
.request()); .request());
} }
} }
/**
* Internal helper class to correlate backoff states with bulk responses. This is needed to check whether we maxed out the number
* of retries but still got rejected (which is perfectly fine and can also happen from time to time under heavy load).
*
* This implementation relies on an implementation detail in Retry, namely that the bulk listener is notified on the same thread
* as the last call to the backoff policy's iterator. The advantage is that this is non-invasive to the rest of the production code.
*/
private static class CorrelatingBackoffPolicy extends BackoffPolicy {
private final Map<BulkResponse, Iterator<TimeValue>> correlations = new ConcurrentHashMap<>();
// this is intentionally *not* static final. We will only ever have one instance of this class per test case and want the
// thread local to be eligible for garbage collection right after the test to avoid leaks.
private final ThreadLocal<Iterator<TimeValue>> iterators = new ThreadLocal<>();
private final BackoffPolicy delegate;
private CorrelatingBackoffPolicy(BackoffPolicy delegate) {
this.delegate = delegate;
}
public Iterator<TimeValue> backoffStateFor(BulkResponse response) {
return correlations.get(response);
}
// Assumption: This method is called from the same thread as the last call to the internal iterator's #hasNext() / #next()
// see also Retry.AbstractRetryHandler#onResponse().
public void logResponse(BulkResponse response) {
Iterator<TimeValue> iterator = iterators.get();
// did we ever retry?
if (iterator != null) {
// we should correlate any iterator only once
iterators.remove();
correlations.put(response, iterator);
}
}
@Override
public Iterator<TimeValue> iterator() {
return new CorrelatingIterator(iterators, delegate.iterator());
}
private static class CorrelatingIterator implements Iterator<TimeValue> {
private final Iterator<TimeValue> delegate;
private final ThreadLocal<Iterator<TimeValue>> iterators;
private CorrelatingIterator(ThreadLocal<Iterator<TimeValue>> iterators, Iterator<TimeValue> delegate) {
this.iterators = iterators;
this.delegate = delegate;
}
@Override
public boolean hasNext() {
// update on every invocation as we might get rescheduled on a different thread. Unfortunately, there is a chance that
// we pollute the thread local map with stale values. Due to the implementation of Retry and the life cycle of the
// enclosing class CorrelatingBackoffPolicy this should not pose a major problem though.
iterators.set(this);
return delegate.hasNext();
}
@Override
public TimeValue next() {
// update on every invocation
iterators.set(this);
return delegate.next();
}
}
}
} }