cleanup tests

This commit is contained in:
Tal Levy 2017-10-31 17:39:25 -07:00
parent c393944dc0
commit dd6d192e25
4 changed files with 100 additions and 139 deletions

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.threadpool.ExecutorBuilder;
@ -33,6 +34,7 @@ import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.security.InternalClient;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
@ -41,7 +43,7 @@ import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
public class IndexLifecycle implements ActionPlugin {
public class IndexLifecycle extends Plugin {
private static final Logger logger = Loggers.getLogger(XPackPlugin.class);
public static final String NAME = "index_lifecycle";
public static final String BASE_PATH = "/_xpack/index_lifecycle/";
@ -80,6 +82,7 @@ public class IndexLifecycle implements ActionPlugin {
return modules;
}
@Override
public void onIndexModule(IndexModule indexModule) {
Index index = indexModule.getIndex();
ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN").error("onIndexModule: " + index.getName());
@ -89,85 +92,19 @@ public class IndexLifecycle implements ActionPlugin {
indexModule.addIndexEventListener(indexLifecycleInitialisationService.get());
}
@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(LIFECYCLE_TIMESERIES_SETTING);
}
public Collection<Object> createComponents(InternalClient internalClient, ClusterService clusterService, Clock clock) {
indexLifecycleInitialisationService.set(new IndexLifecycleInitialisationService(settings, internalClient, clusterService, clock));
return Collections.singletonList(indexLifecycleInitialisationService.get());
}
public List<Setting<?>> getSettings() {
return Arrays.asList(LIFECYCLE_TIMESERIES_SETTING);
}
@Override
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
if (!enabled || tribeNodeClient) {
return Collections.emptyList();
}
return Arrays.asList(
// new RestLifecycleStatusAction(settings, restController)
);
public void close() throws IOException {
indexLifecycleInitialisationService.get().close();
}
@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
if (!enabled) {
return Collections.emptyList();
}
return Arrays.asList(
// new ActionHandler<>(LifecycleStatusAction.INSTANCE, LifecycleStatusAction.TransportAction.class)
);
}
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
if (false == enabled || tribeNode || tribeNodeClient || transportClientMode) {
return Collections.emptyList();
}
FixedExecutorBuilder indexing = new FixedExecutorBuilder(settings, IndexLifecycle.THREAD_POOL_NAME, 4, 4,
"xpack.index_lifecycle.thread_pool");
return Collections.singletonList(indexing);
}
// public Collection<PersistentTasksExecutor<?>> getPersistentTasksExecutors(InternalClient client,
// ClusterService clusterService,
// SchedulerEngine schedulerEngine) {
// return Collections.singletonList(
// new IndexLifecycleTask.IndexLifecycleJobPersistentTasksExecutor(settings, client, clusterService, schedulerEngine));
// }
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList(
// // Metadata
// new NamedWriteableRegistry.Entry(MetaData.Custom.class, "rollup", RollupMetadata::new),
// new NamedWriteableRegistry.Entry(NamedDiff.class, "rollup", RollupMetadata.RollupMetadataDiff::new),
//
// // Persistent action requests
// new NamedWriteableRegistry.Entry(PersistentTaskParams.class, RollupJobTask.TASK_NAME,
// RollupJob::new),
//
// // Task statuses
// new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new)
);
}
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return Arrays.asList(
// // Custom metadata
// new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField("rollup"),
// parser -> RollupMetadata.METADATA_PARSER.parse(parser, null).build()),
//
// // Persistent action requests
// new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(RollupJobTask.TASK_NAME),
// parser -> RollupJob.Builder.fromXContent(parser).build())
//
// // Task statuses
// //new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(RollupJobStatus.NAME), RollupJobStatus::fromXContent)
);
}
}

View File

