initial attempt towards an ordered execution plan for timeseries phases (#3248)

* initial attempt towards an ordered execution plan for timeseries phases

* respond to feedback and fix bug
This commit is contained in:
Tal Levy 2017-12-07 11:59:31 -08:00 committed by GitHub
parent ae7e4882e4
commit 288515ddc5
20 changed files with 572 additions and 435 deletions

View File

@ -48,6 +48,11 @@ public interface IndexLifecycleContext {
*/ */
String getLifecycleTarget(); String getLifecycleTarget();
/**
* @return the current index context's replica count.
*/
int getNumberOfReplicas();
/** /**
* Determines whether the target is able to move to the provided * Determines whether the target is able to move to the provided
* {@link Phase} * {@link Phase}

View File

@ -20,6 +20,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.indexlifecycle.ObjectParserUtils;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
@ -30,6 +31,7 @@ import java.util.Objects;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
public class IndexLifecycleMetadata implements MetaData.Custom { public class IndexLifecycleMetadata implements MetaData.Custom {
public static final String TYPE = "index_lifecycle"; public static final String TYPE = "index_lifecycle";
public static final ParseField POLICIES_FIELD = new ParseField("policies"); public static final ParseField POLICIES_FIELD = new ParseField("policies");
@ -38,7 +40,8 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
public static final IndexLifecycleMetadata EMPTY_METADATA = new IndexLifecycleMetadata(Collections.emptySortedMap(), 3); public static final IndexLifecycleMetadata EMPTY_METADATA = new IndexLifecycleMetadata(Collections.emptySortedMap(), 3);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static final ConstructingObjectParser<IndexLifecycleMetadata, NamedXContentRegistry> PARSER = new ConstructingObjectParser<>( public static final ConstructingObjectParser<IndexLifecycleMetadata, NamedXContentRegistry> PARSER = new ConstructingObjectParser<>(
TYPE, a -> new IndexLifecycleMetadata(convertListToMapValues((List<LifecyclePolicy>) a[0]), (long) a[1])); TYPE, a -> new IndexLifecycleMetadata(
ObjectParserUtils.convertListToMapValues(LifecyclePolicy::getName, (List<LifecyclePolicy>) a[0]), (long) a[1]));
static { static {
PARSER.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, n) -> LifecyclePolicy.parse(p, new Tuple<>(n, c)), PARSER.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, n) -> LifecyclePolicy.parse(p, new Tuple<>(n, c)),
v -> { v -> {
@ -95,14 +98,6 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
return builder; return builder;
} }
private static SortedMap<String, LifecyclePolicy> convertListToMapValues(List<LifecyclePolicy> list) {
SortedMap<String, LifecyclePolicy> map = new TreeMap<>();
for (LifecyclePolicy policy : list) {
map.put(policy.getName(), policy);
}
return map;
}
@Override @Override
public Version getMinimalSupportedVersion() { public Version getMinimalSupportedVersion() {
return Version.V_7_0_0_alpha1; return Version.V_7_0_0_alpha1;

View File

@ -67,6 +67,11 @@ public class InternalIndexLifecycleContext implements IndexLifecycleContext {
return idxMeta.getIndex().getName(); return idxMeta.getIndex().getName();
} }
@Override
public int getNumberOfReplicas() {
return idxMeta.getNumberOfReplicas();
}
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
long now = nowSupplier.getAsLong(); long now = nowSupplier.getAsLong();

View File

@ -138,7 +138,7 @@ public abstract class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
public void onSuccess() { public void onSuccess() {
logger.info("Successfully initialised phase [" + nextPhaseName + "] for index [" + indexName + "]"); logger.info("Successfully initialised phase [" + nextPhaseName + "] for index [" + indexName + "]");
// We might as well execute the phase now rather than waiting for execute to be called again // We might as well execute the phase now rather than waiting for execute to be called again
nextPhase.execute(context); nextPhase.execute(context, getActionProvider(context, nextPhase));
} }
@Override @Override
@ -153,7 +153,7 @@ public abstract class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
throw new IllegalStateException("Current phase [" + currentPhaseName + "] not found in lifecycle [" throw new IllegalStateException("Current phase [" + currentPhaseName + "] not found in lifecycle ["
+ getName() + "] for index [" + indexName + "]"); + getName() + "] for index [" + indexName + "]");
} else { } else {
currentPhase.execute(context); currentPhase.execute(context, getActionProvider(context, currentPhase));
} }
} }
} }
@ -208,6 +208,34 @@ public abstract class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
*/ */
protected abstract String getType(); protected abstract String getType();
/**
* @param context the index lifecycle context for this phase at the time of execution
* @param phase the current phase for which to provide an action provider
* @return the action provider
*/
protected abstract NextActionProvider getActionProvider(IndexLifecycleContext context, Phase phase);
/**
* Reference to a method that determines which {@link LifecycleAction} to execute next after
* a specific action.
*
* <p>
* Concrete {@link LifecyclePolicy} classes will implement this to help determine their specific
* ordering of actions for the phases they allow.
* <pre><code>
* </code></pre>
*/
@FunctionalInterface
interface NextActionProvider {
/**
* @param current The current action which is being or was executed
* @return the action following {@code current} to execute
*/
LifecycleAction next(LifecycleAction current);
}
/** /**
* This class is here to assist in creating a context from which the specific LifecyclePolicy sub-classes can inherit * This class is here to assist in creating a context from which the specific LifecyclePolicy sub-classes can inherit
* all the previously parsed values * all the previously parsed values
@ -251,4 +279,5 @@ public abstract class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
return phases; return phases;
} }
} }
} }

View File

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.common.io.stream.NamedWriteable;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Function;
/**
* NOCOMMIT: these utility methods should be brought into the core API for parsing namedWriteables
*/
public class ObjectParserUtils {
public static <V extends NamedWriteable> SortedMap<String, V> convertListToMapValues(Function<V, String> keyFunction,
List<V> list) {
SortedMap<String, V> map = new TreeMap<>();
for (V namedWriteable : list) {
map.put(keyFunction.apply(namedWriteable), namedWriteable);
}
return map;
}
}

View File

@ -24,7 +24,12 @@ import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleContext.Listener;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import static org.elasticsearch.xpack.indexlifecycle.ObjectParserUtils.convertListToMapValues;
/** /**
* Represents set of {@link LifecycleAction}s which should be executed at a * Represents set of {@link LifecycleAction}s which should be executed at a
@ -40,7 +45,8 @@ public class Phase implements ToXContentObject, Writeable {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private static final ConstructingObjectParser<Phase, Tuple<String, NamedXContentRegistry>> PARSER = new ConstructingObjectParser<>( private static final ConstructingObjectParser<Phase, Tuple<String, NamedXContentRegistry>> PARSER = new ConstructingObjectParser<>(
"phase", false, (a, c) -> new Phase(c.v1(), (TimeValue) a[0], (List<LifecycleAction>) a[1])); "phase", false, (a, c) -> new Phase(c.v1(), (TimeValue) a[0],
convertListToMapValues(LifecycleAction::getWriteableName, (List<LifecycleAction>) a[1])));
static { static {
PARSER.declareField(ConstructingObjectParser.constructorArg(), PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), AFTER_FIELD.getPreferredName()), AFTER_FIELD, ValueType.VALUE); (p, c) -> TimeValue.parseTimeValue(p.text(), AFTER_FIELD.getPreferredName()), AFTER_FIELD, ValueType.VALUE);
@ -55,7 +61,7 @@ public class Phase implements ToXContentObject, Writeable {
} }
private String name; private String name;
private List<LifecycleAction> actions; private Map<String, LifecycleAction> actions;
private TimeValue after; private TimeValue after;
/** /**
@ -65,11 +71,12 @@ public class Phase implements ToXContentObject, Writeable {
* the age of the index when the index should move to this * the age of the index when the index should move to this
* {@link Phase}. * {@link Phase}.
* @param actions * @param actions
* a {@link List} of the {@link LifecycleAction}s to run when * a {@link Map} of the {@link LifecycleAction}s to run when
* during his {@link Phase}. The order of this list defines the * during his {@link Phase}. The keys in this map are the associated
* order in which the {@link LifecycleAction}s will be run. * action names. The order of these actions is defined
* by the {@link LifecyclePolicy.NextActionProvider}.
*/ */
public Phase(String name, TimeValue after, List<LifecycleAction> actions) { public Phase(String name, TimeValue after, Map<String, LifecycleAction> actions) {
this.name = name; this.name = name;
this.after = after; this.after = after;
this.actions = actions; this.actions = actions;
@ -81,14 +88,23 @@ public class Phase implements ToXContentObject, Writeable {
public Phase(StreamInput in) throws IOException { public Phase(StreamInput in) throws IOException {
this.name = in.readString(); this.name = in.readString();
this.after = new TimeValue(in); this.after = new TimeValue(in);
this.actions = in.readNamedWriteableList(LifecycleAction.class); int size = in.readVInt();
TreeMap<String, LifecycleAction> actions = new TreeMap<>();
for (int i = 0; i < size; i++) {
actions.put(in.readString(), in.readNamedWriteable(LifecycleAction.class));
}
this.actions = actions;
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeString(name); out.writeString(name);
after.writeTo(out); after.writeTo(out);
out.writeNamedWriteableList(actions); out.writeVInt(actions.size());
for (Map.Entry<String, LifecycleAction> entry : actions.entrySet()) {
out.writeString(entry.getKey());
out.writeNamedWriteable(entry.getValue());
}
} }
/** /**
@ -107,11 +123,10 @@ public class Phase implements ToXContentObject, Writeable {
} }
/** /**
* @return a {@link List} of the {@link LifecycleAction}s to run when during * @return a {@link Map} of the {@link LifecycleAction}s to run when during
* his {@link Phase}. The order of this list defines the order in * his {@link Phase}.
* which the {@link LifecycleAction}s will be run.
*/ */
public List<LifecycleAction> getActions() { public Map<String, LifecycleAction> getActions() {
return actions; return actions;
} }
@ -122,8 +137,10 @@ public class Phase implements ToXContentObject, Writeable {
* @param context * @param context
* the {@link IndexLifecycleContext} to use to execute the * the {@link IndexLifecycleContext} to use to execute the
* {@link Phase}. * {@link Phase}.
* @param nextActionProvider
* the next action provider
*/ */
protected void execute(IndexLifecycleContext context) { protected void execute(IndexLifecycleContext context, LifecyclePolicy.NextActionProvider nextActionProvider) {
String currentActionName = context.getAction(); String currentActionName = context.getAction();
String indexName = context.getLifecycleTarget(); String indexName = context.getLifecycleTarget();
if (Strings.isNullOrEmpty(currentActionName)) { if (Strings.isNullOrEmpty(currentActionName)) {
@ -135,7 +152,7 @@ public class Phase implements ToXContentObject, Writeable {
firstAction = null; firstAction = null;
firstActionName = PHASE_COMPLETED; firstActionName = PHASE_COMPLETED;
} else { } else {
firstAction = actions.get(0); firstAction = nextActionProvider.next(null);
firstActionName = firstAction.getWriteableName(); firstActionName = firstAction.getWriteableName();
} }
// Set the action on the context to this first action so we know where we are next time we execute // Set the action on the context to this first action so we know where we are next time we execute
@ -146,7 +163,7 @@ public class Phase implements ToXContentObject, Writeable {
logger.info("Successfully initialised action [" + firstActionName + "] for index [" + indexName + "]"); logger.info("Successfully initialised action [" + firstActionName + "] for index [" + indexName + "]");
// Now execute the action unless its PHASE_COMPLETED // Now execute the action unless its PHASE_COMPLETED
if (firstActionName.equals(PHASE_COMPLETED) == false) { if (firstActionName.equals(PHASE_COMPLETED) == false) {
executeAction(context, indexName, 0, firstAction); executeAction(context, indexName, firstAction, nextActionProvider);
} }
} }
@ -159,24 +176,19 @@ public class Phase implements ToXContentObject, Writeable {
}); });
} else if (currentActionName.equals(PHASE_COMPLETED) == false) { } else if (currentActionName.equals(PHASE_COMPLETED) == false) {
// We have an action name and its not PHASE COMPLETED so we need to execute the action // We have an action name and its not PHASE COMPLETED so we need to execute the action
int currentActionIndex = -1; // First find the action in the actions map.
// First find the action in the actions list. if (actions.containsKey(currentActionName) == false) {
for (int i = 0; i < actions.size(); i++) {
if (actions.get(i).getWriteableName().equals(currentActionName)) {
currentActionIndex = i;
break;
}
}
if (currentActionIndex == -1) {
throw new IllegalStateException("Current action [" + currentActionName + "] not found in phase [" throw new IllegalStateException("Current action [" + currentActionName + "] not found in phase ["
+ getName() + "] for index [" + indexName + "]"); + getName() + "] for index [" + indexName + "]");
} }
LifecycleAction currentAction = actions.get(currentActionIndex); LifecycleAction currentAction = actions.get(currentActionName);
executeAction(context, indexName, currentActionIndex, currentAction); // then execute the action
executeAction(context, indexName, currentAction, nextActionProvider);
} }
} }
private void executeAction(IndexLifecycleContext context, String indexName, int actionIndex, LifecycleAction action) { private void executeAction(IndexLifecycleContext context, String indexName, LifecycleAction action,
LifecyclePolicy.NextActionProvider nextActionProvider) {
String actionName = action.getWriteableName(); String actionName = action.getWriteableName();
context.executeAction(action, new LifecycleAction.Listener() { context.executeAction(action, new LifecycleAction.Listener() {
@ -185,7 +197,7 @@ public class Phase implements ToXContentObject, Writeable {
if (completed) { if (completed) {
logger.info("Action [" + actionName + "] for index [" + indexName + "] complete, moving to next action"); logger.info("Action [" + actionName + "] for index [" + indexName + "] complete, moving to next action");
// Since we completed the current action move to the next action // Since we completed the current action move to the next action
moveToAction(context, indexName, actionIndex + 1); moveToAction(context, indexName, action, nextActionProvider);
} else { } else {
logger.info("Action [" + actionName + "] for index [" + indexName + "] executed sucessfully but is not yet complete"); logger.info("Action [" + actionName + "] for index [" + indexName + "] executed sucessfully but is not yet complete");
} }
@ -198,9 +210,10 @@ public class Phase implements ToXContentObject, Writeable {
}); });
} }
private void moveToAction(IndexLifecycleContext context, String indexName, final int nextActionIndex) { private void moveToAction(IndexLifecycleContext context, String indexName, LifecycleAction currentAction,
if (nextActionIndex < actions.size()) { LifecyclePolicy.NextActionProvider nextActionProvider) {
LifecycleAction nextAction = actions.get(nextActionIndex); LifecycleAction nextAction = nextActionProvider.next(currentAction);
if (nextAction != null) {
context.setAction(nextAction.getWriteableName(), new Listener() { context.setAction(nextAction.getWriteableName(), new Listener() {
@Override @Override
@ -208,7 +221,7 @@ public class Phase implements ToXContentObject, Writeable {
logger.info("Successfully initialised action [" + nextAction.getWriteableName() + "] in phase [" + getName() logger.info("Successfully initialised action [" + nextAction.getWriteableName() + "] in phase [" + getName()
+ "] for index [" + indexName + "]"); + "] for index [" + indexName + "]");
// We might as well execute the new action now rather than waiting for execute to be called again // We might as well execute the new action now rather than waiting for execute to be called again
execute(context); execute(context, nextActionProvider);
} }
@Override @Override
@ -238,11 +251,7 @@ public class Phase implements ToXContentObject, Writeable {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
builder.field(AFTER_FIELD.getPreferredName(), after.seconds() + "s"); // Need a better way to get a parsable format out here builder.field(AFTER_FIELD.getPreferredName(), after.seconds() + "s"); // Need a better way to get a parsable format out here
builder.startObject(ACTIONS_FIELD.getPreferredName()); builder.field(ACTIONS_FIELD.getPreferredName(), actions);
for (LifecycleAction action : actions) {
builder.field(action.getWriteableName(), action);
}
builder.endObject();
builder.endObject(); builder.endObject();
return builder; return builder;
} }

View File

@ -10,16 +10,21 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import java.io.IOException; import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
/** /**
* A {@link LifecycleAction} that changes the number of replicas for the index. * A {@link LifecycleAction} that changes the number of replicas for the index.
@ -28,20 +33,31 @@ public class ReplicasAction implements LifecycleAction {
public static final String NAME = "replicas"; public static final String NAME = "replicas";
private static final Logger logger = ESLoggerFactory.getLogger(ReplicasAction.class); private static final Logger logger = ESLoggerFactory.getLogger(ReplicasAction.class);
private static final ObjectParser<ReplicasAction, Void> PARSER = new ObjectParser<>(NAME, ReplicasAction::new); public static final ParseField NUMBER_OF_REPLICAS_FIELD = new ParseField("number_of_replicas");
private static final ConstructingObjectParser<ReplicasAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
false, a -> new ReplicasAction((Integer) a[0]));
static {
PARSER.declareInt(constructorArg(), NUMBER_OF_REPLICAS_FIELD);
}
private int numberOfReplicas;
public static ReplicasAction parse(XContentParser parser) { public static ReplicasAction parse(XContentParser parser) {
return PARSER.apply(parser, null); return PARSER.apply(parser, null);
} }
public ReplicasAction() { public ReplicasAction(int numberOfReplicas) {
this.numberOfReplicas = numberOfReplicas;
} }
public ReplicasAction(StreamInput in) throws IOException { public ReplicasAction(StreamInput in) throws IOException {
this.numberOfReplicas = in.readVInt();
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(numberOfReplicas);
} }
@Override @Override
@ -52,6 +68,7 @@ public class ReplicasAction implements LifecycleAction {
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
builder.field(NUMBER_OF_REPLICAS_FIELD.getPreferredName(), numberOfReplicas);
builder.endObject(); builder.endObject();
return builder; return builder;
} }
@ -62,9 +79,13 @@ public class ReplicasAction implements LifecycleAction {
listener.onSuccess(true); listener.onSuccess(true);
} }
public int getNumberOfReplicas() {
return numberOfReplicas;
}
@Override @Override
public int hashCode() { public int hashCode() {
return 1; return Objects.hashCode(numberOfReplicas);
} }
@Override @Override
@ -75,7 +96,8 @@ public class ReplicasAction implements LifecycleAction {
if (obj.getClass() != getClass()) { if (obj.getClass() != getClass()) {
return false; return false;
} }
return true; ReplicasAction other = (ReplicasAction) obj;
return Objects.equals(numberOfReplicas, other.numberOfReplicas);
} }
@Override @Override

View File

@ -5,7 +5,6 @@
*/ */
package org.elasticsearch.xpack.indexlifecycle; package org.elasticsearch.xpack.indexlifecycle;
import com.google.common.collect.Maps;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.util.set.Sets;
@ -14,10 +13,13 @@ import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.stream.Stream;
/** /**
* Represents the lifecycle of an index from creation to deletion. A * Represents the lifecycle of an index from creation to deletion. A
@ -99,10 +101,96 @@ public class TimeseriesLifecyclePolicy extends LifecyclePolicy {
return null; return null;
} }
/**
* This action provider returns an ordering for the actions within each of the four timeseries phases.
* Hot Phase:
* The Hot Phase only supports the {@link RolloverAction} and so that is the only action to order
* Warm Phase:
* The Warm Phase executes the supported actions in a slightly complicated order for the sake of
* optimization. Assuming the {@link ReplicasAction} is specified, it will run first or last depending
* on whether it increases, decreases, or keeps the existing replica count. If number-of-replicas is
* kept the same, or reduced, then {@link ReplicasAction} is executed first, otherwise, it is last.
* So the ordering looks something like this:
* - {@link ReplicasAction} (if action.number_of_replicas <= idxMeta.number_of_replicas)
* - {@link AllocateAction}
* - {@link ShrinkAction}
* - {@link ForceMergeAction}
* - {@link ReplicasAction} (if action.number_of_replicas > idxMeta.number_of_replicas)
*
* NOCOMMIT: there may exist further optimizations to this when {@link ShrinkAction} is specified.
*
* @param context the index lifecycle context for this phase at the time of execution
* @param phase the current phase for which to provide an action provider
* @return the {@link org.elasticsearch.xpack.indexlifecycle.LifecyclePolicy.NextActionProvider} for {@code phase}.
*/
@Override
protected NextActionProvider getActionProvider(IndexLifecycleContext context, Phase phase) {
Map<String, LifecycleAction> actions = phase.getActions();
switch (phase.getName()) {
case "hot":
// The hot-phase only has one action, either start with it, or return null. Simple as that!
return (action) -> (action == null) ? actions.getOrDefault(RolloverAction.NAME, null) : null;
case "warm":
return (action) -> {
ReplicasAction replicasAction = (ReplicasAction) actions.getOrDefault(ReplicasAction.NAME, null);
if (action == null) {
if (replicasAction != null && replicasAction.getNumberOfReplicas() <= context.getNumberOfReplicas()) {
return replicasAction;
}
return Stream.of(AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME, ReplicasAction.NAME)
.map(a -> actions.getOrDefault(a, null))
.filter(Objects::nonNull).findFirst().orElse(null);
} else if (action instanceof ReplicasAction) {
return Stream.of(AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME)
.map(a -> actions.getOrDefault(a, null))
.filter(Objects::nonNull).findFirst().orElse(null);
} else if (action instanceof AllocateAction) {
return Stream.of(ShrinkAction.NAME, ForceMergeAction.NAME, ReplicasAction.NAME)
.map(a -> actions.getOrDefault(a, null))
.filter(Objects::nonNull).findFirst().orElse(null);
} else if (action instanceof ShrinkAction) {
return Stream.of(ForceMergeAction.NAME, ReplicasAction.NAME)
.map(a -> actions.getOrDefault(a, null))
.filter(Objects::nonNull).findFirst().orElse(null);
} else if (action instanceof ForceMergeAction) {
return replicasAction;
}
return null;
};
case "cold":
return (action) -> {
ReplicasAction replicasAction = (ReplicasAction) actions.getOrDefault(ReplicasAction.NAME, null);
LifecycleAction allocateAction = actions.getOrDefault(AllocateAction.NAME, null);
if (action == null) {
if (replicasAction != null && replicasAction.getNumberOfReplicas() <= context.getNumberOfReplicas()) {
return replicasAction;
} else if (allocateAction != null) {
return allocateAction;
}
return replicasAction;
} else if (action instanceof ReplicasAction) {
return allocateAction;
} else if (action instanceof AllocateAction) {
return replicasAction;
}
return null;
};
case "delete":
return (action) -> {
if (action == null) {
return actions.getOrDefault(DeleteAction.NAME, null);
}
return null;
};
default:
throw new IllegalArgumentException("phase [" + phase.getName() + "] is invalid for policy [timeseries]");
}
}
@Override @Override
public void validate(Collection<Phase> phases) { public void validate(Collection<Phase> phases) {
Set<String> allowedPhases = new HashSet<>(VALID_PHASES); Set<String> allowedPhases = new HashSet<>(VALID_PHASES);
Map<String, Set<String>> allowedActions = Maps.newHashMapWithExpectedSize(allowedPhases.size()); Map<String, Set<String>> allowedActions = new HashMap<>(allowedPhases.size());
allowedActions.put("hot", VALID_HOT_ACTIONS); allowedActions.put("hot", VALID_HOT_ACTIONS);
allowedActions.put("warm", VALID_WARM_ACTIONS); allowedActions.put("warm", VALID_WARM_ACTIONS);
allowedActions.put("cold", VALID_COLD_ACTIONS); allowedActions.put("cold", VALID_COLD_ACTIONS);
@ -111,9 +199,9 @@ public class TimeseriesLifecyclePolicy extends LifecyclePolicy {
if (allowedPhases.contains(phase.getName()) == false) { if (allowedPhases.contains(phase.getName()) == false) {
throw new IllegalArgumentException("Timeseries lifecycle does not support phase [" + phase.getName() + "]"); throw new IllegalArgumentException("Timeseries lifecycle does not support phase [" + phase.getName() + "]");
} }
phase.getActions().forEach(action -> { phase.getActions().forEach((actionName, action) -> {
if (allowedActions.get(phase.getName()).contains(action.getWriteableName()) == false) { if (allowedActions.get(phase.getName()).contains(actionName) == false) {
throw new IllegalArgumentException("invalid action [" + action.getWriteableName() + "] " + throw new IllegalArgumentException("invalid action [" + actionName + "] " +
"defined in phase [" + phase.getName() +"]"); "defined in phase [" + phase.getName() +"]");
} }
}); });

View File

@ -85,7 +85,7 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
public void init() { public void init() {
settings = Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 1) settings = Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0).put("index.lifecycle.name", "test").build(); .put(SETTING_NUMBER_OF_REPLICAS, 0).put("index.lifecycle.name", "test").build();
List<LifecycleAction> deletePhaseActions = Collections.singletonList(new DeleteAction()); Map<String, LifecycleAction> deletePhaseActions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
Map<String, Phase> phases = Collections.singletonMap("delete", new Phase("delete", Map<String, Phase> phases = Collections.singletonMap("delete", new Phase("delete",
TimeValue.timeValueSeconds(3), deletePhaseActions)); TimeValue.timeValueSeconds(3), deletePhaseActions));
lifecyclePolicy = new TimeseriesLifecyclePolicy("test", phases); lifecyclePolicy = new TimeseriesLifecyclePolicy("test", phases);

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
@ -48,9 +49,9 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
List<Phase> phases = new ArrayList<>(numberPhases); List<Phase> phases = new ArrayList<>(numberPhases);
for (int j = 0; j < numberPhases; j++) { for (int j = 0; j < numberPhases; j++) {
TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after"); TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after");
List<LifecycleAction> actions = new ArrayList<>(); Map<String, LifecycleAction> actions = Collections.emptyMap();
if (randomBoolean()) { if (randomBoolean()) {
actions.add(new DeleteAction()); actions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
} }
phases.add(new Phase(randomAlphaOfLength(10), after, actions)); phases.add(new Phase(randomAlphaOfLength(10), after, actions));
} }

View File

@ -434,7 +434,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> now); InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> now);
Phase phase = new Phase("test_phase", after, Collections.emptyList()); Phase phase = new Phase("test_phase", after, Collections.emptyMap());
assertFalse(context.canExecute(phase)); assertFalse(context.canExecute(phase));
} }
@ -450,7 +450,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> now); InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> now);
Phase phase = new Phase("test_phase", after, Collections.emptyList()); Phase phase = new Phase("test_phase", after, Collections.emptyMap());
assertTrue(context.canExecute(phase)); assertTrue(context.canExecute(phase));
} }
@ -466,7 +466,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> now); InternalIndexLifecycleContext context = new InternalIndexLifecycleContext(idxMeta, null, () -> now);
Phase phase = new Phase("test_phase", after, Collections.emptyList()); Phase phase = new Phase("test_phase", after, Collections.emptyMap());
assertTrue(context.canExecute(phase)); assertTrue(context.canExecute(phase));
} }

