Fix BulkProcessor Retry ITs (#41338) (#41472)

* The test fails for the retry backoff enabled case because the retry handler in the bulk processor hasn't been adjusted to account for #40866 which now might lead to an outright rejection of the request instead of its items individually
   * Fixed by adding retry functionality to the top level request as well
* Also fixed the duplicate test for the HLRC that wasn't handling the non-backoff case yet the same way the non-client IT did
* closes #41324
This commit is contained in:
Armin Braun 2019-04-24 13:46:32 +02:00 committed by GitHub
parent 2e4ac178e2
commit 381b8e2ece
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 10 deletions

View File

@ -29,6 +29,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.RemoteTransportException;
import java.util.Collections;
import java.util.Iterator;
@ -56,7 +57,6 @@ public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase {
executeBulkRejectionLoad(BackoffPolicy.noBackoff(), rejectedExecutionExpected);
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/41324")
public void testBulkRejectionLoadWithBackoff() throws Throwable {
boolean rejectedExecutionExpected = false;
executeBulkRejectionLoad(BackoffPolicy.exponentialBackoff(), rejectedExecutionExpected);
@ -122,9 +122,14 @@ public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase {
}
}
} else {
Throwable t = (Throwable) response;
// we're not expecting any other errors
throw new AssertionError("Unexpected failure", t);
if (response instanceof RemoteTransportException
&& ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS && rejectedExecutionExpected) {
// ignored, we exceeded the write queue size with dispatching the initial bulk request
} else {
Throwable t = (Throwable) response;
// we're not expecting any other errors
throw new AssertionError("Unexpected failure", t);
}
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
import java.util.ArrayList;
import java.util.Iterator;
@ -118,11 +119,15 @@ public class Retry {
@Override
public void onFailure(Exception e) {
try {
listener.onFailure(e);
} finally {
if (retryCancellable != null) {
retryCancellable.cancel();
if (e instanceof RemoteTransportException && ((RemoteTransportException) e).status() == RETRY_STATUS && backoff.hasNext()) {
retry(currentBulkRequest);
} else {
try {
listener.onFailure(e);
} finally {
if (retryCancellable != null) {
retryCancellable.cancel();
}
}
}
}

View File

@ -64,7 +64,6 @@ public class BulkProcessorRetryIT extends ESIntegTestCase {
executeBulkRejectionLoad(BackoffPolicy.noBackoff(), rejectedExecutionExpected);
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/41324")
public void testBulkRejectionLoadWithBackoff() throws Throwable {
boolean rejectedExecutionExpected = false;
executeBulkRejectionLoad(BackoffPolicy.exponentialBackoff(), rejectedExecutionExpected);