From dd6d192e25b8c7a9467e172e2a101d3ca24b3281 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Tue, 31 Oct 2017 17:39:25 -0700 Subject: [PATCH] cleanup tests --- .../xpack/indexlifecycle/IndexLifecycle.java | 85 +++--------------- .../IndexLifecycleInitialisationService.java | 65 +++++++------- .../xpack/indexlifecycle/Phase.java | 3 + .../IndexLifecycleInitialisationIT.java | 86 ++++++++++++------- 4 files changed, 100 insertions(+), 139 deletions(-) diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index 16aa04479dd..695937763e1 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.threadpool.ExecutorBuilder; @@ -33,6 +34,7 @@ import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.security.InternalClient; +import java.io.IOException; import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; @@ -41,7 +43,7 @@ import java.util.Collections; import java.util.List; import java.util.function.Supplier; -public class IndexLifecycle implements ActionPlugin { +public class IndexLifecycle extends Plugin { private static final Logger logger = Loggers.getLogger(XPackPlugin.class); public static final String NAME = "index_lifecycle"; public static final String BASE_PATH = "/_xpack/index_lifecycle/"; @@ -80,6 +82,7 @@ public class IndexLifecycle implements ActionPlugin { return modules; } + @Override public void onIndexModule(IndexModule indexModule) { Index index = indexModule.getIndex(); ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN").error("onIndexModule: " + index.getName()); @@ -89,85 +92,19 @@ public class IndexLifecycle implements ActionPlugin { indexModule.addIndexEventListener(indexLifecycleInitialisationService.get()); } + @Override + public List> getSettings() { + return Arrays.asList(LIFECYCLE_TIMESERIES_SETTING); + } + public Collection createComponents(InternalClient internalClient, ClusterService clusterService, Clock clock) { indexLifecycleInitialisationService.set(new IndexLifecycleInitialisationService(settings, internalClient, clusterService, clock)); return Collections.singletonList(indexLifecycleInitialisationService.get()); } - public List> getSettings() { - return Arrays.asList(LIFECYCLE_TIMESERIES_SETTING); - } - @Override - public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings, - IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, - IndexNameExpressionResolver indexNameExpressionResolver, - Supplier nodesInCluster) { - if (!enabled || tribeNodeClient) { - return Collections.emptyList(); - } - - return Arrays.asList( -// new RestLifecycleStatusAction(settings, restController) - ); - + public void close() throws IOException { + indexLifecycleInitialisationService.get().close(); } - @Override - public List> getActions() { - if (!enabled) { - return Collections.emptyList(); - } - return Arrays.asList( -// new ActionHandler<>(LifecycleStatusAction.INSTANCE, LifecycleStatusAction.TransportAction.class) - ); - } - - public List> getExecutorBuilders(Settings settings) { - if (false == enabled || tribeNode || tribeNodeClient || transportClientMode) { - return Collections.emptyList(); - } - - FixedExecutorBuilder indexing = new FixedExecutorBuilder(settings, IndexLifecycle.THREAD_POOL_NAME, 4, 4, - "xpack.index_lifecycle.thread_pool"); - - return Collections.singletonList(indexing); - } - -// public Collection> getPersistentTasksExecutors(InternalClient client, -// ClusterService clusterService, -// SchedulerEngine schedulerEngine) { -// return Collections.singletonList( -// new IndexLifecycleTask.IndexLifecycleJobPersistentTasksExecutor(settings, client, clusterService, schedulerEngine)); -// } - - public List getNamedWriteables() { - return Arrays.asList( -// // Metadata -// new NamedWriteableRegistry.Entry(MetaData.Custom.class, "rollup", RollupMetadata::new), -// new NamedWriteableRegistry.Entry(NamedDiff.class, "rollup", RollupMetadata.RollupMetadataDiff::new), -// -// // Persistent action requests -// new NamedWriteableRegistry.Entry(PersistentTaskParams.class, RollupJobTask.TASK_NAME, -// RollupJob::new), -// -// // Task statuses -// new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new) - ); - } - - public List getNamedXContent() { - return Arrays.asList( -// // Custom metadata -// new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField("rollup"), -// parser -> RollupMetadata.METADATA_PARSER.parse(parser, null).build()), -// -// // Persistent action requests -// new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(RollupJobTask.TASK_NAME), -// parser -> RollupJob.Builder.fromXContent(parser).build()) -// -// // Task statuses -// //new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(RollupJobStatus.NAME), RollupJobStatus::fromXContent) - ); - } } diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationService.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationService.java index 3ff46058208..9006b9c326c 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationService.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationService.java @@ -6,26 +6,31 @@ package org.elasticsearch.xpack.indexlifecycle; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.scheduler.SchedulerEngine; import org.elasticsearch.xpack.security.InternalClient; +import java.io.Closeable; import java.io.IOException; import java.time.Clock; import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycle.LIFECYCLE_TIMESERIES_SETTING; -public class IndexLifecycleInitialisationService extends AbstractLifecycleComponent implements LocalNodeMasterListener, IndexEventListener, SchedulerEngine.Listener { +public class IndexLifecycleInitialisationService extends AbstractComponent implements LocalNodeMasterListener, IndexEventListener, SchedulerEngine.Listener, Closeable { private static final Logger logger = ESLoggerFactory.getLogger(XPackPlugin.class); - private SchedulerEngine scheduler; + + private final SetOnce scheduler = new SetOnce<>(); + private final Clock clock; private InternalClient client; private ClusterService clusterService; private boolean isMaster; @@ -34,7 +39,7 @@ public class IndexLifecycleInitialisationService extends AbstractLifecycleCompon super(settings); this.client = client; this.clusterService = clusterService; - this.scheduler = new SchedulerEngine(clock); + this.clock = clock; clusterService.addLocalNodeMasterListener(this); } @@ -44,36 +49,34 @@ public class IndexLifecycleInitialisationService extends AbstractLifecycleCompon * @param settings the settings to read the lifecycle details from */ public synchronized void setLifecycleSettings(Index index, long creationDate, Settings settings) { - if (isMaster == true) { + if (isMaster) { IndexLifecycleSettings lifecycleSettings = new IndexLifecycleSettings(index, creationDate, settings, client); - registerIndexSchedule(lifecycleSettings); + lifecycleSettings.schedulePhases(scheduler.get()); } } - /** - * This does the heavy lifting of adding an index's lifecycle policy to the scheduler. - * @param lifecycleSettings The index lifecycle settings object - */ - private void registerIndexSchedule(IndexLifecycleSettings lifecycleSettings) { - // need to check that this isn't re-kicking an existing policy... diffs, etc. - // this is where the genesis of index lifecycle management occurs... kick off the scheduling... all is valid! - - // TODO: scheduler needs to know which index's settings are being updated... - lifecycleSettings.schedulePhases(scheduler); - - } - /** * Called before the index gets created. Note that this is also called * when the index is created on data nodes * @param index The index whose settings are to be validated * @param indexSettings The settings for the specified index */ + @Override public void beforeIndexCreated(Index index, Settings indexSettings) { ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN").error("validate setting before index is created"); LIFECYCLE_TIMESERIES_SETTING.get(indexSettings); } + @Override + public void afterIndexCreated(IndexService indexService) { + Settings indexSettings = indexService.getIndexSettings().getSettings(); + Settings lifecycleSettings = (Settings) LIFECYCLE_TIMESERIES_SETTING.get(indexSettings); + if (isMaster && lifecycleSettings.size() > 0) { + setLifecycleSettings(indexService.index(), indexService.getMetaData().getCreationDate(), + indexSettings.getByPrefix(LIFECYCLE_TIMESERIES_SETTING.getKey())); + } + } + @Override public void triggered(SchedulerEngine.Event event) { throw new UnsupportedOperationException(); @@ -82,12 +85,12 @@ public class IndexLifecycleInitialisationService extends AbstractLifecycleCompon @Override public void onMaster() { isMaster = true; + scheduler.set(new SchedulerEngine(clock)); clusterService.state().getMetaData().getIndices().valuesIt() .forEachRemaining((idxMeta) -> { if (idxMeta.getSettings().getByPrefix(LIFECYCLE_TIMESERIES_SETTING.getKey()).size() > 0) { - IndexLifecycleSettings lifecycleSettings = new IndexLifecycleSettings(idxMeta.getIndex(), idxMeta.getCreationDate(), - idxMeta.getSettings().getByPrefix(LIFECYCLE_TIMESERIES_SETTING.getKey()), client); - registerIndexSchedule(lifecycleSettings); + setLifecycleSettings(idxMeta.getIndex(), idxMeta.getCreationDate(), + idxMeta.getSettings().getByPrefix(LIFECYCLE_TIMESERIES_SETTING.getKey())); } }); } @@ -95,7 +98,7 @@ public class IndexLifecycleInitialisationService extends AbstractLifecycleCompon @Override public void offMaster() { isMaster = false; - doStop(); + // when is this called? } @Override @@ -104,16 +107,10 @@ public class IndexLifecycleInitialisationService extends AbstractLifecycleCompon } @Override - protected void doStop() { - scheduler.stop(); - } - - @Override - protected void doStart() { - } - - @Override - protected void doClose() throws IOException { - + public void close() throws IOException { + SchedulerEngine engine = scheduler.get(); + if (engine != null) { + engine.stop(); + } } } diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/Phase.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/Phase.java index d348dc38f5b..73cb9cadf9c 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/Phase.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/Phase.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.indexlifecycle; import org.elasticsearch.client.Client; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -66,6 +67,8 @@ public class Phase extends SchedulerEngine.Job implements ToXContentObject, Sche System.out.println(settings); TimeValue after = AFTER_SETTING.get(settings); SchedulerEngine.Schedule schedule = (startTime, now) -> { + ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN") + .error("calculating schedule with creationTime:" + creationDate + ", and now:" + now); if (startTime == now) { return creationDate + after.getMillis(); } else { diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java index 760add4ac58..bea040383a1 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java @@ -12,7 +12,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -20,8 +19,8 @@ import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackSettings; +import org.junit.Before; -import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -37,6 +36,8 @@ import static org.hamcrest.core.IsNull.nullValue; @ESIntegTestCase.ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class IndexLifecycleInitialisationIT extends ESIntegTestCase { + private Settings settings; + @Override protected Settings nodeSettings(int nodeOrdinal) { Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal)); @@ -46,6 +47,7 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase { settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); + settings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); return settings.build(); } @@ -58,6 +60,7 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase { settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); + settings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); return settings.build(); } @@ -71,16 +74,58 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase { return nodePlugins(); } - public void test() throws IOException { - Settings settings = Settings.builder() + @Before + public void init() { + settings = Settings.builder() .put(indexSettings()) .put(SETTING_NUMBER_OF_SHARDS, 1) .put(SETTING_NUMBER_OF_REPLICAS, 0) - .put("index.lifecycle.timeseries.new.after", "1s") + .put("index.lifecycle.timeseries.new.after", "1s") .put("index.lifecycle.timeseries.delete.after", "3s") - .put("index.lifecycle.timeseries.delete.actions.delete.what", "me") + .put("index.lifecycle.timeseries.delete.actions.delete.what", "me") .build(); + } + public void testSingleNodeCluster() throws Exception { + // start master node + logger.info("Starting sever1"); + final String server_1 = internalCluster().startNode(); + final String node1 = getLocalNodeId(server_1); + logger.info("Creating index [test]"); + CreateIndexResponse createIndexResponse = client().admin().indices().create(createIndexRequest("test").settings(settings)).actionGet(); + assertAcked(createIndexResponse); + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + RoutingNode routingNodeEntry1 = clusterState.getRoutingNodes().node(node1); + assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(1)); + assertBusy(() -> { + assertEquals(false, client().admin().indices().prepareExists("test").get().isExists()); + }); + } + + public void testMasterDedicatedDataDedicated() throws Exception { + // start master node + logger.info("Starting sever1"); + final String server_1 = internalCluster().startMasterOnlyNode(); + final String node1 = getLocalNodeId(server_1); + // start data node + logger.info("Starting sever1"); + final String server_2 = internalCluster().startDataOnlyNode(); + final String node2 = getLocalNodeId(server_2); + + logger.info("Creating index [test]"); + CreateIndexResponse createIndexResponse = client().admin().indices().create(createIndexRequest("test").settings(settings)).actionGet(); + assertAcked(createIndexResponse); + + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + RoutingNode routingNodeEntry1 = clusterState.getRoutingNodes().node(node2); + assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(1)); + + assertBusy(() -> { + assertEquals(false, client().admin().indices().prepareExists("test").get().isExists()); + }); + } + + public void testMasterFailover() throws Exception { // start one server logger.info("Starting sever1"); final String server_1 = internalCluster().startNode(); @@ -105,35 +150,14 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase { logger.info("Done Cluster Health, status {}", clusterHealth.getStatus()); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - final String node2 = getLocalNodeId(server_2); - - // explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join) - client().admin().cluster().prepareReroute().execute().actionGet(); - - clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2").waitForNoRelocatingShards(true)).actionGet(); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(2)); - assertThat(clusterHealth.getInitializingShards(), equalTo(0)); - assertThat(clusterHealth.getUnassignedShards(), equalTo(0)); - assertThat(clusterHealth.getRelocatingShards(), equalTo(0)); - assertThat(clusterHealth.getActiveShards(), equalTo(1)); - assertThat(clusterHealth.getActivePrimaryShards(), equalTo(1)); logger.info("Closing server1"); // kill the first server internalCluster().stopCurrentMasterNode(); - // verify health - logger.info("Running Cluster Health"); - clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus() - .waitForNodes("1").waitForActiveShards(0)).actionGet(); - logger.info("Done Cluster Health, status {}", clusterHealth.getStatus()); - assertThat(clusterHealth.isTimedOut(), equalTo(false)); - assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - assertThat(clusterHealth.getNumberOfNodes(), equalTo(1)); - assertThat(clusterHealth.getActivePrimaryShards(), equalTo(0)); - expectThrows(IndexNotFoundException.class, - () -> client().admin().indices().prepareGetIndex().addIndices("test").get()); + + assertBusy(() -> { + assertEquals(false, client().admin().indices().prepareExists("test").get().isExists()); + }); } private String getLocalNodeId(String name) {