mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 18:35:25 +00:00
re-enable ILM integration tests and fix policyRegistry update bug (#32108)
This PR re-introduces our ILM integration tests with mock steps that we can control in the tests. These tests uncovered a bug where the policy-steps-registry was not being updated on newly elected masters when there were no cluster-state changes to ILM metadata. The fix layed out cleans up the registry/runner when a node is un-elected as master. It re-assigns the class variables so that the existing runner/registry instances that may be running can continue to do so in other threads, potentially.
This commit is contained in:
parent
9a0f6b2c38
commit
304304fe41
@ -45,11 +45,11 @@ public class IndexLifecycleService extends AbstractComponent
|
||||
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
|
||||
private final Clock clock;
|
||||
private final PolicyStepsRegistry policyRegistry;
|
||||
private final IndexLifecycleRunner lifecycleRunner;
|
||||
private Client client;
|
||||
private ClusterService clusterService;
|
||||
private LongSupplier nowSupplier;
|
||||
private SchedulerEngine.Job scheduledJob;
|
||||
private IndexLifecycleRunner lifecycleRunner;
|
||||
|
||||
public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, Clock clock, LongSupplier nowSupplier) {
|
||||
super(settings);
|
||||
@ -121,9 +121,9 @@ public class IndexLifecycleService extends AbstractComponent
|
||||
if (event.localNodeMaster()) { // only act if we are master, otherwise
|
||||
// keep idle until elected
|
||||
IndexLifecycleMetadata lifecycleMetadata = event.state().metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||
if (lifecycleMetadata != null && event.changedCustomMetaDataSet().contains(IndexLifecycleMetadata.TYPE)) {
|
||||
if (lifecycleMetadata != null) {
|
||||
// update policy steps registry
|
||||
policyRegistry.update(event.state(), client, nowSupplier);
|
||||
policyRegistry.update(lifecycleMetadata, client, nowSupplier);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -61,8 +61,7 @@ public class PolicyStepsRegistry {
|
||||
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public void update(ClusterState currentState, Client client, LongSupplier nowSupplier) {
|
||||
IndexLifecycleMetadata meta = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||
public void update(IndexLifecycleMetadata meta, Client client, LongSupplier nowSupplier) {
|
||||
assert meta != null : "IndexLifecycleMetadata cannot be null when updating the policy steps registry";
|
||||
|
||||
Diff<Map<String, LifecyclePolicyMetadata>> diff = DiffableUtils.diff(lifecyclePolicyMap, meta.getPolicyMetadatas(),
|
||||
|
@ -104,10 +104,10 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, "init"))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
index = indexMetadata.getIndex();
|
||||
|
||||
IndexLifecycleMetadata lifecycleMetadata = new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING);
|
||||
MetaData metaData = MetaData.builder()
|
||||
.persistentSettings(settings(Version.CURRENT).build())
|
||||
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING))
|
||||
.putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata)
|
||||
.put(IndexMetaData.builder(indexMetadata))
|
||||
.build();
|
||||
String nodeId = randomAlphaOfLength(10);
|
||||
@ -118,7 +118,7 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
|
||||
.metaData(metaData)
|
||||
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
|
||||
.build();
|
||||
policyStepsRegistry.update(clusterState, client, () -> 0L);
|
||||
policyStepsRegistry.update(lifecycleMetadata, client, () -> 0L);
|
||||
}
|
||||
|
||||
public void testExecuteAllUntilEndOfPolicy() throws IOException {
|
||||
|
@ -7,32 +7,44 @@ package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.DeleteAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ForceMergeAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.MockAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Phase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.client.Requests.clusterHealthRequest;
|
||||
import static org.elasticsearch.client.Requests.createIndexRequest;
|
||||
@ -83,7 +95,7 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Arrays.asList(LocalStateCompositeXPackPlugin.class, IndexLifecycle.class);
|
||||
return Arrays.asList(LocalStateCompositeXPackPlugin.class, IndexLifecycle.class, TestILMPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -95,17 +107,16 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
|
||||
public void init() {
|
||||
settings = Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, 0).put(LifecycleSettings.LIFECYCLE_NAME, "test").build();
|
||||
Map<String, Phase> phases = new HashMap<>();
|
||||
|
||||
Map<String, LifecycleAction> warmPhaseActions = Collections.singletonMap(ForceMergeAction.NAME, new ForceMergeAction(10000));
|
||||
phases.put("warm", new Phase("warm", TimeValue.timeValueSeconds(2), warmPhaseActions));
|
||||
|
||||
Map<String, LifecycleAction> deletePhaseActions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
|
||||
phases.put("delete", new Phase("delete", TimeValue.timeValueSeconds(3), deletePhaseActions));
|
||||
lifecyclePolicy = new LifecyclePolicy(TimeseriesLifecycleType.INSTANCE, "test", phases);
|
||||
List<Step> steps = new ArrayList<>();
|
||||
Step.StepKey key = new Step.StepKey("mock", ObservableAction.NAME, ObservableClusterStateWaitStep.NAME);
|
||||
steps.add(new ObservableClusterStateWaitStep(key, TerminalPolicyStep.KEY));
|
||||
Map<String, LifecycleAction> actions = Collections.singletonMap(ObservableAction.NAME, new ObservableAction(steps, true));
|
||||
Map<String, Phase> phases = Collections.singletonMap("mock", new Phase("mock", TimeValue.timeValueSeconds(0), actions));
|
||||
lifecyclePolicy = new LifecyclePolicy(LockableLifecycleType.INSTANCE, "test", phases);
|
||||
}
|
||||
|
||||
public void testSingleNodeCluster() throws Exception {
|
||||
settings = Settings.builder().put(settings).put("index.lifecycle.test.complete", true).build();
|
||||
// start master node
|
||||
logger.info("Starting server1");
|
||||
final String server_1 = internalCluster().startNode();
|
||||
@ -122,13 +133,17 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
|
||||
RoutingNode routingNodeEntry1 = clusterState.getRoutingNodes().node(node1);
|
||||
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(1));
|
||||
assertBusy(() -> {
|
||||
assertEquals(false, client().admin().indices().prepareExists("test").get().isExists());
|
||||
assertEquals(true, client().admin().indices().prepareExists("test").get().isExists());
|
||||
});
|
||||
assertBusy(() -> {
|
||||
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get();
|
||||
String step = settingsResponse.getSetting("test", "index.lifecycle.step");
|
||||
assertThat(step, equalTo(TerminalPolicyStep.KEY.getName()));
|
||||
});
|
||||
}
|
||||
|
||||
// NORELEASE re-enable when we re-visit integration testing
|
||||
@AwaitsFix(bugUrl = "Fails because of timing")
|
||||
public void testMasterDedicatedDataDedicated() throws Exception {
|
||||
settings = Settings.builder().put(settings).put("index.lifecycle.test.complete", true).build();
|
||||
// start master node
|
||||
logger.info("Starting sever1");
|
||||
internalCluster().startMasterOnlyNode();
|
||||
@ -151,12 +166,15 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
|
||||
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(1));
|
||||
|
||||
assertBusy(() -> {
|
||||
assertEquals(false, client().admin().indices().prepareExists("test").get().isExists());
|
||||
assertEquals(true, client().admin().indices().prepareExists("test").get().isExists());
|
||||
});
|
||||
assertBusy(() -> {
|
||||
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get();
|
||||
String step = settingsResponse.getSetting("test", "index.lifecycle.step");
|
||||
assertThat(step, equalTo(TerminalPolicyStep.KEY.getName()));
|
||||
});
|
||||
}
|
||||
|
||||
// NORELEASE re-enable when force merge action bug is fixed
|
||||
@AwaitsFix(bugUrl = "Fails because force merge action expect shards to be assigned")
|
||||
public void testMasterFailover() throws Exception {
|
||||
// start one server
|
||||
logger.info("Starting sever1");
|
||||
@ -189,12 +207,33 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
|
||||
|
||||
// check step in progress in lifecycle
|
||||
assertBusy(() -> {
|
||||
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get();
|
||||
String step = settingsResponse.getSetting("test", "index.lifecycle.step");
|
||||
assertThat(step, equalTo(ObservableClusterStateWaitStep.NAME));
|
||||
});
|
||||
|
||||
|
||||
logger.info("Closing server1");
|
||||
// kill the first server
|
||||
internalCluster().stopCurrentMasterNode();
|
||||
|
||||
// check that index lifecycle picked back up where it
|
||||
assertBusy(() -> {
|
||||
assertEquals(false, client().admin().indices().prepareExists("test").get().isExists());
|
||||
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get();
|
||||
String step = settingsResponse.getSetting("test", "index.lifecycle.step");
|
||||
assertThat(step, equalTo(ObservableClusterStateWaitStep.NAME));
|
||||
});
|
||||
|
||||
// complete the step
|
||||
client().admin().indices().prepareUpdateSettings("test")
|
||||
.setSettings(Collections.singletonMap("index.lifecycle.test.complete", true)).get();
|
||||
|
||||
assertBusy(() -> {
|
||||
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get();
|
||||
String step = settingsResponse.getSetting("test", "index.lifecycle.step");
|
||||
assertThat(step, equalTo(TerminalPolicyStep.KEY.getName()));
|
||||
});
|
||||
}
|
||||
|
||||
@ -204,4 +243,86 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
|
||||
assertThat(nodeId, not(nullValue()));
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
public static class TestILMPlugin extends Plugin {
|
||||
public TestILMPlugin() {
|
||||
}
|
||||
|
||||
public List<Setting<?>> getSettings() {
|
||||
final Setting<Boolean> COMPLETE_SETTING = Setting.boolSetting("index.lifecycle.test.complete", false,
|
||||
Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
return Collections.singletonList(COMPLETE_SETTING);
|
||||
}
|
||||
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
|
||||
return Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleType.class, LockableLifecycleType.TYPE,
|
||||
(in) -> LockableLifecycleType.INSTANCE),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, ObservableAction.NAME, ObservableAction::readObservableAction),
|
||||
new NamedWriteableRegistry.Entry(ObservableClusterStateWaitStep.class, ObservableClusterStateWaitStep.NAME,
|
||||
ObservableClusterStateWaitStep::new));
|
||||
}
|
||||
}
|
||||
|
||||
public static class ObservableClusterStateWaitStep extends ClusterStateWaitStep implements NamedWriteable {
|
||||
public static final String NAME = "observable_cluster_state_action";
|
||||
|
||||
public ObservableClusterStateWaitStep(StepKey current, StepKey next) {
|
||||
super(current, next);
|
||||
}
|
||||
|
||||
public ObservableClusterStateWaitStep(StreamInput in) throws IOException {
|
||||
this(new StepKey(in.readString(), in.readString(), in.readString()), readOptionalNextStepKey(in));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(getKey().getPhase());
|
||||
out.writeString(getKey().getAction());
|
||||
out.writeString(getKey().getName());
|
||||
boolean hasNextStep = getNextStepKey() != null;
|
||||
out.writeBoolean(hasNextStep);
|
||||
if (hasNextStep) {
|
||||
out.writeString(getNextStepKey().getPhase());
|
||||
out.writeString(getNextStepKey().getAction());
|
||||
out.writeString(getNextStepKey().getName());
|
||||
}
|
||||
}
|
||||
|
||||
private static StepKey readOptionalNextStepKey(StreamInput in) throws IOException {
|
||||
if (in.readBoolean()) {
|
||||
return new StepKey(in.readString(), in.readString(), in.readString());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result isConditionMet(Index index, ClusterState clusterState) {
|
||||
boolean complete = clusterState.metaData().index("test").getSettings()
|
||||
.getAsBoolean("index.lifecycle.test.complete", false);
|
||||
return new Result(complete, null);
|
||||
}
|
||||
}
|
||||
|
||||
public static class ObservableAction extends MockAction {
|
||||
|
||||
ObservableAction(List<Step> steps, boolean safe) {
|
||||
super(steps, safe);
|
||||
}
|
||||
|
||||
public static ObservableAction readObservableAction(StreamInput in) throws IOException {
|
||||
List<Step> steps = in.readList(ObservableClusterStateWaitStep::new);
|
||||
boolean safe = in.readBoolean();
|
||||
return new ObservableAction(steps, safe);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeList(getSteps().stream().map(s -> (ObservableClusterStateWaitStep) s).collect(Collectors.toList()));
|
||||
out.writeBoolean(isSafeAction());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -42,25 +42,7 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
|
||||
|
||||
@Override
|
||||
protected IndexLifecycleMetadata createTestInstance() {
|
||||
int numPolicies = randomInt(5);
|
||||
SortedMap<String, LifecyclePolicyMetadata> policies = new TreeMap<>();
|
||||
for (int i = 0; i < numPolicies; i++) {
|
||||
int numberPhases = randomInt(5);
|
||||
Map<String, Phase> phases = new HashMap<>(numberPhases);
|
||||
for (int j = 0; j < numberPhases; j++) {
|
||||
TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after");
|
||||
Map<String, LifecycleAction> actions = Collections.emptyMap();
|
||||
if (randomBoolean()) {
|
||||
actions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
|
||||
}
|
||||
String phaseName = randomAlphaOfLength(10);
|
||||
phases.put(phaseName, new Phase(phaseName, after, actions));
|
||||
}
|
||||
String policyName = randomAlphaOfLength(10);
|
||||
policies.put(policyName, new LifecyclePolicyMetadata(new LifecyclePolicy(TestLifecycleType.INSTANCE, policyName, phases),
|
||||
Collections.emptyMap()));
|
||||
}
|
||||
return new IndexLifecycleMetadata(policies, randomFrom(OperationMode.values()));
|
||||
return createTestInstance(randomInt(5), randomFrom(OperationMode.values()));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -123,4 +105,24 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
|
||||
assertEquals(MetaData.ALL_CONTEXTS, createTestInstance().context());
|
||||
}
|
||||
|
||||
public static IndexLifecycleMetadata createTestInstance(int numPolicies, OperationMode mode) {
|
||||
SortedMap<String, LifecyclePolicyMetadata> policies = new TreeMap<>();
|
||||
for (int i = 0; i < numPolicies; i++) {
|
||||
int numberPhases = randomInt(5);
|
||||
Map<String, Phase> phases = new HashMap<>(numberPhases);
|
||||
for (int j = 0; j < numberPhases; j++) {
|
||||
TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after");
|
||||
Map<String, LifecycleAction> actions = Collections.emptyMap();
|
||||
if (randomBoolean()) {
|
||||
actions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
|
||||
}
|
||||
String phaseName = randomAlphaOfLength(10);
|
||||
phases.put(phaseName, new Phase(phaseName, after, actions));
|
||||
}
|
||||
String policyName = randomAlphaOfLength(10);
|
||||
policies.put(policyName, new LifecyclePolicyMetadata(new LifecyclePolicy(TestLifecycleType.INSTANCE, policyName, phases),
|
||||
Collections.emptyMap()));
|
||||
}
|
||||
return new IndexLifecycleMetadata(policies, mode);
|
||||
}
|
||||
}
|
||||
|
@ -44,6 +44,7 @@ import java.time.Clock;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
@ -127,11 +128,48 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
||||
assertNull(indexLifecycleService.getScheduler());
|
||||
}
|
||||
|
||||
public void testElectUnElectMaster() throws Exception {
|
||||
public void testOnlyChangesStateOnMasterWhenMetadataChanges() {
|
||||
int numPolicies = randomIntBetween(1, 5);
|
||||
IndexLifecycleMetadata lifecycleMetadata = IndexLifecycleMetadataTests.createTestInstance(numPolicies, OperationMode.RUNNING);
|
||||
IndexLifecycleMetadata newLifecycleMetadata = randomValueOtherThan(lifecycleMetadata,
|
||||
() -> IndexLifecycleMetadataTests.createTestInstance(numPolicies, OperationMode.RUNNING));
|
||||
MetaData previousMetadata = MetaData.builder()
|
||||
.persistentSettings(settings(Version.CURRENT)
|
||||
.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(3)).build())
|
||||
.putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata)
|
||||
.build();
|
||||
MetaData newMetaData = MetaData.builder(previousMetadata).putCustom(IndexLifecycleMetadata.TYPE, newLifecycleMetadata).build();
|
||||
|
||||
ClusterState previousState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metaData(previousMetadata)
|
||||
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
|
||||
.build();
|
||||
ClusterState newState = ClusterState.builder(previousState).metaData(newMetaData).build();
|
||||
ClusterChangedEvent event = new ClusterChangedEvent("_source", previousState, previousState);
|
||||
|
||||
Mockito.reset(clusterService);
|
||||
PolicyStepsRegistry policyStepsRegistry = indexLifecycleService.getPolicyRegistry();
|
||||
indexLifecycleService.applyClusterState(event);
|
||||
indexLifecycleService.clusterChanged(event);
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
assertNotNull(indexLifecycleService.getScheduler());
|
||||
assertEquals(1, indexLifecycleService.getScheduler().jobCount());
|
||||
assertNotNull(indexLifecycleService.getScheduledJob());
|
||||
assertThat(policyStepsRegistry.getLifecyclePolicyMap().keySet(), equalTo(lifecycleMetadata.getPolicyMetadatas().keySet()));
|
||||
|
||||
event = new ClusterChangedEvent("_source", newState, previousState);
|
||||
indexLifecycleService.applyClusterState(event);
|
||||
assertThat(policyStepsRegistry.getLifecyclePolicyMap().keySet(), equalTo(newLifecycleMetadata.getPolicyMetadatas().keySet()));
|
||||
}
|
||||
|
||||
public void testElectUnElectMaster() {
|
||||
int numberOfPolicies = randomIntBetween(1, 5);
|
||||
IndexLifecycleMetadata lifecycleMetadata = IndexLifecycleMetadataTests.createTestInstance(numberOfPolicies, OperationMode.RUNNING);
|
||||
Map<String, LifecyclePolicyMetadata> expectedPolicyMap = lifecycleMetadata.getPolicyMetadatas();
|
||||
MetaData metaData = MetaData.builder()
|
||||
.persistentSettings(settings(Version.CURRENT)
|
||||
.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(3)).build())
|
||||
.putCustom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY)
|
||||
.putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata)
|
||||
.build();
|
||||
|
||||
// First check that when the node has never been master the scheduler
|
||||
@ -158,13 +196,14 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
||||
event = new ClusterChangedEvent("_source", state, state);
|
||||
|
||||
// Check that when the node is first elected as master it sets up
|
||||
// the scheduler and job
|
||||
// the scheduler job and steps registry
|
||||
indexLifecycleService.applyClusterState(event);
|
||||
indexLifecycleService.clusterChanged(event);
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
assertNotNull(indexLifecycleService.getScheduler());
|
||||
assertEquals(1, indexLifecycleService.getScheduler().jobCount());
|
||||
assertNotNull(indexLifecycleService.getScheduledJob());
|
||||
assertThat(indexLifecycleService.getPolicyRegistry().getLifecyclePolicyMap(), equalTo(expectedPolicyMap));
|
||||
|
||||
Mockito.reset(clusterService);
|
||||
state = ClusterState.builder(ClusterName.DEFAULT)
|
||||
@ -173,13 +212,14 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
||||
.build();
|
||||
event = new ClusterChangedEvent("_source", state, state);
|
||||
|
||||
// Check that when the node is un-elected as master it cancels the job
|
||||
indexLifecycleService.applyClusterState(event);
|
||||
// Check that when the node is un-elected as master it cancels the job and cleans up steps registry
|
||||
indexLifecycleService.clusterChanged(event);
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
assertNotNull(indexLifecycleService.getScheduler());
|
||||
assertEquals(0, indexLifecycleService.getScheduler().jobCount());
|
||||
assertNull(indexLifecycleService.getScheduledJob());
|
||||
assertThat(indexLifecycleService.getPolicyRegistry().getLifecyclePolicyMap(), equalTo(expectedPolicyMap));
|
||||
|
||||
Mockito.reset(clusterService);
|
||||
state = ClusterState.builder(ClusterName.DEFAULT)
|
||||
@ -188,13 +228,14 @@ public class IndexLifecycleServiceTests extends ESTestCase {
|
||||
.build();
|
||||
event = new ClusterChangedEvent("_source", state, state);
|
||||
|
||||
// Check that when the node is re-elected as master it cancels the job
|
||||
// Check that when the node is re-elected as master it re-starts the job and populates the registry
|
||||
indexLifecycleService.applyClusterState(event);
|
||||
indexLifecycleService.clusterChanged(event);
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
assertNotNull(indexLifecycleService.getScheduler());
|
||||
assertEquals(1, indexLifecycleService.getScheduler().jobCount());
|
||||
assertNotNull(indexLifecycleService.getScheduledJob());
|
||||
assertThat(indexLifecycleService.getPolicyRegistry().getLifecyclePolicyMap(), equalTo(expectedPolicyMap));
|
||||
}
|
||||
|
||||
public void testSchedulerInitializationAndUpdate() {
|
||||
|
@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Phase;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This {@link LifecycleType} is used for encapsulating test policies
|
||||
* used in integration tests where the underlying {@link LifecycleAction}s are
|
||||
* able to communicate with the test
|
||||
*/
|
||||
public class LockableLifecycleType implements LifecycleType {
|
||||
public static final String TYPE = "lockable";
|
||||
public static final LockableLifecycleType INSTANCE = new LockableLifecycleType();
|
||||
|
||||
@Override
|
||||
public List<Phase> getOrderedPhases(Map<String, Phase> phases) {
|
||||
return new ArrayList<>(phases.values());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNextPhaseName(String currentPhaseName, Map<String, Phase> phases) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPreviousPhaseName(String currentPhaseName, Map<String, Phase> phases) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<LifecycleAction> getOrderedActions(Phase phase) {
|
||||
return new ArrayList<>(phase.getActions().values());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNextActionName(String currentActionName, Phase phase) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void validate(Collection<Phase> phases) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) {
|
||||
}
|
||||
}
|
@ -105,9 +105,10 @@ public class PolicyStepsRegistryTests extends ESTestCase {
|
||||
}
|
||||
Map<String, LifecyclePolicyMetadata> policyMap = Collections.singletonMap(newPolicy.getName(),
|
||||
new LifecyclePolicyMetadata(newPolicy, headers));
|
||||
IndexLifecycleMetadata lifecycleMetadata = new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING);
|
||||
MetaData metaData = MetaData.builder()
|
||||
.persistentSettings(settings(Version.CURRENT).build())
|
||||
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING))
|
||||
.putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata)
|
||||
.build();
|
||||
String nodeId = randomAlphaOfLength(10);
|
||||
DiscoveryNode masterNode = DiscoveryNode.createLocal(settings(Version.CURRENT)
|
||||
@ -122,7 +123,7 @@ public class PolicyStepsRegistryTests extends ESTestCase {
|
||||
PolicyStepsRegistry registry = new PolicyStepsRegistry();
|
||||
|
||||
// add new policy
|
||||
registry.update(currentState, client, () -> 0L);
|
||||
registry.update(lifecycleMetadata, client, () -> 0L);
|
||||
|
||||
assertThat(registry.getFirstStep(newPolicy.getName()), equalTo(policySteps.get(0)));
|
||||
assertThat(registry.getLifecyclePolicyMap().size(), equalTo(1));
|
||||
@ -140,18 +141,18 @@ public class PolicyStepsRegistryTests extends ESTestCase {
|
||||
Map<String, LifecyclePolicyMetadata> registryPolicyMap = registry.getLifecyclePolicyMap();
|
||||
Map<String, Step> registryFirstStepMap = registry.getFirstStepMap();
|
||||
Map<String, Map<Step.StepKey, Step>> registryStepMap = registry.getStepMap();
|
||||
registry.update(currentState, client, () -> 0L);
|
||||
registry.update(lifecycleMetadata, client, () -> 0L);
|
||||
assertThat(registry.getLifecyclePolicyMap(), equalTo(registryPolicyMap));
|
||||
assertThat(registry.getFirstStepMap(), equalTo(registryFirstStepMap));
|
||||
assertThat(registry.getStepMap(), equalTo(registryStepMap));
|
||||
|
||||
// remove policy
|
||||
lifecycleMetadata = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING);
|
||||
currentState = ClusterState.builder(currentState)
|
||||
.metaData(
|
||||
MetaData.builder(metaData)
|
||||
.putCustom(IndexLifecycleMetadata.TYPE,
|
||||
new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING))).build();
|
||||
registry.update(currentState, client, () -> 0L);
|
||||
.putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata)).build();
|
||||
registry.update(lifecycleMetadata, client, () -> 0L);
|
||||
assertTrue(registry.getLifecyclePolicyMap().isEmpty());
|
||||
assertTrue(registry.getFirstStepMap().isEmpty());
|
||||
assertTrue(registry.getStepMap().isEmpty());
|
||||
@ -169,9 +170,10 @@ public class PolicyStepsRegistryTests extends ESTestCase {
|
||||
}
|
||||
Map<String, LifecyclePolicyMetadata> policyMap = Collections.singletonMap(newPolicy.getName(),
|
||||
new LifecyclePolicyMetadata(newPolicy, headers));
|
||||
IndexLifecycleMetadata lifecycleMetadata = new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING);
|
||||
MetaData metaData = MetaData.builder()
|
||||
.persistentSettings(settings(Version.CURRENT).build())
|
||||
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING))
|
||||
.putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata)
|
||||
.build();
|
||||
String nodeId = randomAlphaOfLength(10);
|
||||
DiscoveryNode masterNode = DiscoveryNode.createLocal(settings(Version.CURRENT)
|
||||
@ -183,18 +185,15 @@ public class PolicyStepsRegistryTests extends ESTestCase {
|
||||
.build();
|
||||
PolicyStepsRegistry registry = new PolicyStepsRegistry();
|
||||
// add new policy
|
||||
registry.update(currentState, client, () -> 0L);
|
||||
registry.update(lifecycleMetadata, client, () -> 0L);
|
||||
|
||||
// swap out policy
|
||||
newPolicy = LifecyclePolicyTests.randomLifecyclePolicy(policyName);
|
||||
lifecycleMetadata = new IndexLifecycleMetadata(Collections.singletonMap(policyName,
|
||||
new LifecyclePolicyMetadata(newPolicy, Collections.emptyMap())), OperationMode.RUNNING);
|
||||
currentState = ClusterState.builder(currentState)
|
||||
.metaData(
|
||||
MetaData.builder(metaData)
|
||||
.putCustom(IndexLifecycleMetadata.TYPE,
|
||||
new IndexLifecycleMetadata(Collections.singletonMap(policyName,
|
||||
new LifecyclePolicyMetadata(newPolicy, Collections.emptyMap())), OperationMode.RUNNING)))
|
||||
.build();
|
||||
registry.update(currentState, client, () -> 0L);
|
||||
.metaData(MetaData.builder(metaData).putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata)).build();
|
||||
registry.update(lifecycleMetadata, client, () -> 0L);
|
||||
// TODO(talevy): assert changes... right now we do not support updates to policies. will require internal cleanup
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user