moar changes

- set `indexSurvives` for Step
- extract the two ClusterStateUpdateTasks to separate classes

- simple Delete Policy works!
This commit is contained in:
Tal Levy 2018-03-29 16:23:57 -07:00
parent 74eaba2078
commit d2e87a66e5
11 changed files with 242 additions and 102 deletions

View File

@ -62,11 +62,6 @@ public class DeleteAction implements LifecycleAction {
return builder;
}
@Override
public boolean indexSurvives() {
return false;
}
@Override
public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey) {
Step.StepKey deleteStepKey = new Step.StepKey(phase, NAME, "delete-step");

View File

@ -15,8 +15,14 @@ public class DeleteAsyncActionStep extends AsyncActionStep {
super(key, nextStepKey, client);
}
@Override
public void performAction(Index index, Listener listener) {
getClient().admin().indices().prepareDelete(index.getName())
.execute(ActionListener.wrap(response -> listener.onResponse(true) , listener::onFailure));
}
@Override
public boolean indexSurvives() {
return false;
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
public class InitializePolicyContextStep extends ClusterStateActionStep {
InitializePolicyContextStep(Step.StepKey key, StepKey nextStepKey) {
super(key, nextStepKey);
}
@Override
public ClusterState performAction(Index index, ClusterState clusterState) {
Settings settings = clusterState.metaData().index(index).getSettings();
if (settings.hasValue(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE)) {
return clusterState;
}
ClusterState.Builder newClusterStateBuilder = ClusterState.builder(clusterState);
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
Settings.Builder indexSettings = Settings.builder().put(idxMeta.getSettings())
.put(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE, idxMeta.getCreationDate());
newClusterStateBuilder.metaData(MetaData.builder(clusterState.getMetaData()).put(IndexMetaData
.builder(clusterState.getMetaData().index(index))
.settings(indexSettings)));
return newClusterStateBuilder.build();
}
}

View File

@ -28,8 +28,4 @@ public interface LifecycleAction extends ToXContentObject, NamedWriteable {
* @return an ordered list of steps that represent the execution plan of the action
*/
List<Step> toSteps(Client client, String phase, @Nullable Step.StepKey nextStepKey);
default boolean indexSurvives() {
return true;
}
}

View File

@ -149,10 +149,6 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
public List<Step> toSteps(Client client, LongSupplier nowSupplier) {
List<Step> steps = new ArrayList<>();
List<Phase> orderedPhases = type.getOrderedPhases(phases);
logger.error("checking phases[" + orderedPhases.size() + "]");
for (Phase t : orderedPhases) {
logger.error(t);
}
ListIterator<Phase> phaseIterator = orderedPhases.listIterator(orderedPhases.size());
Step.StepKey lastStepKey = null;
// add steps for each phase, in reverse
@ -172,10 +168,18 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
}
}
Step.StepKey afterStepKey = new Step.StepKey(phase.getName(), null, "after");
steps.add(new PhaseAfterStep(nowSupplier, phase.getAfter(), afterStepKey, lastStepKey));
Step phaseAfterStep = new PhaseAfterStep(nowSupplier, phase.getAfter(), afterStepKey, lastStepKey);
steps.add(phaseAfterStep);
lastStepKey = phaseAfterStep.getKey();
}
steps.add(new InitializePolicyContextStep(new Step.StepKey("", "", ""), lastStepKey));
Collections.reverse(steps);
logger.error("STEP COUNT: " + steps.size());
for (Step step : steps) {
logger.error(step.getKey() + " -> " + step.getNextStepKey());
}
return steps;
}

View File

@ -5,13 +5,17 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import java.util.function.LongSupplier;
public class PhaseAfterStep extends ClusterStateWaitStep {
private static final Logger logger = ESLoggerFactory.getLogger(PhaseAfterStep.class);
private final TimeValue after;
private final LongSupplier nowSupplier;
@ -23,8 +27,16 @@ public class PhaseAfterStep extends ClusterStateWaitStep {
@Override
public boolean isConditionMet(Index index, ClusterState clusterState) {
long lifecycleDate = clusterState.metaData().settings()
IndexMetaData indexMetaData = clusterState.metaData().index(index);
logger.warn("checking phase[" + indexMetaData.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE) + "]"
+ " after[" + after + "]");
long lifecycleDate = indexMetaData.getSettings()
.getAsLong(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE, -1L);
if (lifecycleDate < 0) {
// TODO(talevy): make sure this setting is set before we find ourselves here
logger.warn("index-lifecycle-setting for index" + index.getName() + "] not set");
lifecycleDate = indexMetaData.getCreationDate();
}
return nowSupplier.getAsLong() >= lifecycleDate + after.getMillis();
}
}

View File

