diff --git a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/cleaner/CleanerService.java b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/cleaner/CleanerService.java index ff85ce73a43..040547d1aac 100644 --- a/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/cleaner/CleanerService.java +++ b/elasticsearch/x-pack/marvel/src/main/java/org/elasticsearch/marvel/cleaner/CleanerService.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.marvel.MarvelSettings; import org.elasticsearch.marvel.license.MarvelLicensee; @@ -31,8 +32,8 @@ public class CleanerService extends AbstractLifecycleComponent { private final ThreadPool threadPool; private final ExecutionScheduler executionScheduler; private final List listeners = new CopyOnWriteArrayList<>(); + private final IndicesCleaner runnable; - private volatile IndicesCleaner runnable; private volatile TimeValue globalRetention; CleanerService(Settings settings, ClusterSettings clusterSettings, MarvelLicensee licensee, ThreadPool threadPool, @@ -42,6 +43,7 @@ public class CleanerService extends AbstractLifecycleComponent { this.threadPool = threadPool; this.executionScheduler = executionScheduler; this.globalRetention = MarvelSettings.HISTORY_DURATION.get(settings); + this.runnable = new IndicesCleaner(); // the validation is performed by the setting's object itself clusterSettings.addSettingsUpdateConsumer(MarvelSettings.HISTORY_DURATION, this::setGlobalRetention); @@ -55,7 +57,6 @@ public class CleanerService extends AbstractLifecycleComponent { @Override protected void doStart() { logger.debug("starting cleaning service"); - this.runnable = new IndicesCleaner(); threadPool.schedule(executionScheduler.nextExecutionDelay(new DateTime(ISOChronology.getInstance())), executorName(), runnable); logger.debug("cleaning service started"); } @@ -64,15 +65,14 @@ public class CleanerService extends AbstractLifecycleComponent { protected void doStop() { logger.debug("stopping cleaning service"); listeners.clear(); + runnable.cancel(); + logger.debug("cleaning service stopped"); } @Override protected void doClose() { - logger.debug("closing cleaning service"); - if (runnable != null) { - runnable.cancel(); - } - logger.debug("cleaning service closed"); + // Cleaner runnable should have been cancelled in doStop() method, + // normally called just before this one } private String executorName() { @@ -196,7 +196,15 @@ public class CleanerService extends AbstractLifecycleComponent { logger.debug("scheduling next execution in [{}] seconds", delay.seconds()); - future = threadPool.schedule(delay, executorName(), this); + try { + future = threadPool.schedule(delay, executorName(), this); + } catch (EsRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + logger.debug("couldn't schedule new execution of the cleaner, executor is shutting down", e); + } else { + throw e; + } + } } @Override @@ -212,7 +220,9 @@ public class CleanerService extends AbstractLifecycleComponent { * stopped. */ public void cancel() { - FutureUtils.cancel(future); + if (future.isCancelled() == false) { + FutureUtils.cancel(future); + } } }