separate out IndexLifecycleService cluster-state change concerns (#33033)

Changes to the IndexLifecycleService were necessary since relying on
ClusterChangedEvents for a full picture of the cluster state's settings was
a mistake. It is not necessary that these events hold all settings, especially ones
that are set at node start-up.

Changes to main include:

- move poll interval updates to a SettingsUpdateConsumer
- move scheduler start/stop to a localMasterNodeListener
- keep triggerPolicies in clusterChanged

Changes to tests include:

- removal of some low-level state transition checks in the Service that no longer make sense
  since the changes are unconditionally specified in the appropriate listeners
- add integration tests for poll-interval updates
- add integration test assertions for verifying scheduler is started up correctly
This commit is contained in:
Tal Levy 2018-08-27 14:25:27 -07:00 committed by GitHub
parent 5783545222
commit cfe0acc83c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 94 additions and 216 deletions

View File

@ -15,12 +15,14 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexlifecycle.OperationMode;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
@ -39,9 +41,11 @@ import java.util.function.LongSupplier;
* A service which runs the {@link LifecyclePolicy}s associated with indexes.
*/
public class IndexLifecycleService extends AbstractComponent
implements ClusterStateListener, ClusterStateApplier, SchedulerEngine.Listener, Closeable {
implements ClusterStateListener, ClusterStateApplier, SchedulerEngine.Listener, Closeable, LocalNodeMasterListener {
private static final Logger logger = LogManager.getLogger(IndexLifecycleService.class);
private static final Set<String> IGNORE_ACTIONS_MAINTENANCE_REQUESTED = Collections.singleton(ShrinkAction.NAME);
private volatile boolean isMaster = false;
private volatile TimeValue pollInterval;
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
private final Clock clock;
@ -61,8 +65,12 @@ public class IndexLifecycleService extends AbstractComponent
this.scheduledJob = null;
this.policyRegistry = new PolicyStepsRegistry();
this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, nowSupplier);
this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
clusterService.addStateApplier(this);
clusterService.addListener(this);
clusterService.addLocalNodeMasterListener(this);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING,
this::updatePollInterval);
}
public ClusterState moveClusterStateToStep(ClusterState currentState, String indexName, StepKey currentStepKey, StepKey nextStepKey) {
@ -74,46 +82,54 @@ public class IndexLifecycleService extends AbstractComponent
return lifecycleRunner.moveClusterStateToFailedStep(currentState, indices);
}
@Override
public void onMaster() {
this.isMaster = true;
maybeScheduleJob();
}
@Override
public void offMaster() {
this.isMaster = false;
cancelJob();
}
@Override
public String executorName() {
return ThreadPool.Names.MANAGEMENT;
}
private void updatePollInterval(TimeValue newInterval) {
this.pollInterval = newInterval;
maybeScheduleJob();
}
// pkg-private for testing
SchedulerEngine getScheduler() {
return scheduler.get();
}
// pkg-private for testing
SchedulerEngine.Job getScheduledJob() {
return scheduledJob;
}
public LongSupplier getNowSupplier() {
return nowSupplier;
}
public PolicyStepsRegistry getPolicyRegistry() {
return policyRegistry;
private void maybeScheduleJob() {
if (this.isMaster) {
if (scheduler.get() == null) {
scheduler.set(new SchedulerEngine(settings, clock));
scheduler.get().register(this);
}
scheduledJob = new SchedulerEngine.Job(IndexLifecycle.NAME, new TimeValueSchedule(pollInterval));
scheduler.get().add(scheduledJob);
}
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
IndexLifecycleMetadata lifecycleMetadata = event.state().metaData().custom(IndexLifecycleMetadata.TYPE);
if (event.localNodeMaster() && lifecycleMetadata != null) {
TimeValue pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING
.get(event.state().getMetaData().settings());
TimeValue previousPollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING
.get(event.previousState().getMetaData().settings());
boolean pollIntervalSettingChanged = !pollInterval.equals(previousPollInterval);
if (scheduler.get() == null) { // metadata installed and scheduler should be kicked off. start your engines.
scheduler.set(new SchedulerEngine(settings, clock));
scheduler.get().register(this);
scheduleJob(pollInterval);
} else if (scheduledJob == null) {
scheduleJob(pollInterval);
} else if (pollIntervalSettingChanged) { // all engines are running, just need to update with latest interval
scheduleJob(pollInterval);
}
if (this.isMaster && lifecycleMetadata != null) {
triggerPolicies(event.state(), true);
} else {
cancelJob();
}
}
@ -140,15 +156,10 @@ public class IndexLifecycleService extends AbstractComponent
}
}
private void scheduleJob(TimeValue pollInterval) {
scheduledJob = new SchedulerEngine.Job(IndexLifecycle.NAME, new TimeValueSchedule(pollInterval));
scheduler.get().add(scheduledJob);
}
@Override
public void triggered(SchedulerEngine.Event event) {
if (event.getJobName().equals(IndexLifecycle.NAME)) {
logger.debug("Job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime());
logger.trace("job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime());
triggerPolicies(clusterService.state(), false);
}
}

View File

@ -136,6 +136,9 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
assertBusy(() -> {
assertEquals(true, client().admin().indices().prepareExists("test").get().isExists());
});
IndexLifecycleService indexLifecycleService = internalCluster().getInstance(IndexLifecycleService.class, server_1);
assertThat(indexLifecycleService.getScheduler().jobCount(), equalTo(1));
assertNotNull(indexLifecycleService.getScheduledJob());
assertBusy(() -> {
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get();
String step = settingsResponse.getSetting("test", "index.lifecycle.step");
@ -146,13 +149,25 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
public void testMasterDedicatedDataDedicated() throws Exception {
settings = Settings.builder().put(settings).put("index.lifecycle.test.complete", true).build();
// start master node
logger.info("Starting sever1");
internalCluster().startMasterOnlyNode();
logger.info("Starting master-only server1");
final String server_1 = internalCluster().startMasterOnlyNode();
// start data node
logger.info("Starting sever1");
logger.info("Starting data-only server2");
final String server_2 = internalCluster().startDataOnlyNode();
final String node2 = getLocalNodeId(server_2);
// check that the scheduler was started on the appropriate node
{
IndexLifecycleService indexLifecycleService = internalCluster().getInstance(IndexLifecycleService.class, server_1);
assertThat(indexLifecycleService.getScheduler().jobCount(), equalTo(1));
assertNotNull(indexLifecycleService.getScheduledJob());
}
{
IndexLifecycleService indexLifecycleService = internalCluster().getInstance(IndexLifecycleService.class, server_2);
assertNull(indexLifecycleService.getScheduler());
assertNull(indexLifecycleService.getScheduledJob());
}
logger.info("Creating lifecycle [test_lifecycle]");
PutLifecycleAction.Request putLifecycleRequest = new PutLifecycleAction.Request(lifecyclePolicy);
PutLifecycleAction.Response putLifecycleResponse = client().execute(PutLifecycleAction.INSTANCE, putLifecycleRequest).get();
@ -238,6 +253,27 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
});
}
public void testPollIntervalUpdate() {
TimeValue pollInterval = TimeValue.timeValueSeconds(randomLongBetween(1, 5));
final String server_1 = internalCluster().startMasterOnlyNode(
Settings.builder().put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, pollInterval.getStringRep()).build());
IndexLifecycleService indexLifecycleService = internalCluster().getInstance(IndexLifecycleService.class, server_1);
assertThat(indexLifecycleService.getScheduler().jobCount(), equalTo(1));
{
TimeValueSchedule schedule = (TimeValueSchedule) indexLifecycleService.getScheduledJob().getSchedule();
assertThat(schedule.getInterval(), equalTo(pollInterval));
}
// update the poll interval
TimeValue newPollInterval = TimeValue.timeValueHours(randomLongBetween(6, 10));
Settings newIntervalSettings = Settings.builder().put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, newPollInterval).build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(newIntervalSettings));
{
TimeValueSchedule schedule = (TimeValueSchedule) indexLifecycleService.getScheduledJob().getSchedule();
assertThat(schedule.getInterval(), equalTo(newPollInterval));
}
}
private String getLocalNodeId(String name) {
TransportService transportService = internalCluster().getInstance(TransportService.class, name);
String nodeId = transportService.getLocalNode().getId();

View File

@ -13,13 +13,13 @@ import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
@ -43,7 +43,6 @@ import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collections;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
@ -56,9 +55,6 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class IndexLifecycleServiceTests extends ESTestCase {
@ -87,6 +83,9 @@ public class IndexLifecycleServiceTests extends ESTestCase {
runnable.run();
return null;
}).when(executorService).execute(any());
Settings settings = Settings.builder().put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s").build();
when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings,
Collections.singleton(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING)));
Client client = mock(Client.class);
AdminClient adminClient = mock(AdminClient.class);
@ -105,178 +104,6 @@ public class IndexLifecycleServiceTests extends ESTestCase {
indexLifecycleService.close();
}
public void testOnlyChangesStateOnMasterAndMetadataExists() {
boolean isMaster = randomBoolean();
String localNodeId = isMaster ? nodeId : nodeId + "not_master";
MetaData.Builder metaData = MetaData.builder()
.persistentSettings(settings(Version.CURRENT)
.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(3)).build());
if (isMaster == false) {
metaData.putCustom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY);
}
ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(localNodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
ClusterChangedEvent event = new ClusterChangedEvent("_source", state, state);
indexLifecycleService.applyClusterState(event);
indexLifecycleService.clusterChanged(event);
verify(clusterService, times(1)).addListener(any());
verify(clusterService, times(1)).addStateApplier(any());
Mockito.verifyNoMoreInteractions(clusterService);
assertNull(indexLifecycleService.getScheduler());
}
public void testOnlyChangesStateOnMasterWhenMetadataChanges() {
int numPolicies = randomIntBetween(1, 5);
IndexLifecycleMetadata lifecycleMetadata = IndexLifecycleMetadataTests.createTestInstance(numPolicies, OperationMode.RUNNING);
IndexLifecycleMetadata newLifecycleMetadata = randomValueOtherThan(lifecycleMetadata,
() -> IndexLifecycleMetadataTests.createTestInstance(numPolicies, OperationMode.RUNNING));
MetaData previousMetadata = MetaData.builder()
.persistentSettings(settings(Version.CURRENT)
.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(3)).build())
.putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata)
.build();
MetaData newMetaData = MetaData.builder(previousMetadata).putCustom(IndexLifecycleMetadata.TYPE, newLifecycleMetadata).build();
ClusterState previousState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(previousMetadata)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
ClusterState newState = ClusterState.builder(previousState).metaData(newMetaData).build();
ClusterChangedEvent event = new ClusterChangedEvent("_source", previousState, previousState);
Mockito.reset(clusterService);
PolicyStepsRegistry policyStepsRegistry = indexLifecycleService.getPolicyRegistry();
indexLifecycleService.applyClusterState(event);
indexLifecycleService.clusterChanged(event);
Mockito.verifyZeroInteractions(clusterService);
assertNotNull(indexLifecycleService.getScheduler());
assertEquals(1, indexLifecycleService.getScheduler().jobCount());
assertNotNull(indexLifecycleService.getScheduledJob());
assertThat(policyStepsRegistry.getLifecyclePolicyMap().keySet(), equalTo(lifecycleMetadata.getPolicyMetadatas().keySet()));
event = new ClusterChangedEvent("_source", newState, previousState);
indexLifecycleService.applyClusterState(event);
assertThat(policyStepsRegistry.getLifecyclePolicyMap().keySet(), equalTo(newLifecycleMetadata.getPolicyMetadatas().keySet()));
}
public void testElectUnElectMaster() {
int numberOfPolicies = randomIntBetween(1, 5);
IndexLifecycleMetadata lifecycleMetadata = IndexLifecycleMetadataTests.createTestInstance(numberOfPolicies, OperationMode.RUNNING);
Map<String, LifecyclePolicyMetadata> expectedPolicyMap = lifecycleMetadata.getPolicyMetadatas();
MetaData metaData = MetaData.builder()
.persistentSettings(settings(Version.CURRENT)
.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(3)).build())
.putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata)
.build();
// First check that when the node has never been master the scheduler
// and job are not set up
ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId + "not").masterNodeId(nodeId).add(masterNode).build())
.build();
ClusterChangedEvent event = new ClusterChangedEvent("_source", state, state);
indexLifecycleService.applyClusterState(event);
indexLifecycleService.clusterChanged(event);
verify(clusterService, times(1)).addListener(any());
verify(clusterService, times(1)).addStateApplier(any());
Mockito.verifyNoMoreInteractions(clusterService);
assertNull(indexLifecycleService.getScheduler());
assertNull(indexLifecycleService.getScheduledJob());
Mockito.reset(clusterService);
state = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
event = new ClusterChangedEvent("_source", state, state);
// Check that when the node is first elected as master it sets up
// the scheduler job and steps registry
indexLifecycleService.applyClusterState(event);
indexLifecycleService.clusterChanged(event);
Mockito.verifyZeroInteractions(clusterService);
assertNotNull(indexLifecycleService.getScheduler());
assertEquals(1, indexLifecycleService.getScheduler().jobCount());
assertNotNull(indexLifecycleService.getScheduledJob());
assertThat(indexLifecycleService.getPolicyRegistry().getLifecyclePolicyMap(), equalTo(expectedPolicyMap));
Mockito.reset(clusterService);
state = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId + "not").masterNodeId(nodeId).add(masterNode).build())
.build();
event = new ClusterChangedEvent("_source", state, state);
indexLifecycleService.applyClusterState(event);
// Check that when the node is un-elected as master it cancels the job and cleans up steps registry
indexLifecycleService.clusterChanged(event);
Mockito.verifyZeroInteractions(clusterService);
assertNotNull(indexLifecycleService.getScheduler());
assertEquals(0, indexLifecycleService.getScheduler().jobCount());
assertNull(indexLifecycleService.getScheduledJob());
assertThat(indexLifecycleService.getPolicyRegistry().getLifecyclePolicyMap(), equalTo(expectedPolicyMap));
Mockito.reset(clusterService);
state = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
event = new ClusterChangedEvent("_source", state, state);
// Check that when the node is re-elected as master it re-starts the job and populates the registry
indexLifecycleService.applyClusterState(event);
indexLifecycleService.clusterChanged(event);
Mockito.verifyZeroInteractions(clusterService);
assertNotNull(indexLifecycleService.getScheduler());
assertEquals(1, indexLifecycleService.getScheduler().jobCount());
assertNotNull(indexLifecycleService.getScheduledJob());
assertThat(indexLifecycleService.getPolicyRegistry().getLifecyclePolicyMap(), equalTo(expectedPolicyMap));
}
public void testSchedulerInitializationAndUpdate() {
TimeValue pollInterval = TimeValue.timeValueSeconds(randomIntBetween(1, 59));
MetaData metaData = MetaData.builder()
.putCustom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY)
.persistentSettings(settings(Version.CURRENT).build())
.build();
MetaData updatedPollMetaData = MetaData.builder(metaData).persistentSettings(settings(Version.CURRENT)
.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.getKey(), pollInterval).build())
.build();
ClusterState previousState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
ClusterState currentState = ClusterState.builder(previousState)
.metaData(updatedPollMetaData)
.build();
ClusterChangedEvent event = new ClusterChangedEvent("_source", currentState, previousState);
ClusterChangedEvent noChangeEvent = new ClusterChangedEvent("_source", previousState, previousState);
indexLifecycleService.applyClusterState(noChangeEvent);
indexLifecycleService.clusterChanged(noChangeEvent);
assertThat(indexLifecycleService.getScheduler().jobCount(), equalTo(1));
assertThat(((TimeValueSchedule) indexLifecycleService.getScheduledJob().getSchedule()).getInterval(),
equalTo(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.getDefault(previousState.metaData().settings())));
indexLifecycleService.applyClusterState(event);
indexLifecycleService.clusterChanged(event);
assertThat(indexLifecycleService.getScheduler().jobCount(), equalTo(1));
assertThat(((TimeValueSchedule) indexLifecycleService.getScheduledJob().getSchedule()).getInterval(), equalTo(pollInterval));
noChangeEvent = new ClusterChangedEvent("_source", currentState, currentState);
indexLifecycleService.applyClusterState(noChangeEvent);
indexLifecycleService.clusterChanged(noChangeEvent);
assertThat(indexLifecycleService.getScheduler().jobCount(), equalTo(1));
assertThat(((TimeValueSchedule) indexLifecycleService.getScheduledJob().getSchedule()).getInterval(), equalTo(pollInterval));
verify(clusterService, times(1)).addListener(any());
verify(clusterService, times(1)).addStateApplier(any());
verify(clusterService, never()).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
Mockito.verifyNoMoreInteractions(clusterService);
}
public void testStoppedModeSkip() {
String policyName = randomAlphaOfLengthBetween(1, 20);
@ -302,6 +129,8 @@ public class IndexLifecycleServiceTests extends ESTestCase {
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
ClusterChangedEvent event = new ClusterChangedEvent("_source", currentState, ClusterState.EMPTY_STATE);
indexLifecycleService.applyClusterState(event);
indexLifecycleService.triggerPolicies(currentState, randomBoolean());
assertThat(mockStep.getExecuteCount(), equalTo(0L));
}
@ -341,8 +170,8 @@ public class IndexLifecycleServiceTests extends ESTestCase {
executedShrink.set(true);
return null;
}).when(clusterService).submitStateUpdateTask(anyString(), any(ExecuteStepsUpdateTask.class));
indexLifecycleService.applyClusterState(new ClusterChangedEvent("change", currentState, ClusterState.EMPTY_STATE));
indexLifecycleService.clusterChanged(event);
indexLifecycleService.applyClusterState(event);
indexLifecycleService.triggerPolicies(currentState, randomBoolean());
assertTrue(executedShrink.get());
}
@ -391,12 +220,14 @@ public class IndexLifecycleServiceTests extends ESTestCase {
return null;
}).when(clusterService).submitStateUpdateTask(anyString(), any(OperationModeUpdateTask.class));
indexLifecycleService.clusterChanged(event);
indexLifecycleService.applyClusterState(event);
indexLifecycleService.triggerPolicies(currentState, randomBoolean());
assertNull(ranPolicy.get());
assertTrue(moveToMaintenance.get());
}
public void testTriggeredDifferentJob() {
Mockito.reset(clusterService);
SchedulerEngine.Event schedulerEvent = new SchedulerEngine.Event("foo", randomLong(), randomLong());
indexLifecycleService.triggered(schedulerEvent);
Mockito.verifyZeroInteractions(indicesClient, clusterService);