Introduce specific types of index lifecycle policies (#3157)

Introduce specific types of index lifecycle policies
This commit is contained in:
Tal Levy 2017-11-30 10:09:41 -08:00 committed by GitHub
parent 922060d766
commit 685231e786
14 changed files with 534 additions and 222 deletions

View File

@ -114,6 +114,10 @@ public class IndexLifecycle extends Plugin {
new NamedWriteableRegistry.Entry(NamedDiff.class, IndexLifecycleMetadata.TYPE,
IndexLifecycleMetadata.IndexLifecycleMetadataDiff::new),
// Lifecycle policies
new NamedWriteableRegistry.Entry(LifecyclePolicy.class, TimeseriesLifecyclePolicy.TYPE,
TimeseriesLifecyclePolicy::new),
// Lifecycle actions
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new));
}
@ -124,6 +128,8 @@ public class IndexLifecycle extends Plugin {
// Custom metadata
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(IndexLifecycleMetadata.TYPE),
parser -> IndexLifecycleMetadata.PARSER.parse(parser, null)),
// Lifecycle Policy
new NamedXContentRegistry.Entry(LifecyclePolicy.class, new ParseField(TimeseriesLifecyclePolicy.TYPE), (p, c) -> TimeseriesLifecyclePolicy.parse(p, c)),
// Lifecycle actions
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse));
}

View File

@ -59,7 +59,7 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
int size = in.readVInt();
TreeMap<String, LifecyclePolicy> policies = new TreeMap<>();
for (int i = 0; i < size; i++) {
policies.put(in.readString(), new LifecyclePolicy(in));
policies.put(in.readString(), in.readNamedWriteable(LifecyclePolicy.class));
}
this.policies = policies;
this.pollInterval = in.readVLong();
@ -70,7 +70,7 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
out.writeVInt(policies.size());
for (Map.Entry<String, LifecyclePolicy> entry : policies.entrySet()) {
out.writeString(entry.getKey());
entry.getValue().writeTo(out);
out.writeNamedWriteable(entry.getValue());
}
out.writeVLong(pollInterval);
}
@ -147,13 +147,44 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
final Long pollIntervalDiff;
IndexLifecycleMetadataDiff(IndexLifecycleMetadata before, IndexLifecycleMetadata after) {
this.policies = DiffableUtils.diff(before.policies, after.policies, DiffableUtils.getStringKeySerializer());
this.policies = DiffableUtils.diff(before.policies, after.policies, DiffableUtils.getStringKeySerializer(), new DiffableUtils.ValueSerializer<String, LifecyclePolicy>() {
@Override
public void write(LifecyclePolicy value, StreamOutput out) throws IOException {
out.writeNamedWriteable(value);
}
@Override
public LifecyclePolicy read(StreamInput in, String key) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public Diff<LifecyclePolicy> readDiff(StreamInput in, String key) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean supportsDiffableValues() {
return true;
}
@Override
public Diff<LifecyclePolicy> diff(LifecyclePolicy value, LifecyclePolicy beforePart) {
return value.diff(beforePart);
}
@Override
public void writeDiff(Diff<LifecyclePolicy> value, StreamOutput out) throws IOException {
value.writeTo(out);
}
});
this.pollIntervalDiff = after.pollInterval - before.pollInterval;
}
public IndexLifecycleMetadataDiff(StreamInput in) throws IOException {
this.policies = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), LifecyclePolicy::new,
IndexLifecycleMetadataDiff::readLifecyclePolicyDiffFrom);
this.policies = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(),
(i) -> i.readNamedWriteable(LifecyclePolicy.class), IndexLifecycleMetadataDiff::readLifecyclePolicyDiffFrom);
this.pollIntervalDiff = in.readZLong();
}
@ -176,7 +207,7 @@ public class IndexLifecycleMetadata implements MetaData.Custom {
}
static Diff<LifecyclePolicy> readLifecyclePolicyDiffFrom(StreamInput in) throws IOException {
return AbstractDiffable.readDiffFrom(LifecyclePolicy::new, in);
return AbstractDiffable.readDiffFrom((i) -> i.readNamedWriteable(LifecyclePolicy.class), in);
}
}
}

View File

