From d2e87a66e5c49934b91da58ad4e20c4056e4ac02 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Thu, 29 Mar 2018 16:23:57 -0700 Subject: [PATCH] moar changes - set `indexSurvives` for Step - extract the two ClusterStateUpdateTasks to separate classes - simple Delete Policy works! --- .../core/indexlifecycle/DeleteAction.java | 5 - .../indexlifecycle/DeleteAsyncActionStep.java | 6 ++ .../InitializePolicyContextStep.java | 36 +++++++ .../core/indexlifecycle/LifecycleAction.java | 4 - .../core/indexlifecycle/LifecyclePolicy.java | 14 ++- .../core/indexlifecycle/PhaseAfterStep.java | 14 ++- .../xpack/core/indexlifecycle/Step.java | 4 +- .../ExecuteStepsUpdateTask.java | 83 ++++++++++++++ .../indexlifecycle/IndexLifecycleRunner.java | 101 +++--------------- .../MoveToNextStepUpdateTask.java | 74 +++++++++++++ .../indexlifecycle/PolicyStepsRegistry.java | 3 +- 11 files changed, 242 insertions(+), 102 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/InitializePolicyContextStep.java create mode 100644 x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java create mode 100644 x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTask.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteAction.java index ff4e84ebc2b..145ee76957a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteAction.java @@ -62,11 +62,6 @@ public class DeleteAction implements LifecycleAction { return builder; } - @Override - public boolean indexSurvives() { - return false; - } - @Override public List toSteps(Client client, String phase, Step.StepKey nextStepKey) { Step.StepKey deleteStepKey = new Step.StepKey(phase, NAME, "delete-step"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteAsyncActionStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteAsyncActionStep.java index fb44b4836ea..df89af4ee3d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteAsyncActionStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteAsyncActionStep.java @@ -15,8 +15,14 @@ public class DeleteAsyncActionStep extends AsyncActionStep { super(key, nextStepKey, client); } + @Override public void performAction(Index index, Listener listener) { getClient().admin().indices().prepareDelete(index.getName()) .execute(ActionListener.wrap(response -> listener.onResponse(true) , listener::onFailure)); } + + @Override + public boolean indexSurvives() { + return false; + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/InitializePolicyContextStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/InitializePolicyContextStep.java new file mode 100644 index 00000000000..8fc30bce5e1 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/InitializePolicyContextStep.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; + +public class InitializePolicyContextStep extends ClusterStateActionStep { + + InitializePolicyContextStep(Step.StepKey key, StepKey nextStepKey) { + super(key, nextStepKey); + } + + @Override + public ClusterState performAction(Index index, ClusterState clusterState) { + Settings settings = clusterState.metaData().index(index).getSettings(); + if (settings.hasValue(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE)) { + return clusterState; + } + + ClusterState.Builder newClusterStateBuilder = ClusterState.builder(clusterState); + IndexMetaData idxMeta = clusterState.getMetaData().index(index); + Settings.Builder indexSettings = Settings.builder().put(idxMeta.getSettings()) + .put(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE, idxMeta.getCreationDate()); + newClusterStateBuilder.metaData(MetaData.builder(clusterState.getMetaData()).put(IndexMetaData + .builder(clusterState.getMetaData().index(index)) + .settings(indexSettings))); + return newClusterStateBuilder.build(); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleAction.java index 85428978425..007041511c2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleAction.java @@ -28,8 +28,4 @@ public interface LifecycleAction extends ToXContentObject, NamedWriteable { * @return an ordered list of steps that represent the execution plan of the action */ List toSteps(Client client, String phase, @Nullable Step.StepKey nextStepKey); - - default boolean indexSurvives() { - return true; - } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java index 60a86b8e2be..56f6ab55d91 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java @@ -149,10 +149,6 @@ public class LifecyclePolicy extends AbstractDiffable public List toSteps(Client client, LongSupplier nowSupplier) { List steps = new ArrayList<>(); List orderedPhases = type.getOrderedPhases(phases); - logger.error("checking phases[" + orderedPhases.size() + "]"); - for (Phase t : orderedPhases) { - logger.error(t); - } ListIterator phaseIterator = orderedPhases.listIterator(orderedPhases.size()); Step.StepKey lastStepKey = null; // add steps for each phase, in reverse @@ -172,10 +168,18 @@ public class LifecyclePolicy extends AbstractDiffable } } Step.StepKey afterStepKey = new Step.StepKey(phase.getName(), null, "after"); - steps.add(new PhaseAfterStep(nowSupplier, phase.getAfter(), afterStepKey, lastStepKey)); + Step phaseAfterStep = new PhaseAfterStep(nowSupplier, phase.getAfter(), afterStepKey, lastStepKey); + steps.add(phaseAfterStep); + lastStepKey = phaseAfterStep.getKey(); } + steps.add(new InitializePolicyContextStep(new Step.StepKey("", "", ""), lastStepKey)); + Collections.reverse(steps); + logger.error("STEP COUNT: " + steps.size()); + for (Step step : steps) { + logger.error(step.getKey() + " -> " + step.getNextStepKey()); + } return steps; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStep.java index c732e401677..b309a3a217a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStep.java @@ -5,13 +5,17 @@ */ package org.elasticsearch.xpack.core.indexlifecycle; +import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import java.util.function.LongSupplier; public class PhaseAfterStep extends ClusterStateWaitStep { + private static final Logger logger = ESLoggerFactory.getLogger(PhaseAfterStep.class); private final TimeValue after; private final LongSupplier nowSupplier; @@ -23,8 +27,16 @@ public class PhaseAfterStep extends ClusterStateWaitStep { @Override public boolean isConditionMet(Index index, ClusterState clusterState) { - long lifecycleDate = clusterState.metaData().settings() + IndexMetaData indexMetaData = clusterState.metaData().index(index); + logger.warn("checking phase[" + indexMetaData.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE) + "]" + + " after[" + after + "]"); + long lifecycleDate = indexMetaData.getSettings() .getAsLong(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE, -1L); + if (lifecycleDate < 0) { + // TODO(talevy): make sure this setting is set before we find ourselves here + logger.warn("index-lifecycle-setting for index" + index.getName() + "] not set"); + lifecycleDate = indexMetaData.getCreationDate(); + } return nowSupplier.getAsLong() >= lifecycleDate + after.getMillis(); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Step.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Step.java index b5106d0850d..e7f3092de42 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Step.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Step.java @@ -28,8 +28,8 @@ public abstract class Step { return nextStepKey; } - public boolean hasNextStep() { - return nextStepKey != null; + public boolean indexSurvives() { + return true; } public static class StepKey { diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java new file mode 100644 index 00000000000..36d4421dc9e --- /dev/null +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.indexlifecycle; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.index.Index; +import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateActionStep; +import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep; +import org.elasticsearch.xpack.core.indexlifecycle.Step; + +import java.util.function.BiFunction; +import java.util.function.Function; + +public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask { + private static final Logger logger = ESLoggerFactory.getLogger(ExecuteStepsUpdateTask.class); + private final Index index; + private final Step startStep; + private final Function getCurrentStepInClusterState; + private final Function getStepFromRegistry; + private final BiFunction moveClusterStateToNextStep; + + public ExecuteStepsUpdateTask(Index index, Step startStep, Function getCurrentStepInClusterState, + BiFunction moveClusterStateToNextStep, + Function getStepFromRegistry) { + this.index = index; + this.startStep = startStep; + this.getCurrentStepInClusterState = getCurrentStepInClusterState; + this.moveClusterStateToNextStep = moveClusterStateToNextStep; + this.getStepFromRegistry = getStepFromRegistry; + } + + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + Step currentStep = startStep; + if (currentStep.equals(getCurrentStepInClusterState.apply(currentState))) { + // We can do cluster state steps all together until we + // either get to a step that isn't a cluster state step or a + // cluster state wait step returns not completed + while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { + if (currentStep instanceof ClusterStateActionStep) { + // cluster state action step so do the action and + // move + // the cluster state to the next step + currentState = ((ClusterStateActionStep) currentStep).performAction(index, currentState); + currentState = moveClusterStateToNextStep.apply(currentState, currentStep); + } else { + // cluster state wait step so evaluate the + // condition, if the condition is met move to the + // next step, if its not met return the current + // cluster state so it can be applied and we will + // wait for the next trigger to evaluate the + // condition again + boolean complete = ((ClusterStateWaitStep) currentStep).isConditionMet(index, currentState); + if (complete) { + currentState = moveClusterStateToNextStep.apply(currentState, currentStep); + } else { + logger.warn("condition not met, returning existing state"); + return currentState; + } + } + currentStep = getStepFromRegistry.apply(currentStep.getNextStepKey()); + } + return currentState; + } else { + // either we are no longer the master or the step is now + // not the same as when we submitted the update task. In + // either case we don't want to do anything now + return currentState; + } + } + + @Override + public void onFailure(String source, Exception e) { + throw new RuntimeException(e); // NORELEASE implement error handling + } +} diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java index 479fab34ca2..ec2f1e353cb 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java @@ -5,11 +5,13 @@ */ package org.elasticsearch.xpack.indexlifecycle; +import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings.Builder; import org.elasticsearch.index.Index; @@ -22,7 +24,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.Step; import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey; public class IndexLifecycleRunner { - + private static final Logger logger = ESLoggerFactory.getLogger(IndexLifecycleRunner.class); private PolicyStepsRegistry stepRegistry; private ClusterService clusterService; @@ -33,6 +35,7 @@ public class IndexLifecycleRunner { public void runPolicy(String policy, Index index, Settings indexSettings, Cause cause) { Step currentStep = getCurrentStep(policy, indexSettings); + logger.warn("running policy with current-step[" + currentStep.getKey() + "]"); if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { if (cause != Cause.SCHEDULE_TRIGGER) { executeClusterStateSteps(index, policy, currentStep); @@ -43,6 +46,7 @@ public class IndexLifecycleRunner { @Override public void onResponse(boolean conditionMet) { + logger.error("cs-change-async-wait-callback. current-step:" + currentStep.getKey()); if (conditionMet) { moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey(), Cause.CALLBACK); } @@ -61,7 +65,8 @@ public class IndexLifecycleRunner { @Override public void onResponse(boolean complete) { - if (complete) { + logger.error("cs-change-async-action-callback. current-step:" + currentStep.getKey()); + if (complete && currentStep.indexSurvives()) { moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey(), Cause.CALLBACK); } } @@ -87,53 +92,10 @@ public class IndexLifecycleRunner { private void executeClusterStateSteps(Index index, String policy, Step step) { assert step instanceof ClusterStateActionStep || step instanceof ClusterStateWaitStep; - clusterService.submitStateUpdateTask("ILM", new ClusterStateUpdateTask() { - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - Step currentStep = step; - if (currentStep.equals(getCurrentStep(policy, currentState.getMetaData().index(index).getSettings()))) { - // We can do cluster state steps all together until we - // either get to a step that isn't a cluster state step or a - // cluster state wait step returns not completed - while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { - if (currentStep instanceof ClusterStateActionStep) { - // cluster state action step so do the action and - // move - // the cluster state to the next step - currentState = ((ClusterStateActionStep) currentStep).performAction(index, currentState); - currentState = moveClusterStateToNextStep(index, currentState, currentStep.getNextStepKey()); - } else { - // cluster state wait step so evaluate the - // condition, if the condition is met move to the - // next step, if its not met return the current - // cluster state so it can be applied and we will - // wait for the next trigger to evaluate the - // condition again - boolean complete = ((ClusterStateWaitStep) currentStep).isConditionMet(index, currentState); - if (complete) { - currentState = moveClusterStateToNextStep(index, currentState, currentStep.getNextStepKey()); - } else { - return currentState; - } - } - currentStep = stepRegistry.getStep(policy, currentStep.getNextStepKey()); - } - return currentState; - } else { - // either we are no longer the master or the step is now - // not the same as when we submitted the update task. In - // either case we don't want to do anything now - return currentState; - } - } - - @Override - public void onFailure(String source, Exception e) { - throw new RuntimeException(e); // NORELEASE implement error handling - } - - }); + clusterService.submitStateUpdateTask("ILM", new ExecuteStepsUpdateTask(index, step, + (currentState) -> getCurrentStep(policy, currentState.getMetaData().index(index).getSettings()), + (currentState, currentStep) -> moveClusterStateToNextStep(index, currentState, currentStep.getNextStepKey()), + (stepKey) -> stepRegistry.getStep(policy, stepKey))); } private StepKey getCurrentStepKey(Settings indexSettings) { @@ -165,9 +127,6 @@ public class IndexLifecycleRunner { IndexMetaData idxMeta = clusterState.getMetaData().index(index); Builder indexSettings = Settings.builder().put(idxMeta.getSettings()).put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase()) .put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction()).put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName()); - if (indexSettings.keys().contains(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE) == false) { - indexSettings.put(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), idxMeta.getCreationDate()); - } newClusterStateBuilder.metaData(MetaData.builder(clusterState.getMetaData()).put(IndexMetaData .builder(clusterState.getMetaData().index(index)) .settings(indexSettings))); @@ -175,40 +134,14 @@ public class IndexLifecycleRunner { } private void moveToStep(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey, Cause cause) { - clusterService.submitStateUpdateTask("ILM", new ClusterStateUpdateTask() { - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - Settings indexSettings = currentState.getMetaData().index(index).getSettings(); - if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings)) - && currentStepKey.equals(getCurrentStepKey(indexSettings))) { - return moveClusterStateToNextStep(index, currentState, nextStepKey); - } else { - // either the policy has changed or the step is now - // not the same as when we submitted the update task. In - // either case we don't want to do anything now - return currentState; - } - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - // if the new cluster state is different from the old one then - // we moved to the new step in the execute method so we should - // execute the next step - if (oldState != newState) { - runPolicy(index, newState, cause); - } - } - - @Override - public void onFailure(String source, Exception e) { - throw new RuntimeException(e); // NORELEASE implement error handling - } - }); + logger.error("moveToStep[" + policy + "] [" + index.getName() + "]" + currentStepKey + " -> " + + nextStepKey + ". because:" + cause.name()); + clusterService.submitStateUpdateTask("ILM", new MoveToNextStepUpdateTask(index, policy, + currentStepKey, (c) -> moveClusterStateToNextStep(index, c, nextStepKey), + (s) -> getCurrentStepKey(s), (c) -> runPolicy(index, c, cause))); } - public static enum Cause { + public enum Cause { CLUSTER_STATE_CHANGE, SCHEDULE_TRIGGER, CALLBACK; } } diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTask.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTask.java new file mode 100644 index 00000000000..4fcd7f68857 --- /dev/null +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTask.java @@ -0,0 +1,74 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.indexlifecycle; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateActionStep; +import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep; +import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; +import org.elasticsearch.xpack.core.indexlifecycle.Step; + +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask { + private static final Logger logger = ESLoggerFactory.getLogger(MoveToNextStepUpdateTask.class); + private final Index index; + private final String policy; + private final Step.StepKey currentStepKey; + private final Function moveClusterStateToNextStep; + private final Function getCurrentStepKey; + private final Consumer runPolicy; + + public MoveToNextStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, + Function moveClusterStateToNextStep, + Function getCurrentStepKey, + Consumer runPolicy) { + this.index = index; + this.policy = policy; + this.currentStepKey = currentStepKey; + this.moveClusterStateToNextStep = moveClusterStateToNextStep; + this.getCurrentStepKey = getCurrentStepKey; + this.runPolicy = runPolicy; + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + Settings indexSettings = currentState.getMetaData().index(index).getSettings(); + if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings)) + && currentStepKey.equals(getCurrentStepKey.apply(indexSettings))) { + return moveClusterStateToNextStep.apply(currentState); + } else { + // either the policy has changed or the step is now + // not the same as when we submitted the update task. In + // either case we don't want to do anything now + return currentState; + } + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + // if the new cluster state is different from the old one then + // we moved to the new step in the execute method so we should + // execute the next step + if (oldState != newState) { + runPolicy.accept(newState); + } + } + + @Override + public void onFailure(String source, Exception e) { + throw new RuntimeException(e); // NORELEASE implement error handling + } + +} diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java index 8632acb439f..d37fb6581e5 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java @@ -40,7 +40,8 @@ public class PolicyStepsRegistry { @SuppressWarnings("unchecked") public void update(ClusterState currentState, Client client, LongSupplier nowSupplier) { IndexLifecycleMetadata meta = currentState.metaData().custom(IndexLifecycleMetadata.TYPE); - Diff> diff = DiffableUtils.diff(lifecyclePolicyMap, meta.getPolicies(), DiffableUtils.getStringKeySerializer()); + Diff> diff = DiffableUtils.diff(lifecyclePolicyMap, meta.getPolicies(), + DiffableUtils.getStringKeySerializer()); DiffableUtils.MapDiff> mapDiff = (DiffableUtils.MapDiff) diff; if (mapDiff.getUpserts().isEmpty() == false) { for (LifecyclePolicy policy : mapDiff.getUpserts().values()) {