@ -6,26 +6,31 @@
package org.elasticsearch.xpack.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.security.InternalClient;
import java.io.Closeable;
import java.io.IOException;
import java.time.Clock;
import static org.elasticsearch.xpack.indexlifecycle.IndexLifecycle.LIFECYCLE_TIMESERIES_SETTING;
public class IndexLifecycleInitialisationService extends AbstractLifecycleComponent implements LocalNodeMasterListener, IndexEventListener, SchedulerEngine.Listener {
public class IndexLifecycleInitialisationService extends AbstractComponent implements LocalNodeMasterListener, IndexEventListener, SchedulerEngine.Listener, Closeable {
private static final Logger logger = ESLoggerFactory.getLogger(XPackPlugin.class);
private SchedulerEngine scheduler;
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
private final Clock clock;
private InternalClient client;
private ClusterService clusterService;
private boolean isMaster;
@ -34,7 +39,7 @@ public class IndexLifecycleInitialisationService extends AbstractLifecycleCompon
super(settings);
this.client = client;
this.clusterService = clusterService;
this.scheduler = new SchedulerEngine(clock);
this.clock = clock;
clusterService.addLocalNodeMasterListener(this);
}
@ -44,36 +49,34 @@ public class IndexLifecycleInitialisationService extends AbstractLifecycleCompon
* @param settings the settings to read the lifecycle details from
*/
public synchronized void setLifecycleSettings(Index index, long creationDate, Settings settings) {
if (isMaster == true) {
if (isMaster) {
IndexLifecycleSettings lifecycleSettings = new IndexLifecycleSettings(index, creationDate, settings, client);
registerIndexSchedule(lifecycleSettings);
lifecycleSettings.schedulePhases(scheduler.get());
}
}
/**
* This does the heavy lifting of adding an index's lifecycle policy to the scheduler.
* @param lifecycleSettings The index lifecycle settings object
*/
private void registerIndexSchedule(IndexLifecycleSettings lifecycleSettings) {
// need to check that this isn't re-kicking an existing policy... diffs, etc.
// this is where the genesis of index lifecycle management occurs... kick off the scheduling... all is valid!
// TODO: scheduler needs to know which index's settings are being updated...
lifecycleSettings.schedulePhases(scheduler);
}
/**
* Called before the index gets created. Note that this is also called
* when the index is created on data nodes
* @param index The index whose settings are to be validated
* @param indexSettings The settings for the specified index
*/
@Override
public void beforeIndexCreated(Index index, Settings indexSettings) {
ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN").error("validate setting before index is created");
LIFECYCLE_TIMESERIES_SETTING.get(indexSettings);
}
@Override
public void afterIndexCreated(IndexService indexService) {
Settings indexSettings = indexService.getIndexSettings().getSettings();
Settings lifecycleSettings = (Settings) LIFECYCLE_TIMESERIES_SETTING.get(indexSettings);
if (isMaster && lifecycleSettings.size() > 0) {
setLifecycleSettings(indexService.index(), indexService.getMetaData().getCreationDate(),
indexSettings.getByPrefix(LIFECYCLE_TIMESERIES_SETTING.getKey()));
}
}
@Override
public void triggered(SchedulerEngine.Event event) {
throw new UnsupportedOperationException();
@ -82,12 +85,12 @@ public class IndexLifecycleInitialisationService extends AbstractLifecycleCompon
@Override
public void onMaster() {
isMaster = true;
scheduler.set(new SchedulerEngine(clock));
clusterService.state().getMetaData().getIndices().valuesIt()
.forEachRemaining((idxMeta) -> {
if (idxMeta.getSettings().getByPrefix(LIFECYCLE_TIMESERIES_SETTING.getKey()).size() > 0) {
IndexLifecycleSettings lifecycleSettings = new IndexLifecycleSettings(idxMeta.getIndex(), idxMeta.getCreationDate(),
idxMeta.getSettings().getByPrefix(LIFECYCLE_TIMESERIES_SETTING.getKey()), client);
registerIndexSchedule(lifecycleSettings);
setLifecycleSettings(idxMeta.getIndex(), idxMeta.getCreationDate(),
idxMeta.getSettings().getByPrefix(LIFECYCLE_TIMESERIES_SETTING.getKey()));
}
});
}
@ -95,7 +98,7 @@ public class IndexLifecycleInitialisationService extends AbstractLifecycleCompon
@Override
public void offMaster() {
isMaster = false;
doStop();
// when is this called?
}
@Override
@ -104,16 +107,10 @@ public class IndexLifecycleInitialisationService extends AbstractLifecycleCompon
}
@Override
protected void doStop() {
scheduler.stop();
}
@Override
protected void doStart() {
}
@Override
protected void doClose() throws IOException {
public void close() throws IOException {
SchedulerEngine engine = scheduler.get();
if (engine != null) {
engine.stop();
}
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
@ -66,6 +67,8 @@ public class Phase extends SchedulerEngine.Job implements ToXContentObject, Sche
System.out.println(settings);
TimeValue after = AFTER_SETTING.get(settings);
SchedulerEngine.Schedule schedule = (startTime, now) -> {
ESLoggerFactory.getLogger("INDEX-LIFECYCLE-PLUGIN")
.error("calculating schedule with creationTime:" + creationDate + ", and now:" + now);
if (startTime == now) {
return creationDate + after.getMillis();
} else {

View File

@ -12,7 +12,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
@ -20,8 +19,8 @@ import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@ -37,6 +36,8 @@ import static org.hamcrest.core.IsNull.nullValue;
@ESIntegTestCase.ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
private Settings settings;
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal));
@ -46,6 +47,7 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false);
settings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
return settings.build();
}
@ -58,6 +60,7 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false);
settings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
return settings.build();
}
@ -71,16 +74,58 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
return nodePlugins();
}
public void test() throws IOException {
Settings settings = Settings.builder()
@Before
public void init() {
settings = Settings.builder()
.put(indexSettings())
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.lifecycle.timeseries.new.after", "1s")
.put("index.lifecycle.timeseries.new.after", "1s")
.put("index.lifecycle.timeseries.delete.after", "3s")
.put("index.lifecycle.timeseries.delete.actions.delete.what", "me")
.put("index.lifecycle.timeseries.delete.actions.delete.what", "me")
.build();
}
public void testSingleNodeCluster() throws Exception {
// start master node
logger.info("Starting sever1");
final String server_1 = internalCluster().startNode();
final String node1 = getLocalNodeId(server_1);
logger.info("Creating index [test]");
CreateIndexResponse createIndexResponse = client().admin().indices().create(createIndexRequest("test").settings(settings)).actionGet();
assertAcked(createIndexResponse);
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
RoutingNode routingNodeEntry1 = clusterState.getRoutingNodes().node(node1);
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(1));
assertBusy(() -> {
assertEquals(false, client().admin().indices().prepareExists("test").get().isExists());
});
}
public void testMasterDedicatedDataDedicated() throws Exception {
// start master node
logger.info("Starting sever1");
final String server_1 = internalCluster().startMasterOnlyNode();
final String node1 = getLocalNodeId(server_1);
// start data node
logger.info("Starting sever1");
final String server_2 = internalCluster().startDataOnlyNode();
final String node2 = getLocalNodeId(server_2);
logger.info("Creating index [test]");
CreateIndexResponse createIndexResponse = client().admin().indices().create(createIndexRequest("test").settings(settings)).actionGet();
assertAcked(createIndexResponse);
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
RoutingNode routingNodeEntry1 = clusterState.getRoutingNodes().node(node2);
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(1));
assertBusy(() -> {
assertEquals(false, client().admin().indices().prepareExists("test").get().isExists());
});
}
public void testMasterFailover() throws Exception {
// start one server
logger.info("Starting sever1");
final String server_1 = internalCluster().startNode();
@ -105,35 +150,14 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
logger.info("Done Cluster Health, status {}", clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
final String node2 = getLocalNodeId(server_2);
// explicitly call reroute, so shards will get relocated to the new node (we delay it in ES in case other nodes join)
client().admin().cluster().prepareReroute().execute().actionGet();
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus().waitForNodes("2").waitForNoRelocatingShards(true)).actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.getNumberOfDataNodes(), equalTo(2));
assertThat(clusterHealth.getInitializingShards(), equalTo(0));
assertThat(clusterHealth.getUnassignedShards(), equalTo(0));
assertThat(clusterHealth.getRelocatingShards(), equalTo(0));
assertThat(clusterHealth.getActiveShards(), equalTo(1));
assertThat(clusterHealth.getActivePrimaryShards(), equalTo(1));
logger.info("Closing server1");
// kill the first server
internalCluster().stopCurrentMasterNode();
// verify health
logger.info("Running Cluster Health");
clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForGreenStatus()
.waitForNodes("1").waitForActiveShards(0)).actionGet();
logger.info("Done Cluster Health, status {}", clusterHealth.getStatus());
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealth.getNumberOfNodes(), equalTo(1));
assertThat(clusterHealth.getActivePrimaryShards(), equalTo(0));
expectThrows(IndexNotFoundException.class,
() -> client().admin().indices().prepareGetIndex().addIndices("test").get());
assertBusy(() -> {
assertEquals(false, client().admin().indices().prepareExists("test").get().isExists());
});
}
private String getLocalNodeId(String name) {