[CCR] Check whether the rejected execution exception has the shutdown flag set (#33703)

and if so debug log it and otherwise rethrow.

This should fix a couple of test failures where during test teardown tests
failed due to uncaught exceptions being detected.
This commit is contained in:
Martijn van Groningen 2018-09-14 13:28:11 +02:00 committed by GitHub
parent b8fb83d7a4
commit 222f42274e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 12 additions and 2 deletions

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
@ -90,8 +91,17 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
leaderClient = wrapClient(client, params); leaderClient = wrapClient(client, params);
} }
Client followerClient = wrapClient(client, params); Client followerClient = wrapClient(client, params);
BiConsumer<TimeValue, Runnable> scheduler = BiConsumer<TimeValue, Runnable> scheduler = (delay, command) -> {
(delay, command) -> threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command); try {
threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command);
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.debug("couldn't schedule command, executor is shutting down", e);
} else {
throw e;
}
}
};
return new ShardFollowNodeTask( return new ShardFollowNodeTask(
id, type, action, getDescription(taskInProgress), parentTaskId, headers, params, scheduler, System::nanoTime) { id, type, action, getDescription(taskInProgress), parentTaskId, headers, params, scheduler, System::nanoTime) {