@ -28,8 +28,8 @@ public abstract class Step {
return nextStepKey;
}
public boolean hasNextStep() {
return nextStepKey != null;
public boolean indexSurvives() {
return true;
}
public static class StepKey {

View File

@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateActionStep;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
import java.util.function.BiFunction;
import java.util.function.Function;
public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
private static final Logger logger = ESLoggerFactory.getLogger(ExecuteStepsUpdateTask.class);
private final Index index;
private final Step startStep;
private final Function<ClusterState, Step> getCurrentStepInClusterState;
private final Function<Step.StepKey, Step> getStepFromRegistry;
private final BiFunction<ClusterState, Step, ClusterState> moveClusterStateToNextStep;
public ExecuteStepsUpdateTask(Index index, Step startStep, Function<ClusterState, Step> getCurrentStepInClusterState,
BiFunction<ClusterState, Step, ClusterState> moveClusterStateToNextStep,
Function<Step.StepKey, Step> getStepFromRegistry) {
this.index = index;
this.startStep = startStep;
this.getCurrentStepInClusterState = getCurrentStepInClusterState;
this.moveClusterStateToNextStep = moveClusterStateToNextStep;
this.getStepFromRegistry = getStepFromRegistry;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Step currentStep = startStep;
if (currentStep.equals(getCurrentStepInClusterState.apply(currentState))) {
// We can do cluster state steps all together until we
// either get to a step that isn't a cluster state step or a
// cluster state wait step returns not completed
while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
if (currentStep instanceof ClusterStateActionStep) {
// cluster state action step so do the action and
// move
// the cluster state to the next step
currentState = ((ClusterStateActionStep) currentStep).performAction(index, currentState);
currentState = moveClusterStateToNextStep.apply(currentState, currentStep);
} else {
// cluster state wait step so evaluate the
// condition, if the condition is met move to the
// next step, if its not met return the current
// cluster state so it can be applied and we will
// wait for the next trigger to evaluate the
// condition again
boolean complete = ((ClusterStateWaitStep) currentStep).isConditionMet(index, currentState);
if (complete) {
currentState = moveClusterStateToNextStep.apply(currentState, currentStep);
} else {
logger.warn("condition not met, returning existing state");
return currentState;
}
}
currentStep = getStepFromRegistry.apply(currentStep.getNextStepKey());
}
return currentState;
} else {
// either we are no longer the master or the step is now
// not the same as when we submitted the update task. In
// either case we don't want to do anything now
return currentState;
}
}
@Override
public void onFailure(String source, Exception e) {
throw new RuntimeException(e); // NORELEASE implement error handling
}
}

View File

