diff --git a/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java b/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java index aa804ba76d1..4629cd2e268 100644 --- a/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -82,6 +83,7 @@ public abstract class RetryableAction { if (localRetryTask != null) { localRetryTask.cancel(); } + onFinished(); finalListener.onFailure(e); } } @@ -112,6 +114,9 @@ public abstract class RetryableAction { public abstract boolean shouldRetry(Exception e); + public void onFinished() { + } + private class RetryingListener implements ActionListener { private static final int MAX_EXCEPTIONS = 4; @@ -127,6 +132,7 @@ public abstract class RetryableAction { @Override public void onResponse(Response response) { if (isDone.compareAndSet(false, true)) { + onFinished(); finalListener.onResponse(response); } } @@ -138,10 +144,7 @@ public abstract class RetryableAction { if (elapsedMillis >= timeoutMillis) { logger.debug(() -> new ParameterizedMessage("retryable action timed out after {}", TimeValue.timeValueMillis(elapsedMillis)), e); - addException(e); - if (isDone.compareAndSet(false, true)) { - finalListener.onFailure(buildFinalException()); - } + onFinalFailure(e); } else { addException(e); @@ -152,14 +155,23 @@ public abstract class RetryableAction { if (isDone.get() == false) { final TimeValue delay = TimeValue.timeValueMillis(delayMillis); logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e); - retryTask = threadPool.schedule(runnable, delay, executor); + try { + retryTask = threadPool.schedule(runnable, delay, executor); + } catch (EsRejectedExecutionException ree) { + onFinalFailure(ree); + } } } } else { - addException(e); - if (isDone.compareAndSet(false,true)) { - finalListener.onFailure(buildFinalException()); - } + onFinalFailure(e); + } + } + + private void onFinalFailure(Exception e) { + addException(e); + if (isDone.compareAndSet(false, true)) { + onFinished(); + finalListener.onFailure(buildFinalException()); } }