Fix race in SLM master/cluster state listeners (#59896)

This change fixes two possible race conditions in SLM related to
how local master changes and cluster state events are observed. When
implementing the `LocalNodeMasterListener` interface, it is only
recommended to execute on a separate threadpool if the operations are
heavy and would block the cluster state thread. SLM specified that the
listeners should run in the Snapshot thread pool, but the operations
in the listener were lightweight. This had the side effect of causing
master changes to be delayed if the Snapshot threads were all busy and
could also potentially cause the `onMaster` and `offMaster` calls to
race if both were queued and then executed concurrently. Additionally,
the `SnapshotLifecycleService` is also a `ClusterStateListener` and
there is currently no order of operations guarantee between
`LocalNodeMasterListeners` and `ClusterStateListeners` so this could
lead to incorrect behavior.

The resolution for these two issues is that the
SnapshotRetentionService now specifies the `SAME` executor for its
implementation of the `LocalNodeMasterListener` interface. The
`SnapshotLifecycleService` is no longer a `LocalNodeMasterListener` and
instead tracks local master changes in its `ClusterStateListner`.

Backport of #59801
This commit is contained in:
Jay Modi 2020-07-20 09:59:46 -06:00 committed by GitHub
parent 96a5284484
commit 515b53d297
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 103 additions and 74 deletions

View File

@ -978,8 +978,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
assertTrue(indexExists(shrunkenIndex));
assertTrue(aliasExists(shrunkenIndex, index));
assertThat(getStepKeyForIndex(client(), shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
});
}, 30, TimeUnit.SECONDS);
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53612")

View File

@ -210,9 +210,11 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
clusterService));
snapshotLifecycleService.set(new SnapshotLifecycleService(settings,
() -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock()));
snapshotLifecycleService.get().init();
snapshotRetentionService.set(new SnapshotRetentionService(settings,
() -> new SnapshotRetentionTask(client, clusterService, System::nanoTime, snapshotHistoryStore.get(), threadPool),
clusterService, getClock()));
getClock()));
snapshotRetentionService.get().init(clusterService);
components.addAll(Arrays.asList(snapshotLifecycleService.get(), snapshotHistoryStore.get(), snapshotRetentionService.get()));
return components;

View File