@ -5,11 +5,13 @@
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.index.Index;
@ -22,7 +24,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.Step;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
public class IndexLifecycleRunner {
private static final Logger logger = ESLoggerFactory.getLogger(IndexLifecycleRunner.class);
private PolicyStepsRegistry stepRegistry;
private ClusterService clusterService;
@ -33,6 +35,7 @@ public class IndexLifecycleRunner {
public void runPolicy(String policy, Index index, Settings indexSettings, Cause cause) {
Step currentStep = getCurrentStep(policy, indexSettings);
logger.warn("running policy with current-step[" + currentStep.getKey() + "]");
if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
if (cause != Cause.SCHEDULE_TRIGGER) {
executeClusterStateSteps(index, policy, currentStep);
@ -43,6 +46,7 @@ public class IndexLifecycleRunner {
@Override
public void onResponse(boolean conditionMet) {
logger.error("cs-change-async-wait-callback. current-step:" + currentStep.getKey());
if (conditionMet) {
moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey(), Cause.CALLBACK);
}
@ -61,7 +65,8 @@ public class IndexLifecycleRunner {
@Override
public void onResponse(boolean complete) {
if (complete) {
logger.error("cs-change-async-action-callback. current-step:" + currentStep.getKey());
if (complete && currentStep.indexSurvives()) {
moveToStep(index, policy, currentStep.getKey(), currentStep.getNextStepKey(), Cause.CALLBACK);
}
}
@ -87,53 +92,10 @@ public class IndexLifecycleRunner {
private void executeClusterStateSteps(Index index, String policy, Step step) {
assert step instanceof ClusterStateActionStep || step instanceof ClusterStateWaitStep;
clusterService.submitStateUpdateTask("ILM", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Step currentStep = step;
if (currentStep.equals(getCurrentStep(policy, currentState.getMetaData().index(index).getSettings()))) {
// We can do cluster state steps all together until we
// either get to a step that isn't a cluster state step or a
// cluster state wait step returns not completed
while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
if (currentStep instanceof ClusterStateActionStep) {
// cluster state action step so do the action and
// move
// the cluster state to the next step
currentState = ((ClusterStateActionStep) currentStep).performAction(index, currentState);
currentState = moveClusterStateToNextStep(index, currentState, currentStep.getNextStepKey());
} else {
// cluster state wait step so evaluate the
// condition, if the condition is met move to the
// next step, if its not met return the current
// cluster state so it can be applied and we will
// wait for the next trigger to evaluate the
// condition again
boolean complete = ((ClusterStateWaitStep) currentStep).isConditionMet(index, currentState);
if (complete) {
currentState = moveClusterStateToNextStep(index, currentState, currentStep.getNextStepKey());
} else {
return currentState;
}
}
currentStep = stepRegistry.getStep(policy, currentStep.getNextStepKey());
}
return currentState;
} else {
// either we are no longer the master or the step is now
// not the same as when we submitted the update task. In
// either case we don't want to do anything now
return currentState;
}
}
@Override
public void onFailure(String source, Exception e) {
throw new RuntimeException(e); // NORELEASE implement error handling
}
});
clusterService.submitStateUpdateTask("ILM", new ExecuteStepsUpdateTask(index, step,
(currentState) -> getCurrentStep(policy, currentState.getMetaData().index(index).getSettings()),
(currentState, currentStep) -> moveClusterStateToNextStep(index, currentState, currentStep.getNextStepKey()),
(stepKey) -> stepRegistry.getStep(policy, stepKey)));
}
private StepKey getCurrentStepKey(Settings indexSettings) {
@ -165,9 +127,6 @@ public class IndexLifecycleRunner {
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
Builder indexSettings = Settings.builder().put(idxMeta.getSettings()).put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase())
.put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction()).put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName());
if (indexSettings.keys().contains(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE) == false) {
indexSettings.put(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), idxMeta.getCreationDate());
}
newClusterStateBuilder.metaData(MetaData.builder(clusterState.getMetaData()).put(IndexMetaData
.builder(clusterState.getMetaData().index(index))
.settings(indexSettings)));
@ -175,40 +134,14 @@ public class IndexLifecycleRunner {
}
private void moveToStep(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey, Cause cause) {
clusterService.submitStateUpdateTask("ILM", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Settings indexSettings = currentState.getMetaData().index(index).getSettings();
if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings))
&& currentStepKey.equals(getCurrentStepKey(indexSettings))) {
return moveClusterStateToNextStep(index, currentState, nextStepKey);
} else {
// either the policy has changed or the step is now
// not the same as when we submitted the update task. In
// either case we don't want to do anything now
return currentState;
}
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
// if the new cluster state is different from the old one then
// we moved to the new step in the execute method so we should
// execute the next step
if (oldState != newState) {
runPolicy(index, newState, cause);
}
}
@Override
public void onFailure(String source, Exception e) {
throw new RuntimeException(e); // NORELEASE implement error handling
}
});
logger.error("moveToStep[" + policy + "] [" + index.getName() + "]" + currentStepKey + " -> "
+ nextStepKey + ". because:" + cause.name());
clusterService.submitStateUpdateTask("ILM", new MoveToNextStepUpdateTask(index, policy,
currentStepKey, (c) -> moveClusterStateToNextStep(index, c, nextStepKey),
(s) -> getCurrentStepKey(s), (c) -> runPolicy(index, c, cause)));
}
public static enum Cause {
public enum Cause {
CLUSTER_STATE_CHANGE, SCHEDULE_TRIGGER, CALLBACK;
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateActionStep;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask {
private static final Logger logger = ESLoggerFactory.getLogger(MoveToNextStepUpdateTask.class);
private final Index index;
private final String policy;
private final Step.StepKey currentStepKey;
private final Function<ClusterState, ClusterState> moveClusterStateToNextStep;
private final Function<Settings, Step.StepKey> getCurrentStepKey;
private final Consumer<ClusterState> runPolicy;
public MoveToNextStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey,
Function<ClusterState, ClusterState> moveClusterStateToNextStep,
Function<Settings, Step.StepKey> getCurrentStepKey,
Consumer<ClusterState> runPolicy) {
this.index = index;
this.policy = policy;
this.currentStepKey = currentStepKey;
this.moveClusterStateToNextStep = moveClusterStateToNextStep;
this.getCurrentStepKey = getCurrentStepKey;
this.runPolicy = runPolicy;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
Settings indexSettings = currentState.getMetaData().index(index).getSettings();
if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings))
&& currentStepKey.equals(getCurrentStepKey.apply(indexSettings))) {
return moveClusterStateToNextStep.apply(currentState);
} else {
// either the policy has changed or the step is now
// not the same as when we submitted the update task. In
// either case we don't want to do anything now
return currentState;
}
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
// if the new cluster state is different from the old one then
// we moved to the new step in the execute method so we should
// execute the next step
if (oldState != newState) {
runPolicy.accept(newState);
}
}
@Override
public void onFailure(String source, Exception e) {
throw new RuntimeException(e); // NORELEASE implement error handling
}
}

View File

@ -40,7 +40,8 @@ public class PolicyStepsRegistry {
@SuppressWarnings("unchecked")
public void update(ClusterState currentState, Client client, LongSupplier nowSupplier) {
IndexLifecycleMetadata meta = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
Diff<Map<String, LifecyclePolicy>> diff = DiffableUtils.diff(lifecyclePolicyMap, meta.getPolicies(), DiffableUtils.getStringKeySerializer());
Diff<Map<String, LifecyclePolicy>> diff = DiffableUtils.diff(lifecyclePolicyMap, meta.getPolicies(),
DiffableUtils.getStringKeySerializer());
DiffableUtils.MapDiff<String, LifecyclePolicy, DiffableUtils.KeySerializer<String>> mapDiff = (DiffableUtils.MapDiff) diff;
if (mapDiff.getUpserts().isEmpty() == false) {
for (LifecyclePolicy policy : mapDiff.getUpserts().values()) {