Handle RejectedExecutionException in ShardFollowTasksExecutor (#65648) (#65653)

Follow-up to #65415. We can't have this exception bubble up in an exception
handler any longer due to the new assertion so we must handle it here.
This commit is contained in:
Armin Braun 2020-12-01 06:51:05 +01:00 committed by GitHub
parent 6bbeedc932
commit 16642f1c74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 7 additions and 1 deletions

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
@ -544,7 +545,12 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
if (ShardFollowNodeTask.shouldRetry(e)) {
logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number",
shardFollowNodeTask), e);
threadPool.schedule(() -> nodeOperation(task, params, state), params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME);
try {
threadPool.schedule(() -> nodeOperation(task, params, state), params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME);
} catch (EsRejectedExecutionException rex) {
rex.addSuppressed(e);
shardFollowNodeTask.onFatalFailure(rex);
}
} else {
shardFollowNodeTask.onFatalFailure(e);
}