begin making sense of types

This commit is contained in:
Tal Levy 2018-03-26 18:48:59 -07:00
parent 083c563cf6
commit f429fc0b3e
6 changed files with 44 additions and 174 deletions

View File

@ -5,22 +5,26 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
import org.elasticsearch.xpack.core.indexlifecycle.StepResult;
import java.util.function.Function;
import java.util.function.LongSupplier;
public class ConditionalWaitStep extends Step {
private final Function<ClusterState, Boolean> condition;
public ConditionalWaitStep(String name, String index, String phase, String action, Function<ClusterState, Boolean> condition) {
super(name, action, phase, index);
public ConditionalWaitStep(String name, String phase, String action, Step nextStep, Function<ClusterState, Boolean> condition) {
super(name, action, phase, nextStep);
this.condition = condition;
}
@Override
public StepResult execute(ClusterState currentState) {
public StepResult execute(ClusterService clusterService, ClusterState currentState, Index index, Client client, LongSupplier nowSupplier) {
boolean isComplete = condition.apply(currentState);
return new StepResult(String.valueOf(isComplete), null, currentState, true, isComplete);
}

View File

@ -20,7 +20,7 @@ import java.util.function.LongSupplier;
*/
public interface LifecycleAction extends ToXContentObject, NamedWriteable {
List<Step> toSteps(String phase, Index index, Client client, ThreadPool threadPool, LongSupplier nowSupplier);
List<Step> toSteps(String phase);
default boolean indexSurvives() {
return true;

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -85,8 +86,7 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
}
this.name = name;
this.phases = phases;
// TODO(talevy): return validation
//this.type.validate(phases.values());
this.type.validate(phases.values());
}
/**
@ -145,8 +145,14 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
}
public List<Step> toSteps() {
// TODO(talevy): make real with types
return Collections.emptyList();
List<Step> steps = new ArrayList<>();
for (Phase phase : type.getOrderedPhases(phases)) {
for (LifecycleAction action : type.getOrderedActions(phase)) {
// TODO(talevy): correctly set `nextStep` between actions and phases
steps.addAll(action.toSteps(phase.getName()));
}
}
return steps;
}
@Override
@ -171,23 +177,4 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
public String toString() {
return Strings.toString(this, true, true);
}
/**
* Reference to a method that determines which {@link LifecycleAction} to
* execute next after a specific action.
*
* <p>
* Concrete {@link LifecyclePolicy} classes will implement this to help
* determine their specific ordering of actions for the phases they allow.
*/
@FunctionalInterface
interface NextActionProvider {
/**
* @param current
* The current action which is being or was executed
* @return the action following {@code current} to execute
*/
LifecycleAction next(LifecycleAction current);
}
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteable;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public interface LifecycleType extends NamedWriteable {
@ -16,17 +17,10 @@ public interface LifecycleType extends NamedWriteable {
/**
* @return the first phase of this policy to execute
*/
Phase getFirstPhase(Map<String, Phase> phases);
List<Phase> getOrderedPhases(Map<String, Phase> phases);
List<LifecycleAction> getOrderedActions(Phase phase);
/**
* @param currentPhase
* the current phase that is or was just executed
* @return the next phase after <code>currentPhase</code> to be execute. If
* it is `null`, the first phase to be executed is returned. If it
* is the last phase, then no next phase is to be executed and
* `null` is returned.
*/
Phase nextPhase(Map<String, Phase> phases, @Nullable Phase currentPhase);
/**
* validates whether the specified <code>phases</code> are valid for this

View File

@ -76,7 +76,7 @@ public class Phase implements ToXContentObject, Writeable {
* a {@link Map} of the {@link LifecycleAction}s to run when
* during his {@link Phase}. The keys in this map are the associated
* action names. The order of these actions is defined
* by the {@link LifecyclePolicy.NextActionProvider}.
* by the {@link LifecycleType}
*/
public Phase(String name, TimeValue after, Map<String, LifecycleAction> actions) {
this.name = name;
@ -136,14 +136,6 @@ public class Phase implements ToXContentObject, Writeable {
return actions;
}
public List<Step> toSteps(Index index, long indexLifecycleCreationDate, Client client, ThreadPool threadPool, LongSupplier nowSupplier) {
// TODO(talevy) phase needs to know indexLifecycleStartTime
PhaseAfterStep phaseAfterStep = new PhaseAfterStep(
"phase_start", index.getName(), getName(),after, null);
return Stream.concat(Stream.of(phaseAfterStep), actions.values().stream()
.flatMap(a -> a.toSteps(name, index, client, threadPool, nowSupplier).stream())).collect(Collectors.toList());
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();

View File

@ -12,12 +12,14 @@ import org.elasticsearch.common.util.set.Sets;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@ -56,139 +58,30 @@ public class TimeseriesLifecycleType implements LifecycleType {
return TYPE;
}
@Override
public Phase getFirstPhase(Map<String, Phase> phases) {
Phase firstPhase = phases.get("hot");
if (firstPhase == null) {
firstPhase = phases.get("warm");
}
if (firstPhase == null) {
firstPhase = phases.get("cold");
}
if (firstPhase == null) {
firstPhase = phases.get("delete");
}
return firstPhase;
public List<Phase> getOrderedPhases(Map<String, Phase> phases) {
return VALID_PHASES.stream().map(p -> phases.getOrDefault(p, null)).collect(Collectors.toList());
}
@Override
public Phase nextPhase(Map<String, Phase> phases, @Nullable Phase currentPhase) {
if (currentPhase == null) {
return getFirstPhase(phases);
}
// VALID_PHASES is in order of execution
boolean readyToSetNext = false;
for (String phaseName : VALID_PHASES) {
if (readyToSetNext && phases.containsKey(phaseName)) {
return phases.get(phaseName);
}
if (phaseName.equals(currentPhase.getName())) {
readyToSetNext = true;
public List<LifecycleAction> getOrderedActions(Phase phase) {
Map<String, LifecycleAction> actions = phase.getActions();
switch (phase.getName()) {
case "hot":
return Stream.of(RolloverAction.NAME).map(a -> actions.getOrDefault(a, null))
.filter(Objects::nonNull).collect(Collectors.toList());
case "warm":
return Stream.of(AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME, ReplicasAction.NAME)
.map(a -> actions.getOrDefault(a, null)).filter(Objects::nonNull).collect(Collectors.toList());
case "cold":
return Stream.of(ReplicasAction.NAME, AllocateAction.NAME).map(a -> actions.getOrDefault(a, null))
.filter(Objects::nonNull).collect(Collectors.toList());
case "delete":
return Stream.of(DeleteAction.NAME).map(a -> actions.getOrDefault(a, null))
.filter(Objects::nonNull).collect(Collectors.toList());
default:
return Collections.emptyList();
}
}
return null;
}
// /**
// * This action provider returns an ordering for the actions within each of the four timeseries phases.
// * Hot Phase:
// * The Hot Phase only supports the {@link RolloverAction} and so that is the only action to order
// * Warm Phase:
// * The Warm Phase executes the supported actions in a slightly complicated order for the sake of
// * optimization. Assuming the {@link ReplicasAction} is specified, it will run first or last depending
// * on whether it increases, decreases, or keeps the existing replica count. If number-of-replicas is
// * kept the same, or reduced, then {@link ReplicasAction} is executed first, otherwise, it is last.
// * So the ordering looks something like this:
// * - {@link ReplicasAction} (if action.number_of_replicas lte idxMeta.number_of_replicas)
// * - {@link AllocateAction}
// * - {@link ShrinkAction}
// * - {@link ForceMergeAction}
// * - {@link ReplicasAction} (if action.number_of_replicas gt idxMeta.number_of_replicas)
// *
// * NORELEASE: there may exist further optimizations to this when {@link ShrinkAction} is specified.
// *
// * @param context the index lifecycle context for this phase at the time of execution
// * @param phase the current phase for which to provide an action provider
// * @return the {@link LifecyclePolicy.NextActionProvider} for {@code phase}.
// */
// @Override
// public LifecyclePolicy.NextActionProvider getActionProvider(IndexLifecycleContext context, Phase phase) {
// Map<String, LifecycleAction> actions = phase.getActions();
// switch (phase.getName()) {
// case "hot":
// // The hot-phase only has one action, either start with it, or return null. Simple as that!
// return (action) -> (action == null) ? actions.getOrDefault(RolloverAction.NAME, null) : null;
// case "warm":
// return (action) -> {
// ReplicasAction replicasAction = (ReplicasAction) actions.getOrDefault(ReplicasAction.NAME, null);
// boolean replicaActionFirst = replicasAction != null
// && replicasAction.getNumberOfReplicas() <= context.getNumberOfReplicas();
// if (action == null) {
// if (replicaActionFirst) {
// return replicasAction;
// }
// return Stream.of(AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME, ReplicasAction.NAME)
// .map(a -> actions.getOrDefault(a, null))
// .filter(Objects::nonNull).findFirst().orElse(null);
// } else if (action instanceof ReplicasAction) {
// if (replicaActionFirst) {
// return Stream.of(AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME)
// .map(a -> actions.getOrDefault(a, null))
// .filter(Objects::nonNull).findFirst().orElse(null);
// }
// } else if (action instanceof AllocateAction) {
// return Stream.of(ShrinkAction.NAME, ForceMergeAction.NAME, ReplicasAction.NAME)
// .map(a -> actions.getOrDefault(a, null))
// .filter(Objects::nonNull).findFirst().orElse(null);
// } else if (action instanceof ShrinkAction) {
// return Stream.of(ForceMergeAction.NAME, ReplicasAction.NAME)
// .map(a -> actions.getOrDefault(a, null))
// .filter(Objects::nonNull).findFirst().orElse(null);
// } else if (action instanceof ForceMergeAction) {
// if (replicaActionFirst == false) {
// return replicasAction;
// }
// }
// return null;
// };
// case "cold":
// return (action) -> {
// ReplicasAction replicasAction = (ReplicasAction) actions.getOrDefault(ReplicasAction.NAME, null);
// LifecycleAction allocateAction = actions.getOrDefault(AllocateAction.NAME, null);
// boolean replicaActionFirst = replicasAction != null
// && replicasAction.getNumberOfReplicas() <= context.getNumberOfReplicas();
// if (action == null) {
// if (replicaActionFirst) {
// return replicasAction;
// } else if (allocateAction != null) {
// return allocateAction;
// }
// return replicasAction;
// } else if (action instanceof ReplicasAction) {
// if (replicaActionFirst) {
// return allocateAction;
// }
// } else if (action instanceof AllocateAction) {
// if (replicaActionFirst == false) {
// return replicasAction;
// }
// }
// return null;
// };
// case "delete":
// return (action) -> {
// if (action == null) {
// return actions.getOrDefault(DeleteAction.NAME, null);
// }
// return null;
// };
// default:
// throw new IllegalArgumentException("phase [" + phase.getName() + "] is invalid for policy [timeseries]");
// }
// }
@Override
public void validate(Collection<Phase> phases) {
Set<String> allowedPhases = new HashSet<>(VALID_PHASES);