@ -7,12 +7,13 @@ package org.elasticsearch.xpack.indexlifecycle;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -22,8 +23,15 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleContext.Listener;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
/**
* Represents the lifecycle of an index from creation to deletion. A
@ -33,40 +41,32 @@ import java.util.Objects;
* dictate the order in which the {@link Phase}s are executed and will define
* which {@link LifecycleAction}s are allowed in each phase.
*/
public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implements ToXContentObject, Writeable {
public abstract class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implements ToXContentObject, NamedWriteable {
private static final Logger logger = ESLoggerFactory.getLogger(LifecyclePolicy.class);
public static final ParseField PHASES_FIELD = new ParseField("phases");
public static final ParseField TYPE_FIELD = new ParseField("type");
@SuppressWarnings("unchecked")
private static ConstructingObjectParser<LifecyclePolicy, Tuple<String, NamedXContentRegistry>> PARSER = new ConstructingObjectParser<>(
"lifecycle_policy", false, (a, c) -> new LifecyclePolicy(c.v1(), (List<Phase>) a[0]));
// public static LifecyclePolicy parse(XContentParser parser, Tuple<String, NamedXContentRegistry> context) {
// parser.getXContentRegistry().parseNamedObject()
// Map<String, Object> map = PARSER.apply(parser, context);
// return context.v2().parseNamedObject(LifecyclePolicy.class, map.get("lifecycle_type"), parser, context.v2());
// }
static {
PARSER.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, n) -> Phase.parse(p, new Tuple<>(n, c.v2())),
v -> {
throw new IllegalArgumentException("ordered " + PHASES_FIELD.getPreferredName() + " are not supported");
}, PHASES_FIELD);
}
public static LifecyclePolicy parse(XContentParser parser, Tuple<String, NamedXContentRegistry> context) {
return PARSER.apply(parser, context);
}
private final String name;
private final List<Phase> phases;
protected final String name;
protected final Map<String, Phase> phases;
/**
* @param name
* the name of this {@link LifecyclePolicy}
* @param phases
* a {@link List} of {@link Phase}s which make up this
* {@link LifecyclePolicy}. These {@link Phase}s are executed in
* the order of the {@link List}.
* a {@link Map} of {@link Phase}s which make up this
* {@link LifecyclePolicy}.
*/
public LifecyclePolicy(String name, List<Phase> phases) {
public LifecyclePolicy(String name, Map<String, Phase> phases) {
this.name = name;
this.phases = phases;
validate(phases.values());
}
/**
@ -74,13 +74,22 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implement
*/
public LifecyclePolicy(StreamInput in) throws IOException {
name = in.readString();
phases = in.readList(Phase::new);
phases = Collections.unmodifiableMap(in.readMap(StreamInput::readString, Phase::new));
}
public static LifecyclePolicy parse(XContentParser parser, Tuple<String, NamedXContentRegistry> context) {
return ToXContentContext.PARSER.apply(parser, context);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeList(phases);
out.writeMap(phases, StreamOutput::writeString, (o, val) -> val.writeTo(o));
}
@Override
public String getWriteableName() {
return getType();
}
/**
@ -94,18 +103,19 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implement
* @return the {@link Phase}s for this {@link LifecyclePolicy} in the order
* in which they will be executed.
*/
public List<Phase> getPhases() {
public Map<String, Phase> getPhases() {
return phases;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startObject(PHASES_FIELD.getPreferredName());
for (Phase phase : phases) {
builder.field(phase.getName(), phase);
}
builder.endObject();
builder.field(TYPE_FIELD.getPreferredName(), getType());
builder.startObject(PHASES_FIELD.getPreferredName());
for (Phase phase : phases.values()) {
builder.field(phase.getName(), phase);
}
builder.endObject();
builder.endObject();
return builder;
}
@ -121,45 +131,36 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implement
String currentPhaseName = context.getPhase();
boolean currentPhaseActionsComplete = context.getAction().equals(Phase.PHASE_COMPLETED);
String indexName = context.getLifecycleTarget();
Phase currentPhase = phases.get(currentPhaseName);
if (Strings.isNullOrEmpty(currentPhaseName) || currentPhaseActionsComplete) {
// Either this is the first time we have seen this index or the current phase is complete, in both cases we need to move to the next phase
int currentPhaseIndex = -1;
// First find the current phase (will not find it if this is the first time we've seen this index)
for (int i = 0; i < phases.size(); i++) {
if (phases.get(i).getName().equals(currentPhaseName)) {
currentPhaseIndex = i;
break;
}
}
// If we have reached the last phase then we don't need to do anything (maybe the last phase doesn't have a delete action?)
if (currentPhaseIndex < phases.size() - 1) {
Phase nextPhase = phases.get(currentPhaseIndex + 1);
// We only want to execute the phase if the conditions for executing are met (e.g. the index is old enough)
if (context.canExecute(nextPhase)) {
String nextPhaseName = nextPhase.getName();
// Set the phase on the context to this phase so we know where we are next time we execute
context.setPhase(nextPhaseName, new Listener() {
Phase nextPhase = nextPhase(currentPhase);
// We only want to execute the phase if the conditions for executing are met (e.g. the index is old enough)
if (nextPhase != null && context.canExecute(nextPhase)) {
String nextPhaseName = nextPhase.getName();
// Set the phase on the context to this phase so we know where we are next time we execute
context.setPhase(nextPhaseName, new Listener() {
@Override
public void onSuccess() {
logger.info("Successfully initialised phase [" + nextPhaseName + "] for index [" + indexName + "]");
// We might as well execute the phase now rather than waiting for execute to be called again
nextPhase.execute(context);
}
@Override
public void onSuccess() {
logger.info("Successfully initialised phase [" + nextPhaseName + "] for index [" + indexName + "]");
// We might as well execute the phase now rather than waiting for execute to be called again
nextPhase.execute(context);
}
@Override
public void onFailure(Exception e) {
logger.error("Failed to initialised phase [" + nextPhaseName + "] for index [" + indexName + "]", e);
}
});
}
@Override
public void onFailure(Exception e) {
logger.error("Failed to initialised phase [" + nextPhaseName + "] for index [" + indexName + "]", e);
}
});
}
} else {
// If we have already seen this index and the action is not PHASE_COMPLETED then we just need to execute the current phase again
Phase currentPhase = phases.stream().filter(phase -> phase.getName().equals(currentPhaseName)).findAny()
.orElseThrow(() -> new IllegalStateException("Current phase [" + currentPhaseName + "] not found in lifecycle ["
+ getName() + "] for index [" + indexName + "]"));
currentPhase.execute(context);
if (currentPhase == null) {
throw new IllegalStateException("Current phase [" + currentPhaseName + "] not found in lifecycle ["
+ getName() + "] for index [" + indexName + "]");
} else {
currentPhase.execute(context);
}
}
}
@ -185,4 +186,75 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy> implement
public String toString() {
return Strings.toString(this, true, true);
}
/**
* @return the first phase of this policy to execute
*/
protected abstract Phase getFirstPhase();
/**
* @param currentPhase the current phase that is or was just executed
* @return the next phase after {@param currentPhase} to be execute. If it is `null`, the first
* phase to be executed is returned. If it is the last phase, then no next phase is to be
* executed and `null` is returned.
*/
protected abstract Phase nextPhase(@Nullable Phase currentPhase);
/**
* validates whether the specified {@param phases} are valid for this policy instance.
* @param phases the phases to verify validity against
* @throws IllegalArgumentException if a specific phase or lack of a specific phase is invalid.
*/
protected abstract void validate(Collection<Phase> phases);
/**
* Each {@link LifecyclePolicy} has a specific type to differentiate themselves. Every implementation
* is responsible to providing its specific type.
* @return the {@link LifecyclePolicy} type.
*/
protected abstract String getType();
/**
* This class is here to assist in creating a context from which the specific LifecyclePolicy sub-classes can inherit
* all the previously parsed values
*/
public static class ToXContentContext {
private final Map<String, Phase> phases;
private final String name;
@SuppressWarnings("unchecked")
public static ConstructingObjectParser<LifecyclePolicy, Tuple<String, NamedXContentRegistry>> PARSER = new ConstructingObjectParser<>(
"lifecycle_policy", false, (a, c) -> {
String lifecycleType = (String) a[0];
List<Phase> phases = (List<Phase>) a[1];
Map<String, Phase> phaseMap = phases.stream().collect(Collectors.toMap(Phase::getName, Function.identity()));
NamedXContentRegistry registry = c.v2();
ToXContentContext factory = new ToXContentContext(c.v1(), phaseMap);
try {
return registry.parseNamedObject(LifecyclePolicy.class, lifecycleType, null, factory);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
});
static {
PARSER.declareString(constructorArg(), TYPE_FIELD);
PARSER.declareNamedObjects(constructorArg(), (p, c, n) -> Phase.parse(p, new Tuple<>(n, c.v2())),
v -> {
throw new IllegalArgumentException("ordered " + PHASES_FIELD.getPreferredName() + " are not supported");
}, PHASES_FIELD);
}
ToXContentContext(String name, Map<String, Phase> phases) {
this.name = name;
this.phases = phases;
}
public String getName() {
return name;
}
public Map<String, Phase> getPhases() {
return phases;
}
}
}

View File

@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Represents the lifecycle of an index from creation to deletion. A
* {@link TimeseriesLifecyclePolicy} is made up of a set of {@link Phase}s which it will
* move through. Soon we will constrain the phases using some kinda of lifecycle
* type which will allow only particular {@link Phase}s to be defined, will
* dictate the order in which the {@link Phase}s are executed and will define
* which {@link LifecycleAction}s are allowed in each phase.
*/
public class TimeseriesLifecyclePolicy extends LifecyclePolicy {
public static final String TYPE = "timeseries";
static final List<String> VALID_PHASES = Arrays.asList("hot", "warm", "cold", "delete");
/**
* @param name
* the name of this {@link TimeseriesLifecyclePolicy}
* @param phases
* a {@link Map} of {@link Phase}s which make up this
* {@link TimeseriesLifecyclePolicy}.
*/
public TimeseriesLifecyclePolicy(String name, Map<String, Phase> phases) {
super(name, phases);
}
/**
* For Serialization
*/
public TimeseriesLifecyclePolicy(StreamInput in) throws IOException {
super(in);
}
public static TimeseriesLifecyclePolicy parse(XContentParser parser, Object context) {
ToXContentContext factory = (ToXContentContext) context;
return new TimeseriesLifecyclePolicy(factory.getName(), factory.getPhases());
}
@Override
protected String getType() {
return TYPE;
}
@Override
protected Phase getFirstPhase() {
Phase firstPhase = phases.get("hot");
if (firstPhase == null) {
firstPhase = phases.get("warm");
}
if (firstPhase == null) {
firstPhase = phases.get("cold");
}
if (firstPhase == null) {
firstPhase = phases.get("delete");
}
return firstPhase;
}
@Override
protected Phase nextPhase(@Nullable Phase currentPhase) {
if (currentPhase == null) {
return getFirstPhase();
}
// VALID_PHASES is in order of execution
boolean readyToSetNext = false;
for (String phaseName : VALID_PHASES) {
if (readyToSetNext && phases.containsKey(phaseName)) {
return phases.get(phaseName);
}
if (phaseName.equals(currentPhase.getName())) {
readyToSetNext = true;
}
}
return null;
}
@Override
public void validate(Collection<Phase> phases) {
Set<String> allowedPhases = new HashSet<>(VALID_PHASES);
phases.forEach(phase -> {
if (allowedPhases.contains(phase.getName()) == false) {
throw new IllegalArgumentException("Timeseries lifecycle does not support phase [" + phase.getName() + "]");
}
});
}
}

View File

@ -86,12 +86,12 @@ public class GetLifecycleAction
@Override
public void readFrom(StreamInput in) throws IOException {
policy = new LifecyclePolicy(in);
policy = in.readNamedWriteable(LifecyclePolicy.class);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
policy.writeTo(out);
out.writeNamedWriteable(policy);
}
@Override

View File

@ -149,7 +149,7 @@ public class PutLifecycleAction extends Action<PutLifecycleAction.Request, PutLi
}
public static Request parseRequest(String name, XContentParser parser, NamedXContentRegistry namedXContentRegistry) {
return PARSER.apply(parser, new Tuple<String, NamedXContentRegistry>(name, namedXContentRegistry));
return PARSER.apply(parser, new Tuple<>(name, namedXContentRegistry));
}
@Override
@ -163,13 +163,13 @@ public class PutLifecycleAction extends Action<PutLifecycleAction.Request, PutLi
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
policy = new LifecyclePolicy(in);
policy = in.readNamedWriteable(LifecyclePolicy.class);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
policy.writeTo(out);
out.writeNamedWriteable(policy);
}
@Override