@ -11,12 +11,10 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ilm.OperationMode;
import org.elasticsearch.xpack.core.scheduler.CronSchedule;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
@ -40,7 +38,7 @@ import java.util.stream.Collectors;
* {@link SnapshotLifecycleTask}. It reacts to new policies in the cluster state by scheduling a
* task according to the policy's schedule.
*/
public class SnapshotLifecycleService implements LocalNodeMasterListener, Closeable, ClusterStateListener {
public class SnapshotLifecycleService implements Closeable, ClusterStateListener {
private static final Logger logger = LogManager.getLogger(SnapshotLifecycleService.class);
private static final String JOB_PATTERN_SUFFIX = "-\\d+$";
@ -59,12 +57,31 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
this.scheduler = new SchedulerEngine(settings, clock);
this.clusterService = clusterService;
this.snapshotTask = taskSupplier.get();
clusterService.addLocalNodeMasterListener(this); // TODO: change this not to use 'this'
}
/**
* Initializer method to avoid the publication of a self reference in the constructor.
*/
public void init() {
clusterService.addListener(this);
}
@Override
public void clusterChanged(final ClusterChangedEvent event) {
// Instead of using a LocalNodeMasterListener to track master changes, this service will
// track them here to avoid conditions where master listener events run after other
// listeners that depend on what happened in the master listener
final boolean prevIsMaster = this.isMaster;
if (prevIsMaster != event.localNodeMaster()) {
this.isMaster = event.localNodeMaster();
if (this.isMaster) {
scheduler.register(snapshotTask);
} else {
scheduler.unregister(snapshotTask);
cancelSnapshotJobs();
}
}
if (this.isMaster) {
final ClusterState state = event.state();
@ -83,25 +100,6 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
}
}
@Override
public void onMaster() {
this.isMaster = true;
scheduler.register(snapshotTask);
final ClusterState state = clusterService.state();
if (slmStoppedOrStopping(state)) {
// SLM is currently stopped, so don't schedule jobs
return;
}
scheduleSnapshotJobs(state);
}
@Override
public void offMaster() {
this.isMaster = false;
scheduler.unregister(snapshotTask);
cancelSnapshotJobs();
}
// Only used for testing
SchedulerEngine getScheduler() {
return this.scheduler;
@ -236,11 +234,6 @@ public class SnapshotLifecycleService implements LocalNodeMasterListener, Closea
.orElseThrow(() -> new IllegalArgumentException("no such repository [" + repository + "]"));
}
@Override
public String executorName() {
return ThreadPool.Names.SNAPSHOT;
}
@Override
public void close() {
if (this.running.compareAndSet(true, false)) {

View File

@ -46,13 +46,18 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
public SnapshotRetentionService(Settings settings,
Supplier<SnapshotRetentionTask> taskSupplier,
ClusterService clusterService,
Clock clock) {
this.clock = clock;
this.scheduler = new SchedulerEngine(settings, clock);
this.retentionTask = taskSupplier.get();
this.scheduler.register(this.retentionTask);
this.slmRetentionSchedule = LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING.get(settings);
}
/**
* Initializer method to avoid the publication of a self reference in the constructor.
*/
public void init(ClusterService clusterService) {
clusterService.addLocalNodeMasterListener(this);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING,
this::setUpdateSchedule);
@ -110,7 +115,7 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
@Override
public String executorName() {
return ThreadPool.Names.SNAPSHOT;
return ThreadPool.Names.SAME;
}
@Override

View File

@ -6,14 +6,18 @@
package org.elasticsearch.xpack.slm;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
@ -100,8 +104,7 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(initialState, threadPool);
SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
() -> new FakeSnapshotTask(e -> logger.info("triggered")), clusterService, clock)) {
sls.offMaster();
sls.init();
SnapshotLifecyclePolicyMetadata newPolicy = SnapshotLifecyclePolicyMetadata.builder()
.setPolicy(createPolicy("foo", "*/1 * * * * ?"))
@ -121,26 +124,30 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
// Since the service does not think it is master, it should not be triggered or scheduled
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
sls.onMaster();
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("initial-1")));
ClusterState prevState = state;
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()), true);
sls.clusterChanged(new ClusterChangedEvent("2", state, prevState));
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-2")));
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.STOPPING, new SnapshotLifecycleStats()));
sls.clusterChanged(new ClusterChangedEvent("2", state, emptyState));
prevState = state;
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.STOPPING, new SnapshotLifecycleStats()), true);
sls.clusterChanged(new ClusterChangedEvent("3", state, prevState));
// Since the service is stopping, jobs should have been cancelled
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.STOPPED, new SnapshotLifecycleStats()));
sls.clusterChanged(new ClusterChangedEvent("3", state, emptyState));
prevState = state;
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.STOPPED, new SnapshotLifecycleStats()), true);
sls.clusterChanged(new ClusterChangedEvent("4", state, prevState));
// Since the service is stopped, jobs should have been cancelled
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
// No jobs should be scheduled when service is closed
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()));
prevState = state;
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()), true);
sls.close();
sls.onMaster();
sls.clusterChanged(new ClusterChangedEvent("1", state, emptyState));
sls.clusterChanged(new ClusterChangedEvent("5", state, prevState));
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
} finally {
threadPool.shutdownNow();
@ -160,11 +167,12 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
() -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) {
sls.offMaster();
sls.init();
SnapshotLifecycleMetadata snapMeta =
new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING, new SnapshotLifecycleStats());
ClusterState previousState = createState(snapMeta);
ClusterState state = createState(snapMeta, false);
sls.clusterChanged(new ClusterChangedEvent("1", state, ClusterState.EMPTY_STATE));
Map<String, SnapshotLifecyclePolicyMetadata> policies = new HashMap<>();
SnapshotLifecyclePolicyMetadata policy = SnapshotLifecyclePolicyMetadata.builder()
@ -174,8 +182,9 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
.build();
policies.put(policy.getPolicy().getId(), policy);
snapMeta = new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats());
ClusterState state = createState(snapMeta);
ClusterChangedEvent event = new ClusterChangedEvent("1", state, previousState);
ClusterState previousState = state;
state = createState(snapMeta, false);
ClusterChangedEvent event = new ClusterChangedEvent("2", state, previousState);
trigger.set(e -> {
fail("trigger should not be invoked");
});
@ -185,8 +194,10 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
// Change the service to think it's on the master node, events should be scheduled now
sls.onMaster();
trigger.set(e -> triggerCount.incrementAndGet());
previousState = state;
state = createState(snapMeta, true);
event = new ClusterChangedEvent("3", state, previousState);
sls.clusterChanged(event);
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-1")));
@ -202,8 +213,8 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
.setModifiedDate(2)
.build();
policies.put(policy.getPolicy().getId(), newPolicy);
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()));
event = new ClusterChangedEvent("2", state, previousState);
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()), true);
event = new ClusterChangedEvent("4", state, previousState);
sls.clusterChanged(event);
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-2")));
@ -226,9 +237,9 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
final int currentCount2 = triggerCount.get();
previousState = state;
// Create a state simulating the policy being deleted
state =
createState(new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING, new SnapshotLifecycleStats()));
event = new ClusterChangedEvent("2", state, previousState);
state = createState(
new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING, new SnapshotLifecycleStats()), true);
event = new ClusterChangedEvent("5", state, previousState);
sls.clusterChanged(event);
clock.fastForwardSeconds(2);
@ -246,8 +257,8 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
policies.put(policy.getPolicy().getId(), policy);
snapMeta = new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats());
previousState = state;
state = createState(snapMeta);
event = new ClusterChangedEvent("1", state, previousState);
state = createState(snapMeta, true);
event = new ClusterChangedEvent("6", state, previousState);
trigger.set(e -> triggerCount.incrementAndGet());
sls.clusterChanged(event);
clock.fastForwardSeconds(2);
@ -257,7 +268,10 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-3")));
// Signify becoming non-master, the jobs should all be cancelled
sls.offMaster();
previousState = state;
state = createState(snapMeta, false);
event = new ClusterChangedEvent("7", state, previousState);
sls.clusterChanged(event);
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
} finally {
threadPool.shutdownNow();
@ -276,11 +290,13 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
() -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) {
sls.onMaster();
sls.init();
SnapshotLifecycleMetadata snapMeta =
new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING, new SnapshotLifecycleStats());
ClusterState previousState = createState(snapMeta);
ClusterState state = createState(snapMeta, true);
ClusterChangedEvent event = new ClusterChangedEvent("1", state, ClusterState.EMPTY_STATE);
sls.clusterChanged(event);
Map<String, SnapshotLifecyclePolicyMetadata> policies = new HashMap<>();
SnapshotLifecyclePolicyMetadata policy = SnapshotLifecyclePolicyMetadata.builder()
@ -291,13 +307,13 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
.build();
policies.put(policy.getPolicy().getId(), policy);
snapMeta = new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats());
ClusterState state = createState(snapMeta);
ClusterChangedEvent event = new ClusterChangedEvent("1", state, previousState);
ClusterState previousState = state;
state = createState(snapMeta, true);
event = new ClusterChangedEvent("2", state, previousState);
sls.clusterChanged(event);
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-2-1")));
previousState = state;
SnapshotLifecyclePolicyMetadata secondPolicy = SnapshotLifecyclePolicyMetadata.builder()
.setPolicy(createPolicy("foo-1", "45 * * * * ?"))
.setHeaders(Collections.emptyMap())
@ -306,13 +322,17 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
.build();
policies.put(secondPolicy.getPolicy().getId(), secondPolicy);
snapMeta = new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats());
state = createState(snapMeta);
event = new ClusterChangedEvent("2", state, previousState);
previousState = state;
state = createState(snapMeta, true);
event = new ClusterChangedEvent("3", state, previousState);
sls.clusterChanged(event);
assertThat(sls.getScheduler().scheduledJobIds(), containsInAnyOrder("foo-2-1", "foo-1-2"));
sls.offMaster();
previousState = state;
state = createState(snapMeta, false);
event = new ClusterChangedEvent("4", state, previousState);
sls.clusterChanged(event);
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
} finally {
threadPool.shutdownNow();
@ -336,10 +356,20 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
}
public ClusterState createState(SnapshotLifecycleMetadata snapMeta) {
return createState(snapMeta, false);
}
public ClusterState createState(SnapshotLifecycleMetadata snapMeta, boolean localNodeMaster) {
Metadata metadata = Metadata.builder()
.putCustom(SnapshotLifecycleMetadata.TYPE, snapMeta)
.build();
final DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder()
.add(DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9300), "local"))
.add(new DiscoveryNode("remote", new TransportAddress(TransportAddress.META_ADDRESS, 9301), Version.CURRENT))
.localNodeId("local")
.masterNodeId(localNodeMaster ? "local" : "remote");
return ClusterState.builder(new ClusterName("cluster"))
.nodes(discoveryNodesBuilder)
.metadata(metadata)
.build();
}

View File

@ -51,8 +51,8 @@ public class SnapshotRetentionServiceTests extends ESTestCase {
ThreadPool threadPool = new TestThreadPool("test");
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY,
FakeRetentionTask::new, clusterService, clock)) {
SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY, FakeRetentionTask::new, clock)) {
service.init(clusterService);
assertThat(service.getScheduler().jobCount(), equalTo(0));
service.onMaster();
@ -82,23 +82,23 @@ public class SnapshotRetentionServiceTests extends ESTestCase {
Collections.emptyMap(), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT);
ClockMock clock = new ClockMock();
AtomicInteger invoked = new AtomicInteger(0);
ThreadPool threadPool = new TestThreadPool("test");
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY,
() -> new FakeRetentionTask(event -> {
assertThat(event.getJobName(), equalTo(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID));
invoked.incrementAndGet();
}), clusterService, clock)) {
}), clock)) {
service.init(clusterService);
service.onMaster();
service.triggerRetention();
assertThat(invoked.get(), equalTo(1));
service.offMaster();
service.triggerRetention();
assertThat(invoked.get(), equalTo(1));
service.onMaster();
service.triggerRetention();
assertThat(invoked.get(), equalTo(2));