diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClientStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClientStep.java index c0f191c9c80..7474af79635 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClientStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClientStep.java @@ -23,36 +23,13 @@ public class ClientStep checkComplete, Function checkSuccess) { - super(name, action, phase, index); + super(name, action, phase, nextStep); this.requestBuilder = requestBuilder; this.checkComplete = checkComplete; this.checkSuccess = checkSuccess; this.returnedException = null; this.returnedSuccess = false; } - - @Override - public StepResult execute(ClusterState currentState) { - if (checkComplete.apply(currentState)) { - return new StepResult("client-complete", null, currentState, true, true); - } else { - requestBuilder.execute(new ActionListener() { - @Override - public void onResponse(Response r) { - if (checkSuccess.apply(r)) { - returnedSuccess = true; - } - // IndexLifecycleService.triggerPolicies() - } - - @Override - public void onFailure(Exception e) { - returnedException = e; - } - }); - return new StepResult("client-in-progress", null, currentState, true, false); - } - } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClusterStateUpdateStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClusterStateUpdateStep.java index ae49c903f93..4fc63424fc1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClusterStateUpdateStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ClusterStateUpdateStep.java @@ -5,32 +5,29 @@ */ package org.elasticsearch.xpack.core.indexlifecycle; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.index.Index; import org.elasticsearch.xpack.core.indexlifecycle.Step; import org.elasticsearch.xpack.core.indexlifecycle.StepResult; import java.util.function.Function; +import java.util.function.LongSupplier; public class ClusterStateUpdateStep extends Step { - private final String name; - private final String index; - private final String phase; - private final String action; private final Function updateTask; - public ClusterStateUpdateStep(String name, String index, String phase, String action, Function updateTask) { - super(name, action, phase, index); - this.name = name; - this.index = index; - this.phase = phase; - this.action = action; + public ClusterStateUpdateStep(String name, String index, String phase, String action, Step nextStep, Function updateTask) { + super(name, action, phase, nextStep); this.updateTask = updateTask; } - public StepResult execute(ClusterState clusterState) { + public StepResult execute(ClusterService clusterService, ClusterState currentState, Index index, Client client, LongSupplier nowSupplier) { ClusterState updated = null; try { - updated = updateTask.apply(clusterState); + updated = updateTask.apply(currentState); + updated = updateStateWithNextStep(updated, nowSupplier, index); return new StepResult("done!", null, updated, true, true); } catch (Exception e) { return new StepResult("something went wrong", e, updated, true, true); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java index 8d1e71e3ecf..2f50b1d5628 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java @@ -144,49 +144,9 @@ public class LifecyclePolicy extends AbstractDiffable return builder; } - public StepResult execute(List steps, ClusterState currentState, IndexMetaData indexMetaData, Client client, LongSupplier nowSupplier) { - StepResult lastStepResult = null; - ClusterState updatedState = currentState; - for (int i = getNextStepIdx(steps, indexMetaData); i < steps.size(); i++) { - lastStepResult = steps.get(i).execute(updatedState); - if (lastStepResult.isComplete() && lastStepResult.indexSurvived()) { - if (i < steps.size() - 1) { - Step nextStep = steps.get(i + 1); - long now = nowSupplier.getAsLong(); - // fetch details about next step to run and update the cluster state with this information - Settings newLifecyclePhaseSettings = Settings.builder() - .put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase()) - .put(LifecycleSettings.LIFECYCLE_PHASE_TIME, now) - .put(LifecycleSettings.LIFECYCLE_ACTION_TIME, now) - .put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction()) - .put(LifecycleSettings.LIFECYCLE_STEP_TIME, now) - .put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName()) - .build(); - updatedState = ClusterState.builder(lastStepResult.getClusterState()) - .metaData(MetaData.builder(lastStepResult.getClusterState().metaData()) - .updateSettings(newLifecyclePhaseSettings)).build(); - lastStepResult = new StepResult(lastStepResult, updatedState); - } - } else { - break; - } - } - - return lastStepResult; - } - - private int getNextStepIdx(List steps, IndexMetaData indexMetaData) { - String step = indexMetaData.getSettings().get(LifecycleSettings.LIFECYCLE_STEP); - if (step == null) { - return 0; - } - for (int i = 0; i < steps.size(); i++) { - if (steps.get(i).getName().equals(step)) { - return i; - } - } - - return steps.size(); + public List toSteps() { + // TODO(talevy): make real with types + return Collections.emptyList(); } @Override @@ -229,7 +189,5 @@ public class LifecyclePolicy extends AbstractDiffable * @return the action following {@code current} to execute */ LifecycleAction next(LifecycleAction current); - } - } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Phase.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Phase.java index bab8b28aa6a..9bb6fdb0a48 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Phase.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Phase.java @@ -138,8 +138,8 @@ public class Phase implements ToXContentObject, Writeable { public List toSteps(Index index, long indexLifecycleCreationDate, Client client, ThreadPool threadPool, LongSupplier nowSupplier) { // TODO(talevy) phase needs to know indexLifecycleStartTime - PhaseAfterStep phaseAfterStep = new PhaseAfterStep(threadPool, indexLifecycleCreationDate, nowSupplier, after, - "phase_start", index.getName(), getName(), null); + PhaseAfterStep phaseAfterStep = new PhaseAfterStep( + "phase_start", index.getName(), getName(),after, null); return Stream.concat(Stream.of(phaseAfterStep), actions.values().stream() .flatMap(a -> a.toSteps(name, index, client, threadPool, nowSupplier).stream())).collect(Collectors.toList()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStep.java index 99265a6c837..1f7bb25067c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/PhaseAfterStep.java @@ -5,45 +5,13 @@ */ package org.elasticsearch.xpack.core.indexlifecycle; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.indexlifecycle.Step; -import org.elasticsearch.xpack.core.indexlifecycle.StepResult; - -import java.util.concurrent.CompletableFuture; -import java.util.function.BooleanSupplier; -import java.util.function.LongSupplier; public class PhaseAfterStep extends Step { - private final CompletableFuture timeUp; - private final ThreadPool threadPool; - private final long indexCreationDate; - private final LongSupplier nowSupplier; private final TimeValue after; - public PhaseAfterStep(ThreadPool threadPool, long indexCreationDate, LongSupplier nowSupplier, TimeValue after, String name, - String index, String phase, String action) { - super(name, action, phase, index); - this.threadPool = threadPool; - this.indexCreationDate = indexCreationDate; - this.nowSupplier = nowSupplier; - this.timeUp = new CompletableFuture<>(); + public PhaseAfterStep(String phase, String action, String name, TimeValue after, Step nextStep) { + super(name, action, phase, nextStep); this.after = after; } - - public StepResult execute(ClusterState currentState) { - LongSupplier elapsed = () -> nowSupplier.getAsLong() - indexCreationDate; - BooleanSupplier isReady = () -> after.getSeconds() <= elapsed.getAsLong(); - if (isReady.getAsBoolean()) { - return new StepResult("phase starting", null, currentState, true, true); - } else { - threadPool.schedule(TimeValue.timeValueSeconds(elapsed.getAsLong()), ThreadPool.Names.GENERIC, () -> { - if (after.getSeconds() <= elapsed.getAsLong()) { - // IndexLifecycleService.triggerPolicies() - } - }); - return new StepResult("phase-check-rescheduled", null, currentState, true, false); - } - } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkAction.java index 77350546f3c..2cd9dcc58b1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ShrinkAction.java @@ -153,7 +153,7 @@ public class ShrinkAction implements LifecycleAction { currentState -> { // check that shrunken index was already created, if so, no need to both client IndexMetaData shrunkMetaData = currentState.metaData().index(shrunkenIndexName); - boolean isSuccessful = shrunkMetaData != null && shrunkenIndexName.equals(IndexMetaData.INDEX_SHRINK_SOURCE_NAME + return shrunkMetaData != null && shrunkenIndexName.equals(IndexMetaData.INDEX_SHRINK_SOURCE_NAME .get(shrunkMetaData.getSettings())); }, ResizeResponse::isAcknowledged); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Step.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Step.java index 55b1ada51a2..9b9c3b8eae1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Step.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Step.java @@ -5,22 +5,30 @@ */ package org.elasticsearch.xpack.core.indexlifecycle; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; + +import java.util.function.LongSupplier; /** * A {@link LifecycleAction} which deletes the index. */ -public abstract class Step { +public class Step { private final String name; private final String action; private final String phase; - private final String index; + private final Step nextStep; - public Step(String name, String action, String phase, String index) { + public Step(String name, String action, String phase, Step nextStep) { this.name = name; this.action = action; this.phase = phase; - this.index = index; + this.nextStep = nextStep; } public String getName() { @@ -35,9 +43,68 @@ public abstract class Step { return phase; } - public String getIndex() { - return index; + public Step getNextStep() { + return nextStep; } - public abstract StepResult execute(ClusterState currentState); + public boolean hasNextStep() { + return nextStep != null; + } + + /** + * Executes this step and updates the cluster state with the next step to run + * + * @param currentState + * @param client + * @param nowSupplier + * @return + */ + public StepResult execute(ClusterService clusterService, ClusterState currentState, Index index, Client client, LongSupplier nowSupplier) { +// Example: Delete +// +// client.admin().indices().prepareDelete(index.getName()).execute(new ActionListener() { +// @Override +// public void onResponse(DeleteIndexResponse deleteIndexResponse) { +// if (deleteIndexResponse.isAcknowledged()) { +// submitUpdateNextStepTask(clusterService, nowSupplier, index); +// } +// } +// +// @Override +// public void onFailure(Exception e) { +// +// } +// }); + throw new UnsupportedOperationException("implement me"); + } + + protected void submitUpdateNextStepTask(ClusterService clusterService, LongSupplier nowSupplier, Index index) { + clusterService.submitStateUpdateTask("update-next-step", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return updateStateWithNextStep(currentState, nowSupplier, index); + } + + @Override + public void onFailure(String source, Exception e) { + + } + }); + } + + protected ClusterState updateStateWithNextStep(ClusterState currentState, LongSupplier nowSupplier, Index index) { + long now = nowSupplier.getAsLong(); + // fetch details about next step to run and update the cluster state with this information + Settings newLifecyclePhaseSettings = Settings.builder() + .put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase()) + .put(LifecycleSettings.LIFECYCLE_PHASE_TIME, now) + .put(LifecycleSettings.LIFECYCLE_ACTION_TIME, now) + .put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction()) + .put(LifecycleSettings.LIFECYCLE_STEP_TIME, now) + .put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName()) + .build(); + return ClusterState.builder(currentState) + .metaData(MetaData.builder(currentState.metaData()) + .updateSettings(newLifecyclePhaseSettings, index.getName())).build(); + } } diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index 92d35b6d094..ef74c3a3a65 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; @@ -24,6 +25,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; @@ -34,7 +36,10 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import java.io.Closeable; import java.time.Clock; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.concurrent.CompletableFuture; import java.util.function.LongSupplier; @@ -49,6 +54,7 @@ public class IndexLifecycleService extends AbstractComponent private final SetOnce scheduler = new SetOnce<>(); private final Clock clock; + private final PolicyStepsRegistry policyRegistry; private Client client; private ClusterService clusterService; private ThreadPool threadPool; @@ -64,6 +70,7 @@ public class IndexLifecycleService extends AbstractComponent this.threadPool = threadPool; this.nowSupplier = nowSupplier; this.scheduledJob = null; + this.policyRegistry = new PolicyStepsRegistry(); clusterService.addListener(this); } @@ -76,10 +83,10 @@ public class IndexLifecycleService extends AbstractComponent } @Override + @SuppressWarnings("unchecked") public void clusterChanged(ClusterChangedEvent event) { if (event.localNodeMaster()) { // only act if we are master, otherwise keep idle until elected IndexLifecycleMetadata lifecycleMetadata = event.state().metaData().custom(IndexLifecycleMetadata.TYPE); - TimeValue pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING .get(event.state().getMetaData().settings()); TimeValue previousPollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING @@ -87,6 +94,11 @@ public class IndexLifecycleService extends AbstractComponent boolean pollIntervalSettingChanged = !pollInterval.equals(previousPollInterval); + if (lifecycleMetadata != null) { + // update policy steps registry + policyRegistry.update(event.state()); + } + if (lifecycleMetadata == null) { // no lifecycle metadata, install initial empty metadata state lifecycleMetadata = new IndexLifecycleMetadata(Collections.emptySortedMap()); installMetadata(lifecycleMetadata); @@ -116,9 +128,7 @@ public class IndexLifecycleService extends AbstractComponent scheduler.get().add(scheduledJob); } - public synchronized void triggerPolicies() { - IndexLifecycleMetadata indexLifecycleMetadata = clusterService.state().metaData().custom(IndexLifecycleMetadata.TYPE); - SortedMap policies = indexLifecycleMetadata.getPolicies(); + public void triggerPolicies() { // loop through all indices in cluster state and filter for ones that are // managed by the Index Lifecycle Service they have a index.lifecycle.name setting // associated to a policy @@ -126,10 +136,6 @@ public class IndexLifecycleService extends AbstractComponent clusterState.metaData().indices().valuesIt().forEachRemaining((idxMeta) -> { String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()); if (Strings.isNullOrEmpty(policyName) == false) { - LifecyclePolicy policy = policies.get(policyName); - // fetch step - // check whether complete - // if complete, launch next task clusterService.submitStateUpdateTask("index-lifecycle-" + policyName, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { @@ -137,12 +143,15 @@ public class IndexLifecycleService extends AbstractComponent currentState = putLifecycleDate(currentState, idxMeta); long lifecycleDate = currentState.metaData().settings() .getAsLong(LifecycleSettings.LIFECYCLE_INDEX_CREATION_DATE, -1L); - // TODO(talevy): make real - List steps = policy.getPhases().values().stream() - .flatMap(p -> p.toSteps(idxMeta.getIndex(), lifecycleDate, client, threadPool, nowSupplier).stream()) - .collect(Collectors.toList()); - StepResult result = policy.execute(steps, currentState, idxMeta, client, nowSupplier); - return result.getClusterState(); + // get current phase, action, step + String phase = currentState.metaData().settings().get(LifecycleSettings.LIFECYCLE_PHASE); + String action = currentState.metaData().settings().get(LifecycleSettings.LIFECYCLE_ACTION); + String stepName = currentState.metaData().settings().get(LifecycleSettings.LIFECYCLE_STEP); + // returns current step to execute. If settings are null, then the first step to be executed in + // this policy is returned. + Step currentStep = policyRegistry.getStep(policyName, phase, action, stepName); + ClusterState newClusterState = executeStepUntilAsync(currentStep, clusterState, client, nowSupplier, idxMeta.getIndex()); + return currentState; } @Override @@ -154,6 +163,28 @@ public class IndexLifecycleService extends AbstractComponent }); } + /** + * executes the given step, and then all proceeding steps, until it is necessary to exit the + * cluster-state thread and let any wait-condition or asynchronous action progress externally + * + * TODO(colin): should steps execute themselves and execute `nextStep` internally? + * + * @param startStep The current step that has either not been executed, or not completed before + * @return the new ClusterState + */ + private ClusterState executeStepUntilAsync(Step startStep, ClusterState currentState, Client client, LongSupplier nowSupplier, Index index) { + StepResult result = startStep.execute(clusterService, currentState, index, client, nowSupplier); + while (result.isComplete() && result.indexSurvived() && startStep.hasNextStep()) { + currentState = result.getClusterState(); + startStep = startStep.getNextStep(); + result = startStep.execute(clusterService, currentState, index, client, nowSupplier); + } + if (result.isComplete()) { + currentState = result.getClusterState(); + } + return currentState; + } + @Override public void triggered(SchedulerEngine.Event event) { if (event.getJobName().equals(IndexLifecycle.NAME)) { diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java new file mode 100644 index 00000000000..b943b86984b --- /dev/null +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.indexlifecycle; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.DiffableUtils; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; +import org.elasticsearch.xpack.core.indexlifecycle.Step; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +public class PolicyStepsRegistry { + + + // keeps track of existing policies in the cluster state + SortedMap lifecyclePolicyMap; + // keeps track of what the first step in a policy is + Map firstStepMap; + // keeps track of a mapping from step-name to respective Step + Map stepMap; + + public PolicyStepsRegistry() { + this.lifecyclePolicyMap = new TreeMap<>(); + this.firstStepMap = new HashMap<>(); + this.stepMap = new HashMap<>(); + } + + @SuppressWarnings("unchecked") + public void update(ClusterState currentState) { + IndexLifecycleMetadata meta = currentState.metaData().custom(IndexLifecycleMetadata.TYPE); + Diff> diff = DiffableUtils.diff(lifecyclePolicyMap, meta.getPolicies(), DiffableUtils.getStringKeySerializer()); + DiffableUtils.MapDiff> mapDiff = (DiffableUtils.MapDiff) diff; + if (mapDiff.getUpserts().isEmpty() == false) { + for (LifecyclePolicy policy : mapDiff.getUpserts().values()) { + lifecyclePolicyMap.put(policy.getName(), policy); + List policyAsSteps = policy.toSteps(); + if (policyAsSteps.isEmpty() == false) { + firstStepMap.put(policy.getName(), policyAsSteps.get(0)); + for (Step step : policyAsSteps) { + stepMap.put(new StepKey(step.getPhase(), step.getAction(), step.getName()), step); + } + } + } + } + + for (String deletedPolicyName : mapDiff.getDeletes()) { + LifecyclePolicy policy = lifecyclePolicyMap.remove(deletedPolicyName); + Step next = firstStepMap.remove(deletedPolicyName); + while (next.hasNextStep()) { + next = stepMap.remove(next.getNextStep()); + } + } + } + + public Lifecycle + + /** + * returns the {@link Step} that matches the name and + * policy specified. This is used by {@link ClusterState} + * readers that know the current policy and step by name + * as String values in the cluster state. + * @param policy the policy from which to fetch the associated steps from + * @param phase the phase the requested step is run in + * @param action the action the requested step is run in + * @param name the name of the requested step + * @return + */ + public Step getStep(String policy, @Nullable String phase, @Nullable String action, @Nullable String name) { + Step step = stepMap.get(new StepKey(phase, action, name)); + if (step == null) { + step = firstStepMap.get(policy); + } + return step; + } + + + + @Override + public int hashCode() { + return Objects.hash(lifecyclePolicyMap, firstStepMap, stepMap); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + PolicyStepsRegistry other = (PolicyStepsRegistry) obj; + return Objects.equals(lifecyclePolicyMap, other.lifecyclePolicyMap) + && Objects.equals(firstStepMap, other.firstStepMap) && Objects.equals(stepMap, other.stepMap); + } + + public class StepKey { + private final String phase; + + private final String action; + private final String name; + + public StepKey(String phase, String action, String name) { + this.phase = phase; + this.action = action; + this.name = name; + } + + public String getPhase() { + return phase; + } + + public String getAction() { + return action; + } + + public String getName() { + return name; + } + + @Override + public int hashCode() { + return Objects.hash(phase, action, name); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + StepKey other = (StepKey) obj; + return Objects.equals(phase, other.phase) && Objects.equals(action, other.action) && Objects.equals(name, other.name); + } + } +} diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java index 3a94fea8a36..20c09224caa 100644 --- a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java @@ -127,35 +127,35 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase 0L; - Client client = mock(Client.class); - Step phaseAfterStep = new PhaseAfterStep(threadPool, 0L, nowSupplier, - TimeValue.timeValueSeconds(0L), "name-0", "index", "phase", "mock_action"); - Step updateStep = new ClusterStateUpdateStep("name-1", "index", "phase", "mock_action", - (state) -> state); - Step waitStep = new ConditionalWaitStep("name-2", "index", "phase", "mock_action", - (state) -> true); - indexName = randomAlphaOfLengthBetween(1, 20); - lifecycleName = randomAlphaOfLengthBetween(1, 20); - Map phases = new LinkedHashMap<>(); - firstAction = new MockAction(Arrays.asList(phaseAfterStep, updateStep, waitStep)); - Map actions = Collections.singletonMap(MockAction.NAME, firstAction); - firstPhase = new Phase("phase", null, actions); - phases.put(firstPhase.getName(), firstPhase); - policy = new LifecyclePolicy(TestLifecycleType.INSTANCE, lifecycleName, null); - - List phaseSteps = firstPhase.toSteps(new Index(indexName, indexName), 0L, - client, threadPool, nowSupplier); - - ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) - .metaData(MetaData.builder().put( - IndexMetaData.builder("index") - .settings(settings(Version.CURRENT)) - .numberOfShards(1).numberOfReplicas(1))).build(); - - StepResult result = policy.execute(phaseSteps, clusterState, clusterState.metaData().index("index"), client, nowSupplier); - - threadPool.shutdown(); +// ThreadPool threadPool = new TestThreadPool("test"); +// LongSupplier nowSupplier = () -> 0L; +// Client client = mock(Client.class); +// Step phaseAfterStep = new PhaseAfterStep(threadPool, 0L, nowSupplier, +// TimeValue.timeValueSeconds(0L), "name-0", "index", "phase", "mock_action"); +// Step updateStep = new ClusterStateUpdateStep("name-1", "index", "phase", "mock_action", +// (state) -> state); +// Step waitStep = new ConditionalWaitStep("name-2", "index", "phase", "mock_action", +// (state) -> true); +// indexName = randomAlphaOfLengthBetween(1, 20); +// lifecycleName = randomAlphaOfLengthBetween(1, 20); +// Map phases = new LinkedHashMap<>(); +// firstAction = new MockAction(Arrays.asList(phaseAfterStep, updateStep, waitStep)); +// Map actions = Collections.singletonMap(MockAction.NAME, firstAction); +// firstPhase = new Phase("phase", null, actions); +// phases.put(firstPhase.getName(), firstPhase); +// policy = new LifecyclePolicy(TestLifecycleType.INSTANCE, lifecycleName, null); +// +// List phaseSteps = firstPhase.toSteps(new Index(indexName, indexName), 0L, +// client, threadPool, nowSupplier); +// +// ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) +// .metaData(MetaData.builder().put( +// IndexMetaData.builder("index") +// .settings(settings(Version.CURRENT)) +// .numberOfShards(1).numberOfReplicas(1))).build(); +// +// StepResult result = policy.execute(phaseSteps, clusterState, clusterState.metaData().index("index"), client, nowSupplier); +// +// threadPool.shutdown(); } }