View File

@ -90,7 +90,7 @@ public class IndexLifecycleInitialisationIT extends ESIntegTestCase {
phases.add(new Phase("new", TimeValue.timeValueSeconds(0), Collections.emptyList()));
List<LifecycleAction> deletePhaseActions = Collections.singletonList(new DeleteAction());
phases.add(new Phase("delete", TimeValue.timeValueSeconds(3), deletePhaseActions));
lifecyclePolicy = new LifecyclePolicy("test_lifecycle", phases);
lifecyclePolicy = new TestLifecyclePolicy("test", phases);
}
public void testSingleNodeCluster() throws Exception {

View File

@ -34,7 +34,8 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
@Before
public void setup() {
List<NamedXContentRegistry.Entry> entries = Arrays
.asList(new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse));
.asList(new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse),
new NamedXContentRegistry.Entry(LifecyclePolicy.class, new ParseField("test"), TestLifecyclePolicy::parse));
registry = new NamedXContentRegistry(entries);
}
@ -54,17 +55,12 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
phases.add(new Phase(randomAlphaOfLength(10), after, actions));
}
String policyName = randomAlphaOfLength(10);
policies.put(policyName, new LifecyclePolicy(policyName, phases));
policies.put(policyName, new TestLifecyclePolicy(policyName, phases));
}
long pollInterval = randomNonNegativeLong();
return new IndexLifecycleMetadata(policies, pollInterval);
}
@Override
protected String[] getShuffleFieldsExceptions() {
return new String[] { "phases" }; // NOCOMMIT this needs to be temporary since we should not rely on the order of the JSON map
}
@Override
protected IndexLifecycleMetadata doParseInstance(XContentParser parser) throws IOException {
return IndexLifecycleMetadata.PARSER.apply(parser, registry);
@ -75,9 +71,12 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
return IndexLifecycleMetadata::new;
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(
Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new)));
Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new),
new NamedWriteableRegistry.Entry(LifecyclePolicy.class, TestLifecyclePolicy.TYPE,
TestLifecyclePolicy::new)));
}
@Override
@ -92,7 +91,7 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
case 1:
policies = new TreeMap<>(policies);
String policyName = randomAlphaOfLength(10);
policies.put(policyName, new LifecyclePolicy(policyName, Collections.emptyList()));
policies.put(policyName, new TestLifecyclePolicy(policyName, Collections.emptyList()));
break;
default:
throw new AssertionError("Illegal randomisation branch");

