diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncActionStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncActionStep.java new file mode 100644 index 00000000000..7172ddf497c --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncActionStep.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.client.Client; +import org.elasticsearch.index.Index; + +public abstract class AsyncActionStep extends Step { + + private Client client; + + public AsyncActionStep(String name, String action, String phase, StepKey nextStepKey, Client client) { + super(name, action, phase, nextStepKey); + this.client = client; + } + + protected Client getClient() { + return client; + } + + public abstract void performAction(Index index, Listener listener); + + public static interface Listener { + + void onResponse(boolean complete); + + void onFailure(Exception e); + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncWaitStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncWaitStep.java new file mode 100644 index 00000000000..2430680970e --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncWaitStep.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.client.Client; +import org.elasticsearch.index.Index; + +public abstract class AsyncWaitStep extends Step { + + private Client client; + + public AsyncWaitStep(String name, String action, String phase, StepKey nextStepKey, Client client) { + super(name, action, phase, nextStepKey); + this.client = client; + } + + protected Client getClient() { + return client; + } + + public abstract void evaluateCondition(Index index, Listener listener); + + public static interface Listener { + + void onResponse(boolean conditionMet); + + void onFailure(Exception e); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClientStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClientStep.java deleted file mode 100644 index 179a1edefef..00000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClientStep.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.indexlifecycle; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.xpack.core.indexlifecycle.Step; -import org.elasticsearch.xpack.core.indexlifecycle.StepResult; - -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -public class ClientStep extends Step { - - private final RequestBuilder requestBuilder; - private final Function checkComplete; - private final Function checkSuccess; - private Exception returnedException; - private boolean returnedSuccess; - - public ClientStep(String name, String action, String phase, String index, StepKey nextStepKey, RequestBuilder requestBuilder, - Function checkComplete, Function checkSuccess) { - super(name, action, phase, nextStepKey); - this.requestBuilder = requestBuilder; - this.checkComplete = checkComplete; - this.checkSuccess = checkSuccess; - this.returnedException = null; - this.returnedSuccess = false; - } -} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClusterStateActionStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClusterStateActionStep.java new file mode 100644 index 00000000000..987fd77fa1a --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClusterStateActionStep.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.index.Index; + +public abstract class ClusterStateActionStep extends Step { + + public ClusterStateActionStep(String name, String action, String phase, StepKey nextStepKey) { + super(name, action, phase, nextStepKey); + } + + public abstract ClusterState performAction(Index index, ClusterState clusterState); + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClusterStateUpdateStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClusterStateUpdateStep.java deleted file mode 100644 index eab7ac564e9..00000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClusterStateUpdateStep.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.indexlifecycle; - -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.index.Index; -import org.elasticsearch.xpack.core.indexlifecycle.Step; -import org.elasticsearch.xpack.core.indexlifecycle.StepResult; - -import java.util.function.Function; -import java.util.function.LongSupplier; - -public class ClusterStateUpdateStep extends Step { - private final Function updateTask; - - public ClusterStateUpdateStep(String name, String index, String phase, String action, StepKey nextStepKey, Function updateTask) { - super(name, action, phase, nextStepKey); - this.updateTask = updateTask; - } - - public StepResult execute(ClusterService clusterService, ClusterState currentState, Index index, Client client, LongSupplier nowSupplier) { - ClusterState updated = null; - try { - updated = updateTask.apply(currentState); - updated = updateStateWithNextStep(updated, nowSupplier, index); - return new StepResult("done!", null, updated, true, true); - } catch (Exception e) { - return new StepResult("something went wrong", e, updated, true, true); - } - } -} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClusterStateWaitStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClusterStateWaitStep.java new file mode 100644 index 00000000000..187ce62d454 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClusterStateWaitStep.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.index.Index; + +public abstract class ClusterStateWaitStep extends Step { + + public ClusterStateWaitStep(String name, String action, String phase, StepKey nextStepKey) { + super(name, action, phase, nextStepKey); + } + + public abstract boolean isConditionMet(Index index, ClusterState clusterState); + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ConditionalWaitStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ConditionalWaitStep.java deleted file mode 100644 index fe675ccc8b9..00000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ConditionalWaitStep.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.indexlifecycle; - -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.index.Index; -import org.elasticsearch.xpack.core.indexlifecycle.Step; -import org.elasticsearch.xpack.core.indexlifecycle.StepResult; - -import java.util.function.Function; -import java.util.function.LongSupplier; - -public class ConditionalWaitStep extends Step { - private final Function condition; - - public ConditionalWaitStep(String name, String phase, String action, StepKey nextStepKey, Function condition) { - super(name, action, phase, nextStepKey); - this.condition = condition; - } - - @Override - public StepResult execute(ClusterService clusterService, ClusterState currentState, Index index, Client client, LongSupplier nowSupplier) { - boolean isComplete = condition.apply(currentState); - return new StepResult(String.valueOf(isComplete), null, currentState, true, isComplete); - } -} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleSettings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleSettings.java index e3c2e337610..768b3c4773a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleSettings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleSettings.java @@ -31,7 +31,7 @@ public class LifecycleSettings { Setting.Property.Dynamic, Setting.Property.IndexScope); public static final Setting LIFECYCLE_ACTION_SETTING = Setting.simpleString(LIFECYCLE_ACTION, Setting.Property.Dynamic, Setting.Property.IndexScope); - public static final Setting LIFECYCLE_STEP_SETTING = Setting.simpleString(LIFECYCLE_ACTION, + public static final Setting LIFECYCLE_STEP_SETTING = Setting.simpleString(LIFECYCLE_STEP, Setting.Property.Dynamic, Setting.Property.IndexScope); public static final Setting LIFECYCLE_INDEX_CREATION_DATE_SETTING = Setting.longSetting(LIFECYCLE_INDEX_CREATION_DATE, -1L, -1L, Setting.Property.Dynamic, Setting.Property.IndexScope); @@ -39,6 +39,6 @@ public class LifecycleSettings { -1L, -1L, Setting.Property.Dynamic, Setting.Property.IndexScope); public static final Setting LIFECYCLE_ACTION_TIME_SETTING = Setting.longSetting(LIFECYCLE_ACTION_TIME, -1L, -1L, Setting.Property.Dynamic, Setting.Property.IndexScope); - public static final Setting LIFECYCLE_STEP_TIME_SETTING = Setting.longSetting(LIFECYCLE_ACTION_TIME, + public static final Setting LIFECYCLE_STEP_TIME_SETTING = Setting.longSetting(LIFECYCLE_STEP_TIME, -1L, -1L, Setting.Property.Dynamic, Setting.Property.IndexScope); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Step.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Step.java index 57de63d3778..3d341153934 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Step.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Step.java @@ -5,21 +5,12 @@ */ package org.elasticsearch.xpack.core.indexlifecycle; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.Index; - import java.util.Objects; -import java.util.function.LongSupplier; /** * A {@link LifecycleAction} which deletes the index. */ -public class Step { +public abstract class Step { private final String name; private final String action; private final String phase; @@ -52,63 +43,6 @@ public class Step { return nextStepKey != null; } - /** - * Executes this step and updates the cluster state with the next step to run - * - * @param currentState - * @param client - * @param nowSupplier - * @return - */ - public StepResult execute(ClusterService clusterService, ClusterState currentState, Index index, Client client, LongSupplier nowSupplier) { -// Example: Delete -// -// client.admin().indices().prepareDelete(index.getName()).execute(new ActionListener() { -// @Override -// public void onResponse(DeleteIndexResponse deleteIndexResponse) { -// if (deleteIndexResponse.isAcknowledged()) { -// submitUpdateNextStepTask(clusterService, nowSupplier, index); -// } -// } -// -// @Override -// public void onFailure(Exception e) { -// -// } -// }); - throw new UnsupportedOperationException("implement me"); - } - - protected void submitUpdateNextStepTask(ClusterService clusterService, LongSupplier nowSupplier, Index index) { - clusterService.submitStateUpdateTask("update-next-step", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return updateStateWithNextStep(currentState, nowSupplier, index); - } - - @Override - public void onFailure(String source, Exception e) { - - } - }); - } - - protected ClusterState updateStateWithNextStep(ClusterState currentState, LongSupplier nowSupplier, Index index) { - long now = nowSupplier.getAsLong(); - // fetch details about next step to run and update the cluster state with this information - Settings newLifecyclePhaseSettings = Settings.builder() - .put(LifecycleSettings.LIFECYCLE_PHASE, nextStepKey.getPhase()) - .put(LifecycleSettings.LIFECYCLE_PHASE_TIME, now) - .put(LifecycleSettings.LIFECYCLE_ACTION_TIME, now) - .put(LifecycleSettings.LIFECYCLE_ACTION, nextStepKey.getAction()) - .put(LifecycleSettings.LIFECYCLE_STEP_TIME, now) - .put(LifecycleSettings.LIFECYCLE_STEP, nextStepKey.getName()) - .build(); - return ClusterState.builder(currentState) - .metaData(MetaData.builder(currentState.metaData()) - .updateSettings(newLifecyclePhaseSettings, index.getName())).build(); - } - public static class StepKey { private final String phase; diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java new file mode 100644 index 00000000000..84e19c482df --- /dev/null +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java @@ -0,0 +1,216 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.indexlifecycle; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.Settings.Builder; +import org.elasticsearch.index.Index; +import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep; +import org.elasticsearch.xpack.core.indexlifecycle.AsyncWaitStep; +import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateActionStep; +import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; +import org.elasticsearch.xpack.core.indexlifecycle.Step; +import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey; + +public class IndexLifecycleRunner { + + private PolicyStepsRegistry stepRegistry; + private ClusterService clusterService; + + public IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ClusterService clusterService) { + this.stepRegistry = stepRegistry; + this.clusterService = clusterService; + } + + public void runPolicy(String policy, Index index, Settings indexSettings, Cause cause) { + Step currentStep = getCurrentStep(policy, indexSettings); + if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { + if (cause != Cause.SCHEDULE_TRIGGER) { + executeClusterStateSteps(index, policy, currentStep); + } + } else if (currentStep instanceof AsyncWaitStep) { + if (cause != Cause.CLUSTER_STATE_CHANGE) { + ((AsyncWaitStep) currentStep).evaluateCondition(index, new AsyncWaitStep.Listener() { + + @Override + public void onResponse(boolean conditionMet) { + if (conditionMet) { + moveToStep(index, policy, new StepKey(currentStep.getPhase(), currentStep.getAction(), currentStep.getName()), + currentStep.getNextStepKey(), Cause.CALLBACK); + } + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); // NORELEASE implement error handling + } + + }); + } + } else if (currentStep instanceof AsyncActionStep) { + if (cause != Cause.CLUSTER_STATE_CHANGE) { + ((AsyncActionStep) currentStep).performAction(index, new AsyncActionStep.Listener() { + + @Override + public void onResponse(boolean complete) { + if (complete) { + moveToStep(index, policy, new StepKey(currentStep.getPhase(), currentStep.getAction(), currentStep.getName()), + currentStep.getNextStepKey(), Cause.CALLBACK); + } + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); // NORELEASE implement error handling + } + }); + } + } else { + throw new IllegalStateException( + "Step with key [" + currentStep.getName() + "] is not a recognised type: [" + currentStep.getClass().getName() + "]"); + } + } + + private void runPolicy(Index index, ClusterState clusterState, Cause cause) { + IndexMetaData indexMetaData = clusterState.getMetaData().index(index); + Settings indexSettings = indexMetaData.getSettings(); + String policy = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings); + runPolicy(policy, index, indexSettings, cause); + } + + private void executeClusterStateSteps(Index index, String policy, Step step) { + assert step instanceof ClusterStateActionStep || step instanceof ClusterStateWaitStep; + clusterService.submitStateUpdateTask("ILM", new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + Step currentStep = step; + if (currentStep.equals(getCurrentStep(policy, currentState.getMetaData().index(index).getSettings()))) { + // We can do cluster state steps all together until we + // either get to a step that isn't a cluster state step or a + // cluster state wait step returns not completed + while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { + if (currentStep instanceof ClusterStateActionStep) { + // cluster state action step so do the action and + // move + // the cluster state to the next step + currentState = ((ClusterStateActionStep) currentStep).performAction(index, currentState); + currentState = moveClusterStateToNextStep(index, currentState, currentStep.getNextStepKey()); + } else { + // cluster state wait step so evaluate the + // condition, if the condition is met move to the + // next step, if its not met return the current + // cluster state so it can be applied and we will + // wait for the next trigger to evaluate the + // condition again + boolean complete = ((ClusterStateWaitStep) currentStep).isConditionMet(index, currentState); + if (complete) { + currentState = moveClusterStateToNextStep(index, currentState, currentStep.getNextStepKey()); + } else { + return currentState; + } + } + currentStep = stepRegistry.getStep(policy, currentStep.getNextStepKey()); + } + return currentState; + } else { + // either we are no longer the master or the step is now + // not the same as when we submitted the update task. In + // either case we don't want to do anything now + return currentState; + } + } + + @Override + public void onFailure(String source, Exception e) { + throw new RuntimeException(e); // NORELEASE implement error handling + } + + }); + } + + private StepKey getCurrentStepKey(Settings indexSettings) { + String currentPhase = LifecycleSettings.LIFECYCLE_PHASE_SETTING.get(indexSettings); + String currentAction = LifecycleSettings.LIFECYCLE_ACTION_SETTING.get(indexSettings); + String currentStep = LifecycleSettings.LIFECYCLE_STEP_SETTING.get(indexSettings); + if (currentStep == null) { + assert currentPhase == null : "Current phase is not null: " + currentPhase; + assert currentAction == null : "Current action is not null: " + currentAction; + return null; + } else { + assert currentPhase != null; + assert currentAction != null; + return new StepKey(currentPhase, currentAction, currentStep); + } + } + + private Step getCurrentStep(String policy, Settings indexSettings) { + StepKey currentStepKey = getCurrentStepKey(indexSettings); + if (currentStepKey == null) { + return stepRegistry.getFirstStep(policy); + } else { + return stepRegistry.getStep(policy, currentStepKey); + } + } + + private ClusterState moveClusterStateToNextStep(Index index, ClusterState clusterState, StepKey nextStep) { + ClusterState.Builder newClusterStateBuilder = ClusterState.builder(clusterState); + IndexMetaData idxMeta = clusterState.getMetaData().index(index); + Builder indexSettings = Settings.builder().put(idxMeta.getSettings()).put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase()) + .put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction()).put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName()); + if (indexSettings.keys().contains(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE) == false) { + indexSettings.put(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), idxMeta.getCreationDate()); + } + newClusterStateBuilder.metaData(MetaData.builder(clusterState.getMetaData()).put(IndexMetaData + .builder(clusterState.getMetaData().index(index)) + .settings(indexSettings))); + return newClusterStateBuilder.build(); + } + + private void moveToStep(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey, Cause cause) { + clusterService.submitStateUpdateTask("ILM", new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + Settings indexSettings = currentState.getMetaData().index(index).getSettings(); + if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings)) + && currentStepKey.equals(getCurrentStepKey(indexSettings))) { + return moveClusterStateToNextStep(index, currentState, nextStepKey); + } else { + // either the policy has changed or the step is now + // not the same as when we submitted the update task. In + // either case we don't want to do anything now + return currentState; + } + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + // if the new cluster state is different from the old one then + // we moved to the new step in the execute method so we should + // execute the next step + if (oldState != newState) { + runPolicy(index, newState, cause); + } + } + + @Override + public void onFailure(String source, Exception e) { + throw new RuntimeException(e); // NORELEASE implement error handling + } + }); + } + + public static enum Cause { + CLUSTER_STATE_CHANGE, SCHEDULE_TRIGGER, CALLBACK; + } +} diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index 144d69e06b3..820bf8a9f34 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -12,7 +12,6 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -20,13 +19,11 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.Index; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; -import org.elasticsearch.xpack.core.indexlifecycle.Step; -import org.elasticsearch.xpack.core.indexlifecycle.StepResult; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleRunner.Cause; import java.io.Closeable; import java.time.Clock; @@ -48,6 +45,7 @@ public class IndexLifecycleService extends AbstractComponent private ThreadPool threadPool; private LongSupplier nowSupplier; private SchedulerEngine.Job scheduledJob; + private IndexLifecycleRunner lifecycleRunner; public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, Clock clock, ThreadPool threadPool, LongSupplier nowSupplier) { @@ -59,6 +57,7 @@ public class IndexLifecycleService extends AbstractComponent this.nowSupplier = nowSupplier; this.scheduledJob = null; this.policyRegistry = new PolicyStepsRegistry(); + this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService); clusterService.addListener(this); } @@ -71,7 +70,6 @@ public class IndexLifecycleService extends AbstractComponent } @Override - @SuppressWarnings("unchecked") public void clusterChanged(ClusterChangedEvent event) { if (event.localNodeMaster()) { // only act if we are master, otherwise keep idle until elected IndexLifecycleMetadata lifecycleMetadata = event.state().metaData().custom(IndexLifecycleMetadata.TYPE); @@ -99,6 +97,8 @@ public class IndexLifecycleService extends AbstractComponent } else if (pollIntervalSettingChanged) { // all engines are running, just need to update with latest interval scheduleJob(pollInterval); } + + triggerPolicies(event.state(), Cause.CLUSTER_STATE_CHANGE); } else { cancelJob(); } @@ -116,67 +116,11 @@ public class IndexLifecycleService extends AbstractComponent scheduler.get().add(scheduledJob); } - public void triggerPolicies() { - // loop through all indices in cluster state and filter for ones that are - // managed by the Index Lifecycle Service they have a index.lifecycle.name setting - // associated to a policy - ClusterState clusterState = clusterService.state(); - clusterState.metaData().indices().valuesIt().forEachRemaining((idxMeta) -> { - String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()); - if (Strings.isNullOrEmpty(policyName) == false) { - clusterService.submitStateUpdateTask("index-lifecycle-" + policyName, new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - // ensure date is set - currentState = putLifecycleDate(currentState, idxMeta); - long lifecycleDate = currentState.metaData().settings() - .getAsLong(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE, -1L); - // get current phase, action, step - String phase = currentState.metaData().settings().get(LifecycleSettings.LIFECYCLE_PHASE); - String action = currentState.metaData().settings().get(LifecycleSettings.LIFECYCLE_ACTION); - String stepName = currentState.metaData().settings().get(LifecycleSettings.LIFECYCLE_STEP); - // returns current step to execute. If settings are null, then the first step to be executed in - // this policy is returned. - Step currentStep = policyRegistry.getStep(policyName, new Step.StepKey(phase, action, stepName)); - return executeStepUntilAsync(policyName, currentStep, clusterState, client, nowSupplier, idxMeta.getIndex()); - } - - @Override - public void onFailure(String source, Exception e) { - - } - }); - } - }); - } - - /** - * executes the given step, and then all proceeding steps, until it is necessary to exit the - * cluster-state thread and let any wait-condition or asynchronous action progress externally - * - * TODO(colin): should steps execute themselves and execute `nextStep` internally? - * - * @param startStep The current step that has either not been executed, or not completed before - * @return the new ClusterState - */ - private ClusterState executeStepUntilAsync(String policyName, Step startStep, ClusterState currentState, Client client, LongSupplier nowSupplier, Index index) { - StepResult result = startStep.execute(clusterService, currentState, index, client, nowSupplier); - while (result.isComplete() && result.indexSurvived() && startStep.hasNextStep()) { - currentState = result.getClusterState(); - startStep = policyRegistry.getStep(policyName, startStep.getNextStepKey()); - result = startStep.execute(clusterService, currentState, index, client, nowSupplier); - } - if (result.isComplete()) { - currentState = result.getClusterState(); - } - return currentState; - } - @Override public void triggered(SchedulerEngine.Event event) { if (event.getJobName().equals(IndexLifecycle.NAME)) { logger.info("Job triggered: " + event.getJobName() + ", " + event.getScheduledTime() + ", " + event.getTriggeredTime()); - triggerPolicies(); + triggerPolicies(clusterService.state(), Cause.SCHEDULE_TRIGGER); } } @@ -198,18 +142,17 @@ public class IndexLifecycleService extends AbstractComponent } })); } - - private ClusterState putLifecycleDate(ClusterState clusterState, IndexMetaData idxMeta) { - if (idxMeta.getSettings().hasValue(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE)) { - return clusterState; - } else { - ClusterState.Builder builder = new ClusterState.Builder(clusterState); - MetaData.Builder metadataBuilder = MetaData.builder(clusterState.metaData()); - Settings settings = Settings.builder() - .put(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE_SETTING.getKey(), idxMeta.getCreationDate()).build(); - metadataBuilder.updateSettings(settings, idxMeta.getIndex().getName()); - return builder.metaData(metadataBuilder.build()).build(); - } + + public void triggerPolicies(ClusterState clusterState, Cause cause) { + // loop through all indices in cluster state and filter for ones that are + // managed by the Index Lifecycle Service they have a index.lifecycle.name setting + // associated to a policy + clusterState.metaData().indices().valuesIt().forEachRemaining((idxMeta) -> { + String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()); + if (Strings.isNullOrEmpty(policyName) == false) { + lifecycleRunner.runPolicy(policyName, idxMeta.getIndex(), idxMeta.getSettings(), cause); + } + }); } @Override diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java index 81af7fbc9f3..ce3f4fd3594 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java @@ -78,6 +78,9 @@ public class PolicyStepsRegistry { return step; } + public Step getFirstStep(String policy) { + return firstStepMap.get(policy); + } @Override public int hashCode() { diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java index 20c09224caa..dca13e46564 100644 --- a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java @@ -5,35 +5,22 @@ */ package org.elasticsearch.xpack.indexlifecycle; -import org.elasticsearch.Version; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterModule; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.Index; import org.elasticsearch.test.AbstractSerializingTestCase; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateUpdateStep; -import org.elasticsearch.xpack.core.indexlifecycle.ConditionalWaitStep; import org.elasticsearch.xpack.core.indexlifecycle.DeleteAction; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType; import org.elasticsearch.xpack.core.indexlifecycle.MockAction; import org.elasticsearch.xpack.core.indexlifecycle.Phase; -import org.elasticsearch.xpack.core.indexlifecycle.PhaseAfterStep; -import org.elasticsearch.xpack.core.indexlifecycle.Step; -import org.elasticsearch.xpack.core.indexlifecycle.StepResult; -import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType; import org.elasticsearch.xpack.core.indexlifecycle.TestLifecycleType; +import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType; import java.io.IOException; import java.util.ArrayList; @@ -43,9 +30,6 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.function.LongSupplier; - -import static org.mockito.Mockito.mock; public class LifecyclePolicyTests extends AbstractSerializingTestCase {