View File

@ -10,35 +10,47 @@ import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
public class LifecyclePolicyTests extends ESTestCase { public class LifecyclePolicyTests extends ESTestCase {
public void testExecuteNewIndexBeforeTrigger() throws Exception { private String indexName;
String indexName = randomAlphaOfLengthBetween(1, 20); private String lifecycleName;
String lifecycleName = randomAlphaOfLengthBetween(1, 20); private MockAction firstAction;
List<Phase> phases = new ArrayList<>(); private MockAction secondAction;
List<LifecycleAction> actions = new ArrayList<>(); private MockAction thirdAction;
MockAction firstAction = new MockAction(); private Phase firstPhase;
actions.add(firstAction); private Phase secondPhase;
TimeValue after = TimeValue.timeValueSeconds(0); private Phase thirdPhase;
Phase firstPhase = new Phase("first_phase", after, actions); private LifecyclePolicy policy;
phases.add(firstPhase);
actions = new ArrayList<>();
MockAction secondAction = new MockAction();
actions.add(secondAction);
after = TimeValue.timeValueSeconds(10);
Phase secondPhase = new Phase("second_phase", after, actions);
phases.add(secondPhase);
actions = new ArrayList<>();
MockAction thirdAction = new MockAction();
actions.add(thirdAction);
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "", "") { @Before
public void setupPolicy() {
indexName = randomAlphaOfLengthBetween(1, 20);
lifecycleName = randomAlphaOfLengthBetween(1, 20);
List<Phase> phases = new ArrayList<>();
firstAction = new MockAction();
Map<String, LifecycleAction> actions = Collections.singletonMap(MockAction.NAME, firstAction);
TimeValue after = TimeValue.timeValueSeconds(0);
firstPhase = new Phase("first_phase", after, actions);
phases.add(firstPhase);
secondAction = new MockAction();
actions = Collections.singletonMap(MockAction.NAME, secondAction);
after = TimeValue.timeValueSeconds(10);
secondPhase = new Phase("second_phase", after, actions);
phases.add(secondPhase);
thirdAction = new MockAction();
actions = Collections.singletonMap(MockAction.NAME, thirdAction);
after = TimeValue.timeValueSeconds(20);
thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
policy = new TestLifecyclePolicy(lifecycleName, phases);
}
public void testExecuteNewIndexBeforeTrigger() throws Exception {
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "", "", 0) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -65,30 +77,7 @@ public class LifecyclePolicyTests extends ESTestCase {
} }
public void testExecuteNewIndexAfterTrigger() throws Exception { public void testExecuteNewIndexAfterTrigger() throws Exception {
String indexName = randomAlphaOfLengthBetween(1, 20); MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "", "", 0) {
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
List<Phase> phases = new ArrayList<>();
List<LifecycleAction> actions = new ArrayList<>();
MockAction firstAction = new MockAction();
actions.add(firstAction);
TimeValue after = TimeValue.timeValueSeconds(0);
Phase firstPhase = new Phase("first_phase", after, actions);
phases.add(firstPhase);
actions = new ArrayList<>();
MockAction secondAction = new MockAction();
actions.add(secondAction);
after = TimeValue.timeValueSeconds(10);
Phase secondPhase = new Phase("second_phase", after, actions);
phases.add(secondPhase);
actions = new ArrayList<>();
MockAction thirdAction = new MockAction();
actions.add(thirdAction);
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "", "") {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -115,30 +104,7 @@ public class LifecyclePolicyTests extends ESTestCase {
} }
public void testExecuteNewIndexAfterTriggerFailure() throws Exception { public void testExecuteNewIndexAfterTriggerFailure() throws Exception {
String indexName = randomAlphaOfLengthBetween(1, 20); MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "", "", 0) {
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
List<Phase> phases = new ArrayList<>();
List<LifecycleAction> actions = new ArrayList<>();
MockAction firstAction = new MockAction();
actions.add(firstAction);
TimeValue after = TimeValue.timeValueSeconds(0);
Phase firstPhase = new Phase("first_phase", after, actions);
phases.add(firstPhase);
actions = new ArrayList<>();
MockAction secondAction = new MockAction();
actions.add(secondAction);
after = TimeValue.timeValueSeconds(10);
Phase secondPhase = new Phase("second_phase", after, actions);
phases.add(secondPhase);
actions = new ArrayList<>();
MockAction thirdAction = new MockAction();
actions.add(thirdAction);
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "", "") {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -169,30 +135,7 @@ public class LifecyclePolicyTests extends ESTestCase {
} }
public void testExecuteFirstPhase() throws Exception { public void testExecuteFirstPhase() throws Exception {
String indexName = randomAlphaOfLengthBetween(1, 20); MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, firstPhase.getName(), "", 0) {
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
List<Phase> phases = new ArrayList<>();
List<LifecycleAction> actions = new ArrayList<>();
MockAction firstAction = new MockAction();
actions.add(firstAction);
TimeValue after = TimeValue.timeValueSeconds(0);
Phase firstPhase = new Phase("first_phase", after, actions);
phases.add(firstPhase);
actions = new ArrayList<>();
MockAction secondAction = new MockAction();
actions.add(secondAction);
after = TimeValue.timeValueSeconds(10);
Phase secondPhase = new Phase("second_phase", after, actions);
phases.add(secondPhase);
actions = new ArrayList<>();
MockAction thirdAction = new MockAction();
actions.add(thirdAction);
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, firstPhase.getName(), "") {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -215,30 +158,7 @@ public class LifecyclePolicyTests extends ESTestCase {
} }
public void testExecuteSecondPhase() throws Exception { public void testExecuteSecondPhase() throws Exception {
String indexName = randomAlphaOfLengthBetween(1, 20); MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, secondPhase.getName(), "", 0) {
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
List<Phase> phases = new ArrayList<>();
List<LifecycleAction> actions = new ArrayList<>();
MockAction firstAction = new MockAction();
actions.add(firstAction);
TimeValue after = TimeValue.timeValueSeconds(0);
Phase firstPhase = new Phase("first_phase", after, actions);
phases.add(firstPhase);
actions = new ArrayList<>();
MockAction secondAction = new MockAction();
actions.add(secondAction);
after = TimeValue.timeValueSeconds(10);
Phase secondPhase = new Phase("second_phase", after, actions);
phases.add(secondPhase);
actions = new ArrayList<>();
MockAction thirdAction = new MockAction();
actions.add(thirdAction);
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, secondPhase.getName(), "") {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -261,30 +181,7 @@ public class LifecyclePolicyTests extends ESTestCase {
} }
public void testExecuteThirdPhase() throws Exception { public void testExecuteThirdPhase() throws Exception {
String indexName = randomAlphaOfLengthBetween(1, 20); MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, thirdPhase.getName(), "", 0) {
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
List<Phase> phases = new ArrayList<>();
List<LifecycleAction> actions = new ArrayList<>();
MockAction firstAction = new MockAction();
actions.add(firstAction);
TimeValue after = TimeValue.timeValueSeconds(0);
Phase firstPhase = new Phase("first_phase", after, actions);
phases.add(firstPhase);
actions = new ArrayList<>();
MockAction secondAction = new MockAction();
actions.add(secondAction);
after = TimeValue.timeValueSeconds(10);
Phase secondPhase = new Phase("second_phase", after, actions);
phases.add(secondPhase);
actions = new ArrayList<>();
MockAction thirdAction = new MockAction();
actions.add(thirdAction);
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, thirdPhase.getName(), "") {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -307,30 +204,7 @@ public class LifecyclePolicyTests extends ESTestCase {
} }
public void testExecuteMissingPhase() throws Exception { public void testExecuteMissingPhase() throws Exception {
String indexName = randomAlphaOfLengthBetween(1, 20); MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "does_not_exist", "", 0) {
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
List<Phase> phases = new ArrayList<>();
List<LifecycleAction> actions = new ArrayList<>();
MockAction firstAction = new MockAction();
actions.add(firstAction);
TimeValue after = TimeValue.timeValueSeconds(0);
Phase firstPhase = new Phase("first_phase", after, actions);
phases.add(firstPhase);
actions = new ArrayList<>();
MockAction secondAction = new MockAction();
actions.add(secondAction);
after = TimeValue.timeValueSeconds(10);
Phase secondPhase = new Phase("second_phase", after, actions);
phases.add(secondPhase);
actions = new ArrayList<>();
MockAction thirdAction = new MockAction();
actions.add(thirdAction);
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "does_not_exist", "") {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -356,31 +230,8 @@ public class LifecyclePolicyTests extends ESTestCase {
} }
public void testExecuteFirstPhaseCompletedBeforeTrigger() throws Exception { public void testExecuteFirstPhaseCompletedBeforeTrigger() throws Exception {
String indexName = randomAlphaOfLengthBetween(1, 20); MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, firstPhase.getName(), Phase.PHASE_COMPLETED, 0) {
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
List<Phase> phases = new ArrayList<>();
List<LifecycleAction> actions = new ArrayList<>();
MockAction firstAction = new MockAction();
actions.add(firstAction);
TimeValue after = TimeValue.timeValueSeconds(0);
Phase firstPhase = new Phase("first_phase", after, actions);
phases.add(firstPhase);
actions = new ArrayList<>();
MockAction secondAction = new MockAction();
actions.add(secondAction);
after = TimeValue.timeValueSeconds(10);
Phase secondPhase = new Phase("second_phase", after, actions);
phases.add(secondPhase);
actions = new ArrayList<>();
MockAction thirdAction = new MockAction();
actions.add(thirdAction);
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, firstPhase.getName(), Phase.PHASE_COMPLETED) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
if (phase == secondPhase) { if (phase == secondPhase) {
@ -406,30 +257,7 @@ public class LifecyclePolicyTests extends ESTestCase {
} }
public void testExecuteFirstPhaseCompletedAfterTrigger() throws Exception { public void testExecuteFirstPhaseCompletedAfterTrigger() throws Exception {
String indexName = randomAlphaOfLengthBetween(1, 20); MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, firstPhase.getName(), Phase.PHASE_COMPLETED, 0) {
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
List<Phase> phases = new ArrayList<>();
List<LifecycleAction> actions = new ArrayList<>();
MockAction firstAction = new MockAction();
actions.add(firstAction);
TimeValue after = TimeValue.timeValueSeconds(0);
Phase firstPhase = new Phase("first_phase", after, actions);
phases.add(firstPhase);
actions = new ArrayList<>();
MockAction secondAction = new MockAction();
actions.add(secondAction);
after = TimeValue.timeValueSeconds(10);
Phase secondPhase = new Phase("second_phase", after, actions);
phases.add(secondPhase);
actions = new ArrayList<>();
MockAction thirdAction = new MockAction();
actions.add(thirdAction);
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, firstPhase.getName(), Phase.PHASE_COMPLETED) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -456,30 +284,7 @@ public class LifecyclePolicyTests extends ESTestCase {
} }
public void testExecuteSecondPhaseCompletedBeforeTrigger() throws Exception { public void testExecuteSecondPhaseCompletedBeforeTrigger() throws Exception {
String indexName = randomAlphaOfLengthBetween(1, 20); MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, secondPhase.getName(), Phase.PHASE_COMPLETED, 0) {
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
List<Phase> phases = new ArrayList<>();
List<LifecycleAction> actions = new ArrayList<>();
MockAction firstAction = new MockAction();
actions.add(firstAction);
TimeValue after = TimeValue.timeValueSeconds(0);
Phase firstPhase = new Phase("first_phase", after, actions);
phases.add(firstPhase);
actions = new ArrayList<>();
MockAction secondAction = new MockAction();
actions.add(secondAction);
after = TimeValue.timeValueSeconds(10);
Phase secondPhase = new Phase("second_phase", after, actions);
phases.add(secondPhase);
actions = new ArrayList<>();
MockAction thirdAction = new MockAction();
actions.add(thirdAction);
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, secondPhase.getName(), Phase.PHASE_COMPLETED) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -506,30 +311,7 @@ public class LifecyclePolicyTests extends ESTestCase {
} }
public void testExecuteSecondPhaseCompletedAfterTrigger() throws Exception { public void testExecuteSecondPhaseCompletedAfterTrigger() throws Exception {
String indexName = randomAlphaOfLengthBetween(1, 20); MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, secondPhase.getName(), Phase.PHASE_COMPLETED, 0) {
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
List<Phase> phases = new ArrayList<>();
List<LifecycleAction> actions = new ArrayList<>();
MockAction firstAction = new MockAction();
actions.add(firstAction);
TimeValue after = TimeValue.timeValueSeconds(0);
Phase firstPhase = new Phase("first_phase", after, actions);
phases.add(firstPhase);
actions = new ArrayList<>();
MockAction secondAction = new MockAction();
actions.add(secondAction);
after = TimeValue.timeValueSeconds(10);
Phase secondPhase = new Phase("second_phase", after, actions);
phases.add(secondPhase);
actions = new ArrayList<>();
MockAction thirdAction = new MockAction();
actions.add(thirdAction);
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, secondPhase.getName(), Phase.PHASE_COMPLETED) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -556,35 +338,16 @@ public class LifecyclePolicyTests extends ESTestCase {
} }
public void testExecuteThirdPhaseCompleted() throws Exception { public void testExecuteThirdPhaseCompleted() throws Exception {
String indexName = randomAlphaOfLengthBetween(1, 20); MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, thirdPhase.getName(), Phase.PHASE_COMPLETED, 0) {
String lifecycleName = randomAlphaOfLengthBetween(1, 20);
List<Phase> phases = new ArrayList<>();
List<LifecycleAction> actions = new ArrayList<>();
MockAction firstAction = new MockAction();
actions.add(firstAction);
TimeValue after = TimeValue.timeValueSeconds(0);
Phase firstPhase = new Phase("first_phase", after, actions);
phases.add(firstPhase);
actions = new ArrayList<>();
MockAction secondAction = new MockAction();
actions.add(secondAction);
after = TimeValue.timeValueSeconds(10);
Phase secondPhase = new Phase("second_phase", after, actions);
phases.add(secondPhase);
actions = new ArrayList<>();
MockAction thirdAction = new MockAction();
actions.add(thirdAction);
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, thirdPhase.getName(), Phase.PHASE_COMPLETED) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
throw new AssertionError("canExecute should not have been called"); throw new AssertionError("canExecute should not have been called");
} }
@Override
public int getNumberOfReplicas() {
return 0;
}
}; };
policy.execute(context); policy.execute(context);

View File

@ -11,11 +11,13 @@ public abstract class MockIndexLifecycleContext implements IndexLifecycleContext
private String phase; private String phase;
private String action; private String action;
private Exception exceptionToThrow; private Exception exceptionToThrow;
private int numberOfReplicas;
public MockIndexLifecycleContext(String targetName, String initialPhase, String initialAction) { public MockIndexLifecycleContext(String targetName, String initialPhase, String initialAction, int numberOfReplicas) {
this.targetName = targetName; this.targetName = targetName;
this.phase = initialPhase; this.phase = initialPhase;
this.action = initialAction; this.action = initialAction;
this.numberOfReplicas = numberOfReplicas;
} }
public void failOnSetters(Exception exceptionToThrow) { public void failOnSetters(Exception exceptionToThrow) {
@ -58,6 +60,16 @@ public abstract class MockIndexLifecycleContext implements IndexLifecycleContext
return targetName; return targetName;
} }
@Override
public int getNumberOfReplicas() {
return numberOfReplicas;
}
@Override
public boolean canExecute(Phase phase) {
return true;
}
@Override @Override
public void executeAction(LifecycleAction action, LifecycleAction.Listener listener) { public void executeAction(LifecycleAction action, LifecycleAction.Listener listener) {
action.execute(null, null, listener); action.execute(null, null, listener);

View File

@ -16,7 +16,7 @@ public class MockIndexLifecycleContextTests extends ESTestCase {
String newPhase = randomAlphaOfLengthBetween(1, 20); String newPhase = randomAlphaOfLengthBetween(1, 20);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(targetName, MockIndexLifecycleContext context = new MockIndexLifecycleContext(targetName,
randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)) { randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20), randomIntBetween(0, 10)) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -50,7 +50,7 @@ public class MockIndexLifecycleContextTests extends ESTestCase {
String phase = randomAlphaOfLengthBetween(1, 20); String phase = randomAlphaOfLengthBetween(1, 20);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(randomAlphaOfLengthBetween(1, 20), phase, MockIndexLifecycleContext context = new MockIndexLifecycleContext(randomAlphaOfLengthBetween(1, 20), phase,
randomAlphaOfLengthBetween(1, 20)) { randomAlphaOfLengthBetween(1, 20), randomIntBetween(0, 10)) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -67,7 +67,7 @@ public class MockIndexLifecycleContextTests extends ESTestCase {
String newAction = randomAlphaOfLengthBetween(1, 20); String newAction = randomAlphaOfLengthBetween(1, 20);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(targetName, phase, MockIndexLifecycleContext context = new MockIndexLifecycleContext(targetName, phase,
randomAlphaOfLengthBetween(1, 20)) { randomAlphaOfLengthBetween(1, 20), randomIntBetween(0, 10)) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -101,7 +101,7 @@ public class MockIndexLifecycleContextTests extends ESTestCase {
String action = randomAlphaOfLengthBetween(1, 20); String action = randomAlphaOfLengthBetween(1, 20);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(randomAlphaOfLengthBetween(1, 20), MockIndexLifecycleContext context = new MockIndexLifecycleContext(randomAlphaOfLengthBetween(1, 20),
randomAlphaOfLengthBetween(1, 20), action) { randomAlphaOfLengthBetween(1, 20), action, randomIntBetween(0, 10)) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -115,7 +115,7 @@ public class MockIndexLifecycleContextTests extends ESTestCase {
public void testGetLifecycleTarget() { public void testGetLifecycleTarget() {
String target = randomAlphaOfLengthBetween(1, 20); String target = randomAlphaOfLengthBetween(1, 20);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(target, randomAlphaOfLengthBetween(1, 20), MockIndexLifecycleContext context = new MockIndexLifecycleContext(target, randomAlphaOfLengthBetween(1, 20),
randomAlphaOfLengthBetween(1, 20)) { randomAlphaOfLengthBetween(1, 20), randomIntBetween(0, 10)) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -128,7 +128,7 @@ public class MockIndexLifecycleContextTests extends ESTestCase {
public void testExecuteAction() { public void testExecuteAction() {
MockIndexLifecycleContext context = new MockIndexLifecycleContext(randomAlphaOfLengthBetween(1, 20), MockIndexLifecycleContext context = new MockIndexLifecycleContext(randomAlphaOfLengthBetween(1, 20),
randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)) { randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20), randomIntBetween(0, 10)) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {

View File

@ -19,7 +19,9 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
public class PhaseTests extends AbstractSerializingTestCase<Phase> { public class PhaseTests extends AbstractSerializingTestCase<Phase> {
@ -38,9 +40,9 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
@Override @Override
protected Phase createTestInstance() { protected Phase createTestInstance() {
TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after"); TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after");
List<LifecycleAction> actions = new ArrayList<>(); Map<String, LifecycleAction> actions = Collections.emptyMap();
if (randomBoolean()) { if (randomBoolean()) {
actions.add(new DeleteAction()); actions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
} }
return new Phase(phaseName, after, actions); return new Phase(phaseName, after, actions);
} }
@ -65,7 +67,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
protected Phase mutateInstance(Phase instance) throws IOException { protected Phase mutateInstance(Phase instance) throws IOException {
String name = instance.getName(); String name = instance.getName();
TimeValue after = instance.getAfter(); TimeValue after = instance.getAfter();
List<LifecycleAction> actions = instance.getActions(); Map<String, LifecycleAction> actions = instance.getActions();
switch (between(0, 2)) { switch (between(0, 2)) {
case 0: case 0:
name = name + randomAlphaOfLengthBetween(1, 5); name = name + randomAlphaOfLengthBetween(1, 5);
@ -74,8 +76,8 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
after = TimeValue.timeValueSeconds(after.getSeconds() + randomIntBetween(1, 1000)); after = TimeValue.timeValueSeconds(after.getSeconds() + randomIntBetween(1, 1000));
break; break;
case 2: case 2:
actions = new ArrayList<>(actions); actions = new HashMap<>(actions);
actions.add(new DeleteAction()); actions.put(MockAction.NAME, new MockAction());
break; break;
default: default:
throw new AssertionError("Illegal randomisation branch"); throw new AssertionError("Illegal randomisation branch");
@ -87,7 +89,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
String indexName = randomAlphaOfLengthBetween(1, 20); String indexName = randomAlphaOfLengthBetween(1, 20);
String phaseName = randomAlphaOfLengthBetween(1, 20); String phaseName = randomAlphaOfLengthBetween(1, 20);
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100)); TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100));
List<LifecycleAction> actions = new ArrayList<>(); Map<String, LifecycleAction> actions = new HashMap<>();
MockAction firstAction = new MockAction() { MockAction firstAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
@ -95,7 +97,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
firstAction.setCompleteOnExecute(true); firstAction.setCompleteOnExecute(true);
actions.add(firstAction); actions.put(firstAction.getWriteableName(), firstAction);
MockAction secondAction = new MockAction() { MockAction secondAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
@ -103,7 +105,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
secondAction.setCompleteOnExecute(true); secondAction.setCompleteOnExecute(true);
actions.add(secondAction); actions.put(secondAction.getWriteableName(), secondAction);
MockAction thirdAction = new MockAction() { MockAction thirdAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
@ -111,10 +113,10 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
thirdAction.setCompleteOnExecute(true); thirdAction.setCompleteOnExecute(true);
actions.add(thirdAction); actions.put(thirdAction.getWriteableName(), thirdAction);
Phase phase = new Phase(phaseName, after, actions); Phase phase = new Phase(phaseName, after, actions);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "") { MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "", 0) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -122,7 +124,16 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
phase.execute(context); phase.execute(context, current -> {
if (current == null) {
return firstAction;
} else if ("first_action".equals(current.getWriteableName())) {
return secondAction;
} else if ("second_action".equals(current.getWriteableName())) {
return thirdAction;
}
return null;
});
assertEquals(indexName, context.getLifecycleTarget()); assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase()); assertEquals(phaseName, context.getPhase());
@ -140,7 +151,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
String indexName = randomAlphaOfLengthBetween(1, 20); String indexName = randomAlphaOfLengthBetween(1, 20);
String phaseName = randomAlphaOfLengthBetween(1, 20); String phaseName = randomAlphaOfLengthBetween(1, 20);
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100)); TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100));
List<LifecycleAction> actions = new ArrayList<>(); Map<String, LifecycleAction> actions = new HashMap<>();
MockAction firstAction = new MockAction() { MockAction firstAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
@ -148,24 +159,24 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
firstAction.setCompleteOnExecute(false); firstAction.setCompleteOnExecute(false);
actions.add(firstAction); actions.put(firstAction.getWriteableName(), firstAction);
MockAction secondAction = new MockAction() { MockAction secondAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
return "second_action"; return "second_action";
} }
}; };
actions.add(secondAction); actions.put(secondAction.getWriteableName(), secondAction);
MockAction thirdAction = new MockAction() { MockAction thirdAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
return "third_action"; return "third_action";
} }
}; };
actions.add(thirdAction); actions.put(thirdAction.getWriteableName(), thirdAction);
Phase phase = new Phase(phaseName, after, actions); Phase phase = new Phase(phaseName, after, actions);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "") { MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "", 0) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -173,7 +184,16 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
phase.execute(context); phase.execute(context, current -> {
if (current == null) {
return firstAction;
} else if ("first_action".equals(current.getWriteableName())) {
return secondAction;
} else if ("second_action".equals(current.getWriteableName())) {
return thirdAction;
}
return null;
});
assertEquals(indexName, context.getLifecycleTarget()); assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase()); assertEquals(phaseName, context.getPhase());
@ -191,31 +211,31 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
String indexName = randomAlphaOfLengthBetween(1, 20); String indexName = randomAlphaOfLengthBetween(1, 20);
String phaseName = randomAlphaOfLengthBetween(1, 20); String phaseName = randomAlphaOfLengthBetween(1, 20);
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100)); TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100));
List<LifecycleAction> actions = new ArrayList<>(); Map<String, LifecycleAction> actions = new HashMap<>();
MockAction firstAction = new MockAction() { MockAction firstAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
return "first_action"; return "first_action";
} }
}; };
actions.add(firstAction); actions.put(firstAction.getWriteableName(), firstAction);
MockAction secondAction = new MockAction() { MockAction secondAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
return "second_action"; return "second_action";
} }
}; };
actions.add(secondAction); actions.put(secondAction.getWriteableName(), secondAction);
MockAction thirdAction = new MockAction() { MockAction thirdAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
return "third_action"; return "third_action";
} }
}; };
actions.add(thirdAction); actions.put(thirdAction.getWriteableName(), thirdAction);
Phase phase = new Phase(phaseName, after, actions); Phase phase = new Phase(phaseName, after, actions);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "") { MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "", 0) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -227,7 +247,16 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
context.failOnSetters(exception); context.failOnSetters(exception);
phase.execute(context); phase.execute(context, current -> {
if (current == null) {
return firstAction;
} else if ("first_action".equals(current.getWriteableName())) {
return secondAction;
} else if ("second_action".equals(current.getWriteableName())) {
return thirdAction;
}
return null;
});
assertEquals(indexName, context.getLifecycleTarget()); assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase()); assertEquals(phaseName, context.getPhase());
@ -245,9 +274,9 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
String indexName = randomAlphaOfLengthBetween(1, 20); String indexName = randomAlphaOfLengthBetween(1, 20);
String phaseName = randomAlphaOfLengthBetween(1, 20); String phaseName = randomAlphaOfLengthBetween(1, 20);
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100)); TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100));
Phase phase = new Phase(phaseName, after, Collections.emptyList()); Phase phase = new Phase(phaseName, after, Collections.emptyMap());
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "") { MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "", 0) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -255,7 +284,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
phase.execute(context); phase.execute(context, a -> null);
assertEquals(indexName, context.getLifecycleTarget()); assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase()); assertEquals(phaseName, context.getPhase());
@ -287,9 +316,9 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
return "third_action"; return "third_action";
} }
}; };
Phase phase = new Phase(phaseName, after, Collections.emptyList()); Phase phase = new Phase(phaseName, after, Collections.emptyMap());
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, Phase.PHASE_COMPLETED) { MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, Phase.PHASE_COMPLETED, 0) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -297,7 +326,16 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
phase.execute(context); phase.execute(context, current -> {
if (current == null) {
return firstAction;
} else if ("first_action".equals(current.getWriteableName())) {
return secondAction;
} else if ("second_action".equals(current.getWriteableName())) {
return thirdAction;
}
return null;
});
assertEquals(indexName, context.getLifecycleTarget()); assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase()); assertEquals(phaseName, context.getPhase());
@ -315,7 +353,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
String indexName = randomAlphaOfLengthBetween(1, 20); String indexName = randomAlphaOfLengthBetween(1, 20);
String phaseName = randomAlphaOfLengthBetween(1, 20); String phaseName = randomAlphaOfLengthBetween(1, 20);
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100)); TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100));
List<LifecycleAction> actions = new ArrayList<>(); Map<String, LifecycleAction> actions = new HashMap<>();
MockAction firstAction = new MockAction() { MockAction firstAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
@ -323,7 +361,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
firstAction.setCompleteOnExecute(false); firstAction.setCompleteOnExecute(false);
actions.add(firstAction); actions.put(firstAction.getWriteableName(), firstAction);
MockAction secondAction = new MockAction() { MockAction secondAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
@ -331,7 +369,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
secondAction.setCompleteOnExecute(false); secondAction.setCompleteOnExecute(false);
actions.add(secondAction); actions.put(secondAction.getWriteableName(), secondAction);
MockAction thirdAction = new MockAction() { MockAction thirdAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
@ -339,10 +377,10 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
thirdAction.setCompleteOnExecute(false); thirdAction.setCompleteOnExecute(false);
actions.add(thirdAction); actions.put(thirdAction.getWriteableName(), thirdAction);
Phase phase = new Phase(phaseName, after, actions); Phase phase = new Phase(phaseName, after, actions);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, firstAction.getWriteableName()) { MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, firstAction.getWriteableName(), 0) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -350,7 +388,16 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
phase.execute(context); phase.execute(context, current -> {
if (current == null) {
return firstAction;
} else if ("first_action".equals(current.getWriteableName())) {
return secondAction;
} else if ("second_action".equals(current.getWriteableName())) {
return thirdAction;
}
return null;
});
assertEquals(indexName, context.getLifecycleTarget()); assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase()); assertEquals(phaseName, context.getPhase());
@ -365,7 +412,16 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
firstAction.setCompleteOnExecute(true); firstAction.setCompleteOnExecute(true);
phase.execute(context); phase.execute(context, current -> {
if (current == null) {
return firstAction;
} else if ("first_action".equals(current.getWriteableName())) {
return secondAction;
} else if ("second_action".equals(current.getWriteableName())) {
return thirdAction;
}
return null;
});
assertEquals(indexName, context.getLifecycleTarget()); assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase()); assertEquals(phaseName, context.getPhase());
@ -383,7 +439,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
String indexName = randomAlphaOfLengthBetween(1, 20); String indexName = randomAlphaOfLengthBetween(1, 20);
String phaseName = randomAlphaOfLengthBetween(1, 20); String phaseName = randomAlphaOfLengthBetween(1, 20);
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100)); TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100));
List<LifecycleAction> actions = new ArrayList<>(); Map<String, LifecycleAction> actions = new HashMap<>();
MockAction firstAction = new MockAction() { MockAction firstAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
@ -391,7 +447,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
firstAction.setCompleteOnExecute(false); firstAction.setCompleteOnExecute(false);
actions.add(firstAction); actions.put(firstAction.getWriteableName(), firstAction);
MockAction secondAction = new MockAction() { MockAction secondAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
@ -399,7 +455,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
secondAction.setCompleteOnExecute(false); secondAction.setCompleteOnExecute(false);
actions.add(secondAction); actions.put(secondAction.getWriteableName(), secondAction);
MockAction thirdAction = new MockAction() { MockAction thirdAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
@ -407,10 +463,10 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
thirdAction.setCompleteOnExecute(false); thirdAction.setCompleteOnExecute(false);
actions.add(thirdAction); actions.put(thirdAction.getWriteableName(), thirdAction);
Phase phase = new Phase(phaseName, after, actions); Phase phase = new Phase(phaseName, after, actions);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, secondAction.getWriteableName()) { MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, secondAction.getWriteableName(), 0) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -418,7 +474,16 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
phase.execute(context); phase.execute(context, current -> {
if (current == null) {
return firstAction;
} else if ("first_action".equals(current.getWriteableName())) {
return secondAction;
} else if ("second_action".equals(current.getWriteableName())) {
return thirdAction;
}
return null;
});
assertEquals(indexName, context.getLifecycleTarget()); assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase()); assertEquals(phaseName, context.getPhase());
@ -433,7 +498,16 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
secondAction.setCompleteOnExecute(true); secondAction.setCompleteOnExecute(true);
phase.execute(context); phase.execute(context, current -> {
if (current == null) {
return firstAction;
} else if ("first_action".equals(current.getWriteableName())) {
return secondAction;
} else if ("second_action".equals(current.getWriteableName())) {
return thirdAction;
}
return null;
});
assertEquals(indexName, context.getLifecycleTarget()); assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase()); assertEquals(phaseName, context.getPhase());
@ -451,7 +525,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
String indexName = randomAlphaOfLengthBetween(1, 20); String indexName = randomAlphaOfLengthBetween(1, 20);
String phaseName = randomAlphaOfLengthBetween(1, 20); String phaseName = randomAlphaOfLengthBetween(1, 20);
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100)); TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100));
List<LifecycleAction> actions = new ArrayList<>(); Map<String, LifecycleAction> actions = new HashMap<>();
MockAction firstAction = new MockAction() { MockAction firstAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
@ -459,7 +533,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
firstAction.setCompleteOnExecute(false); firstAction.setCompleteOnExecute(false);
actions.add(firstAction); actions.put(firstAction.getWriteableName(), firstAction);
MockAction secondAction = new MockAction() { MockAction secondAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
@ -467,7 +541,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
secondAction.setCompleteOnExecute(false); secondAction.setCompleteOnExecute(false);
actions.add(secondAction); actions.put(secondAction.getWriteableName(), secondAction);
MockAction thirdAction = new MockAction() { MockAction thirdAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
@ -475,10 +549,10 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
thirdAction.setCompleteOnExecute(false); thirdAction.setCompleteOnExecute(false);
actions.add(thirdAction); actions.put(thirdAction.getWriteableName(), thirdAction);
Phase phase = new Phase(phaseName, after, actions); Phase phase = new Phase(phaseName, after, actions);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, thirdAction.getWriteableName()) { MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, thirdAction.getWriteableName(), 0) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -486,7 +560,16 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
phase.execute(context); phase.execute(context, current -> {
if (current == null) {
return firstAction;
} else if ("first_action".equals(current.getWriteableName())) {
return secondAction;
} else if ("second_action".equals(current.getWriteableName())) {
return thirdAction;
}
return null;
});
assertEquals(indexName, context.getLifecycleTarget()); assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase()); assertEquals(phaseName, context.getPhase());
@ -501,7 +584,16 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
thirdAction.setCompleteOnExecute(true); thirdAction.setCompleteOnExecute(true);
phase.execute(context); phase.execute(context, current -> {
if (current == null) {
return firstAction;
} else if ("first_action".equals(current.getWriteableName())) {
return secondAction;
} else if ("second_action".equals(current.getWriteableName())) {
return thirdAction;
}
return null;
});
assertEquals(indexName, context.getLifecycleTarget()); assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase()); assertEquals(phaseName, context.getPhase());
@ -519,31 +611,31 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
String indexName = randomAlphaOfLengthBetween(1, 20); String indexName = randomAlphaOfLengthBetween(1, 20);
String phaseName = randomAlphaOfLengthBetween(1, 20); String phaseName = randomAlphaOfLengthBetween(1, 20);
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100)); TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100));
List<LifecycleAction> actions = new ArrayList<>(); Map<String, LifecycleAction> actions = new HashMap<>();
MockAction firstAction = new MockAction() { MockAction firstAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
return "first_action"; return "first_action";
} }
}; };
actions.add(firstAction); actions.put(firstAction.getWriteableName(), firstAction);
MockAction secondAction = new MockAction() { MockAction secondAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
return "second_action"; return "second_action";
} }
}; };
actions.add(secondAction); actions.put(secondAction.getWriteableName(), secondAction);
MockAction thirdAction = new MockAction() { MockAction thirdAction = new MockAction() {
@Override @Override
public String getWriteableName() { public String getWriteableName() {
return "third_action"; return "third_action";
} }
}; };
actions.add(thirdAction); actions.put(thirdAction.getWriteableName(), thirdAction);
Phase phase = new Phase(phaseName, after, actions); Phase phase = new Phase(phaseName, after, actions);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "does_not_exist") { MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "does_not_exist", 0) {
@Override @Override
public boolean canExecute(Phase phase) { public boolean canExecute(Phase phase) {
@ -551,7 +643,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
} }
}; };
IllegalStateException exception = expectThrows(IllegalStateException.class, () -> phase.execute(context)); IllegalStateException exception = expectThrows(IllegalStateException.class, () -> phase.execute(context, a -> firstAction));
assertEquals("Current action [" + "does_not_exist" + "] not found in phase [" + phaseName + "] for index [" + indexName + "]", assertEquals("Current action [" + "does_not_exist" + "] not found in phase [" + phaseName + "] for index [" + indexName + "]",
exception.getMessage()); exception.getMessage());

