From f429fc0b3ed9c651d1b3477b4aaa13911c34a294 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Mon, 26 Mar 2018 18:48:59 -0700 Subject: [PATCH] begin making sense of types --- .../indexlifecycle/ConditionalWaitStep.java | 10 +- .../core/indexlifecycle/LifecycleAction.java | 2 +- .../core/indexlifecycle/LifecyclePolicy.java | 33 ++-- .../core/indexlifecycle/LifecycleType.java | 14 +- .../xpack/core/indexlifecycle/Phase.java | 10 +- .../TimeseriesLifecycleType.java | 149 +++--------------- 6 files changed, 44 insertions(+), 174 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ConditionalWaitStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ConditionalWaitStep.java index ac057aec7a9..25d2e666c2f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ConditionalWaitStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ConditionalWaitStep.java @@ -5,22 +5,26 @@ */ package org.elasticsearch.xpack.core.indexlifecycle; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.index.Index; import org.elasticsearch.xpack.core.indexlifecycle.Step; import org.elasticsearch.xpack.core.indexlifecycle.StepResult; import java.util.function.Function; +import java.util.function.LongSupplier; public class ConditionalWaitStep extends Step { private final Function condition; - public ConditionalWaitStep(String name, String index, String phase, String action, Function condition) { - super(name, action, phase, index); + public ConditionalWaitStep(String name, String phase, String action, Step nextStep, Function condition) { + super(name, action, phase, nextStep); this.condition = condition; } @Override - public StepResult execute(ClusterState currentState) { + public StepResult execute(ClusterService clusterService, ClusterState currentState, Index index, Client client, LongSupplier nowSupplier) { boolean isComplete = condition.apply(currentState); return new StepResult(String.valueOf(isComplete), null, currentState, true, isComplete); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleAction.java index ed346114ca3..931a6f58a2d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleAction.java @@ -20,7 +20,7 @@ import java.util.function.LongSupplier; */ public interface LifecycleAction extends ToXContentObject, NamedWriteable { - List toSteps(String phase, Index index, Client client, ThreadPool threadPool, LongSupplier nowSupplier); + List toSteps(String phase); default boolean indexSurvives() { return true; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java index 2f50b1d5628..a48d6808388 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -85,8 +86,7 @@ public class LifecyclePolicy extends AbstractDiffable } this.name = name; this.phases = phases; - // TODO(talevy): return validation - //this.type.validate(phases.values()); + this.type.validate(phases.values()); } /** @@ -145,8 +145,14 @@ public class LifecyclePolicy extends AbstractDiffable } public List toSteps() { - // TODO(talevy): make real with types - return Collections.emptyList(); + List steps = new ArrayList<>(); + for (Phase phase : type.getOrderedPhases(phases)) { + for (LifecycleAction action : type.getOrderedActions(phase)) { + // TODO(talevy): correctly set `nextStep` between actions and phases + steps.addAll(action.toSteps(phase.getName())); + } + } + return steps; } @Override @@ -171,23 +177,4 @@ public class LifecyclePolicy extends AbstractDiffable public String toString() { return Strings.toString(this, true, true); } - - /** - * Reference to a method that determines which {@link LifecycleAction} to - * execute next after a specific action. - * - *

- * Concrete {@link LifecyclePolicy} classes will implement this to help - * determine their specific ordering of actions for the phases they allow. - */ - @FunctionalInterface - interface NextActionProvider { - - /** - * @param current - * The current action which is being or was executed - * @return the action following {@code current} to execute - */ - LifecycleAction next(LifecycleAction current); - } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleType.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleType.java index f7b48e07fd3..d7961fddbf3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleType.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleType.java @@ -9,6 +9,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.NamedWriteable; import java.util.Collection; +import java.util.List; import java.util.Map; public interface LifecycleType extends NamedWriteable { @@ -16,17 +17,10 @@ public interface LifecycleType extends NamedWriteable { /** * @return the first phase of this policy to execute */ - Phase getFirstPhase(Map phases); + List getOrderedPhases(Map phases); + + List getOrderedActions(Phase phase); - /** - * @param currentPhase - * the current phase that is or was just executed - * @return the next phase after currentPhase to be execute. If - * it is `null`, the first phase to be executed is returned. If it - * is the last phase, then no next phase is to be executed and - * `null` is returned. - */ - Phase nextPhase(Map phases, @Nullable Phase currentPhase); /** * validates whether the specified phases are valid for this diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Phase.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Phase.java index 9bb6fdb0a48..64c7b6fd0bf 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Phase.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/Phase.java @@ -76,7 +76,7 @@ public class Phase implements ToXContentObject, Writeable { * a {@link Map} of the {@link LifecycleAction}s to run when * during his {@link Phase}. The keys in this map are the associated * action names. The order of these actions is defined - * by the {@link LifecyclePolicy.NextActionProvider}. + * by the {@link LifecycleType} */ public Phase(String name, TimeValue after, Map actions) { this.name = name; @@ -136,14 +136,6 @@ public class Phase implements ToXContentObject, Writeable { return actions; } - public List toSteps(Index index, long indexLifecycleCreationDate, Client client, ThreadPool threadPool, LongSupplier nowSupplier) { - // TODO(talevy) phase needs to know indexLifecycleStartTime - PhaseAfterStep phaseAfterStep = new PhaseAfterStep( - "phase_start", index.getName(), getName(),after, null); - return Stream.concat(Stream.of(phaseAfterStep), actions.values().stream() - .flatMap(a -> a.toSteps(name, index, client, threadPool, nowSupplier).stream())).collect(Collectors.toList()); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/TimeseriesLifecycleType.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/TimeseriesLifecycleType.java index e78d1e1731d..5cb2941790a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/TimeseriesLifecycleType.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/TimeseriesLifecycleType.java @@ -12,12 +12,14 @@ import org.elasticsearch.common.util.set.Sets; import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -56,139 +58,30 @@ public class TimeseriesLifecycleType implements LifecycleType { return TYPE; } - @Override - public Phase getFirstPhase(Map phases) { - Phase firstPhase = phases.get("hot"); - if (firstPhase == null) { - firstPhase = phases.get("warm"); - } - if (firstPhase == null) { - firstPhase = phases.get("cold"); - } - if (firstPhase == null) { - firstPhase = phases.get("delete"); - } - return firstPhase; + public List getOrderedPhases(Map phases) { + return VALID_PHASES.stream().map(p -> phases.getOrDefault(p, null)).collect(Collectors.toList()); } - @Override - public Phase nextPhase(Map phases, @Nullable Phase currentPhase) { - if (currentPhase == null) { - return getFirstPhase(phases); + public List getOrderedActions(Phase phase) { + Map actions = phase.getActions(); + switch (phase.getName()) { + case "hot": + return Stream.of(RolloverAction.NAME).map(a -> actions.getOrDefault(a, null)) + .filter(Objects::nonNull).collect(Collectors.toList()); + case "warm": + return Stream.of(AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME, ReplicasAction.NAME) + .map(a -> actions.getOrDefault(a, null)).filter(Objects::nonNull).collect(Collectors.toList()); + case "cold": + return Stream.of(ReplicasAction.NAME, AllocateAction.NAME).map(a -> actions.getOrDefault(a, null)) + .filter(Objects::nonNull).collect(Collectors.toList()); + case "delete": + return Stream.of(DeleteAction.NAME).map(a -> actions.getOrDefault(a, null)) + .filter(Objects::nonNull).collect(Collectors.toList()); + default: + return Collections.emptyList(); } - - // VALID_PHASES is in order of execution - boolean readyToSetNext = false; - for (String phaseName : VALID_PHASES) { - if (readyToSetNext && phases.containsKey(phaseName)) { - return phases.get(phaseName); - } - if (phaseName.equals(currentPhase.getName())) { - readyToSetNext = true; - } - } - - return null; } -// /** -// * This action provider returns an ordering for the actions within each of the four timeseries phases. -// * Hot Phase: -// * The Hot Phase only supports the {@link RolloverAction} and so that is the only action to order -// * Warm Phase: -// * The Warm Phase executes the supported actions in a slightly complicated order for the sake of -// * optimization. Assuming the {@link ReplicasAction} is specified, it will run first or last depending -// * on whether it increases, decreases, or keeps the existing replica count. If number-of-replicas is -// * kept the same, or reduced, then {@link ReplicasAction} is executed first, otherwise, it is last. -// * So the ordering looks something like this: -// * - {@link ReplicasAction} (if action.number_of_replicas lte idxMeta.number_of_replicas) -// * - {@link AllocateAction} -// * - {@link ShrinkAction} -// * - {@link ForceMergeAction} -// * - {@link ReplicasAction} (if action.number_of_replicas gt idxMeta.number_of_replicas) -// * -// * NORELEASE: there may exist further optimizations to this when {@link ShrinkAction} is specified. -// * -// * @param context the index lifecycle context for this phase at the time of execution -// * @param phase the current phase for which to provide an action provider -// * @return the {@link LifecyclePolicy.NextActionProvider} for {@code phase}. -// */ -// @Override -// public LifecyclePolicy.NextActionProvider getActionProvider(IndexLifecycleContext context, Phase phase) { -// Map actions = phase.getActions(); -// switch (phase.getName()) { -// case "hot": -// // The hot-phase only has one action, either start with it, or return null. Simple as that! -// return (action) -> (action == null) ? actions.getOrDefault(RolloverAction.NAME, null) : null; -// case "warm": -// return (action) -> { -// ReplicasAction replicasAction = (ReplicasAction) actions.getOrDefault(ReplicasAction.NAME, null); -// boolean replicaActionFirst = replicasAction != null -// && replicasAction.getNumberOfReplicas() <= context.getNumberOfReplicas(); -// if (action == null) { -// if (replicaActionFirst) { -// return replicasAction; -// } -// return Stream.of(AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME, ReplicasAction.NAME) -// .map(a -> actions.getOrDefault(a, null)) -// .filter(Objects::nonNull).findFirst().orElse(null); -// } else if (action instanceof ReplicasAction) { -// if (replicaActionFirst) { -// return Stream.of(AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME) -// .map(a -> actions.getOrDefault(a, null)) -// .filter(Objects::nonNull).findFirst().orElse(null); -// } -// } else if (action instanceof AllocateAction) { -// return Stream.of(ShrinkAction.NAME, ForceMergeAction.NAME, ReplicasAction.NAME) -// .map(a -> actions.getOrDefault(a, null)) -// .filter(Objects::nonNull).findFirst().orElse(null); -// } else if (action instanceof ShrinkAction) { -// return Stream.of(ForceMergeAction.NAME, ReplicasAction.NAME) -// .map(a -> actions.getOrDefault(a, null)) -// .filter(Objects::nonNull).findFirst().orElse(null); -// } else if (action instanceof ForceMergeAction) { -// if (replicaActionFirst == false) { -// return replicasAction; -// } -// } -// return null; -// }; -// case "cold": -// return (action) -> { -// ReplicasAction replicasAction = (ReplicasAction) actions.getOrDefault(ReplicasAction.NAME, null); -// LifecycleAction allocateAction = actions.getOrDefault(AllocateAction.NAME, null); -// boolean replicaActionFirst = replicasAction != null -// && replicasAction.getNumberOfReplicas() <= context.getNumberOfReplicas(); -// if (action == null) { -// if (replicaActionFirst) { -// return replicasAction; -// } else if (allocateAction != null) { -// return allocateAction; -// } -// return replicasAction; -// } else if (action instanceof ReplicasAction) { -// if (replicaActionFirst) { -// return allocateAction; -// } -// } else if (action instanceof AllocateAction) { -// if (replicaActionFirst == false) { -// return replicasAction; -// } -// } -// return null; -// }; -// case "delete": -// return (action) -> { -// if (action == null) { -// return actions.getOrDefault(DeleteAction.NAME, null); -// } -// return null; -// }; -// default: -// throw new IllegalArgumentException("phase [" + phase.getName() + "] is invalid for policy [timeseries]"); -// } -// } - @Override public void validate(Collection phases) { Set allowedPhases = new HashSet<>(VALID_PHASES);