Rebuild step on PolicyStepsRegistry.getStep (#33780)

This moves away from caching a list of steps for a current phase, instead
rebuilding the necessary step from the phase JSON stored in the index's
metadata.

Relates to #29823
This commit is contained in:
Lee Hinman 2018-09-18 17:07:57 -06:00 committed by GitHub
parent 11a55d2307
commit 27dd25857b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 275 additions and 259 deletions

View File

@ -82,6 +82,55 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
return randomTimeseriesLifecyclePolicy(lifecycleName);
}
/**
* The same as {@link #randomTimeseriesLifecyclePolicy(String)} but ensures
* that the resulting policy has all valid phases and all valid actions.
*/
public static LifecyclePolicy randomTimeseriesLifecyclePolicyWithAllPhases(@Nullable String lifecycleName) {
List<String> phaseNames = TimeseriesLifecycleType.VALID_PHASES;
Map<String, Phase> phases = new HashMap<>(phaseNames.size());
Function<String, Set<String>> validActions = (phase) -> {
switch (phase) {
case "hot":
return TimeseriesLifecycleType.VALID_HOT_ACTIONS;
case "warm":
return TimeseriesLifecycleType.VALID_WARM_ACTIONS;
case "cold":
return TimeseriesLifecycleType.VALID_COLD_ACTIONS;
case "delete":
return TimeseriesLifecycleType.VALID_DELETE_ACTIONS;
default:
throw new IllegalArgumentException("invalid phase [" + phase + "]");
}};
Function<String, LifecycleAction> randomAction = (action) -> {
switch (action) {
case AllocateAction.NAME:
return AllocateActionTests.randomInstance();
case DeleteAction.NAME:
return new DeleteAction();
case ForceMergeAction.NAME:
return ForceMergeActionTests.randomInstance();
case ReadOnlyAction.NAME:
return new ReadOnlyAction();
case RolloverAction.NAME:
return RolloverActionTests.randomInstance();
case ShrinkAction.NAME:
return ShrinkActionTests.randomInstance();
default:
throw new IllegalArgumentException("invalid action [" + action + "]");
}};
for (String phase : phaseNames) {
TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after");
Map<String, LifecycleAction> actions = new HashMap<>();
Set<String> actionNames = validActions.apply(phase);
for (String action : actionNames) {
actions.put(action, randomAction.apply(action));
}
phases.put(phase, new Phase(phase, after, actions));
}
return new LifecyclePolicy(TimeseriesLifecycleType.INSTANCE, lifecycleName, phases);
}
public static LifecyclePolicy randomTimeseriesLifecyclePolicy(@Nullable String lifecycleName) {
List<String> phaseNames = randomSubsetOf(TimeseriesLifecycleType.VALID_PHASES);
Map<String, Phase> phases = new HashMap<>(phaseNames.size());

View File

@ -70,7 +70,7 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
// This index doesn't exist any more, there's nothing to execute currently
return currentState;
}
Step registeredCurrentStep = IndexLifecycleRunner.getCurrentStep(policyStepsRegistry, policy, index,
Step registeredCurrentStep = IndexLifecycleRunner.getCurrentStep(policyStepsRegistry, policy, indexMetaData,
indexMetaData.getSettings());
if (currentStep.equals(registeredCurrentStep)) {
// We can do cluster state steps all together until we
@ -118,7 +118,7 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
if (currentStep.getKey().getPhase().equals(currentStep.getNextStepKey().getPhase()) == false) {
return currentState;
}
currentStep = policyStepsRegistry.getStep(index, currentStep.getNextStepKey());
currentStep = policyStepsRegistry.getStep(indexMetaData, currentStep.getNextStepKey());
}
return currentState;
} else {

View File

@ -87,7 +87,7 @@ public class IndexLifecycleRunner {
+ LifecycleSettings.LIFECYCLE_SKIP + "== true");
return;
}
Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData.getIndex(), indexSettings);
Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, indexSettings);
if (currentStep == null) {
// This may happen in the case that there is invalid ilm-step index settings or the stepRegistry is out of
// sync with the current cluster state
@ -197,12 +197,12 @@ public class IndexLifecycleRunner {
}
}
static Step getCurrentStep(PolicyStepsRegistry stepRegistry, String policy, Index index, Settings indexSettings) {
static Step getCurrentStep(PolicyStepsRegistry stepRegistry, String policy, IndexMetaData indexMetaData, Settings indexSettings) {
StepKey currentStepKey = getCurrentStepKey(indexSettings);
if (currentStepKey == null) {
return stepRegistry.getFirstStep(policy);
} else {
return stepRegistry.getStep(index, currentStepKey);
return stepRegistry.getStep(indexMetaData, currentStepKey);
}
}

View File

@ -140,11 +140,6 @@ public class IndexLifecycleService extends AbstractComponent
public void applyClusterState(ClusterChangedEvent event) {
if (event.localNodeMaster()) { // only act if we are master, otherwise
// keep idle until elected
// Since indices keep their current phase's details even if the policy changes, it's possible for a deleted index to have a
// policy, and then be re-created with the same name, so here we remove indices that have been delete so they don't waste memory
if (event.indicesDeleted().isEmpty() == false) {
policyRegistry.removeIndices(event.indicesDeleted());
}
if (event.state().metaData().custom(IndexLifecycleMetadata.TYPE) != null) {
policyRegistry.update(event.state());
}

View File

@ -5,9 +5,9 @@
*/
package org.elasticsearch.xpack.indexlifecycle;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
@ -19,6 +19,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.Index;
@ -53,26 +54,18 @@ public class PolicyStepsRegistry {
private final Map<String, Step> firstStepMap;
// keeps track of a mapping from policy/step-name to respective Step, the key is policy name
private final Map<String, Map<Step.StepKey, Step>> stepMap;
// A map of index to a list of compiled steps for the current phase
private final Map<Index, List<Step>> indexPhaseSteps;
private final NamedXContentRegistry xContentRegistry;
public PolicyStepsRegistry(NamedXContentRegistry xContentRegistry, Client client) {
this.lifecyclePolicyMap = new TreeMap<>();
this.firstStepMap = new HashMap<>();
this.stepMap = new HashMap<>();
this.indexPhaseSteps = new HashMap<>();
this.xContentRegistry = xContentRegistry;
this.client = client;
this(new TreeMap<>(), new HashMap<>(), new HashMap<>(), xContentRegistry, client);
}
PolicyStepsRegistry(SortedMap<String, LifecyclePolicyMetadata> lifecyclePolicyMap,
Map<String, Step> firstStepMap, Map<String, Map<Step.StepKey, Step>> stepMap,
Map<Index, List<Step>> indexPhaseSteps, NamedXContentRegistry xContentRegistry, Client client) {
NamedXContentRegistry xContentRegistry, Client client) {
this.lifecyclePolicyMap = lifecyclePolicyMap;
this.firstStepMap = firstStepMap;
this.stepMap = stepMap;
this.indexPhaseSteps = indexPhaseSteps;
this.xContentRegistry = xContentRegistry;
this.client = client;
}
@ -89,17 +82,6 @@ public class PolicyStepsRegistry {
return stepMap;
}
/**
* Remove phase step lists for indices that have been deleted
* @param indices a list of indices that have been deleted
*/
public void removeIndices(List<Index> indices) {
indices.forEach(index -> {
logger.trace("removing cached phase steps for deleted index [{}]", index.getName());
indexPhaseSteps.remove(index);
});
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public void update(ClusterState clusterState) {
final IndexLifecycleMetadata meta = clusterState.metaData().custom(IndexLifecycleMetadata.TYPE);
@ -146,31 +128,15 @@ public class PolicyStepsRegistry {
assert ErrorStep.NAME.equals(step.getKey().getName()) == false : "unexpected error step in policy";
stepMapForPolicy.put(step.getKey(), step);
}
logger.trace("updating cached steps for [{}] policy, new steps: {}",
policyMetadata.getName(), stepMapForPolicy.keySet());
stepMap.put(policyMetadata.getName(), stepMapForPolicy);
}
}
}
}
for (ObjectCursor<IndexMetaData> imd : clusterState.metaData().getIndices().values()) {
final Index index = imd.value.getIndex();
final String policy = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
if (policy == null || lifecyclePolicyMap.containsKey(policy) == false) {
indexPhaseSteps.remove(index);
} else {
final List<Step> currentSteps = indexPhaseSteps.get(index);
// Get the current steps' phase, if there are steps stored
final String existingPhase = (currentSteps == null || currentSteps.size() == 0) ?
"_none_" : currentSteps.get(0).getKey().getPhase();
// Retrieve the current phase, defaulting to "new" if no phase is set
final String currentPhase = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE,
InitializePolicyContextStep.INITIALIZATION_PHASE);
if (existingPhase.equals(currentPhase) == false) {
logger.debug("index [{}] has transitioned phases [{} -> {}], rebuilding step list",
index, existingPhase, currentPhase);
// parse existing phase steps from the phase definition in the index settings
String phaseDef = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION,
InitializePolicyContextStep.INITIALIZATION_PHASE);
private List<Step> parseStepsFromPhase(String policy, String currentPhase, String phaseDef) throws IOException {
final PhaseExecutionInfo phaseExecutionInfo;
LifecyclePolicy currentPolicy = lifecyclePolicyMap.get(policy).getPolicy();
final LifecyclePolicy policyToExecute;
@ -183,10 +149,6 @@ public class PolicyStepsRegistry {
try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) {
phaseExecutionInfo = PhaseExecutionInfo.parse(parser, currentPhase);
} catch (IOException e) {
logger.error("failed to configure phase [" + currentPhase + "] for index [" + index.getName() + "]", e);
indexPhaseSteps.remove(index);
continue;
}
Map<String, Phase> phaseMap = new HashMap<>(currentPolicy.getPhases());
if (phaseExecutionInfo.getPhase() != null) {
@ -206,43 +168,46 @@ public class PolicyStepsRegistry {
.filter(e -> e.getKey().getPhase().equals(currentPhase))
.collect(Collectors.toList());
}
indexPhaseSteps.put(index, phaseSteps);
}
}
}
logger.trace("parsed steps for policy [{}] in phase [{}], definition: [{}], steps: [{}]",
policy, currentPhase, phaseDef, phaseSteps);
return phaseSteps;
}
/**
* returns the {@link Step} that matches the index name and
* stepkey specified. This is used by {@link ClusterState}
* readers that know the current policy and step by name
* as String values in the cluster state.
* @param index the index to get the step for
* @param stepKey the key to the requested {@link Step}
* @return the step for the given stepkey or null if the step was not found
*/
@Nullable
public Step getStep(final Index index, final Step.StepKey stepKey) {
public Step getStep(final IndexMetaData indexMetaData, final Step.StepKey stepKey) {
if (ErrorStep.NAME.equals(stepKey.getName())) {
return new ErrorStep(new Step.StepKey(stepKey.getPhase(), stepKey.getAction(), ErrorStep.NAME));
}
if (indexPhaseSteps.get(index) == null) {
return null;
final String phase = stepKey.getPhase();
final String policyName = indexMetaData.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
final Index index = indexMetaData.getIndex();
if (policyName == null) {
throw new IllegalArgumentException("failed to retrieve step " + stepKey + " as index [" + index.getName() + "] has no policy");
}
if (logger.isTraceEnabled()) {
logger.trace("[{}]: retrieving step [{}], found: [{}]\nall steps for this phase: [{}]", index, stepKey,
indexPhaseSteps.get(index).stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null),
indexPhaseSteps.get(index));
} else if (logger.isDebugEnabled()) {
logger.debug("[{}]: retrieving step [{}], found: [{}]", index, stepKey,
indexPhaseSteps.get(index).stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null));
// parse phase steps from the phase definition in the index settings
final String phaseJson = indexMetaData.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION,
InitializePolicyContextStep.INITIALIZATION_PHASE);
final List<Step> phaseSteps;
try {
phaseSteps = parseStepsFromPhase(policyName, phase, phaseJson);
} catch (IOException e) {
throw new ElasticsearchException("failed to load cached steps for " + stepKey, e);
} catch (XContentParseException parseErr) {
throw new XContentParseException(parseErr.getLocation(),
"failed to load cached steps for " + stepKey + " from [" + phaseJson + "]", parseErr);
}
assert indexPhaseSteps.get(index).stream().allMatch(step -> step.getKey().getPhase().equals(stepKey.getPhase())) :
"expected all steps for [" + index + "] to be in phase [" + stepKey.getPhase() +
"] but they were not, steps: " + indexPhaseSteps.get(index);
return indexPhaseSteps.get(index).stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null);
assert phaseSteps.stream().allMatch(step -> step.getKey().getPhase().equals(phase)) :
"expected phase steps loaded from phase definition for [" + index.getName() + "] to be in phase [" + phase +
"] but they were not, steps: " + phaseSteps;
// Return the step that matches the given stepKey or else null if we couldn't find it
return phaseSteps.stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null);
}
/**

View File

@ -60,6 +60,7 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
private String allClusterPolicyName;
private String invalidPolicyName;
private Index index;
private IndexMetaData indexMetaData;
private MockClusterStateActionStep firstStep;
private MockClusterStateWaitStep secondStep;
private MockClusterStateWaitStep allClusterSecondStep;
@ -104,10 +105,10 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
indexName = randomAlphaOfLength(5);
lifecycleMetadata = new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING);
setupIndexPolicy(mixedPolicyName);
indexMetaData = setupIndexPolicy(mixedPolicyName);
}
private void setupIndexPolicy(String policyName) {
private IndexMetaData setupIndexPolicy(String policyName) {
// Reset the index to use the "allClusterPolicyName"
IndexMetaData indexMetadata = IndexMetaData.builder(indexName)
.settings(settings(Version.CURRENT)
@ -131,11 +132,12 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
policyStepsRegistry.update(clusterState);
return indexMetadata;
}
public void testNeverExecuteNonClusterStateStep() throws IOException {
setStateToKey(thirdStepKey);
Step startStep = policyStepsRegistry.getStep(index, thirdStepKey);
Step startStep = policyStepsRegistry.getStep(indexMetaData, thirdStepKey);
long now = randomNonNegativeLong();
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now);
assertThat(task.execute(clusterState), sameInstance(clusterState));
@ -143,7 +145,7 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
public void testExecuteUntilFirstNonClusterStateStep() throws IOException {
setStateToKey(secondStepKey);
Step startStep = policyStepsRegistry.getStep(index, secondStepKey);
Step startStep = policyStepsRegistry.getStep(indexMetaData, secondStepKey);
long now = randomNonNegativeLong();
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now);
ClusterState newState = task.execute(clusterState);
@ -177,7 +179,7 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
public void testExecuteIncompleteWaitStepNoInfo() throws IOException {
secondStep.setWillComplete(false);
setStateToKey(secondStepKey);
Step startStep = policyStepsRegistry.getStep(index, secondStepKey);
Step startStep = policyStepsRegistry.getStep(indexMetaData, secondStepKey);
long now = randomNonNegativeLong();
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now);
ClusterState newState = task.execute(clusterState);
@ -195,7 +197,7 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
RandomStepInfo stepInfo = new RandomStepInfo(() -> randomAlphaOfLength(10));
secondStep.expectedInfo(stepInfo);
setStateToKey(secondStepKey);
Step startStep = policyStepsRegistry.getStep(index, secondStepKey);
Step startStep = policyStepsRegistry.getStep(indexMetaData, secondStepKey);
long now = randomNonNegativeLong();
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now);
ClusterState newState = task.execute(clusterState);
@ -211,7 +213,7 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
public void testOnFailure() throws IOException {
setStateToKey(secondStepKey);
Step startStep = policyStepsRegistry.getStep(index, secondStepKey);
Step startStep = policyStepsRegistry.getStep(indexMetaData, secondStepKey);
long now = randomNonNegativeLong();
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now);
Exception expectedException = new RuntimeException();

View File

@ -7,11 +7,13 @@ package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
@ -39,6 +41,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.MockAction;
import org.elasticsearch.xpack.core.indexlifecycle.MockStep;
import org.elasticsearch.xpack.core.indexlifecycle.OperationMode;
import org.elasticsearch.xpack.core.indexlifecycle.Phase;
import org.elasticsearch.xpack.core.indexlifecycle.PhaseExecutionInfo;
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
@ -64,6 +67,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
public class IndexLifecycleRunnerTests extends ESTestCase {
private static final NamedXContentRegistry REGISTRY = new NamedXContentRegistry(new IndexLifecycle(Settings.EMPTY).getNamedXContent());
private PolicyStepsRegistry createOneStepPolicyStepRegistry(String policyName, Step step) {
return createOneStepPolicyStepRegistry(policyName, step, "test");
@ -82,7 +86,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
steps.add(step);
Index index = new Index(indexName, indexName + "uuid");
indexSteps.put(index, steps);
return new PolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, indexSteps, NamedXContentRegistry.EMPTY, null);
return new PolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, NamedXContentRegistry.EMPTY, null);
}
public void testRunPolicyTerminalPolicyStep() {
@ -435,123 +439,48 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
}
public void testGetCurrentStep() {
SortedMap<String, LifecyclePolicyMetadata> lifecyclePolicyMap = null; // Not used in the methods tested here
String policyName = "policy_1";
String otherPolicyName = "other_policy";
String policyName = "policy";
StepKey firstStepKey = new StepKey("phase_1", "action_1", "step_1");
StepKey secondStepKey = new StepKey("phase_1", "action_1", "step_2");
StepKey thirdStepKey = new StepKey("phase_1", "action_2", "step_1");
StepKey fourthStepKey = new StepKey("phase_2", "action_1", "step_1");
StepKey otherPolicyFirstStepKey = new StepKey("phase_1", "action_1", "step_1");
StepKey otherPolicySecondStepKey = new StepKey("phase_1", "action_1", "step_2");
Step firstStep = new MockStep(firstStepKey, secondStepKey);
Step secondStep = new MockStep(secondStepKey, thirdStepKey);
Step thirdStep = new MockStep(thirdStepKey, fourthStepKey);
Step fourthStep = new MockStep(fourthStepKey, null);
Step otherPolicyFirstStep = new MockStep(firstStepKey, secondStepKey);
Step otherPolicySecondStep = new MockStep(secondStepKey, thirdStepKey);
Map<String, Step> firstStepMap = new HashMap<>();
firstStepMap.put(policyName, firstStep);
firstStepMap.put(otherPolicyName, otherPolicyFirstStep);
Map<String, Map<StepKey, Step>> stepMap = new HashMap<>();
Map<StepKey, Step> policySteps = new HashMap<>();
policySteps.put(firstStepKey, firstStep);
policySteps.put(secondStepKey, secondStep);
policySteps.put(thirdStepKey, thirdStep);
policySteps.put(fourthStepKey, fourthStep);
stepMap.put(policyName, policySteps);
Map<StepKey, Step> otherPolicySteps = new HashMap<>();
otherPolicySteps.put(otherPolicyFirstStepKey, otherPolicyFirstStep);
otherPolicySteps.put(otherPolicySecondStepKey, otherPolicySecondStep);
stepMap.put(otherPolicyName, otherPolicySteps);
Map<Index, List<Step>> indexSteps = new HashMap<>();
List<Step> phase1Steps = new ArrayList<>();
phase1Steps.add(firstStep);
phase1Steps.add(secondStep);
phase1Steps.add(thirdStep);
Index index = new Index("test", "uuid");
indexSteps.put(index, phase1Steps);
PolicyStepsRegistry registry = new PolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, indexSteps,
NamedXContentRegistry.EMPTY, null);
Settings indexSettings = Settings.EMPTY;
Step actualStep = IndexLifecycleRunner.getCurrentStep(registry, policyName, index, indexSettings);
assertSame(firstStep, actualStep);
indexSettings = Settings.builder()
.put(LifecycleSettings.LIFECYCLE_PHASE, "phase_1")
.put(LifecycleSettings.LIFECYCLE_ACTION, "action_1")
.put(LifecycleSettings.LIFECYCLE_STEP, "step_1")
Step.StepKey MOCK_STEP_KEY = new Step.StepKey("mock", "mock", "mock");
Client client = mock(Client.class);
Mockito.when(client.settings()).thenReturn(Settings.EMPTY);
LifecyclePolicy policy = LifecyclePolicyTests.randomTimeseriesLifecyclePolicyWithAllPhases(policyName);
LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(policy, Collections.emptyMap(), 1, randomNonNegativeLong());
String phaseName = randomFrom(policy.getPhases().keySet());
Phase phase = policy.getPhases().get(phaseName);
PhaseExecutionInfo pei = new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong());
String phaseJson = Strings.toString(pei);
LifecycleAction action = randomFrom(phase.getActions().values());
Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY));
Settings indexSettings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put(LifecycleSettings.LIFECYCLE_NAME, policyName)
.put(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION, phaseJson)
.put(LifecycleSettings.LIFECYCLE_PHASE, step.getKey().getPhase())
.put(LifecycleSettings.LIFECYCLE_ACTION, step.getKey().getAction())
.put(LifecycleSettings.LIFECYCLE_STEP, step.getKey().getName())
.build();
actualStep = IndexLifecycleRunner.getCurrentStep(registry, policyName, index, indexSettings);
assertSame(firstStep, actualStep);
IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(indexSettings).build();
SortedMap<String, LifecyclePolicyMetadata> metas = new TreeMap<>();
metas.put(policyName, policyMetadata);
PolicyStepsRegistry registry = new PolicyStepsRegistry(metas, firstStepMap, stepMap, REGISTRY, client);
indexSettings = Settings.builder()
.put(LifecycleSettings.LIFECYCLE_PHASE, "phase_1")
.put(LifecycleSettings.LIFECYCLE_ACTION, "action_1")
.put(LifecycleSettings.LIFECYCLE_STEP, "step_2")
.build();
actualStep = IndexLifecycleRunner.getCurrentStep(registry, policyName, index, indexSettings);
assertSame(secondStep, actualStep);
// First step is retrieved because there are no settings for the index
Step stepFromNoSettings = IndexLifecycleRunner.getCurrentStep(registry, policy.getName(), indexMetaData, Settings.EMPTY);
assertEquals(firstStep, stepFromNoSettings);
indexSettings = Settings.builder()
.put(LifecycleSettings.LIFECYCLE_PHASE, "phase_1")
.put(LifecycleSettings.LIFECYCLE_ACTION, "action_2")
.put(LifecycleSettings.LIFECYCLE_STEP, "step_1")
.build();
actualStep = IndexLifecycleRunner.getCurrentStep(registry, policyName, index, indexSettings);
assertSame(thirdStep, actualStep);
// Switch to phase_2
// TODO: it'd be nice if we used the actual registry.update method for this
indexSteps.clear();
indexSteps.put(index, Collections.singletonList(fourthStep));
registry = new PolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, indexSteps, NamedXContentRegistry.EMPTY, null);
indexSettings = Settings.builder()
.put(LifecycleSettings.LIFECYCLE_PHASE, "phase_2")
.put(LifecycleSettings.LIFECYCLE_ACTION, "action_1")
.put(LifecycleSettings.LIFECYCLE_STEP, "step_1")
.build();
actualStep = IndexLifecycleRunner.getCurrentStep(registry, policyName, index, indexSettings);
assertSame(fourthStep, actualStep);
indexSettings = Settings.builder()
.put(LifecycleSettings.LIFECYCLE_PHASE, "phase_2")
.put(LifecycleSettings.LIFECYCLE_ACTION, "action_1")
.put(LifecycleSettings.LIFECYCLE_STEP, "step_1")
.build();
actualStep = IndexLifecycleRunner.getCurrentStep(registry, policyName, index, indexSettings);
assertSame(fourthStep, actualStep);
// Back to phase_1
indexSteps.clear();
indexSteps.put(index, phase1Steps);
registry = new PolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, indexSteps, NamedXContentRegistry.EMPTY, null);
indexSettings = Settings.builder()
.put(LifecycleSettings.LIFECYCLE_PHASE, "phase_1")
.put(LifecycleSettings.LIFECYCLE_ACTION, "action_1")
.put(LifecycleSettings.LIFECYCLE_STEP, "step_1")
.build();
actualStep = IndexLifecycleRunner.getCurrentStep(registry, otherPolicyName, index, indexSettings);
assertEquals(otherPolicyFirstStep, actualStep);
indexSettings = Settings.builder()
.put(LifecycleSettings.LIFECYCLE_PHASE, "phase_1")
.put(LifecycleSettings.LIFECYCLE_ACTION, "action_1")
.put(LifecycleSettings.LIFECYCLE_STEP, "step_2")
.build();
actualStep = IndexLifecycleRunner.getCurrentStep(registry, otherPolicyName, index, indexSettings);
assertEquals(otherPolicySecondStep, actualStep);
Settings invalidIndexSettings = Settings.builder()
.put(LifecycleSettings.LIFECYCLE_PHASE, "phase_1")
.put(LifecycleSettings.LIFECYCLE_ACTION, "action_1")
.put(LifecycleSettings.LIFECYCLE_STEP, "step_3")
.build();
assertNull(IndexLifecycleRunner.getCurrentStep(registry, policyName, index, invalidIndexSettings));
assertNull(IndexLifecycleRunner.getCurrentStep(registry, "policy_does_not_exist", new Index("test","bad"), invalidIndexSettings));
// The step that was written into the metadata is retrieved
Step currentStep = IndexLifecycleRunner.getCurrentStep(registry, policy.getName(), indexMetaData, indexSettings);
assertEquals(step.getKey(), currentStep.getKey());
}
public void testMoveClusterStateToNextStep() {
@ -1085,7 +1014,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
Map<String, Map<StepKey, Step>> stepMap = Collections.singletonMap(policyName, policySteps);
Map<Index, List<Step>> indexSteps = Collections.singletonMap(index, Collections.singletonList(step));
PolicyStepsRegistry policyStepsRegistry = new PolicyStepsRegistry(lifecyclePolicyMap, firstStepMap,
stepMap, indexSteps, NamedXContentRegistry.EMPTY, null);
stepMap, NamedXContentRegistry.EMPTY, null);
ClusterService clusterService = mock(ClusterService.class);
final AtomicLong now = new AtomicLong(5);
IndexLifecycleRunner runner = new IndexLifecycleRunner(policyStepsRegistry, clusterService, now::get);

View File

@ -23,17 +23,19 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.Index;
import org.elasticsearch.node.Node;
import org.elasticsearch.xpack.core.indexlifecycle.OperationMode;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.InitializePolicyContextStep;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyTests;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.indexlifecycle.MockStep;
import org.elasticsearch.xpack.core.indexlifecycle.OperationMode;
import org.elasticsearch.xpack.core.indexlifecycle.Phase;
import org.elasticsearch.xpack.core.indexlifecycle.PhaseExecutionInfo;
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction;
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkStep;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
@ -43,18 +45,28 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
public class PolicyStepsRegistryTests extends ESTestCase {
private static final Step.StepKey MOCK_STEP_KEY = new Step.StepKey("mock", "mock", "mock");
private static final NamedXContentRegistry REGISTRY = new NamedXContentRegistry(new IndexLifecycle(Settings.EMPTY).getNamedXContent());
private IndexMetaData emptyMetaData(Index index) {
return IndexMetaData.builder(index.getName()).settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
}
public void testGetFirstStep() {
String policyName = randomAlphaOfLengthBetween(2, 10);
Step expectedFirstStep = new MockStep(MOCK_STEP_KEY, null);
Map<String, Step> firstStepMap = Collections.singletonMap(policyName, expectedFirstStep);
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, firstStepMap, null, null, NamedXContentRegistry.EMPTY, null);
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, firstStepMap, null, NamedXContentRegistry.EMPTY, null);
Step actualFirstStep = registry.getFirstStep(policyName);
assertThat(actualFirstStep, sameInstance(expectedFirstStep));
}
@ -63,18 +75,36 @@ public class PolicyStepsRegistryTests extends ESTestCase {
String policyName = randomAlphaOfLengthBetween(2, 10);
Step expectedFirstStep = new MockStep(MOCK_STEP_KEY, null);
Map<String, Step> firstStepMap = Collections.singletonMap(policyName, expectedFirstStep);
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, firstStepMap, null, null, NamedXContentRegistry.EMPTY, null);
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, firstStepMap, null, NamedXContentRegistry.EMPTY, null);
Step actualFirstStep = registry.getFirstStep(policyName + "unknown");
assertNull(actualFirstStep);
}
public void testGetStep() {
Step expectedStep = new MockStep(MOCK_STEP_KEY, null);
Index index = new Index("test", "uuid");
Map<Index, List<Step>> indexSteps = Collections.singletonMap(index, Collections.singletonList(expectedStep));
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, null, null, indexSteps, NamedXContentRegistry.EMPTY, null);
Step actualStep = registry.getStep(index, MOCK_STEP_KEY);
assertThat(actualStep, sameInstance(expectedStep));
Client client = mock(Client.class);
Mockito.when(client.settings()).thenReturn(Settings.EMPTY);
LifecyclePolicy policy = LifecyclePolicyTests.randomTimeseriesLifecyclePolicyWithAllPhases("policy");
LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(policy, Collections.emptyMap(), 1, randomNonNegativeLong());
String phaseName = randomFrom(policy.getPhases().keySet());
Phase phase = policy.getPhases().get(phaseName);
PhaseExecutionInfo pei = new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong());
String phaseJson = Strings.toString(pei);
LifecycleAction action = randomFrom(phase.getActions().values());
Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY));
IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put(LifecycleSettings.LIFECYCLE_NAME, "policy")
.put(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION, phaseJson)
.build())
.build();
SortedMap<String, LifecyclePolicyMetadata> metas = new TreeMap<>();
metas.put("policy", policyMetadata);
PolicyStepsRegistry registry = new PolicyStepsRegistry(metas, null, null, REGISTRY, client);
Step actualStep = registry.getStep(indexMetaData, step.getKey());
assertThat(actualStep.getKey(), equalTo(step.getKey()));
}
public void testGetStepErrorStep() {
@ -82,29 +112,71 @@ public class PolicyStepsRegistryTests extends ESTestCase {
Step expectedStep = new ErrorStep(errorStepKey);
Index index = new Index("test", "uuid");
Map<Index, List<Step>> indexSteps = Collections.singletonMap(index, Collections.singletonList(expectedStep));
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, null, null, indexSteps, NamedXContentRegistry.EMPTY, null);
Step actualStep = registry.getStep(index, errorStepKey);
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, null, null, NamedXContentRegistry.EMPTY, null);
Step actualStep = registry.getStep(emptyMetaData(index), errorStepKey);
assertThat(actualStep, equalTo(expectedStep));
}
public void testGetStepUnknownPolicy() {
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, null, null, Collections.emptyMap(), NamedXContentRegistry.EMPTY, null);
assertNull(registry.getStep(new Index("test", "uuid"), MOCK_STEP_KEY));
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, null, null, NamedXContentRegistry.EMPTY, null);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> registry.getStep(emptyMetaData(new Index("test", "uuid")), MOCK_STEP_KEY));
assertThat(e.getMessage(),
containsString("failed to retrieve step {\"phase\":\"mock\",\"action\":\"mock\",\"name\":\"mock\"}" +
" as index [test] has no policy"));
}
public void testGetStepForIndexWithNoPhaseGetsInitializationStep() {
Client client = mock(Client.class);
Mockito.when(client.settings()).thenReturn(Settings.EMPTY);
LifecyclePolicy policy = LifecyclePolicyTests.randomTimeseriesLifecyclePolicy("policy");
LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(policy, Collections.emptyMap(), 1, randomNonNegativeLong());
IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put(LifecycleSettings.LIFECYCLE_NAME, "policy")
.build())
.build();
SortedMap<String, LifecyclePolicyMetadata> metas = new TreeMap<>();
metas.put("policy", policyMetadata);
PolicyStepsRegistry registry = new PolicyStepsRegistry(metas, null, null, REGISTRY, client);
Step step = registry.getStep(indexMetaData, InitializePolicyContextStep.KEY);
assertNotNull(step);
}
public void testGetStepUnknownStepKey() {
Step expectedStep = new MockStep(MOCK_STEP_KEY, null);
Index index = new Index("test", "uuid");
Map<Index, List<Step>> indexSteps = Collections.singletonMap(index, Collections.singletonList(expectedStep));
PolicyStepsRegistry registry = new PolicyStepsRegistry(null, null, null, indexSteps, NamedXContentRegistry.EMPTY, null);
Step.StepKey unknownStepKey = new Step.StepKey(MOCK_STEP_KEY.getPhase(),
MOCK_STEP_KEY.getAction(),MOCK_STEP_KEY.getName() + "not");
assertNull(registry.getStep(index, unknownStepKey));
Client client = mock(Client.class);
Mockito.when(client.settings()).thenReturn(Settings.EMPTY);
LifecyclePolicy policy = LifecyclePolicyTests.randomTimeseriesLifecyclePolicyWithAllPhases("policy");
LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(policy, Collections.emptyMap(), 1, randomNonNegativeLong());
String phaseName = randomFrom(policy.getPhases().keySet());
Phase phase = policy.getPhases().get(phaseName);
PhaseExecutionInfo pei = new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong());
String phaseJson = Strings.toString(pei);
LifecycleAction action = randomFrom(phase.getActions().values());
Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY));
IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put(LifecycleSettings.LIFECYCLE_NAME, "policy")
.put(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION, phaseJson)
.build())
.build();
SortedMap<String, LifecyclePolicyMetadata> metas = new TreeMap<>();
metas.put("policy", policyMetadata);
PolicyStepsRegistry registry = new PolicyStepsRegistry(metas, null, null, REGISTRY, client);
Step actualStep = registry.getStep(indexMetaData,
new Step.StepKey(step.getKey().getPhase(), step.getKey().getAction(), step.getKey().getName() + "-bad"));
assertNull(actualStep);
}
public void testUpdateFromNothingToSomethingToNothing() throws Exception {
Index index = new Index("test", "uuid");
Client client = Mockito.mock(Client.class);
Client client = mock(Client.class);
Mockito.when(client.settings()).thenReturn(Settings.EMPTY);
String policyName = randomAlphaOfLength(5);
LifecyclePolicy newPolicy = LifecyclePolicyTests.randomTestLifecyclePolicy(policyName);
@ -169,7 +241,7 @@ public class PolicyStepsRegistryTests extends ESTestCase {
.build();
registry.update(currentState);
assertThat(registeredStepsForPolicy.get(step.getKey()), equalTo(step));
assertThat(registry.getStep(index, step.getKey()), equalTo(step));
assertThat(registry.getStep(metaData.index(index), step.getKey()), equalTo(step));
}
Map<String, LifecyclePolicyMetadata> registryPolicyMap = registry.getLifecyclePolicyMap();
@ -193,7 +265,7 @@ public class PolicyStepsRegistryTests extends ESTestCase {
}
public void testUpdateChangedPolicy() {
Client client = Mockito.mock(Client.class);
Client client = mock(Client.class);
Mockito.when(client.settings()).thenReturn(Settings.EMPTY);
String policyName = randomAlphaOfLengthBetween(5, 10);
LifecyclePolicy newPolicy = LifecyclePolicyTests.randomTestLifecyclePolicy(policyName);
@ -234,12 +306,15 @@ public class PolicyStepsRegistryTests extends ESTestCase {
public void testUpdatePolicyButNoPhaseChangeIndexStepsDontChange() throws Exception {
Index index = new Index("test", "uuid");
Client client = Mockito.mock(Client.class);
Client client = mock(Client.class);
Mockito.when(client.settings()).thenReturn(Settings.EMPTY);
String policyName = randomAlphaOfLength(5);
Map<String, LifecycleAction> actions = new HashMap<>();
actions.put("shrink", new ShrinkAction(1));
Map<String, Phase> phases = new HashMap<>();
Phase warmPhase = new Phase("warm", TimeValue.ZERO, actions);
PhaseExecutionInfo pei = new PhaseExecutionInfo(policyName, warmPhase, 1, randomNonNegativeLong());
String phaseJson = Strings.toString(pei);
phases.put("warm", new Phase("warm", TimeValue.ZERO, actions));
LifecyclePolicy newPolicy = new LifecyclePolicy(policyName, phases);
// Modify the policy
@ -269,7 +344,8 @@ public class PolicyStepsRegistryTests extends ESTestCase {
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT.id)
.put(LifecycleSettings.LIFECYCLE_NAME, policyName)
.put(LifecycleSettings.LIFECYCLE_PHASE, "warm")))
.put(LifecycleSettings.LIFECYCLE_PHASE, "warm")
.put(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION, phaseJson)))
.build();
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
@ -287,7 +363,7 @@ public class PolicyStepsRegistryTests extends ESTestCase {
.build();
// start with empty registry
PolicyStepsRegistry registry = new PolicyStepsRegistry(NamedXContentRegistry.EMPTY, client);
PolicyStepsRegistry registry = new PolicyStepsRegistry(REGISTRY, client);
// add new policy
registry.update(currentState);
@ -296,7 +372,7 @@ public class PolicyStepsRegistryTests extends ESTestCase {
Step shrinkStep = registeredStepsForPolicy.entrySet().stream()
.filter(e -> e.getKey().getPhase().equals("warm") && e.getKey().getName().equals("shrink"))
.findFirst().get().getValue();
Step gotStep = registry.getStep(index, shrinkStep.getKey());
Step gotStep = registry.getStep(metaData.index(index), shrinkStep.getKey());
assertThat(((ShrinkStep) shrinkStep).getNumberOfShards(), equalTo(1));
assertThat(((ShrinkStep) gotStep).getNumberOfShards(), equalTo(1));
@ -322,7 +398,7 @@ public class PolicyStepsRegistryTests extends ESTestCase {
shrinkStep = registeredStepsForPolicy.entrySet().stream()
.filter(e -> e.getKey().getPhase().equals("warm") && e.getKey().getName().equals("shrink"))
.findFirst().get().getValue();
gotStep = registry.getStep(index, shrinkStep.getKey());
gotStep = registry.getStep(metaData.index(index), shrinkStep.getKey());
assertThat(((ShrinkStep) shrinkStep).getNumberOfShards(), equalTo(2));
assertThat(((ShrinkStep) gotStep).getNumberOfShards(), equalTo(1));
}