View File

@ -20,7 +20,7 @@ public class ReplicasActionTests extends AbstractSerializingTestCase<ReplicasAct
@Override @Override
protected ReplicasAction createTestInstance() { protected ReplicasAction createTestInstance() {
return new ReplicasAction(); return new ReplicasAction(randomIntBetween(0, 10));
} }
@Override @Override

View File

@ -13,6 +13,8 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -39,6 +41,12 @@ public class TestLifecyclePolicy extends LifecyclePolicy {
protected String getType() { protected String getType() {
return TYPE; return TYPE;
} }
@Override
protected NextActionProvider getActionProvider(IndexLifecycleContext context, Phase phase) {
return a -> Optional.ofNullable(phase.getActions().entrySet().iterator().next()).map(Map.Entry::getValue).orElse(null);
}
@Override @Override
protected Phase getFirstPhase() { protected Phase getFirstPhase() {
return phasesList.get(0); return phasesList.get(0);

View File

@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.elasticsearch.xpack.indexlifecycle.TimeseriesLifecyclePolicy.VALID_COLD_ACTIONS; import static org.elasticsearch.xpack.indexlifecycle.TimeseriesLifecyclePolicy.VALID_COLD_ACTIONS;
@ -30,6 +31,7 @@ import static org.elasticsearch.xpack.indexlifecycle.TimeseriesLifecyclePolicy.V
import static org.elasticsearch.xpack.indexlifecycle.TimeseriesLifecyclePolicy.VALID_PHASES; import static org.elasticsearch.xpack.indexlifecycle.TimeseriesLifecyclePolicy.VALID_PHASES;
import static org.elasticsearch.xpack.indexlifecycle.TimeseriesLifecyclePolicy.VALID_WARM_ACTIONS; import static org.elasticsearch.xpack.indexlifecycle.TimeseriesLifecyclePolicy.VALID_WARM_ACTIONS;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class TimeseriesLifecyclePolicyTests extends AbstractSerializingTestCase<LifecyclePolicy> { public class TimeseriesLifecyclePolicyTests extends AbstractSerializingTestCase<LifecyclePolicy> {
@ -38,7 +40,7 @@ public class TimeseriesLifecyclePolicyTests extends AbstractSerializingTestCase<
private static final AllocateAction TEST_ALLOCATE_ACTION = new AllocateAction(); private static final AllocateAction TEST_ALLOCATE_ACTION = new AllocateAction();
private static final DeleteAction TEST_DELETE_ACTION = new DeleteAction(); private static final DeleteAction TEST_DELETE_ACTION = new DeleteAction();
private static final ForceMergeAction TEST_FORCE_MERGE_ACTION = new ForceMergeAction(); private static final ForceMergeAction TEST_FORCE_MERGE_ACTION = new ForceMergeAction();
private static final ReplicasAction TEST_REPLICAS_ACTION = new ReplicasAction(); private static final ReplicasAction TEST_REPLICAS_ACTION = new ReplicasAction(1);
private static final RolloverAction TEST_ROLLOVER_ACTION = new RolloverAction(); private static final RolloverAction TEST_ROLLOVER_ACTION = new RolloverAction();
private static final ShrinkAction TEST_SHRINK_ACTION = new ShrinkAction(); private static final ShrinkAction TEST_SHRINK_ACTION = new ShrinkAction();
@ -88,7 +90,7 @@ public class TimeseriesLifecyclePolicyTests extends AbstractSerializingTestCase<
Phase expectedFirstPhase = null; Phase expectedFirstPhase = null;
for (String phaseName : Arrays.asList("hot", "warm", "cold", "delete")) { for (String phaseName : Arrays.asList("hot", "warm", "cold", "delete")) {
if (randomBoolean()) { if (randomBoolean()) {
Phase phase = new Phase(phaseName, TimeValue.MINUS_ONE, Collections.emptyList()); Phase phase = new Phase(phaseName, TimeValue.MINUS_ONE, Collections.emptyMap());
phases.put(phaseName, phase); phases.put(phaseName, phase);
if (expectedFirstPhase == null) { if (expectedFirstPhase == null) {
expectedFirstPhase = phase; expectedFirstPhase = phase;
@ -105,7 +107,7 @@ public class TimeseriesLifecyclePolicyTests extends AbstractSerializingTestCase<
List<Phase> phasesInOrder = new ArrayList<>(); List<Phase> phasesInOrder = new ArrayList<>();
for (String phase : VALID_PHASES) { for (String phase : VALID_PHASES) {
if (randomBoolean()) { if (randomBoolean()) {
Phase phaseToAdd = new Phase(phase, TimeValue.MINUS_ONE, Collections.emptyList()); Phase phaseToAdd = new Phase(phase, TimeValue.MINUS_ONE, Collections.emptyMap());
phases.put(phase, phaseToAdd); phases.put(phase, phaseToAdd);
phasesInOrder.add(phaseToAdd); phasesInOrder.add(phaseToAdd);
} }
@ -128,7 +130,7 @@ public class TimeseriesLifecyclePolicyTests extends AbstractSerializingTestCase<
phaseName += randomAlphaOfLength(5); phaseName += randomAlphaOfLength(5);
} }
Map<String, Phase> phases = Collections.singletonMap(phaseName, Map<String, Phase> phases = Collections.singletonMap(phaseName,
new Phase(phaseName, TimeValue.ZERO, Collections.emptyList())); new Phase(phaseName, TimeValue.ZERO, Collections.emptyMap()));
if (invalid) { if (invalid) {
Exception e = expectThrows(IllegalArgumentException.class, () -> new TimeseriesLifecyclePolicy(lifecycleName, phases)); Exception e = expectThrows(IllegalArgumentException.class, () -> new TimeseriesLifecyclePolicy(lifecycleName, phases));
assertThat(e.getMessage(), equalTo("Timeseries lifecycle does not support phase [" + phaseName + "]")); assertThat(e.getMessage(), equalTo("Timeseries lifecycle does not support phase [" + phaseName + "]"));
@ -139,11 +141,11 @@ public class TimeseriesLifecyclePolicyTests extends AbstractSerializingTestCase<
public void testValidateHotPhase() { public void testValidateHotPhase() {
LifecycleAction invalidAction = null; LifecycleAction invalidAction = null;
List<LifecycleAction> actions = randomSubsetOf(VALID_HOT_ACTIONS) Map<String, LifecycleAction> actions = VALID_HOT_ACTIONS
.stream().map(this::getTestAction).collect(Collectors.toList()); .stream().map(this::getTestAction).collect(Collectors.toMap(LifecycleAction::getWriteableName, Function.identity()));
if (randomBoolean()) { if (randomBoolean()) {
invalidAction = getTestAction(randomFrom("allocate", "forcemerge", "delete", "replicas", "shrink")); invalidAction = getTestAction(randomFrom("allocate", "forcemerge", "delete", "replicas", "shrink"));
actions.add(invalidAction); actions.put(invalidAction.getWriteableName(), invalidAction);
} }
Map<String, Phase> hotPhase = Collections.singletonMap("hot", Map<String, Phase> hotPhase = Collections.singletonMap("hot",
new Phase("hot", TimeValue.ZERO, actions)); new Phase("hot", TimeValue.ZERO, actions));
@ -160,11 +162,11 @@ public class TimeseriesLifecyclePolicyTests extends AbstractSerializingTestCase<
public void testValidateWarmPhase() { public void testValidateWarmPhase() {
LifecycleAction invalidAction = null; LifecycleAction invalidAction = null;
List<LifecycleAction> actions = randomSubsetOf(VALID_WARM_ACTIONS) Map<String, LifecycleAction> actions = randomSubsetOf(VALID_WARM_ACTIONS)
.stream().map(this::getTestAction).collect(Collectors.toList()); .stream().map(this::getTestAction).collect(Collectors.toMap(LifecycleAction::getWriteableName, Function.identity()));
if (randomBoolean()) { if (randomBoolean()) {
invalidAction = getTestAction(randomFrom("rollover", "delete")); invalidAction = getTestAction(randomFrom("rollover", "delete"));
actions.add(invalidAction); actions.put(invalidAction.getWriteableName(), invalidAction);
} }
Map<String, Phase> warmPhase = Collections.singletonMap("warm", Map<String, Phase> warmPhase = Collections.singletonMap("warm",
new Phase("warm", TimeValue.ZERO, actions)); new Phase("warm", TimeValue.ZERO, actions));
@ -181,11 +183,11 @@ public class TimeseriesLifecyclePolicyTests extends AbstractSerializingTestCase<
public void testValidateColdPhase() { public void testValidateColdPhase() {
LifecycleAction invalidAction = null; LifecycleAction invalidAction = null;
List<LifecycleAction> actions = randomSubsetOf(VALID_COLD_ACTIONS) Map<String, LifecycleAction> actions = randomSubsetOf(VALID_COLD_ACTIONS)
.stream().map(this::getTestAction).collect(Collectors.toList()); .stream().map(this::getTestAction).collect(Collectors.toMap(LifecycleAction::getWriteableName, Function.identity()));
if (randomBoolean()) { if (randomBoolean()) {
invalidAction = getTestAction(randomFrom("rollover", "delete", "forcemerge", "shrink")); invalidAction = getTestAction(randomFrom("rollover", "delete", "forcemerge", "shrink"));
actions.add(invalidAction); actions.put(invalidAction.getWriteableName(), invalidAction);
} }
Map<String, Phase> coldPhase = Collections.singletonMap("cold", Map<String, Phase> coldPhase = Collections.singletonMap("cold",
new Phase("cold", TimeValue.ZERO, actions)); new Phase("cold", TimeValue.ZERO, actions));
@ -202,11 +204,11 @@ public class TimeseriesLifecyclePolicyTests extends AbstractSerializingTestCase<
public void testValidateDeletePhase() { public void testValidateDeletePhase() {
LifecycleAction invalidAction = null; LifecycleAction invalidAction = null;
List<LifecycleAction> actions = randomSubsetOf(VALID_DELETE_ACTIONS) Map<String, LifecycleAction> actions = VALID_DELETE_ACTIONS
.stream().map(this::getTestAction).collect(Collectors.toList()); .stream().map(this::getTestAction).collect(Collectors.toMap(LifecycleAction::getWriteableName, Function.identity()));
if (randomBoolean()) { if (randomBoolean()) {
invalidAction = getTestAction(randomFrom("allocate", "rollover", "replicas", "forcemerge", "shrink")); invalidAction = getTestAction(randomFrom("allocate", "rollover", "replicas", "forcemerge", "shrink"));
actions.add(invalidAction); actions.put(invalidAction.getWriteableName(), invalidAction);
} }
Map<String, Phase> deletePhase = Collections.singletonMap("delete", Map<String, Phase> deletePhase = Collections.singletonMap("delete",
new Phase("delete", TimeValue.ZERO, actions)); new Phase("delete", TimeValue.ZERO, actions));
@ -221,6 +223,84 @@ public class TimeseriesLifecyclePolicyTests extends AbstractSerializingTestCase<
} }
} }
public void testHotActionProvider() {
String indexName = randomAlphaOfLengthBetween(1, 10);
Map<String, LifecycleAction> actions = VALID_HOT_ACTIONS
.stream().map(this::getTestAction).collect(Collectors.toMap(LifecycleAction::getWriteableName, Function.identity()));
Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "", "", 0){};
TimeseriesLifecyclePolicy policy = new TimeseriesLifecyclePolicy(lifecycleName, Collections.singletonMap("hot", hotPhase));
LifecyclePolicy.NextActionProvider provider = policy.getActionProvider(context, hotPhase);
assertThat(provider.next(null), equalTo(TEST_ROLLOVER_ACTION));
assertNull(provider.next(TEST_ROLLOVER_ACTION));
}
public void testWarmActionProviderReplicasActionSortOrder() {
String indexName = randomAlphaOfLengthBetween(1, 10);
Map<String, LifecycleAction> actions = randomSubsetOf(VALID_WARM_ACTIONS)
.stream().map(this::getTestAction).collect(Collectors.toMap(LifecycleAction::getWriteableName, Function.identity()));
actions.put(ReplicasAction.NAME, TEST_REPLICAS_ACTION);
Phase warmPhase = new Phase("warm", TimeValue.ZERO, actions);
MockIndexLifecycleContext context =new MockIndexLifecycleContext(indexName, "", "",
TEST_REPLICAS_ACTION.getNumberOfReplicas() + 1){};
TimeseriesLifecyclePolicy policy = new TimeseriesLifecyclePolicy(lifecycleName, Collections.singletonMap("warm", warmPhase));
LifecyclePolicy.NextActionProvider provider = policy.getActionProvider(context, warmPhase);
assertThat(provider.next(null), equalTo(TEST_REPLICAS_ACTION));
context = new MockIndexLifecycleContext(indexName, "", "",
TEST_REPLICAS_ACTION.getNumberOfReplicas() - 1){};
provider = policy.getActionProvider(context, warmPhase);
if (actions.size() > 1) {
LifecycleAction current = provider.next(null);
assertThat(current, not(equalTo(TEST_REPLICAS_ACTION)));
while (true) {
if (provider.next(current) == null) {
assertThat(current, equalTo(TEST_REPLICAS_ACTION));
} else {
current = provider.next(current);
}
}
} else {
assertThat(provider.next(null), equalTo(TEST_REPLICAS_ACTION));
}
}
public void testColdActionProviderReplicasActionSortOrder() {
String indexName = randomAlphaOfLengthBetween(1, 10);
Map<String, LifecycleAction> actions = randomSubsetOf(VALID_COLD_ACTIONS)
.stream().map(this::getTestAction).collect(Collectors.toMap(LifecycleAction::getWriteableName, Function.identity()));
actions.put(ReplicasAction.NAME, TEST_REPLICAS_ACTION);
Phase coldPhase = new Phase("cold", TimeValue.ZERO, actions);
MockIndexLifecycleContext context =new MockIndexLifecycleContext(indexName, "", "",
TEST_REPLICAS_ACTION.getNumberOfReplicas() + 1){};
TimeseriesLifecyclePolicy policy = new TimeseriesLifecyclePolicy(lifecycleName, Collections.singletonMap("cold", coldPhase));
LifecyclePolicy.NextActionProvider provider = policy.getActionProvider(context, coldPhase);
assertThat(provider.next(null), equalTo(TEST_REPLICAS_ACTION));
context = new MockIndexLifecycleContext(indexName, "", "",
TEST_REPLICAS_ACTION.getNumberOfReplicas() - 1){};
provider = policy.getActionProvider(context, coldPhase);
if (actions.size() > 1) {
LifecycleAction current = provider.next(null);
assertThat(current, equalTo(TEST_ALLOCATE_ACTION));
assertThat(provider.next(current), equalTo(TEST_REPLICAS_ACTION));
} else {
assertThat(provider.next(null), equalTo(TEST_REPLICAS_ACTION));
}
}
public void testDeleteActionProvider() {
String indexName = randomAlphaOfLengthBetween(1, 10);
Map<String, LifecycleAction> actions = VALID_DELETE_ACTIONS
.stream().map(this::getTestAction).collect(Collectors.toMap(LifecycleAction::getWriteableName, Function.identity()));
Phase deletePhase = new Phase("delete", TimeValue.ZERO, actions);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "", "", 0){};
TimeseriesLifecyclePolicy policy = new TimeseriesLifecyclePolicy(lifecycleName, Collections.singletonMap("delete", deletePhase));
LifecyclePolicy.NextActionProvider provider = policy.getActionProvider(context, deletePhase);
assertThat(provider.next(null), equalTo(TEST_DELETE_ACTION));
assertNull(provider.next(TEST_DELETE_ACTION));
}
private LifecycleAction getTestAction(String actionName) { private LifecycleAction getTestAction(String actionName) {
switch (actionName) { switch (actionName) {
case AllocateAction.NAME: case AllocateAction.NAME:

View File

@ -63,7 +63,7 @@ public class GetLifecycleResponseTests extends AbstractStreamableTestCase<GetLif
phases = new HashMap<>(phases); phases = new HashMap<>(phases);
String newPhaseName = randomAlphaOfLengthBetween(1, 10); String newPhaseName = randomAlphaOfLengthBetween(1, 10);
phases.put(name, new Phase(newPhaseName, TimeValue.timeValueSeconds(randomIntBetween(1, 1000)), phases.put(name, new Phase(newPhaseName, TimeValue.timeValueSeconds(randomIntBetween(1, 1000)),
Collections.emptyList())); Collections.emptyMap()));
break; break;
default: default:
throw new AssertionError("Illegal randomisation branch"); throw new AssertionError("Illegal randomisation branch");

View File

@ -83,7 +83,7 @@ public class PutLifecycleRequestTests extends AbstractStreamableXContentTestCase
phases = new HashMap<>(phases); phases = new HashMap<>(phases);
String newPhaseName = randomAlphaOfLengthBetween(1, 10); String newPhaseName = randomAlphaOfLengthBetween(1, 10);
phases.put(name, new Phase(newPhaseName, TimeValue.timeValueSeconds(randomIntBetween(1, 1000)), phases.put(name, new Phase(newPhaseName, TimeValue.timeValueSeconds(randomIntBetween(1, 1000)),
Collections.emptyList())); Collections.emptyMap()));
break; break;
default: default:
throw new AssertionError("Illegal randomisation branch"); throw new AssertionError("Illegal randomisation branch");