Handle Rejections when Scheduling RetryableAction (#58033) (#58039)

Scheduling on the threadpool will throw if the scheduler is already
shut down. Handled by treating the rejection like any other non-retryable
exception.

Closes #58021
This commit is contained in:
Armin Braun 2020-06-12 15:23:02 +02:00 committed by GitHub
parent d6c8d9415d
commit a5a251d8c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 9 deletions

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -82,6 +83,7 @@ public abstract class RetryableAction<Response> {
if (localRetryTask != null) { if (localRetryTask != null) {
localRetryTask.cancel(); localRetryTask.cancel();
} }
onFinished();
finalListener.onFailure(e); finalListener.onFailure(e);
} }
} }
@ -112,6 +114,9 @@ public abstract class RetryableAction<Response> {
public abstract boolean shouldRetry(Exception e); public abstract boolean shouldRetry(Exception e);
public void onFinished() {
}
private class RetryingListener implements ActionListener<Response> { private class RetryingListener implements ActionListener<Response> {
private static final int MAX_EXCEPTIONS = 4; private static final int MAX_EXCEPTIONS = 4;
@ -127,6 +132,7 @@ public abstract class RetryableAction<Response> {
@Override @Override
public void onResponse(Response response) { public void onResponse(Response response) {
if (isDone.compareAndSet(false, true)) { if (isDone.compareAndSet(false, true)) {
onFinished();
finalListener.onResponse(response); finalListener.onResponse(response);
} }
} }
@ -138,10 +144,7 @@ public abstract class RetryableAction<Response> {
if (elapsedMillis >= timeoutMillis) { if (elapsedMillis >= timeoutMillis) {
logger.debug(() -> new ParameterizedMessage("retryable action timed out after {}", logger.debug(() -> new ParameterizedMessage("retryable action timed out after {}",
TimeValue.timeValueMillis(elapsedMillis)), e); TimeValue.timeValueMillis(elapsedMillis)), e);
addException(e); onFinalFailure(e);
if (isDone.compareAndSet(false, true)) {
finalListener.onFailure(buildFinalException());
}
} else { } else {
addException(e); addException(e);
@ -152,16 +155,25 @@ public abstract class RetryableAction<Response> {
if (isDone.get() == false) { if (isDone.get() == false) {
final TimeValue delay = TimeValue.timeValueMillis(delayMillis); final TimeValue delay = TimeValue.timeValueMillis(delayMillis);
logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e); logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e);
try {
retryTask = threadPool.schedule(runnable, delay, executor); retryTask = threadPool.schedule(runnable, delay, executor);
} catch (EsRejectedExecutionException ree) {
onFinalFailure(ree);
}
} }
} }
} else { } else {
onFinalFailure(e);
}
}
private void onFinalFailure(Exception e) {
addException(e); addException(e);
if (isDone.compareAndSet(false, true)) { if (isDone.compareAndSet(false, true)) {
onFinished();
finalListener.onFailure(buildFinalException()); finalListener.onFailure(buildFinalException());
} }
} }
}
private Exception buildFinalException() { private Exception buildFinalException() {
final Exception topLevel = caughtExceptions.removeFirst(); final Exception topLevel = caughtExceptions.removeFirst();