[7.x] Don't schedule SLM jobs when services have been stopped… (#48692)
This adds a guard for the SLM lifecycle and retention service that prevents new jobs from being scheduled once the service has been stopped. Previous if the node were shut down the service would be stopped, but a cluster state or local master election would cause a job to attempt to be scheduled. This could lead to an uncaught `RejectedExecutionException`. Resolves #47749
This commit is contained in:
parent
13043219ac
commit
72a601c47f
|
@ -30,6 +30,7 @@ import java.time.Clock;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -48,6 +49,7 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
|
||||||
private final ClusterService clusterService;
|
private final ClusterService clusterService;
|
||||||
private final SnapshotLifecycleTask snapshotTask;
|
private final SnapshotLifecycleTask snapshotTask;
|
||||||
private final Map<String, SchedulerEngine.Job> scheduledTasks = ConcurrentCollections.newConcurrentMap();
|
private final Map<String, SchedulerEngine.Job> scheduledTasks = ConcurrentCollections.newConcurrentMap();
|
||||||
|
private final AtomicBoolean running = new AtomicBoolean(true);
|
||||||
private volatile boolean isMaster = false;
|
private volatile boolean isMaster = false;
|
||||||
|
|
||||||
public SnapshotLifecycleService(Settings settings,
|
public SnapshotLifecycleService(Settings settings,
|
||||||
|
@ -160,6 +162,10 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
|
||||||
* the same version of a policy has already been scheduled it does not overwrite the job.
|
* the same version of a policy has already been scheduled it does not overwrite the job.
|
||||||
*/
|
*/
|
||||||
public void maybeScheduleSnapshot(final SnapshotLifecyclePolicyMetadata snapshotLifecyclePolicy) {
|
public void maybeScheduleSnapshot(final SnapshotLifecyclePolicyMetadata snapshotLifecyclePolicy) {
|
||||||
|
if (this.running.get() == false) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
final String jobId = getJobId(snapshotLifecyclePolicy);
|
final String jobId = getJobId(snapshotLifecyclePolicy);
|
||||||
final Pattern existingJobPattern = Pattern.compile(snapshotLifecyclePolicy.getPolicy().getId() + JOB_PATTERN_SUFFIX);
|
final Pattern existingJobPattern = Pattern.compile(snapshotLifecyclePolicy.getPolicy().getId() + JOB_PATTERN_SUFFIX);
|
||||||
|
|
||||||
|
@ -237,6 +243,8 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
this.scheduler.stop();
|
if (this.running.compareAndSet(true, false)) {
|
||||||
|
this.scheduler.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.time.Clock;
|
import java.time.Clock;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -38,6 +39,7 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
|
||||||
private final SchedulerEngine scheduler;
|
private final SchedulerEngine scheduler;
|
||||||
private final SnapshotRetentionTask retentionTask;
|
private final SnapshotRetentionTask retentionTask;
|
||||||
private final Clock clock;
|
private final Clock clock;
|
||||||
|
private final AtomicBoolean running = new AtomicBoolean(true);
|
||||||
|
|
||||||
private volatile String slmRetentionSchedule;
|
private volatile String slmRetentionSchedule;
|
||||||
private volatile boolean isMaster = false;
|
private volatile boolean isMaster = false;
|
||||||
|
@ -81,7 +83,7 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
|
||||||
|
|
||||||
private void rescheduleRetentionJob() {
|
private void rescheduleRetentionJob() {
|
||||||
final String schedule = this.slmRetentionSchedule;
|
final String schedule = this.slmRetentionSchedule;
|
||||||
if (this.isMaster && Strings.hasText(schedule)) {
|
if (this.running.get() && this.isMaster && Strings.hasText(schedule)) {
|
||||||
final SchedulerEngine.Job retentionJob = new SchedulerEngine.Job(SLM_RETENTION_JOB_ID,
|
final SchedulerEngine.Job retentionJob = new SchedulerEngine.Job(SLM_RETENTION_JOB_ID,
|
||||||
new CronSchedule(schedule));
|
new CronSchedule(schedule));
|
||||||
logger.debug("scheduling SLM retention job for [{}]", schedule);
|
logger.debug("scheduling SLM retention job for [{}]", schedule);
|
||||||
|
@ -113,6 +115,8 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
this.scheduler.stop();
|
if (this.running.compareAndSet(true, false)) {
|
||||||
|
this.scheduler.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,6 +133,13 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
|
||||||
// Since the service is stopped, jobs should have been cancelled
|
// Since the service is stopped, jobs should have been cancelled
|
||||||
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
|
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
|
||||||
|
|
||||||
|
// No jobs should be scheduled when service is closed
|
||||||
|
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()));
|
||||||
|
sls.close();
|
||||||
|
sls.onMaster();
|
||||||
|
sls.clusterChanged(new ClusterChangedEvent("1", state, emptyState));
|
||||||
|
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
|
||||||
|
|
||||||
threadPool.shutdownNow();
|
threadPool.shutdownNow();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,6 +66,12 @@ public class SnapshotRetentionServiceTests extends ESTestCase {
|
||||||
|
|
||||||
service.setUpdateSchedule("");
|
service.setUpdateSchedule("");
|
||||||
assertThat(service.getScheduler().jobCount(), equalTo(0));
|
assertThat(service.getScheduler().jobCount(), equalTo(0));
|
||||||
|
|
||||||
|
// Service should not scheduled any jobs once closed
|
||||||
|
service.close();
|
||||||
|
service.onMaster();
|
||||||
|
assertThat(service.getScheduler().jobCount(), equalTo(0));
|
||||||
|
|
||||||
threadPool.shutdownNow();
|
threadPool.shutdownNow();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue