Merge pull request elastic/elasticsearch#1902 from tlrx/catch-rejected-exception
Monitoring: Catch EsRejectedExecutionException when rescheduling the cleaner Original commit: elastic/x-pack-elasticsearch@6ba20d5b8a
This commit is contained in:
commit
3672e06343
|
@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
import org.elasticsearch.marvel.MarvelSettings;
|
||||
import org.elasticsearch.marvel.license.MarvelLicensee;
|
||||
|
@ -31,8 +32,8 @@ public class CleanerService extends AbstractLifecycleComponent<CleanerService> {
|
|||
private final ThreadPool threadPool;
|
||||
private final ExecutionScheduler executionScheduler;
|
||||
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
|
||||
private final IndicesCleaner runnable;
|
||||
|
||||
private volatile IndicesCleaner runnable;
|
||||
private volatile TimeValue globalRetention;
|
||||
|
||||
CleanerService(Settings settings, ClusterSettings clusterSettings, MarvelLicensee licensee, ThreadPool threadPool,
|
||||
|
@ -42,6 +43,7 @@ public class CleanerService extends AbstractLifecycleComponent<CleanerService> {
|
|||
this.threadPool = threadPool;
|
||||
this.executionScheduler = executionScheduler;
|
||||
this.globalRetention = MarvelSettings.HISTORY_DURATION.get(settings);
|
||||
this.runnable = new IndicesCleaner();
|
||||
|
||||
// the validation is performed by the setting's object itself
|
||||
clusterSettings.addSettingsUpdateConsumer(MarvelSettings.HISTORY_DURATION, this::setGlobalRetention);
|
||||
|
@ -55,7 +57,6 @@ public class CleanerService extends AbstractLifecycleComponent<CleanerService> {
|
|||
@Override
|
||||
protected void doStart() {
|
||||
logger.debug("starting cleaning service");
|
||||
this.runnable = new IndicesCleaner();
|
||||
threadPool.schedule(executionScheduler.nextExecutionDelay(new DateTime(ISOChronology.getInstance())), executorName(), runnable);
|
||||
logger.debug("cleaning service started");
|
||||
}
|
||||
|
@ -64,15 +65,14 @@ public class CleanerService extends AbstractLifecycleComponent<CleanerService> {
|
|||
protected void doStop() {
|
||||
logger.debug("stopping cleaning service");
|
||||
listeners.clear();
|
||||
runnable.cancel();
|
||||
logger.debug("cleaning service stopped");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
logger.debug("closing cleaning service");
|
||||
if (runnable != null) {
|
||||
runnable.cancel();
|
||||
}
|
||||
logger.debug("cleaning service closed");
|
||||
// Cleaner runnable should have been cancelled in doStop() method,
|
||||
// normally called just before this one
|
||||
}
|
||||
|
||||
private String executorName() {
|
||||
|
@ -196,7 +196,15 @@ public class CleanerService extends AbstractLifecycleComponent<CleanerService> {
|
|||
|
||||
logger.debug("scheduling next execution in [{}] seconds", delay.seconds());
|
||||
|
||||
try {
|
||||
future = threadPool.schedule(delay, executorName(), this);
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
if (e.isExecutorShutdown()) {
|
||||
logger.debug("couldn't schedule new execution of the cleaner, executor is shutting down", e);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -212,9 +220,11 @@ public class CleanerService extends AbstractLifecycleComponent<CleanerService> {
|
|||
* stopped.
|
||||
*/
|
||||
public void cancel() {
|
||||
if (future.isCancelled() == false) {
|
||||
FutureUtils.cancel(future);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface ExecutionScheduler {
|
||||
|
||||
|
|
Loading…
Reference in New Issue