From 3455445d2c3a4642286067f5a81880193764c137 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Mon, 30 Oct 2017 17:51:01 -0700 Subject: [PATCH] add integration test to indexlifecyclemanagement This test verifies that we have sufficient failover code so that a newly elected master re-registers old schedules and fires them off. All times are relative to the index creation date. --- .../xpack/indexlifecycle/IndexLifecycle.java | 3 +- .../IndexLifecycleInitialisationService.java | 89 ++++++++--- .../IndexLifecycleInitialisationIT.java | 145 ++++++++++++++++++ 3 files changed, 211 insertions(+), 26 deletions(-) create mode 100644 x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index 5b39f7c6495..343f068075f 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -87,8 +87,9 @@ public class IndexLifecycle implements ActionPlugin { public void onIndexModule(IndexModule indexModule) { ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN").error("onIndexModule"); Index index = indexModule.getIndex(); + long creationDate = settings.getAsLong("index.creation_date", -1L); indexModule.addSettingsUpdateConsumer(LIFECYCLE_TIMESERIES_SETTING, - (Settings s) -> indexLifecycleInitialisationService.get().setLifecycleSettings(index, s)); + (Settings s) -> indexLifecycleInitialisationService.get().setLifecycleSettings(index, creationDate, s)); indexModule.addIndexEventListener(indexLifecycleInitialisationService.get()); } diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationService.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationService.java index 8160a236e85..7ab0dce9a3d 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationService.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationService.java @@ -8,61 +8,65 @@ package org.elasticsearch.xpack.indexlifecycle; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.scheduler.SchedulerEngine; import org.elasticsearch.xpack.security.InternalClient; +import java.io.IOException; import java.time.Clock; import java.util.Collections; import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycle.LIFECYCLE_TIMESERIES_SETTING; -public class IndexLifecycleInitialisationService extends AbstractComponent implements ClusterStateListener, IndexEventListener, SchedulerEngine.Listener { +public class IndexLifecycleInitialisationService extends AbstractLifecycleComponent implements LocalNodeMasterListener, IndexEventListener, SchedulerEngine.Listener { private static final Logger logger = ESLoggerFactory.getLogger(XPackPlugin.class); private SchedulerEngine scheduler; private InternalClient client; + private ClusterService clusterService; + private boolean isMaster; public IndexLifecycleInitialisationService(Settings settings, InternalClient client, ClusterService clusterService, Clock clock) { super(settings); - this.scheduler = new SchedulerEngine(clock); this.client = client; + this.clusterService = clusterService; + this.scheduler = new SchedulerEngine(clock); this.scheduler.register(this); - clusterService.addListener(this); - clusterService.addLifecycleListener(new LifecycleListener() { - @Override - public void beforeStart() { - scheduler.start(Collections.emptyList()); - } - - @Override - public void beforeStop() { - super.beforeStop(); - scheduler.stop(); // should I just be an AbstractLifecycleComponent instead? - } - }); + clusterService.addLocalNodeMasterListener(this); } /** * This should kick-off some stuff in the scheduler + * This is triggered every update to settings in the index * @param settings the settings to read the lifecycle details from */ - public synchronized void setLifecycleSettings(Index index, Settings settings) { + public synchronized void setLifecycleSettings(Index index, long creationDate, Settings settings) { + if (isMaster == true) { + registerIndexSchedule(index, creationDate, settings); + } + } + + /** + * This does the heavy lifting of adding an index's lifecycle policy to the scheduler. + * @param index The index to schedule a policy for + * @param settings The `index.lifecycle.timeseries` settings object + */ + private void registerIndexSchedule(Index index, long creationDate, Settings settings) { // need to check that this isn't re-kicking an existing policy... diffs, etc. // this is where the genesis of index lifecycle management occurs... kick off the scheduling... all is valid! TimeValue deleteAfter = settings.getAsTime("delete.after", TimeValue.MINUS_ONE); SchedulerEngine.Schedule schedule = (startTime, now) -> { if (startTime == now) { - return now + deleteAfter.getMillis(); + return creationDate + deleteAfter.getMillis(); } else { return -1; // do not schedule another delete after already deleted } @@ -71,22 +75,20 @@ public class IndexLifecycleInitialisationService extends AbstractComponent imple scheduler.add(new SchedulerEngine.Job(index.getName(), schedule)); ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN") .error("kicked off lifecycle job to be triggered in " + deleteAfter.getSeconds() + " seconds"); + } /** * Called before the index gets created. Note that this is also called * when the index is created on data nodes + * @param index The index whose settings are to be validated + * @param indexSettings The settings for the specified index */ public void beforeIndexCreated(Index index, Settings indexSettings) { ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN").error("validate setting before index is created"); LIFECYCLE_TIMESERIES_SETTING.get(indexSettings); } - @Override - public void clusterChanged(ClusterChangedEvent event) { - ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN").debug("cluster state changed: " + event.source()); - } - @Override public void triggered(SchedulerEngine.Event event) { client.admin().indices().prepareDelete(event.getJobName()).execute(new ActionListener() { @@ -101,4 +103,41 @@ public class IndexLifecycleInitialisationService extends AbstractComponent imple } }); } + + @Override + public void onMaster() { + isMaster = true; + clusterService.state().getMetaData().getIndices().valuesIt() + .forEachRemaining((idxMeta) -> { + if (idxMeta.getSettings().getByPrefix(LIFECYCLE_TIMESERIES_SETTING.getKey()).size() > 0) { + registerIndexSchedule(idxMeta.getIndex(), idxMeta.getCreationDate(), + idxMeta.getSettings().getByPrefix(LIFECYCLE_TIMESERIES_SETTING.getKey())); + } + }); + } + + @Override + public void offMaster() { + isMaster = false; + doStop(); + } + + @Override + public String executorName() { + return ThreadPool.Names.MANAGEMENT; + } + + @Override + protected void doStop() { + scheduler.stop(); + } + + @Override + protected void doStart() { + } + + @Override + protected void doClose() throws IOException { + + } } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java new file mode 100644 index 00000000000..7dca0b86000 --- /dev/null +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java @@ -0,0 +1,145 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.indexlifecycle; + +import static org.elasticsearch.client.Requests.clusterHealthRequest; +import static org.elasticsearch.client.Requests.createIndexRequest; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.test.ESIntegTestCase.Scope; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.core.IsNull.nullValue; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; +import org.elasticsearch.analysis.common.CommonAnalysisPlugin; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.XPackPlugin; +import org.elasticsearch.xpack.XPackSettings; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +@ESIntegTestCase.ClusterScope(scope = Scope.TEST, numDataNodes = 0) +public class IndexLifecycleInitialisationIT extends ESIntegTestCase { + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal)); + settings.put(XPackSettings.INDEX_LIFECYCLE_ENABLED.getKey(), true); + settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); + settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); + settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); + settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); + settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); + return settings.build(); + } + + @Override + protected Settings transportClientSettings() { + Settings.Builder settings = Settings.builder().put(super.transportClientSettings()); + settings.put(XPackSettings.INDEX_LIFECYCLE_ENABLED.getKey(), true); + settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); + settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); + settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); + settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); + settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); + return settings.build(); + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(XPackPlugin.class, CommonAnalysisPlugin.class, ReindexPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return nodePlugins(); + } + + public void test() throws IOException { + Settings settings = Settings.builder() + .put(indexSettings()) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.lifecycle.timeseries.new", "baz") + .put("index.lifecycle.timeseries.delete.after", "3s") + .build(); + + // start one server + logger.info("Starting sever1"); + final String server_1 = internalCluster().startNode(); + final String node1 = getLocalNodeId(server_1); + + logger.info("Creating index [test]"); + CreateIndexResponse createIndexResponse = client().admin().indices().create(createIndexRequest("test").settings(settings)).actionGet(); + assertAcked(createIndexResponse); + + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + RoutingNode routingNodeEntry1 = clusterState.getRoutingNodes().node(node1); + assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(1)); + + logger.info("Starting server2"); + // start another server + String server_2 = internalCluster().startNode(); + + // first wait for 2 nodes in the cluster + logger.info("Waiting for replicas to be assigned"); + ClusterHealthResponse clusterHealth = + client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet(); + logger.info("Done Cluster Health, status {}", clusterHealth.getStatus()); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + final String node2 = getLocalNodeId(server_2); + + // explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join) + client().admin().cluster().prepareReroute().execute().actionGet(); + + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2").waitForNoRelocatingShards(true)).actionGet(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(2)); + assertThat(clusterHealth.getInitializingShards(), equalTo(0)); + assertThat(clusterHealth.getUnassignedShards(), equalTo(0)); + assertThat(clusterHealth.getRelocatingShards(), equalTo(0)); + assertThat(clusterHealth.getActiveShards(), equalTo(1)); + assertThat(clusterHealth.getActivePrimaryShards(), equalTo(1)); + + logger.info("Closing server1"); + // kill the first server + internalCluster().stopCurrentMasterNode(); + // verify health + logger.info("Running Cluster Health"); + clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus() + .waitForNodes("1").waitForActiveShards(0)).actionGet(); + logger.info("Done Cluster Health, status {}", clusterHealth.getStatus()); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + assertThat(clusterHealth.getNumberOfNodes(), equalTo(1)); + assertThat(clusterHealth.getActivePrimaryShards(), equalTo(0)); + expectThrows(IndexNotFoundException.class, + () -> client().admin().indices().prepareGetIndex().addIndices("test").get()); + } + + private String getLocalNodeId(String name) { + TransportService transportService = internalCluster().getInstance(TransportService.class, name); + String nodeId = transportService.getLocalNode().getId(); + assertThat(nodeId, not(nullValue())); + return nodeId; + } +}