diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncWaitStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncWaitStep.java index 5f2e2793e05..f6c968cfae4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncWaitStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncWaitStep.java @@ -6,8 +6,8 @@ package org.elasticsearch.xpack.core.indexlifecycle; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.index.Index; public abstract class AsyncWaitStep extends Step { @@ -22,7 +22,7 @@ public abstract class AsyncWaitStep extends Step { return client; } - public abstract void evaluateCondition(Index index, Listener listener); + public abstract void evaluateCondition(IndexMetaData indexMetaData, Listener listener); public interface Listener { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java index 1b55e82ea00..8fd9098c3e3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java @@ -219,9 +219,9 @@ public class LifecyclePolicy extends AbstractDiffable steps.add(new InitializePolicyContextStep(InitializePolicyContextStep.KEY, lastStepKey)); Collections.reverse(steps); - logger.debug("STEP COUNT: " + steps.size()); + logger.trace("STEP COUNT: " + steps.size()); for (Step step : steps) { - logger.debug(step.getKey() + " -> " + step.getNextStepKey()); + logger.trace(step.getKey() + " -> " + step.getNextStepKey()); } return steps; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java index c4aa7d079a7..f95d4ffbf61 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java @@ -8,16 +8,18 @@ package org.elasticsearch.xpack.core.indexlifecycle; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import java.io.IOException; import java.util.Locale; import java.util.Objects; -public class RolloverStep extends AsyncActionStep { +public class RolloverStep extends AsyncWaitStep { public static final String NAME = "attempt_rollover"; private ByteSizeValue maxSize; @@ -33,7 +35,7 @@ public class RolloverStep extends AsyncActionStep { } @Override - public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) { + public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) { String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetaData.getSettings()); if (Strings.isNullOrEmpty(rolloverAlias)) { @@ -54,7 +56,7 @@ public class RolloverStep extends AsyncActionStep { rolloverRequest.addMaxIndexDocsCondition(maxDocs); } getClient().admin().indices().rolloverIndex(rolloverRequest, - ActionListener.wrap(response -> listener.onResponse(response.isRolledOver()), listener::onFailure)); + ActionListener.wrap(response -> listener.onResponse(response.isRolledOver(), new EmptyInfo()), listener::onFailure)); } ByteSizeValue getMaxSize() { @@ -89,4 +91,13 @@ public class RolloverStep extends AsyncActionStep { Objects.equals(maxDocs, other.maxDocs); } + // We currently have no information to provide for this AsyncWaitStep, so this is an empty object + private class EmptyInfo implements ToXContentObject { + private EmptyInfo() {} + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder; + } + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStep.java index f775022cf85..0d706dca104 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStep.java @@ -8,12 +8,12 @@ package org.elasticsearch.xpack.core.indexlifecycle; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.Index; import java.io.IOException; import java.util.Arrays; @@ -38,12 +38,14 @@ public class SegmentCountStep extends AsyncWaitStep { } @Override - public void evaluateCondition(Index index, Listener listener) { - getClient().admin().indices().segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> { - long numberShardsLeftToMerge = StreamSupport.stream(response.getIndices().get(index.getName()).spliterator(), false) - .filter(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> p.getSegments().size() > maxNumSegments)).count(); - listener.onResponse(numberShardsLeftToMerge == 0, new Info(numberShardsLeftToMerge)); - }, listener::onFailure)); + public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) { + getClient().admin().indices().segments(new IndicesSegmentsRequest(indexMetaData.getIndex().getName()), + ActionListener.wrap(response -> { + long numberShardsLeftToMerge = + StreamSupport.stream(response.getIndices().get(indexMetaData.getIndex().getName()).spliterator(), false) + .filter(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> p.getSegments().size() > maxNumSegments)).count(); + listener.onResponse(numberShardsLeftToMerge == 0, new Info(numberShardsLeftToMerge)); + }, listener::onFailure)); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStepTests.java index c2445c57db2..a35daa1a01d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStepTests.java @@ -21,7 +21,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep.Listener; +import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey; import org.junit.Before; import org.mockito.Mockito; @@ -148,10 +148,10 @@ public class RolloverStepTests extends AbstractStepTestCase { }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any()); SetOnce actionCompleted = new SetOnce<>(); - step.performAction(indexMetaData, null, new Listener() { + step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() { @Override - public void onResponse(boolean complete) { + public void onResponse(boolean complete, ToXContentObject obj) { actionCompleted.set(complete); } @@ -205,10 +205,10 @@ public class RolloverStepTests extends AbstractStepTestCase { }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any()); SetOnce actionCompleted = new SetOnce<>(); - step.performAction(indexMetaData, null, new Listener() { + step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() { @Override - public void onResponse(boolean complete) { + public void onResponse(boolean complete, ToXContentObject obj) { actionCompleted.set(complete); } @@ -263,10 +263,10 @@ public class RolloverStepTests extends AbstractStepTestCase { }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any()); SetOnce exceptionThrown = new SetOnce<>(); - step.performAction(indexMetaData, null, new Listener() { + step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() { @Override - public void onResponse(boolean complete) { + public void onResponse(boolean complete, ToXContentObject obj) { throw new AssertionError("Unexpected method call"); } @@ -292,9 +292,9 @@ public class RolloverStepTests extends AbstractStepTestCase { RolloverStep step = createRandomInstance(); SetOnce exceptionThrown = new SetOnce<>(); - step.performAction(indexMetaData, null, new Listener() { + step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() { @Override - public void onResponse(boolean complete) { + public void onResponse(boolean complete, ToXContentObject obj) { throw new AssertionError("Unexpected method call"); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStepTests.java index faa63feeed1..ae0551020fb 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStepTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.indexlifecycle; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.segments.IndexSegments; import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; @@ -14,6 +15,8 @@ import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.Segment; @@ -41,6 +44,15 @@ public class SegmentCountStepTests extends AbstractStepTestCase conditionInfo = new SetOnce<>(); SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments); - step.evaluateCondition(index, new AsyncWaitStep.Listener() { + step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject info) { conditionMetResult.set(conditionMet); @@ -166,7 +178,7 @@ public class SegmentCountStepTests extends AbstractStepTestCase conditionInfo = new SetOnce<>(); SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments); - step.evaluateCondition(index, new AsyncWaitStep.Listener() { + step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject info) { conditionMetResult.set(conditionMet); @@ -206,7 +218,7 @@ public class SegmentCountStepTests extends AbstractStepTestCase exceptionThrown = new SetOnce<>(); SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments); - step.evaluateCondition(index, new AsyncWaitStep.Listener() { + step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject info) { throw new AssertionError("unexpected method call"); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java index a3ca355135e..bc5317b6057 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateActionStep; import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState; import org.elasticsearch.xpack.core.indexlifecycle.Step; +import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep; import java.io.IOException; import java.util.function.LongSupplier; @@ -28,15 +29,18 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask { private final Index index; private final Step startStep; private final PolicyStepsRegistry policyStepsRegistry; + private final IndexLifecycleRunner lifecycleRunner; private LongSupplier nowSupplier; + private Step.StepKey nextStepKey = null; public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry, - LongSupplier nowSupplier) { + IndexLifecycleRunner lifecycleRunner, LongSupplier nowSupplier) { this.policy = policy; this.index = index; this.startStep = startStep; this.policyStepsRegistry = policyStepsRegistry; this.nowSupplier = nowSupplier; + this.lifecycleRunner = lifecycleRunner; } String getPolicy() { @@ -63,7 +67,7 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask { * @throws IOException if any exceptions occur */ @Override - public ClusterState execute(ClusterState currentState) throws IOException { + public ClusterState execute(final ClusterState currentState) throws IOException { Step currentStep = startStep; IndexMetaData indexMetaData = currentState.metaData().index(index); if (indexMetaData == null) { @@ -74,22 +78,24 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask { Step registeredCurrentStep = IndexLifecycleRunner.getCurrentStep(policyStepsRegistry, policy, indexMetaData, LifecycleExecutionState.fromIndexMetadata(indexMetaData)); if (currentStep.equals(registeredCurrentStep)) { + ClusterState state = currentState; // We can do cluster state steps all together until we // either get to a step that isn't a cluster state step or a // cluster state wait step returns not completed while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { + nextStepKey = currentStep.getNextStepKey(); if (currentStep instanceof ClusterStateActionStep) { // cluster state action step so do the action and - // move - // the cluster state to the next step + // move the cluster state to the next step logger.trace("[{}] performing cluster state action ({}) [{}], next: [{}]", index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey()); - currentState = ((ClusterStateActionStep) currentStep).performAction(index, currentState); + state = ((ClusterStateActionStep) currentStep).performAction(index, state); if (currentStep.getNextStepKey() == null) { - return currentState; - } - currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getKey(), + return state; + } else { + state = IndexLifecycleRunner.moveClusterStateToNextStep(index, state, currentStep.getKey(), currentStep.getNextStepKey(), nowSupplier); + } } else { // cluster state wait step so evaluate the // condition, if the condition is met move to the @@ -99,29 +105,34 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask { // condition again logger.trace("[{}] waiting for cluster state step condition ({}) [{}], next: [{}]", index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey()); - ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, currentState); + ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state); if (result.isComplete()) { if (currentStep.getNextStepKey() == null) { - return currentState; - } - currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getKey(), + return state; + } else { + state = IndexLifecycleRunner.moveClusterStateToNextStep(index, state, currentStep.getKey(), currentStep.getNextStepKey(), nowSupplier); + } } else { - logger.debug("condition not met, returning existing state"); + logger.debug("[{}] condition not met ({}), returning existing state", index.getName(), currentStep.getKey()); ToXContentObject stepInfo = result.getInfomationContext(); if (stepInfo == null) { - return currentState; + return state; } else { - return IndexLifecycleRunner.addStepInfoToClusterState(index, currentState, stepInfo); + return IndexLifecycleRunner.addStepInfoToClusterState(index, state, stepInfo); } } } + // There are actions we need to take in the event a phase + // transition happens, so even if we would continue in the while + // loop, if we are about to go into a new phase, return so that + // other processing can occur if (currentStep.getKey().getPhase().equals(currentStep.getNextStepKey().getPhase()) == false) { - return currentState; + return state; } currentStep = policyStepsRegistry.getStep(indexMetaData, currentStep.getNextStepKey()); } - return currentState; + return state; } else { // either we are no longer the master or the step is now // not the same as when we submitted the update task. In @@ -130,6 +141,19 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask { } } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (oldState.equals(newState) == false) { + IndexMetaData indexMetaData = newState.metaData().index(index); + if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY && indexMetaData != null) { + // After the cluster state has been processed and we have moved + // to a new step, we need to conditionally execute the step iff + // it is an `AsyncAction` so that it is executed exactly once. + lifecycleRunner.maybeRunAsyncAction(newState, indexMetaData, policy, nextStepKey); + } + } + } + @Override public void onFailure(String source, Exception e) { throw new ElasticsearchException( diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java index df4471cd8de..bdc259535d9 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java @@ -82,100 +82,146 @@ public class IndexLifecycleRunner { return now >= lifecycleDate + after.getMillis(); } - public void runPolicy(String policy, IndexMetaData indexMetaData, ClusterState currentState, - boolean fromClusterStateChange) { + /** + * Run the current step, only if it is an asynchronous wait step. These + * wait criteria are checked periodically from the ILM scheduler + */ + public void runPeriodicStep(String policy, IndexMetaData indexMetaData) { Settings indexSettings = indexMetaData.getSettings(); + String index = indexMetaData.getIndex().getName(); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData); if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexSettings)) { - logger.info("skipping policy [" + policy + "] for index [" + indexMetaData.getIndex().getName() + "]." - + LifecycleSettings.LIFECYCLE_SKIP + "== true"); + logger.info("skipping policy [{}] for index [{}]: {} == true", policy, index, LifecycleSettings.LIFECYCLE_SKIP); return; } Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, lifecycleState); if (currentStep == null) { - // This may happen in the case that there is invalid ilm-step index settings or the stepRegistry is out of - // sync with the current cluster state - logger.warn("current step [" + getCurrentStepKey(lifecycleState) + "] for index [" + indexMetaData.getIndex().getName() - + "] with policy [" + policy + "] is not recognized"); + logger.warn("current step [{}] for index [{}] with policy [{}] is not recognized", + getCurrentStepKey(lifecycleState), index, policy); return; } - logger.debug("running policy with current-step [" + currentStep.getKey() + "]"); + if (currentStep instanceof TerminalPolicyStep) { - logger.debug("policy [" + policy + "] for index [" + indexMetaData.getIndex().getName() + "] complete, skipping execution"); + logger.debug("policy [{}] for index [{}] complete, skipping execution", policy, index); return; } else if (currentStep instanceof ErrorStep) { - logger.debug( - "policy [" + policy + "] for index [" + indexMetaData.getIndex().getName() + "] on an error step, skipping execution"); + logger.debug("policy [{}] for index [{}] on an error step, skipping execution", policy, index); return; - } else if (currentStep instanceof PhaseCompleteStep) { + } + + // Only phase changing and async wait steps should be run through periodic polling + if (currentStep instanceof PhaseCompleteStep) { // Only proceed to the next step if enough time has elapsed to go into the next phase if (isReadyToTransitionToThisPhase(policy, indexMetaData, currentStep.getNextStepKey().getPhase())) { moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); } - return; - } - - if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { - executeClusterStateSteps(indexMetaData.getIndex(), policy, currentStep); } else if (currentStep instanceof AsyncWaitStep) { - if (fromClusterStateChange == false) { - ((AsyncWaitStep) currentStep).evaluateCondition(indexMetaData.getIndex(), new AsyncWaitStep.Listener() { + logger.debug("running periodic policy with current-step [{}]", currentStep.getKey()); + ((AsyncWaitStep) currentStep).evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() { - @Override - public void onResponse(boolean conditionMet, ToXContentObject stepInfo) { - logger.debug("cs-change-async-wait-callback. current-step:" + currentStep.getKey()); - if (conditionMet) { - moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); - } else if (stepInfo != null) { - setStepInfo(indexMetaData.getIndex(), policy, currentStep.getKey(), stepInfo); - } + @Override + public void onResponse(boolean conditionMet, ToXContentObject stepInfo) { + logger.debug("cs-change-async-wait-callback, current-step: " + currentStep.getKey()); + if (conditionMet) { + moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); + } else if (stepInfo != null) { + setStepInfo(indexMetaData.getIndex(), policy, currentStep.getKey(), stepInfo); } + } - @Override - public void onFailure(Exception e) { - moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e); - } - - }); - } - } else if (currentStep instanceof AsyncActionStep) { - if (fromClusterStateChange == false) { - ((AsyncActionStep) currentStep).performAction(indexMetaData, currentState, new AsyncActionStep.Listener() { - - @Override - public void onResponse(boolean complete) { - logger.debug("cs-change-async-action-callback. current-step:" + currentStep.getKey()); - if (complete && ((AsyncActionStep) currentStep).indexSurvives()) { - moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); - } - } - - @Override - public void onFailure(Exception e) { - moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e); - } - }); - } + @Override + public void onFailure(Exception e) { + moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e); + } + }); } else { - throw new IllegalStateException( - "Step with key [" + currentStep.getKey() + "] is not a recognised type: [" + currentStep.getClass().getName() + "]"); + logger.trace("ignoring non periodic step execution from step transition [{}]", currentStep.getKey()); } } - private void runPolicy(IndexMetaData indexMetaData, ClusterState currentState) { - if (indexMetaData == null) { - // This index doesn't exist any more, there's nothing to execute + /** + * If the current step (matching the expected step key) is an asynchronous action step, run it + */ + public void maybeRunAsyncAction(ClusterState currentState, IndexMetaData indexMetaData, String policy, StepKey expectedStepKey) { + Settings indexSettings = indexMetaData.getSettings(); + String index = indexMetaData.getIndex().getName(); + LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData); + if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexSettings)) { + logger.info("skipping policy [{}] for index [{}]: {} == true", policy, index, LifecycleSettings.LIFECYCLE_SKIP); return; } - Settings indexSettings = indexMetaData.getSettings(); - String policy = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings); - runPolicy(policy, indexMetaData, currentState, false); + Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, lifecycleState); + if (currentStep == null) { + logger.warn("current step [{}] for index [{}] with policy [{}] is not recognized", + getCurrentStepKey(lifecycleState), index, policy); + return; + } + + if (currentStep.getKey().equals(expectedStepKey) == false) { + throw new IllegalStateException("expected index [" + indexMetaData.getIndex().getName() + "] with policy [" + policy + + "] to have current step consistent with provided step key (" + expectedStepKey + ") but it was " + currentStep.getKey()); + } + if (currentStep instanceof AsyncActionStep) { + logger.debug("running policy with async action step [{}]", currentStep.getKey()); + ((AsyncActionStep) currentStep).performAction(indexMetaData, currentState, new AsyncActionStep.Listener() { + + @Override + public void onResponse(boolean complete) { + logger.debug("cs-change-async-action-callback, current-step: [{}]", currentStep.getKey()); + if (complete && ((AsyncActionStep) currentStep).indexSurvives()) { + moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); + } + } + + @Override + public void onFailure(Exception e) { + moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e); + } + }); + } else { + logger.trace("ignoring non async action step execution from step transition [{}]", currentStep.getKey()); + } } - private void executeClusterStateSteps(Index index, String policy, Step step) { - assert step instanceof ClusterStateActionStep || step instanceof ClusterStateWaitStep; - clusterService.submitStateUpdateTask("ilm-execute-cluster-state-steps", - new ExecuteStepsUpdateTask(policy, index, step, stepRegistry, nowSupplier)); + /** + * Run the current step that either waits for index age, or updates/waits-on cluster state. + * Invoked after the cluster state has been changed + */ + public void runPolicyAfterStateChange(String policy, IndexMetaData indexMetaData) { + Settings indexSettings = indexMetaData.getSettings(); + String index = indexMetaData.getIndex().getName(); + LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData); + if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexSettings)) { + logger.info("skipping policy [{}] for index [{}]: {} == true", policy, index, LifecycleSettings.LIFECYCLE_SKIP); + return; + } + Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, lifecycleState); + if (currentStep == null) { + logger.warn("current step [{}] for index [{}] with policy [{}] is not recognized", + getCurrentStepKey(lifecycleState), index, policy); + return; + } + + if (currentStep instanceof TerminalPolicyStep) { + logger.debug("policy [{}] for index [{}] complete, skipping execution", policy, index); + return; + } else if (currentStep instanceof ErrorStep) { + logger.debug("policy [{}] for index [{}] on an error step, skipping execution", policy, index); + return; + } + + if (currentStep instanceof PhaseCompleteStep) { + // Only proceed to the next step if enough time has elapsed to go into the next phase + if (isReadyToTransitionToThisPhase(policy, indexMetaData, currentStep.getNextStepKey().getPhase())) { + moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); + } + } else if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { + logger.debug("[{}] running policy with current-step [{}]", indexMetaData.getIndex().getName(), currentStep.getKey()); + clusterService.submitStateUpdateTask("ilm-execute-cluster-state-steps", + new ExecuteStepsUpdateTask(policy, indexMetaData.getIndex(), currentStep, stepRegistry, this, nowSupplier)); + } else { + logger.trace("ignoring step execution from cluster state change event [{}]", currentStep.getKey()); + } } /** @@ -384,8 +430,14 @@ public class IndexLifecycleRunner { private void moveToStep(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey) { logger.debug("moveToStep[" + policy + "] [" + index.getName() + "]" + currentStepKey + " -> " + nextStepKey); - clusterService.submitStateUpdateTask("ilm-move-to-step", new MoveToNextStepUpdateTask(index, policy, currentStepKey, - nextStepKey, nowSupplier)); + clusterService.submitStateUpdateTask("ilm-move-to-step", + new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, nowSupplier, clusterState -> + { + IndexMetaData indexMetaData = clusterState.metaData().index(index); + if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY && indexMetaData != null) { + maybeRunAsyncAction(clusterState, indexMetaData, policy, nextStepKey); + } + })); } private void moveToErrorStep(Index index, String policy, StepKey currentStepKey, Exception e) { diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index a96020b9958..026d1c7aeef 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -89,6 +89,39 @@ public class IndexLifecycleService extends AbstractComponent public void onMaster() { this.isMaster = true; maybeScheduleJob(); + + ClusterState clusterState = clusterService.state(); + IndexLifecycleMetadata currentMetadata = clusterState.metaData().custom(IndexLifecycleMetadata.TYPE); + if (currentMetadata != null) { + OperationMode currentMode = currentMetadata.getOperationMode(); + if (OperationMode.STOPPED.equals(currentMode)) { + return; + } + + boolean safeToStop = true; // true until proven false by a run policy + + // If we just became master, we need to kick off any async actions that + // may have not been run due to master rollover + for (ObjectCursor cursor : clusterState.metaData().indices().values()) { + IndexMetaData idxMeta = cursor.value; + String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()); + if (Strings.isNullOrEmpty(policyName) == false) { + StepKey stepKey = IndexLifecycleRunner.getCurrentStepKey(LifecycleExecutionState.fromIndexMetadata(idxMeta)); + if (OperationMode.STOPPING == currentMode && + stepKey != null && + IGNORE_ACTIONS_MAINTENANCE_REQUESTED.contains(stepKey.getAction()) == false) { + logger.info("skipping policy [{}] for index [{}]. stopping Index Lifecycle execution", + policyName, idxMeta.getIndex().getName()); + continue; + } + lifecycleRunner.maybeRunAsyncAction(clusterState, idxMeta, policyName, stepKey); + safeToStop = false; // proven false! + } + } + if (safeToStop && OperationMode.STOPPING == currentMode) { + submitOperationModeUpdate(OperationMode.STOPPED); + } + } } @Override @@ -199,7 +232,11 @@ public class IndexLifecycleService extends AbstractComponent + "]. stopping Index Lifecycle execution"); continue; } - lifecycleRunner.runPolicy(policyName, idxMeta, clusterState, fromClusterStateChange); + if (fromClusterStateChange) { + lifecycleRunner.runPolicyAfterStateChange(policyName, idxMeta); + } else { + lifecycleRunner.runPeriodicStep(policyName, idxMeta); + } safeToStop = false; // proven false! } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTask.java index f7aabce7778..750fd1af5da 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTask.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.indexlifecycle; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -15,22 +17,27 @@ import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; import org.elasticsearch.xpack.core.indexlifecycle.Step; +import java.util.function.Consumer; import java.util.function.LongSupplier; public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask { + private static final Logger logger = LogManager.getLogger(MoveToNextStepUpdateTask.class); + private final Index index; private final String policy; private final Step.StepKey currentStepKey; private final Step.StepKey nextStepKey; private final LongSupplier nowSupplier; + private final Consumer stateChangeConsumer; public MoveToNextStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey nextStepKey, - LongSupplier nowSupplier) { + LongSupplier nowSupplier, Consumer stateChangeConsumer) { this.index = index; this.policy = policy; this.currentStepKey = currentStepKey; this.nextStepKey = nextStepKey; this.nowSupplier = nowSupplier; + this.stateChangeConsumer = stateChangeConsumer; } Index getIndex() { @@ -60,6 +67,7 @@ public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask { LifecycleExecutionState indexILMData = LifecycleExecutionState.fromIndexMetadata(currentState.getMetaData().index(index)); if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings)) && currentStepKey.equals(IndexLifecycleRunner.getCurrentStepKey(indexILMData))) { + logger.trace("moving [{}] to next step ({})", index.getName(), nextStepKey); return IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStepKey, nextStepKey, nowSupplier); } else { // either the policy has changed or the step is now @@ -69,6 +77,13 @@ public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask { } } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (oldState.equals(newState) == false) { + stateChangeConsumer.accept(newState); + } + } + @Override public void onFailure(String source, Exception e) { throw new ElasticsearchException("policy [" + policy + "] for index [" + index.getName() + "] failed trying to move from step [" diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java index 89b8c9b4319..4baf7022e8d 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java @@ -140,7 +140,11 @@ public class PolicyStepsRegistry { private List parseStepsFromPhase(String policy, String currentPhase, String phaseDef) throws IOException { final PhaseExecutionInfo phaseExecutionInfo; - LifecyclePolicy currentPolicy = lifecyclePolicyMap.get(policy).getPolicy(); + LifecyclePolicyMetadata policyMetadata = lifecyclePolicyMap.get(policy); + if (policyMetadata == null) { + throw new IllegalStateException("unable to parse steps for policy [" + policy + "] as it doesn't exist"); + } + LifecyclePolicy currentPolicy = policyMetadata.getPolicy(); final LifecyclePolicy policyToExecute; if (InitializePolicyContextStep.INITIALIZATION_PHASE.equals(phaseDef) || TerminalPolicyStep.COMPLETED_PHASE.equals(phaseDef)) { @@ -200,7 +204,7 @@ public class PolicyStepsRegistry { throw new ElasticsearchException("failed to load cached steps for " + stepKey, e); } catch (XContentParseException parseErr) { throw new XContentParseException(parseErr.getLocation(), - "failed to load cached steps for " + stepKey + " from [" + phaseJson + "]", parseErr); + "failed to load steps for " + stepKey + " from [" + phaseJson + "]", parseErr); } assert phaseSteps.stream().allMatch(step -> step.getKey().getPhase().equals(phase)) : diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTaskTests.java index a10afb631eb..42900236a52 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTaskTests.java @@ -144,7 +144,7 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase { setStateToKey(thirdStepKey); Step startStep = policyStepsRegistry.getStep(indexMetaData, thirdStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); assertThat(task.execute(clusterState), sameInstance(clusterState)); } @@ -152,7 +152,7 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase { setStateToKey(secondStepKey); Step startStep = policyStepsRegistry.getStep(indexMetaData, secondStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); ClusterState newState = task.execute(clusterState); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index)); StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState); @@ -180,7 +180,8 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase { Step invalidStep = new MockClusterStateActionStep(firstStepKey, secondStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(invalidPolicyName, index, invalidStep, policyStepsRegistry, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(invalidPolicyName, index, + invalidStep, policyStepsRegistry, null, () -> now); ClusterState newState = task.execute(clusterState); assertSame(newState, clusterState); } @@ -190,7 +191,7 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase { setStateToKey(secondStepKey); Step startStep = policyStepsRegistry.getStep(indexMetaData, secondStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); ClusterState newState = task.execute(clusterState); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index)); StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState); @@ -209,7 +210,7 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase { setStateToKey(secondStepKey); Step startStep = policyStepsRegistry.getStep(indexMetaData, secondStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); ClusterState newState = task.execute(clusterState); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index)); StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState); @@ -225,7 +226,7 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase { setStateToKey(secondStepKey); Step startStep = policyStepsRegistry.getStep(indexMetaData, secondStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); Exception expectedException = new RuntimeException(); ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> task.onFailure(randomAlphaOfLength(10), expectedException)); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java index 604ba29145c..f31f2dafa76 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java @@ -12,6 +12,8 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; @@ -24,7 +26,10 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.Index; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexlifecycle.AbstractStepTestCase; import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep; import org.elasticsearch.xpack.core.indexlifecycle.AsyncWaitStep; @@ -59,7 +64,10 @@ import java.util.Map; import java.util.Objects; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; @@ -69,14 +77,46 @@ import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; public class IndexLifecycleRunnerTests extends ESTestCase { - private static final NamedXContentRegistry REGISTRY = new NamedXContentRegistry(new IndexLifecycle(Settings.EMPTY).getNamedXContent()); + private static final NamedXContentRegistry REGISTRY; - private PolicyStepsRegistry createOneStepPolicyStepRegistry(String policyName, Step step) { + static { + List entries = new ArrayList<>(new IndexLifecycle(Settings.EMPTY).getNamedXContent()); + REGISTRY = new NamedXContentRegistry(entries); + } + + /** A real policy steps registry where getStep can be overridden so that JSON doesn't have to be parsed */ + private class MockPolicyStepsRegistry extends PolicyStepsRegistry { + private BiFunction fn = null; + + MockPolicyStepsRegistry(SortedMap lifecyclePolicyMap, Map firstStepMap, + Map> stepMap, NamedXContentRegistry xContentRegistry, Client client) { + super(lifecyclePolicyMap, firstStepMap, stepMap, xContentRegistry, client); + } + + public void setResolver(BiFunction fn) { + this.fn = fn; + } + + @Override + public Step getStep(IndexMetaData indexMetaData, StepKey stepKey) { + if (fn == null) { + logger.info("--> retrieving step {}", stepKey); + return super.getStep(indexMetaData, stepKey); + } else { + logger.info("--> returning mock step"); + return fn.apply(indexMetaData, stepKey); + } + } + } + + private MockPolicyStepsRegistry createOneStepPolicyStepRegistry(String policyName, Step step) { return createOneStepPolicyStepRegistry(policyName, step, "test"); } - private PolicyStepsRegistry createOneStepPolicyStepRegistry(String policyName, Step step, String indexName) { - SortedMap lifecyclePolicyMap = null; // Not used in this test + private MockPolicyStepsRegistry createOneStepPolicyStepRegistry(String policyName, Step step, String indexName) { + LifecyclePolicy policy = new LifecyclePolicy(policyName, new HashMap<>()); + SortedMap lifecyclePolicyMap = new TreeMap<>(); + lifecyclePolicyMap.put(policyName, new LifecyclePolicyMetadata(policy, new HashMap<>(), 1, 1)); Map firstStepMap = new HashMap<>(); firstStepMap.put(policyName, step); Map> stepMap = new HashMap<>(); @@ -88,7 +128,9 @@ public class IndexLifecycleRunnerTests extends ESTestCase { steps.add(step); Index index = new Index(indexName, indexName + "uuid"); indexSteps.put(index, steps); - return new PolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, NamedXContentRegistry.EMPTY, null); + Client client = mock(Client.class); + Mockito.when(client.settings()).thenReturn(Settings.EMPTY); + return new MockPolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, REGISTRY, client); } public void testRunPolicyTerminalPolicyStep() { @@ -100,7 +142,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, false); + runner.runPolicyAfterStateChange(policyName, indexMetaData); Mockito.verifyZeroInteractions(clusterService); } @@ -120,11 +162,284 @@ public class IndexLifecycleRunnerTests extends ESTestCase { .putCustom(ILM_CUSTOM_METADATA_KEY, newState.build().asMap()) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, false); + runner.runPolicyAfterStateChange(policyName, indexMetaData); Mockito.verifyZeroInteractions(clusterService); } + public void testRunStateChangePolicyWithNoNextStep() throws Exception { + String policyName = "foo"; + StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); + MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, null); + PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); + ThreadPool threadPool = new TestThreadPool("name"); + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .build(); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + DiscoveryNode node = clusterService.localNode(); + IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING); + ClusterState state = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder() + .put(indexMetaData, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm)) + .nodes(DiscoveryNodes.builder() + .add(node) + .masterNodeId(node.getId()) + .localNodeId(node.getId())) + .build(); + ClusterServiceUtils.setState(clusterService, state); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); + + ClusterState before = clusterService.state(); + CountDownLatch latch = new CountDownLatch(1); + step.setLatch(latch); + runner.runPolicyAfterStateChange(policyName, indexMetaData); + + latch.await(5, TimeUnit.SECONDS); + ClusterState after = clusterService.state(); + + assertEquals(before, after); + assertThat(step.getExecuteCount(), equalTo(1L)); + clusterService.close(); + threadPool.shutdownNow(); + } + + public void testRunStateChangePolicyWithNextStep() throws Exception { + String policyName = "foo"; + StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); + StepKey nextStepKey = new StepKey("phase", "action", "next_cluster_state_action_step"); + MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, nextStepKey); + MockClusterStateActionStep nextStep = new MockClusterStateActionStep(nextStepKey, null); + MockPolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); + stepRegistry.setResolver((i, k) -> { + if (stepKey.equals(k)) { + return step; + } else if (nextStepKey.equals(k)) { + return nextStep; + } else { + fail("should not try to retrieve different step"); + return null; + } + }); + ThreadPool threadPool = new TestThreadPool("name"); + LifecycleExecutionState les = LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("cluster_state_action_step") + .build(); + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, les.asMap()) + .build(); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + DiscoveryNode node = clusterService.localNode(); + IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING); + ClusterState state = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder() + .put(indexMetaData, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm)) + .nodes(DiscoveryNodes.builder() + .add(node) + .masterNodeId(node.getId()) + .localNodeId(node.getId())) + .build(); + ClusterServiceUtils.setState(clusterService, state); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); + + ClusterState before = clusterService.state(); + CountDownLatch latch = new CountDownLatch(1); + step.setLatch(latch); + runner.runPolicyAfterStateChange(policyName, indexMetaData); + + latch.await(5, TimeUnit.SECONDS); + ClusterState after = clusterService.state(); + + assertEquals(before, after); + assertThat(step.getExecuteCount(), equalTo(1L)); + clusterService.close(); + threadPool.shutdownNow(); + } + + public void testRunAsyncActionDoesNotRun() { + String policyName = "foo"; + StepKey stepKey = new StepKey("phase", "action", "async_action_step"); + MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null); + PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); + ThreadPool threadPool = new TestThreadPool("name"); + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .build(); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + DiscoveryNode node = clusterService.localNode(); + IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING); + ClusterState state = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder() + .put(indexMetaData, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm)) + .nodes(DiscoveryNodes.builder() + .add(node) + .masterNodeId(node.getId()) + .localNodeId(node.getId())) + .build(); + ClusterServiceUtils.setState(clusterService, state); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); + + ClusterState before = clusterService.state(); + // State changes should not run AsyncAction steps + runner.runPolicyAfterStateChange(policyName, indexMetaData); + + ClusterState after = clusterService.state(); + + assertEquals(before, after); + assertThat(step.getExecuteCount(), equalTo(0L)); + clusterService.close(); + threadPool.shutdownNow(); + } + + public void testRunStateChangePolicyWithAsyncActionNextStep() throws Exception { + String policyName = "foo"; + StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); + StepKey nextStepKey = new StepKey("phase", "action", "async_action_step"); + MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, nextStepKey); + MockAsyncActionStep nextStep = new MockAsyncActionStep(nextStepKey, null); + MockPolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); + stepRegistry.setResolver((i, k) -> { + if (stepKey.equals(k)) { + return step; + } else if (nextStepKey.equals(k)) { + return nextStep; + } else { + fail("should not try to retrieve different step"); + return null; + } + }); + ThreadPool threadPool = new TestThreadPool("name"); + LifecycleExecutionState les = LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("cluster_state_action_step") + .build(); + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, les.asMap()) + .build(); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + DiscoveryNode node = clusterService.localNode(); + IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING); + ClusterState state = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder() + .put(indexMetaData, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm)) + .nodes(DiscoveryNodes.builder() + .add(node) + .masterNodeId(node.getId()) + .localNodeId(node.getId())) + .build(); + logger.info("--> state: {}", state); + ClusterServiceUtils.setState(clusterService, state); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); + + ClusterState before = clusterService.state(); + CountDownLatch latch = new CountDownLatch(1); + step.setLatch(latch); + runner.runPolicyAfterStateChange(policyName, indexMetaData); + + // Wait for the cluster state action step + latch.await(5, TimeUnit.SECONDS); + + CountDownLatch asyncLatch = new CountDownLatch(1); + nextStep.setLatch(asyncLatch); + + // Wait for the async action step + asyncLatch.await(5, TimeUnit.SECONDS); + ClusterState after = clusterService.state(); + + assertNotEquals(before, after); + assertThat(step.getExecuteCount(), equalTo(1L)); + assertThat(nextStep.getExecuteCount(), equalTo(1L)); + clusterService.close(); + threadPool.shutdownNow(); + } + + public void testRunPeriodicStep() throws Exception { + String policyName = "foo"; + StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); + StepKey nextStepKey = new StepKey("phase", "action", "async_action_step"); + MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, nextStepKey); + MockAsyncWaitStep nextStep = new MockAsyncWaitStep(nextStepKey, null); + MockPolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); + stepRegistry.setResolver((i, k) -> { + if (stepKey.equals(k)) { + return step; + } else if (nextStepKey.equals(k)) { + return nextStep; + } else { + fail("should not try to retrieve different step"); + return null; + } + }); + ThreadPool threadPool = new TestThreadPool("name"); + LifecycleExecutionState les = LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("cluster_state_action_step") + .build(); + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, les.asMap()) + .build(); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + DiscoveryNode node = clusterService.localNode(); + IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING); + ClusterState state = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder() + .put(indexMetaData, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm)) + .nodes(DiscoveryNodes.builder() + .add(node) + .masterNodeId(node.getId()) + .localNodeId(node.getId())) + .build(); + logger.info("--> state: {}", state); + ClusterServiceUtils.setState(clusterService, state); + IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); + + ClusterState before = clusterService.state(); + CountDownLatch latch = new CountDownLatch(1); + step.setLatch(latch); + runner.runPeriodicStep(policyName, indexMetaData); + latch.await(5, TimeUnit.SECONDS); + + ClusterState after = clusterService.state(); + + assertEquals(before, after); + assertThat(step.getExecuteCount(), equalTo(1L)); + assertThat(nextStep.getExecuteCount(), equalTo(0L)); + clusterService.close(); + threadPool.shutdownNow(); + } + public void testRunPolicyClusterStateActionStep() { String policyName = "cluster_state_action_policy"; StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); @@ -135,7 +450,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, randomBoolean()); + runner.runPolicyAfterStateChange(policyName, indexMetaData); Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-execute-cluster-state-steps"), Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetaData.getIndex(), policyName, step))); @@ -153,87 +468,13 @@ public class IndexLifecycleRunnerTests extends ESTestCase { IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, randomBoolean()); + runner.runPolicyAfterStateChange(policyName, indexMetaData); Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-execute-cluster-state-steps"), Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetaData.getIndex(), policyName, step))); Mockito.verifyNoMoreInteractions(clusterService); } - public void testRunPolicyAsyncActionStepCompletes() { - String policyName = "async_action_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_action_step"); - MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null); - step.setWillComplete(true); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - - runner.runPolicy(policyName, indexMetaData, null, false); - - assertEquals(1, step.getExecuteCount()); - Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-move-to-step"), - Mockito.argThat(new MoveToNextStepUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey, null))); - Mockito.verifyNoMoreInteractions(clusterService); - } - - public void testRunPolicyAsyncActionStepCompletesIndexDestroyed() { - String policyName = "async_action_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_action_step"); - MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null); - step.setWillComplete(true); - step.setIndexSurvives(false); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - - runner.runPolicy(policyName, indexMetaData, null, false); - - assertEquals(1, step.getExecuteCount()); - Mockito.verifyZeroInteractions(clusterService); - } - - public void testRunPolicyAsyncActionStepNotComplete() { - String policyName = "async_action_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_action_step"); - MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null); - step.setWillComplete(false); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - - runner.runPolicy(policyName, indexMetaData, null, false); - - assertEquals(1, step.getExecuteCount()); - Mockito.verifyZeroInteractions(clusterService); - } - - public void testRunPolicyAsyncActionStepFails() { - String policyName = "async_action_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_action_step"); - MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null); - Exception expectedException = new RuntimeException(); - step.setException(expectedException); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - - runner.runPolicy(policyName, indexMetaData, null, false); - - assertEquals(1, step.getExecuteCount()); - Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-move-to-error-step"), - Mockito.argThat(new MoveToErrorStepUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey, expectedException))); - Mockito.verifyNoMoreInteractions(clusterService); - } - public void testRunPolicyAsyncActionStepClusterStateChangeIgnored() { String policyName = "async_action_policy"; StepKey stepKey = new StepKey("phase", "action", "async_action_step"); @@ -246,91 +487,12 @@ public class IndexLifecycleRunnerTests extends ESTestCase { IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, true); + runner.runPolicyAfterStateChange(policyName, indexMetaData); assertEquals(0, step.getExecuteCount()); Mockito.verifyZeroInteractions(clusterService); } - public void testRunPolicyAsyncWaitStepCompletes() { - String policyName = "async_wait_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_wait_step"); - MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null); - step.setWillComplete(true); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - - runner.runPolicy(policyName, indexMetaData, null, false); - - assertEquals(1, step.getExecuteCount()); - Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-move-to-step"), - Mockito.argThat(new MoveToNextStepUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey, null))); - Mockito.verifyNoMoreInteractions(clusterService); - } - - public void testRunPolicyAsyncWaitStepNotComplete() { - String policyName = "async_wait_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_wait_step"); - MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null); - RandomStepInfo stepInfo = new RandomStepInfo(() -> randomAlphaOfLength(10)); - step.expectedInfo(stepInfo); - step.setWillComplete(false); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - - runner.runPolicy(policyName, indexMetaData, null, false); - - assertEquals(1, step.getExecuteCount()); - Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-set-step-info"), - Mockito.argThat(new SetStepInfoUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey, stepInfo))); - Mockito.verifyNoMoreInteractions(clusterService); - } - - public void testRunPolicyAsyncWaitStepNotCompleteNoStepInfo() { - String policyName = "async_wait_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_wait_step"); - MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null); - RandomStepInfo stepInfo = null; - step.expectedInfo(stepInfo); - step.setWillComplete(false); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - - runner.runPolicy(policyName, indexMetaData, null, false); - - assertEquals(1, step.getExecuteCount()); - Mockito.verifyZeroInteractions(clusterService); - } - - public void testRunPolicyAsyncWaitStepFails() { - String policyName = "async_wait_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_wait_step"); - MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null); - Exception expectedException = new RuntimeException(); - step.setException(expectedException); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - - runner.runPolicy(policyName, indexMetaData, null, false); - - assertEquals(1, step.getExecuteCount()); - Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-move-to-error-step"), - Mockito.argThat(new MoveToErrorStepUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey, expectedException))); - Mockito.verifyNoMoreInteractions(clusterService); - } - public void testRunPolicyAsyncWaitStepClusterStateChangeIgnored() { String policyName = "async_wait_policy"; StepKey stepKey = new StepKey("phase", "action", "async_wait_step"); @@ -343,7 +505,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, true); + runner.runPolicyAfterStateChange(policyName, indexMetaData); assertEquals(0, step.getExecuteCount()); Mockito.verifyZeroInteractions(clusterService); @@ -357,24 +519,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); // verify that no exception is thrown - runner.runPolicy(policyName, indexMetaData, null, randomBoolean()); - Mockito.verifyZeroInteractions(clusterService); - } - - public void testRunPolicyUnknownStepType() { - String policyName = "cluster_state_action_policy"; - StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); - MockStep step = new MockStep(stepKey, null); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - - IllegalStateException exception = expectThrows(IllegalStateException.class, - () -> runner.runPolicy(policyName, indexMetaData, null, randomBoolean())); - assertEquals("Step with key [" + stepKey + "] is not a recognised type: [" + step.getClass().getName() + "]", - exception.getMessage()); + runner.runPolicyAfterStateChange(policyName, indexMetaData); Mockito.verifyZeroInteractions(clusterService); } @@ -606,14 +751,14 @@ public class IndexLifecycleRunnerTests extends ESTestCase { public void testValidatedMoveClusterStateToNextStepWithoutPolicy() { String indexName = "my_index"; - String policyName = randomBoolean() ? null : ""; + String policyName = "policy"; StepKey currentStepKey = new StepKey("current_phase", "current_action", "current_step"); StepKey nextStepKey = new StepKey("next_phase", "next_action", "next_step"); long now = randomNonNegativeLong(); Step step = new MockStep(nextStepKey, nextStepKey); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - Builder indexSettingsBuilder = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policyName); + Builder indexSettingsBuilder = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, randomBoolean() ? "" : null); LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder(); lifecycleState.setPhase(currentStepKey.getPhase()); lifecycleState.setAction(currentStepKey.getAction()); @@ -731,7 +876,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { () -> runner.moveClusterStateToFailedStep(clusterState, new String[] { invalidIndexName })); assertThat(exception.getMessage(), equalTo("index [" + invalidIndexName + "] does not exist")); } -// + public void testMoveClusterStateToFailedStepInvalidPolicySetting() { String indexName = "my_index"; String[] indices = new String[] { indexName }; @@ -811,7 +956,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policy, step); ClusterService clusterService = mock(ClusterService.class); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - runner.runPolicy(policy, clusterState.metaData().index(index), clusterState, randomBoolean()); + runner.runPolicyAfterStateChange(policy, clusterState.metaData().index(index)); Mockito.verifyZeroInteractions(clusterService); } @@ -1211,6 +1356,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { private boolean willComplete; private boolean indexSurvives = true; private long executeCount = 0; + private CountDownLatch latch; MockAsyncActionStep(StepKey key, StepKey nextStepKey) { super(key, nextStepKey, null); @@ -1237,9 +1383,16 @@ public class IndexLifecycleRunnerTests extends ESTestCase { return executeCount; } + public void setLatch(CountDownLatch latch) { + this.latch = latch; + } + @Override public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) { executeCount++; + if (latch != null) { + latch.countDown(); + } if (exception == null) { listener.onResponse(willComplete); } else { @@ -1255,6 +1408,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { private boolean willComplete; private long executeCount = 0; private ToXContentObject expectedInfo = null; + private CountDownLatch latch; MockAsyncWaitStep(StepKey key, StepKey nextStepKey) { super(key, nextStepKey, null); @@ -1276,9 +1430,16 @@ public class IndexLifecycleRunnerTests extends ESTestCase { return executeCount; } + public void setLatch(CountDownLatch latch) { + this.latch = latch; + } + @Override - public void evaluateCondition(Index index, Listener listener) { + public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) { executeCount++; + if (latch != null) { + latch.countDown(); + } if (exception == null) { listener.onResponse(willComplete, expectedInfo); } else { @@ -1292,6 +1453,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase { private RuntimeException exception; private long executeCount = 0; + private CountDownLatch latch; MockClusterStateActionStep(StepKey key, StepKey nextStepKey) { super(key, nextStepKey); @@ -1301,6 +1463,10 @@ public class IndexLifecycleRunnerTests extends ESTestCase { this.exception = exception; } + public void setLatch(CountDownLatch latch) { + this.latch = latch; + } + public long getExecuteCount() { return executeCount; } @@ -1308,6 +1474,9 @@ public class IndexLifecycleRunnerTests extends ESTestCase { @Override public ClusterState performAction(Index index, ClusterState clusterState) { executeCount++; + if (latch != null) { + latch.countDown(); + } if (exception != null) { throw exception; } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java index a1ea3c2cd7f..13fe9c1c690 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java @@ -177,7 +177,7 @@ public class IndexLifecycleServiceTests extends ESTestCase { return null; }).when(clusterService).submitStateUpdateTask(anyString(), any(ExecuteStepsUpdateTask.class)); indexLifecycleService.applyClusterState(event); - indexLifecycleService.triggerPolicies(currentState, randomBoolean()); + indexLifecycleService.triggerPolicies(currentState, true); assertTrue(executedShrink.get()); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTaskTests.java index 16c4e332177..f166bba25c9 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTaskTests.java @@ -15,8 +15,8 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState; import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyMetadata; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyTests; @@ -28,6 +28,7 @@ import org.junit.Before; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY; import static org.hamcrest.Matchers.equalTo; @@ -68,7 +69,9 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase { setStateToKey(currentStepKey, now); - MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now); + AtomicBoolean changed = new AtomicBoolean(false); + MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, + () -> now, state -> changed.set(true)); ClusterState newState = task.execute(clusterState); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index)); StepKey actualKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState); @@ -77,6 +80,7 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase { assertThat(lifecycleState.getActionTime(), equalTo(now)); assertThat(lifecycleState.getStepTime(), equalTo(now)); task.clusterStateProcessed("source", clusterState, newState); + assertTrue(changed.get()); } public void testExecuteDifferentCurrentStep() { @@ -84,7 +88,7 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase { StepKey notCurrentStepKey = new StepKey("not-current", "not-current", "not-current"); long now = randomNonNegativeLong(); setStateToKey(notCurrentStepKey, now); - MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now); + MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now, null); ClusterState newState = task.execute(clusterState); assertSame(newState, clusterState); } @@ -94,7 +98,7 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase { long now = randomNonNegativeLong(); setStateToKey(currentStepKey, now); setStatePolicy("not-" + policy); - MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now); + MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now, null); ClusterState newState = task.execute(clusterState); assertSame(newState, clusterState); } @@ -108,7 +112,8 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase { setStateToKey(currentStepKey, now); SetOnce changed = new SetOnce<>(); - MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, invalidNextStep, () -> now); + MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, + invalidNextStep, () -> now, s -> changed.set(true)); ClusterState newState = task.execute(clusterState); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index)); StepKey actualKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState); @@ -117,6 +122,7 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase { assertThat(lifecycleState.getActionTime(), equalTo(now)); assertThat(lifecycleState.getStepTime(), equalTo(now)); task.clusterStateProcessed("source", clusterState, newState); + assertTrue(changed.get()); } public void testOnFailure() { @@ -126,7 +132,7 @@ public class MoveToNextStepUpdateTaskTests extends ESTestCase { setStateToKey(currentStepKey, now); - MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now); + MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now, state -> {}); Exception expectedException = new RuntimeException(); ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> task.onFailure(randomAlphaOfLength(10), expectedException));