View File

@ -5,89 +5,14 @@
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecyclePolicy> {
private NamedXContentRegistry registry;
private String lifecycleName;
@Before
public void setup() {
List<NamedXContentRegistry.Entry> entries = Arrays
.asList(new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse));
registry = new NamedXContentRegistry(entries);
lifecycleName = randomAlphaOfLength(20); // NOCOMMIT we need to randomise the lifecycle name rather
// than use the same name for all instances
}
@Override
protected LifecyclePolicy createTestInstance() {
int numberPhases = randomInt(5);
List<Phase> phases = new ArrayList<>(numberPhases);
for (int i = 0; i < numberPhases; i++) {
TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after");
List<LifecycleAction> actions = new ArrayList<>();
if (randomBoolean()) {
actions.add(new DeleteAction());
}
phases.add(new Phase(randomAlphaOfLength(10), after, actions));
}
return new LifecyclePolicy(lifecycleName, phases);
}
@Override
protected String[] getShuffleFieldsExceptions() {
return new String[] { "phases" }; // NOCOMMIT this needs to be temporary since we should not rely on the order of the JSON map
}
@Override
protected LifecyclePolicy doParseInstance(XContentParser parser) throws IOException {
return LifecyclePolicy.parse(parser, new Tuple<>(lifecycleName, registry));
}
@Override
protected Reader<LifecyclePolicy> instanceReader() {
return LifecyclePolicy::new;
}
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(
Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new)));
}
@Override
protected LifecyclePolicy mutateInstance(LifecyclePolicy instance) throws IOException {
String name = instance.getName();
List<Phase> phases = instance.getPhases();
switch (between(0, 1)) {
case 0:
name = name + randomAlphaOfLengthBetween(1, 5);
break;
case 1:
phases = new ArrayList<>(phases);
phases.add(new Phase(randomAlphaOfLengthBetween(1, 10), TimeValue.timeValueSeconds(randomIntBetween(1, 1000)),
Collections.emptyList()));
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new LifecyclePolicy(name, phases);
}
public class LifecyclePolicyTests extends ESTestCase {
public void testExecuteNewIndexBeforeTrigger() throws Exception {
String indexName = randomAlphaOfLengthBetween(1, 20);
@ -111,7 +36,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "", "") {
@ -161,7 +86,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "", "") {
@ -211,7 +136,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "", "") {
@ -265,7 +190,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, firstPhase.getName(), "") {
@ -311,7 +236,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, secondPhase.getName(), "") {
@ -357,7 +282,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, thirdPhase.getName(), "") {
@ -403,7 +328,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, "does_not_exist", "") {
@ -452,7 +377,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, firstPhase.getName(), Phase.PHASE_COMPLETED) {
@ -502,7 +427,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, firstPhase.getName(), Phase.PHASE_COMPLETED) {
@ -552,7 +477,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, secondPhase.getName(), Phase.PHASE_COMPLETED) {
@ -602,7 +527,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, secondPhase.getName(), Phase.PHASE_COMPLETED) {
@ -652,7 +577,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
after = TimeValue.timeValueSeconds(20);
Phase thirdPhase = new Phase("third_phase", after, actions);
phases.add(thirdPhase);
LifecyclePolicy policy = new LifecyclePolicy(lifecycleName, phases);
LifecyclePolicy policy = new TestLifecyclePolicy(lifecycleName, phases);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, thirdPhase.getName(), Phase.PHASE_COMPLETED) {

View File

@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
public class TestLifecyclePolicy extends LifecyclePolicy {
public static final String TYPE = "test";
private List<Phase> phasesList;
public TestLifecyclePolicy(String name, List<Phase> phasesList) {
super(name, phasesList.stream().collect(Collectors.toMap(Phase::getName, Function.identity())));
this.phasesList = phasesList;
}
public TestLifecyclePolicy(StreamInput in) throws IOException {
super(in);
}
public static TestLifecyclePolicy parse(XContentParser parser, Object context) {
ToXContentContext factory = (ToXContentContext) context;
return new TestLifecyclePolicy(factory.getName(), new ArrayList<>(factory.getPhases().values()));
}
@Override
protected String getType() {
return TYPE;
}
@Override
protected Phase getFirstPhase() {
return phasesList.get(0);
}
@Override
protected Phase nextPhase(@Nullable Phase currentPhase) {
if (currentPhase == null) {
return getFirstPhase();
}
for(int i=0; i < phasesList.size() - 1; i++) {
if (phasesList.get(i).equals(currentPhase)) {
return phasesList.get(i + 1);
}
}
return null;
}
@Override
protected void validate(Collection<Phase> phases) {
// always valid
}
}

View File

@ -0,0 +1,119 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.indexlifecycle;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.junit.Before;
import java.io.IOException;
import java.sql.Time;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.indexlifecycle.TimeseriesLifecyclePolicy.VALID_PHASES;
import static org.hamcrest.Matchers.equalTo;
public class TimeseriesLifecyclePolicyTests extends AbstractSerializingTestCase<LifecyclePolicy> {
private NamedXContentRegistry registry;
private String lifecycleName;
@Before
public void setup() {
List<NamedXContentRegistry.Entry> entries = Arrays
.asList(new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse),
new NamedXContentRegistry.Entry(LifecyclePolicy.class, new ParseField(TimeseriesLifecyclePolicy.TYPE),
TimeseriesLifecyclePolicy::parse));
registry = new NamedXContentRegistry(entries);
lifecycleName = randomAlphaOfLength(20); // NOCOMMIT we need to randomise the lifecycle name rather
// than use the same name for all instances
}
@Override
protected LifecyclePolicy createTestInstance() {
return new TimeseriesLifecyclePolicy(lifecycleName, Collections.emptyMap());
}
@Override
protected LifecyclePolicy doParseInstance(XContentParser parser) throws IOException {
return LifecyclePolicy.parse(parser, new Tuple<>(lifecycleName, registry));
}
@Override
protected Reader<LifecyclePolicy> instanceReader() {
return TimeseriesLifecyclePolicy::new;
}
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(
Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new)));
}
public void testGetFirstPhase() {
Map<String, Phase> phases = new HashMap<>();
Phase expectedFirstPhase = null;
for (String phaseName : Arrays.asList("hot", "warm", "cold", "delete")) {
if (randomBoolean()) {
Phase phase = new Phase(phaseName, TimeValue.MINUS_ONE, Collections.emptyList());
phases.put(phaseName, phase);
if (expectedFirstPhase == null) {
expectedFirstPhase = phase;
}
}
}
TimeseriesLifecyclePolicy policy = new TimeseriesLifecyclePolicy(lifecycleName, phases);
assertThat(policy.getFirstPhase(), equalTo(expectedFirstPhase));
}
public void testGetNextPhase() {
for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) {
Map<String, Phase> phases = new HashMap<>();
List<Phase> phasesInOrder = new ArrayList<>();
for (String phase : VALID_PHASES) {
if (randomBoolean()) {
Phase phaseToAdd = new Phase(phase, TimeValue.MINUS_ONE, Collections.emptyList());
phases.put(phase, phaseToAdd);
phasesInOrder.add(phaseToAdd);
}
}
TimeseriesLifecyclePolicy policy = new TimeseriesLifecyclePolicy(lifecycleName, phases);
assertThat(policy.nextPhase(null), equalTo(policy.getFirstPhase()));
for (int i = 0; i < phasesInOrder.size() - 1; i++) {
assertThat(policy.nextPhase(phasesInOrder.get(i)), equalTo(phasesInOrder.get(i + 1)));
}
if (phasesInOrder.isEmpty() == false) {
assertNull(policy.nextPhase(phasesInOrder.get(phasesInOrder.size() - 1)));
}
}
}
public void testValidate() {
boolean invalid = randomBoolean();
String phaseName = randomFrom("hot", "warm", "cold", "delete");
if (invalid) {
phaseName += randomAlphaOfLength(5);
}
Map<String, Phase> phases = Collections.singletonMap(phaseName,
new Phase(phaseName, TimeValue.ZERO, Collections.emptyList()));
if (invalid) {
Exception e = expectThrows(IllegalArgumentException.class, () -> new TimeseriesLifecyclePolicy(lifecycleName, phases));
assertThat(e.getMessage(), equalTo("Timeseries lifecycle does not support phase [" + phaseName + "]"));
} else {
new TimeseriesLifecyclePolicy(lifecycleName, phases);
}
}
}

View File

@ -25,5 +25,4 @@ public class GetLifecycleRequestTests extends AbstractStreamableTestCase<GetLife
protected MutateFunction<Request> getMutateFunction() {
return resp -> new Request(resp.getPolicyName() + randomAlphaOfLengthBetween(1, 10));
}
}

View File

@ -13,13 +13,17 @@ import org.elasticsearch.xpack.indexlifecycle.DeleteAction;
import org.elasticsearch.xpack.indexlifecycle.LifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.indexlifecycle.Phase;
import org.elasticsearch.xpack.indexlifecycle.TestLifecyclePolicy;
import org.elasticsearch.xpack.indexlifecycle.TimeseriesLifecyclePolicy;
import org.elasticsearch.xpack.indexlifecycle.action.GetLifecycleAction.Response;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class GetLifecycleResponseTests extends AbstractStreamableTestCase<GetLifecycleAction.Response> {
@ -33,17 +37,7 @@ public class GetLifecycleResponseTests extends AbstractStreamableTestCase<GetLif
@Override
protected Response createTestInstance() {
int numberPhases = randomInt(5);
List<Phase> phases = new ArrayList<>(numberPhases);
for (int i = 0; i < numberPhases; i++) {
TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after");
List<LifecycleAction> actions = new ArrayList<>();
if (randomBoolean()) {
actions.add(new DeleteAction());
}
phases.add(new Phase(randomAlphaOfLength(10), after, actions));
}
return new Response(new LifecyclePolicy(lifecycleName, phases));
return new Response(new TestLifecyclePolicy(lifecycleName, Collections.emptyList()));
}
@Override
@ -53,7 +47,8 @@ public class GetLifecycleResponseTests extends AbstractStreamableTestCase<GetLif
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(
Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new)));
Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new),
new NamedWriteableRegistry.Entry(LifecyclePolicy.class, TestLifecyclePolicy.TYPE, TestLifecyclePolicy::new)));
}
@Override
@ -61,21 +56,21 @@ public class GetLifecycleResponseTests extends AbstractStreamableTestCase<GetLif
return resp -> {
LifecyclePolicy policy = resp.getPolicy();
String name = policy.getName();
List<Phase> phases = policy.getPhases();
Map<String, Phase> phases = policy.getPhases();
switch (between(0, 1)) {
case 0:
name = name + randomAlphaOfLengthBetween(1, 5);
break;
case 1:
phases = new ArrayList<>(phases);
phases.add(new Phase(randomAlphaOfLengthBetween(1, 10), TimeValue.timeValueSeconds(randomIntBetween(1, 1000)),
case 0:
name = name + randomAlphaOfLengthBetween(1, 5);
break;
case 1:
phases = new HashMap<>(phases);
String newPhaseName = randomAlphaOfLengthBetween(1, 10);
phases.put(name, new Phase(newPhaseName, TimeValue.timeValueSeconds(randomIntBetween(1, 1000)),
Collections.emptyList()));
break;
default:
throw new AssertionError("Illegal randomisation branch");
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new Response(new LifecyclePolicy(name, phases));
return new Response(new TestLifecyclePolicy(name, new ArrayList<>(phases.values())));
};
}
}

View File

@ -16,13 +16,16 @@ import org.elasticsearch.xpack.indexlifecycle.DeleteAction;
import org.elasticsearch.xpack.indexlifecycle.LifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.indexlifecycle.Phase;
import org.elasticsearch.xpack.indexlifecycle.TestLifecyclePolicy;
import org.elasticsearch.xpack.indexlifecycle.action.PutLifecycleAction.Request;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PutLifecycleRequestTests extends AbstractStreamableXContentTestCase<PutLifecycleAction.Request> {
@ -32,7 +35,8 @@ public class PutLifecycleRequestTests extends AbstractStreamableXContentTestCase
@Before
public void setup() {
List<NamedXContentRegistry.Entry> entries = Arrays
.asList(new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse));
.asList(new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse),
new NamedXContentRegistry.Entry(LifecyclePolicy.class, new ParseField(TestLifecyclePolicy.TYPE), TestLifecyclePolicy::parse));
registry = new NamedXContentRegistry(entries);
lifecycleName = randomAlphaOfLength(20); // NOCOMMIT we need to randomise the lifecycle name rather
// than use the same name for all instances
@ -40,17 +44,7 @@ public class PutLifecycleRequestTests extends AbstractStreamableXContentTestCase
@Override
protected Request createTestInstance() {
int numberPhases = 1; // NOCOMMIT need to make this more than one when phase order doesn't rely on JSON map order
List<Phase> phases = new ArrayList<>(numberPhases);
for (int i = 0; i < numberPhases; i++) {
TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after");
List<LifecycleAction> actions = new ArrayList<>();
if (randomBoolean()) {
actions.add(new DeleteAction());
}
phases.add(new Phase(randomAlphaOfLength(10), after, actions));
}
return new Request(new LifecyclePolicy(lifecycleName, phases));
return new Request(new TestLifecyclePolicy(lifecycleName, Collections.emptyList()));
}
@Override
@ -65,7 +59,8 @@ public class PutLifecycleRequestTests extends AbstractStreamableXContentTestCase
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(
Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new)));
Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new),
new NamedWriteableRegistry.Entry(LifecyclePolicy.class, TestLifecyclePolicy.TYPE, TestLifecyclePolicy::new)));
}
protected boolean supportsUnknownFields() {
@ -77,20 +72,21 @@ public class PutLifecycleRequestTests extends AbstractStreamableXContentTestCase
return resp -> {
LifecyclePolicy policy = resp.getPolicy();
String name = policy.getName();
List<Phase> phases = policy.getPhases();
Map<String, Phase> phases = policy.getPhases();
switch (between(0, 1)) {
case 0:
name = name + randomAlphaOfLengthBetween(1, 5);
break;
case 1:
phases = new ArrayList<>(phases);
phases.add(new Phase(randomAlphaOfLengthBetween(1, 10), TimeValue.timeValueSeconds(randomIntBetween(1, 1000)),
case 0:
name = name + randomAlphaOfLengthBetween(1, 5);
break;
case 1:
phases = new HashMap<>(phases);
String newPhaseName = randomAlphaOfLengthBetween(1, 10);
phases.put(name, new Phase(newPhaseName, TimeValue.timeValueSeconds(randomIntBetween(1, 1000)),
Collections.emptyList()));
break;
default:
throw new AssertionError("Illegal randomisation branch");
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new Request(new LifecyclePolicy(name, phases));
return new Request(new TestLifecyclePolicy(name, new ArrayList<>(phases.values())));
};
}