From 72a601c47f0082400dc2a83a34affbc3f7b15ed1 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 30 Oct 2019 09:46:35 -0600 Subject: [PATCH] =?UTF-8?q?[7.x]=20Don't=20schedule=20SLM=20jobs=20when=20?= =?UTF-8?q?services=20have=20been=20stopped=E2=80=A6=20(#48692)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This adds a guard for the SLM lifecycle and retention service that prevents new jobs from being scheduled once the service has been stopped. Previous if the node were shut down the service would be stopped, but a cluster state or local master election would cause a job to attempt to be scheduled. This could lead to an uncaught `RejectedExecutionException`. Resolves #47749 --- .../xpack/slm/SnapshotLifecycleService.java | 10 +++++++++- .../xpack/slm/SnapshotRetentionService.java | 8 ++++++-- .../xpack/slm/SnapshotLifecycleServiceTests.java | 7 +++++++ .../xpack/slm/SnapshotRetentionServiceTests.java | 6 ++++++ 4 files changed, 28 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java index c0f8e651587..4529621dfcc 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java @@ -30,6 +30,7 @@ import java.time.Clock; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -48,6 +49,7 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea private final ClusterService clusterService; private final SnapshotLifecycleTask snapshotTask; private final Map scheduledTasks = ConcurrentCollections.newConcurrentMap(); + private final AtomicBoolean running = new AtomicBoolean(true); private volatile boolean isMaster = false; public SnapshotLifecycleService(Settings settings, @@ -160,6 +162,10 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea * the same version of a policy has already been scheduled it does not overwrite the job. */ public void maybeScheduleSnapshot(final SnapshotLifecyclePolicyMetadata snapshotLifecyclePolicy) { + if (this.running.get() == false) { + return; + } + final String jobId = getJobId(snapshotLifecyclePolicy); final Pattern existingJobPattern = Pattern.compile(snapshotLifecyclePolicy.getPolicy().getId() + JOB_PATTERN_SUFFIX); @@ -237,6 +243,8 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea @Override public void close() { - this.scheduler.stop(); + if (this.running.compareAndSet(true, false)) { + this.scheduler.stop(); + } } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java index 235df846b48..0eefbbf92ef 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java @@ -20,6 +20,7 @@ import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy; import java.io.Closeable; import java.time.Clock; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; /** @@ -38,6 +39,7 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea private final SchedulerEngine scheduler; private final SnapshotRetentionTask retentionTask; private final Clock clock; + private final AtomicBoolean running = new AtomicBoolean(true); private volatile String slmRetentionSchedule; private volatile boolean isMaster = false; @@ -81,7 +83,7 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea private void rescheduleRetentionJob() { final String schedule = this.slmRetentionSchedule; - if (this.isMaster && Strings.hasText(schedule)) { + if (this.running.get() && this.isMaster && Strings.hasText(schedule)) { final SchedulerEngine.Job retentionJob = new SchedulerEngine.Job(SLM_RETENTION_JOB_ID, new CronSchedule(schedule)); logger.debug("scheduling SLM retention job for [{}]", schedule); @@ -113,6 +115,8 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea @Override public void close() { - this.scheduler.stop(); + if (this.running.compareAndSet(true, false)) { + this.scheduler.stop(); + } } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java index 2a8868c480c..8336e104821 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java @@ -133,6 +133,13 @@ public class SnapshotLifecycleServiceTests extends ESTestCase { // Since the service is stopped, jobs should have been cancelled assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); + // No jobs should be scheduled when service is closed + state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats())); + sls.close(); + sls.onMaster(); + sls.clusterChanged(new ClusterChangedEvent("1", state, emptyState)); + assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet())); + threadPool.shutdownNow(); } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java index 4a08c5d7917..1915a76fe1d 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java @@ -66,6 +66,12 @@ public class SnapshotRetentionServiceTests extends ESTestCase { service.setUpdateSchedule(""); assertThat(service.getScheduler().jobCount(), equalTo(0)); + + // Service should not scheduled any jobs once closed + service.close(); + service.onMaster(); + assertThat(service.getScheduler().jobCount(), equalTo(0)); + threadPool.shutdownNow(); } }