add integration test to indexlifecyclemanagement
This test verifies that we have sufficient failover code so that a newly elected master re-registers old schedules and fires them off. All times are relative to the index creation date.
This commit is contained in:
parent
de4d87c6e6
commit
3455445d2c
|
@ -87,8 +87,9 @@ public class IndexLifecycle implements ActionPlugin {
|
|||
public void onIndexModule(IndexModule indexModule) {
|
||||
ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN").error("onIndexModule");
|
||||
Index index = indexModule.getIndex();
|
||||
long creationDate = settings.getAsLong("index.creation_date", -1L);
|
||||
indexModule.addSettingsUpdateConsumer(LIFECYCLE_TIMESERIES_SETTING,
|
||||
(Settings s) -> indexLifecycleInitialisationService.get().setLifecycleSettings(index, s));
|
||||
(Settings s) -> indexLifecycleInitialisationService.get().setLifecycleSettings(index, creationDate, s));
|
||||
indexModule.addIndexEventListener(indexLifecycleInitialisationService.get());
|
||||
}
|
||||
|
||||
|
|
|
@ -8,61 +8,65 @@ package org.elasticsearch.xpack.indexlifecycle;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.LocalNodeMasterListener;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.component.LifecycleListener;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.XPackPlugin;
|
||||
import org.elasticsearch.xpack.scheduler.SchedulerEngine;
|
||||
import org.elasticsearch.xpack.security.InternalClient;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Clock;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycle.LIFECYCLE_TIMESERIES_SETTING;
|
||||
|
||||
public class IndexLifecycleInitialisationService extends AbstractComponent implements ClusterStateListener, IndexEventListener, SchedulerEngine.Listener {
|
||||
public class IndexLifecycleInitialisationService extends AbstractLifecycleComponent implements LocalNodeMasterListener, IndexEventListener, SchedulerEngine.Listener {
|
||||
private static final Logger logger = ESLoggerFactory.getLogger(XPackPlugin.class);
|
||||
private SchedulerEngine scheduler;
|
||||
private InternalClient client;
|
||||
private ClusterService clusterService;
|
||||
private boolean isMaster;
|
||||
|
||||
public IndexLifecycleInitialisationService(Settings settings, InternalClient client, ClusterService clusterService, Clock clock) {
|
||||
super(settings);
|
||||
this.scheduler = new SchedulerEngine(clock);
|
||||
this.client = client;
|
||||
this.clusterService = clusterService;
|
||||
this.scheduler = new SchedulerEngine(clock);
|
||||
this.scheduler.register(this);
|
||||
clusterService.addListener(this);
|
||||
clusterService.addLifecycleListener(new LifecycleListener() {
|
||||
@Override
|
||||
public void beforeStart() {
|
||||
scheduler.start(Collections.emptyList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeStop() {
|
||||
super.beforeStop();
|
||||
scheduler.stop(); // should I just be an AbstractLifecycleComponent instead?
|
||||
}
|
||||
});
|
||||
clusterService.addLocalNodeMasterListener(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* This should kick-off some stuff in the scheduler
|
||||
* This is triggered every update to settings in the index
|
||||
* @param settings the settings to read the lifecycle details from
|
||||
*/
|
||||
public synchronized void setLifecycleSettings(Index index, Settings settings) {
|
||||
public synchronized void setLifecycleSettings(Index index, long creationDate, Settings settings) {
|
||||
if (isMaster == true) {
|
||||
registerIndexSchedule(index, creationDate, settings);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This does the heavy lifting of adding an index's lifecycle policy to the scheduler.
|
||||
* @param index The index to schedule a policy for
|
||||
* @param settings The `index.lifecycle.timeseries` settings object
|
||||
*/
|
||||
private void registerIndexSchedule(Index index, long creationDate, Settings settings) {
|
||||
// need to check that this isn't re-kicking an existing policy... diffs, etc.
|
||||
// this is where the genesis of index lifecycle management occurs... kick off the scheduling... all is valid!
|
||||
TimeValue deleteAfter = settings.getAsTime("delete.after", TimeValue.MINUS_ONE);
|
||||
SchedulerEngine.Schedule schedule = (startTime, now) -> {
|
||||
if (startTime == now) {
|
||||
return now + deleteAfter.getMillis();
|
||||
return creationDate + deleteAfter.getMillis();
|
||||
} else {
|
||||
return -1; // do not schedule another delete after already deleted
|
||||
}
|
||||
|
@ -71,22 +75,20 @@ public class IndexLifecycleInitialisationService extends AbstractComponent imple
|
|||
scheduler.add(new SchedulerEngine.Job(index.getName(), schedule));
|
||||
ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN")
|
||||
.error("kicked off lifecycle job to be triggered in " + deleteAfter.getSeconds() + " seconds");
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before the index gets created. Note that this is also called
|
||||
* when the index is created on data nodes
|
||||
* @param index The index whose settings are to be validated
|
||||
* @param indexSettings The settings for the specified index
|
||||
*/
|
||||
public void beforeIndexCreated(Index index, Settings indexSettings) {
|
||||
ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN").error("validate setting before index is created");
|
||||
LIFECYCLE_TIMESERIES_SETTING.get(indexSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN").debug("cluster state changed: " + event.source());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggered(SchedulerEngine.Event event) {
|
||||
client.admin().indices().prepareDelete(event.getJobName()).execute(new ActionListener<DeleteIndexResponse>() {
|
||||
|
@ -101,4 +103,41 @@ public class IndexLifecycleInitialisationService extends AbstractComponent imple
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMaster() {
|
||||
isMaster = true;
|
||||
clusterService.state().getMetaData().getIndices().valuesIt()
|
||||
.forEachRemaining((idxMeta) -> {
|
||||
if (idxMeta.getSettings().getByPrefix(LIFECYCLE_TIMESERIES_SETTING.getKey()).size() > 0) {
|
||||
registerIndexSchedule(idxMeta.getIndex(), idxMeta.getCreationDate(),
|
||||
idxMeta.getSettings().getByPrefix(LIFECYCLE_TIMESERIES_SETTING.getKey()));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void offMaster() {
|
||||
isMaster = false;
|
||||
doStop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executorName() {
|
||||
return ThreadPool.Names.MANAGEMENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
scheduler.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws IOException {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,145 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import static org.elasticsearch.client.Requests.clusterHealthRequest;
|
||||
import static org.elasticsearch.client.Requests.createIndexRequest;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
|
||||
import static org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.CoreMatchers.not;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.core.IsNull.nullValue;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
||||
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.reindex.ReindexPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.XPackPlugin;
|
||||
import org.elasticsearch.xpack.XPackSettings;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = Scope.TEST, numDataNodes = 0)
|
||||
public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal));
|
||||
settings.put(XPackSettings.INDEX_LIFECYCLE_ENABLED.getKey(), true);
|
||||
settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
|
||||
settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
|
||||
settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
|
||||
settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
|
||||
settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false);
|
||||
return settings.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings transportClientSettings() {
|
||||
Settings.Builder settings = Settings.builder().put(super.transportClientSettings());
|
||||
settings.put(XPackSettings.INDEX_LIFECYCLE_ENABLED.getKey(), true);
|
||||
settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
|
||||
settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
|
||||
settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
|
||||
settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
|
||||
settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false);
|
||||
return settings.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Arrays.asList(XPackPlugin.class, CommonAnalysisPlugin.class, ReindexPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
|
||||
return nodePlugins();
|
||||
}
|
||||
|
||||
public void test() throws IOException {
|
||||
Settings settings = Settings.builder()
|
||||
.put(indexSettings())
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put("index.lifecycle.timeseries.new", "baz")
|
||||
.put("index.lifecycle.timeseries.delete.after", "3s")
|
||||
.build();
|
||||
|
||||
// start one server
|
||||
logger.info("Starting sever1");
|
||||
final String server_1 = internalCluster().startNode();
|
||||
final String node1 = getLocalNodeId(server_1);
|
||||
|
||||
logger.info("Creating index [test]");
|
||||
CreateIndexResponse createIndexResponse = client().admin().indices().create(createIndexRequest("test").settings(settings)).actionGet();
|
||||
assertAcked(createIndexResponse);
|
||||
|
||||
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
RoutingNode routingNodeEntry1 = clusterState.getRoutingNodes().node(node1);
|
||||
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(1));
|
||||
|
||||
logger.info("Starting server2");
|
||||
// start another server
|
||||
String server_2 = internalCluster().startNode();
|
||||
|
||||
// first wait for 2 nodes in the cluster
|
||||
logger.info("Waiting for replicas to be assigned");
|
||||
ClusterHealthResponse clusterHealth =
|
||||
client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2")).actionGet();
|
||||
logger.info("Done Cluster Health, status {}", clusterHealth.getStatus());
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
|
||||
final String node2 = getLocalNodeId(server_2);
|
||||
|
||||
// explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join)
|
||||
client().admin().cluster().prepareReroute().execute().actionGet();
|
||||
|
||||
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2").waitForNoRelocatingShards(true)).actionGet();
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
|
||||
assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(2));
|
||||
assertThat(clusterHealth.getInitializingShards(), equalTo(0));
|
||||
assertThat(clusterHealth.getUnassignedShards(), equalTo(0));
|
||||
assertThat(clusterHealth.getRelocatingShards(), equalTo(0));
|
||||
assertThat(clusterHealth.getActiveShards(), equalTo(1));
|
||||
assertThat(clusterHealth.getActivePrimaryShards(), equalTo(1));
|
||||
|
||||
logger.info("Closing server1");
|
||||
// kill the first server
|
||||
internalCluster().stopCurrentMasterNode();
|
||||
// verify health
|
||||
logger.info("Running Cluster Health");
|
||||
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()
|
||||
.waitForNodes("1").waitForActiveShards(0)).actionGet();
|
||||
logger.info("Done Cluster Health, status {}", clusterHealth.getStatus());
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
|
||||
assertThat(clusterHealth.getNumberOfNodes(), equalTo(1));
|
||||
assertThat(clusterHealth.getActivePrimaryShards(), equalTo(0));
|
||||
expectThrows(IndexNotFoundException.class,
|
||||
() -> client().admin().indices().prepareGetIndex().addIndices("test").get());
|
||||
}
|
||||
|
||||
private String getLocalNodeId(String name) {
|
||||
TransportService transportService = internalCluster().getInstance(TransportService.class, name);
|
||||
String nodeId = transportService.getLocalNode().getId();
|
||||
assertThat(nodeId, not(nullValue()));
|
||||
return nodeId;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue