diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index 0bc1dc6a24a..83f90a7c1e3 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -114,6 +114,10 @@ public class IndexLifecycle extends Plugin { new NamedWriteableRegistry.Entry(NamedDiff.class, IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.IndexLifecycleMetadataDiff::new), + // Lifecycle policies + new NamedWriteableRegistry.Entry(LifecyclePolicy.class, TimeseriesLifecyclePolicy.TYPE, + TimeseriesLifecyclePolicy::new), + // Lifecycle actions new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new)); } @@ -124,6 +128,8 @@ public class IndexLifecycle extends Plugin { // Custom metadata new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(IndexLifecycleMetadata.TYPE), parser -> IndexLifecycleMetadata.PARSER.parse(parser, null)), + // Lifecycle Policy + new NamedXContentRegistry.Entry(LifecyclePolicy.class, new ParseField(TimeseriesLifecyclePolicy.TYPE), (p, c) -> TimeseriesLifecyclePolicy.parse(p, c)), // Lifecycle actions new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse)); } diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadata.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadata.java index 550768f86cb..f12187ca6bf 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadata.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadata.java @@ -59,7 +59,7 @@ public class IndexLifecycleMetadata implements MetaData.Custom { int size = in.readVInt(); TreeMap policies = new TreeMap<>(); for (int i = 0; i < size; i++) { - policies.put(in.readString(), new LifecyclePolicy(in)); + policies.put(in.readString(), in.readNamedWriteable(LifecyclePolicy.class)); } this.policies = policies; this.pollInterval = in.readVLong(); @@ -70,7 +70,7 @@ public class IndexLifecycleMetadata implements MetaData.Custom { out.writeVInt(policies.size()); for (Map.Entry entry : policies.entrySet()) { out.writeString(entry.getKey()); - entry.getValue().writeTo(out); + out.writeNamedWriteable(entry.getValue()); } out.writeVLong(pollInterval); } @@ -147,13 +147,44 @@ public class IndexLifecycleMetadata implements MetaData.Custom { final Long pollIntervalDiff; IndexLifecycleMetadataDiff(IndexLifecycleMetadata before, IndexLifecycleMetadata after) { - this.policies = DiffableUtils.diff(before.policies, after.policies, DiffableUtils.getStringKeySerializer()); + this.policies = DiffableUtils.diff(before.policies, after.policies, DiffableUtils.getStringKeySerializer(), new DiffableUtils.ValueSerializer() { + @Override + public void write(LifecyclePolicy value, StreamOutput out) throws IOException { + out.writeNamedWriteable(value); + } + + @Override + public LifecyclePolicy read(StreamInput in, String key) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Diff readDiff(StreamInput in, String key) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean supportsDiffableValues() { + return true; + } + + @Override + public Diff diff(LifecyclePolicy value, LifecyclePolicy beforePart) { + return value.diff(beforePart); + } + + @Override + public void writeDiff(Diff value, StreamOutput out) throws IOException { + value.writeTo(out); + } + + }); this.pollIntervalDiff = after.pollInterval - before.pollInterval; } public IndexLifecycleMetadataDiff(StreamInput in) throws IOException { - this.policies = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), LifecyclePolicy::new, - IndexLifecycleMetadataDiff::readLifecyclePolicyDiffFrom); + this.policies = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), + (i) -> i.readNamedWriteable(LifecyclePolicy.class), IndexLifecycleMetadataDiff::readLifecyclePolicyDiffFrom); this.pollIntervalDiff = in.readZLong(); } @@ -176,7 +207,7 @@ public class IndexLifecycleMetadata implements MetaData.Custom { } static Diff readLifecyclePolicyDiffFrom(StreamInput in) throws IOException { - return AbstractDiffable.readDiffFrom(LifecyclePolicy::new, in); + return AbstractDiffable.readDiffFrom((i) -> i.readNamedWriteable(LifecyclePolicy.class), in); } } } diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicy.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicy.java index fc2584c450c..11873c660dc 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicy.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicy.java @@ -7,12 +7,13 @@ package org.elasticsearch.xpack.indexlifecycle; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -22,8 +23,15 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.indexlifecycle.IndexLifecycleContext.Listener; import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; /** * Represents the lifecycle of an index from creation to deletion. A @@ -33,40 +41,32 @@ import java.util.Objects; * dictate the order in which the {@link Phase}s are executed and will define * which {@link LifecycleAction}s are allowed in each phase. */ -public class LifecyclePolicy extends AbstractDiffable implements ToXContentObject, Writeable { +public abstract class LifecyclePolicy extends AbstractDiffable implements ToXContentObject, NamedWriteable { private static final Logger logger = ESLoggerFactory.getLogger(LifecyclePolicy.class); public static final ParseField PHASES_FIELD = new ParseField("phases"); + public static final ParseField TYPE_FIELD = new ParseField("type"); - @SuppressWarnings("unchecked") - private static ConstructingObjectParser> PARSER = new ConstructingObjectParser<>( - "lifecycle_policy", false, (a, c) -> new LifecyclePolicy(c.v1(), (List) a[0])); +// public static LifecyclePolicy parse(XContentParser parser, Tuple context) { +// parser.getXContentRegistry().parseNamedObject() +// Map map = PARSER.apply(parser, context); +// return context.v2().parseNamedObject(LifecyclePolicy.class, map.get("lifecycle_type"), parser, context.v2()); +// } - static { - PARSER.declareNamedObjects(ConstructingObjectParser.constructorArg(), (p, c, n) -> Phase.parse(p, new Tuple<>(n, c.v2())), - v -> { - throw new IllegalArgumentException("ordered " + PHASES_FIELD.getPreferredName() + " are not supported"); - }, PHASES_FIELD); - } - - public static LifecyclePolicy parse(XContentParser parser, Tuple context) { - return PARSER.apply(parser, context); - } - - private final String name; - private final List phases; + protected final String name; + protected final Map phases; /** * @param name * the name of this {@link LifecyclePolicy} * @param phases - * a {@link List} of {@link Phase}s which make up this - * {@link LifecyclePolicy}. These {@link Phase}s are executed in - * the order of the {@link List}. + * a {@link Map} of {@link Phase}s which make up this + * {@link LifecyclePolicy}. */ - public LifecyclePolicy(String name, List phases) { + public LifecyclePolicy(String name, Map phases) { this.name = name; this.phases = phases; + validate(phases.values()); } /** @@ -74,13 +74,22 @@ public class LifecyclePolicy extends AbstractDiffable implement */ public LifecyclePolicy(StreamInput in) throws IOException { name = in.readString(); - phases = in.readList(Phase::new); + phases = Collections.unmodifiableMap(in.readMap(StreamInput::readString, Phase::new)); + } + + public static LifecyclePolicy parse(XContentParser parser, Tuple context) { + return ToXContentContext.PARSER.apply(parser, context); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(name); - out.writeList(phases); + out.writeMap(phases, StreamOutput::writeString, (o, val) -> val.writeTo(o)); + } + + @Override + public String getWriteableName() { + return getType(); } /** @@ -94,18 +103,19 @@ public class LifecyclePolicy extends AbstractDiffable implement * @return the {@link Phase}s for this {@link LifecyclePolicy} in the order * in which they will be executed. */ - public List getPhases() { + public Map getPhases() { return phases; } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.startObject(PHASES_FIELD.getPreferredName()); - for (Phase phase : phases) { - builder.field(phase.getName(), phase); - } - builder.endObject(); + builder.field(TYPE_FIELD.getPreferredName(), getType()); + builder.startObject(PHASES_FIELD.getPreferredName()); + for (Phase phase : phases.values()) { + builder.field(phase.getName(), phase); + } + builder.endObject(); builder.endObject(); return builder; } @@ -121,45 +131,36 @@ public class LifecyclePolicy extends AbstractDiffable implement String currentPhaseName = context.getPhase(); boolean currentPhaseActionsComplete = context.getAction().equals(Phase.PHASE_COMPLETED); String indexName = context.getLifecycleTarget(); + Phase currentPhase = phases.get(currentPhaseName); if (Strings.isNullOrEmpty(currentPhaseName) || currentPhaseActionsComplete) { - // Either this is the first time we have seen this index or the current phase is complete, in both cases we need to move to the next phase - int currentPhaseIndex = -1; - // First find the current phase (will not find it if this is the first time we've seen this index) - for (int i = 0; i < phases.size(); i++) { - if (phases.get(i).getName().equals(currentPhaseName)) { - currentPhaseIndex = i; - break; - } - } - // If we have reached the last phase then we don't need to do anything (maybe the last phase doesn't have a delete action?) - if (currentPhaseIndex < phases.size() - 1) { - Phase nextPhase = phases.get(currentPhaseIndex + 1); - // We only want to execute the phase if the conditions for executing are met (e.g. the index is old enough) - if (context.canExecute(nextPhase)) { - String nextPhaseName = nextPhase.getName(); - // Set the phase on the context to this phase so we know where we are next time we execute - context.setPhase(nextPhaseName, new Listener() { + Phase nextPhase = nextPhase(currentPhase); + // We only want to execute the phase if the conditions for executing are met (e.g. the index is old enough) + if (nextPhase != null && context.canExecute(nextPhase)) { + String nextPhaseName = nextPhase.getName(); + // Set the phase on the context to this phase so we know where we are next time we execute + context.setPhase(nextPhaseName, new Listener() { - @Override - public void onSuccess() { - logger.info("Successfully initialised phase [" + nextPhaseName + "] for index [" + indexName + "]"); - // We might as well execute the phase now rather than waiting for execute to be called again - nextPhase.execute(context); - } + @Override + public void onSuccess() { + logger.info("Successfully initialised phase [" + nextPhaseName + "] for index [" + indexName + "]"); + // We might as well execute the phase now rather than waiting for execute to be called again + nextPhase.execute(context); + } - @Override - public void onFailure(Exception e) { - logger.error("Failed to initialised phase [" + nextPhaseName + "] for index [" + indexName + "]", e); - } - }); - } + @Override + public void onFailure(Exception e) { + logger.error("Failed to initialised phase [" + nextPhaseName + "] for index [" + indexName + "]", e); + } + }); } } else { // If we have already seen this index and the action is not PHASE_COMPLETED then we just need to execute the current phase again - Phase currentPhase = phases.stream().filter(phase -> phase.getName().equals(currentPhaseName)).findAny() - .orElseThrow(() -> new IllegalStateException("Current phase [" + currentPhaseName + "] not found in lifecycle [" - + getName() + "] for index [" + indexName + "]")); - currentPhase.execute(context); + if (currentPhase == null) { + throw new IllegalStateException("Current phase [" + currentPhaseName + "] not found in lifecycle [" + + getName() + "] for index [" + indexName + "]"); + } else { + currentPhase.execute(context); + } } } @@ -185,4 +186,75 @@ public class LifecyclePolicy extends AbstractDiffable implement public String toString() { return Strings.toString(this, true, true); } + + /** + * @return the first phase of this policy to execute + */ + protected abstract Phase getFirstPhase(); + + /** + * @param currentPhase the current phase that is or was just executed + * @return the next phase after {@param currentPhase} to be execute. If it is `null`, the first + * phase to be executed is returned. If it is the last phase, then no next phase is to be + * executed and `null` is returned. + */ + protected abstract Phase nextPhase(@Nullable Phase currentPhase); + + /** + * validates whether the specified {@param phases} are valid for this policy instance. + * @param phases the phases to verify validity against + * @throws IllegalArgumentException if a specific phase or lack of a specific phase is invalid. + */ + protected abstract void validate(Collection phases); + + /** + * Each {@link LifecyclePolicy} has a specific type to differentiate themselves. Every implementation + * is responsible to providing its specific type. + * @return the {@link LifecyclePolicy} type. + */ + protected abstract String getType(); + + /** + * This class is here to assist in creating a context from which the specific LifecyclePolicy sub-classes can inherit + * all the previously parsed values + */ + public static class ToXContentContext { + private final Map phases; + private final String name; + + @SuppressWarnings("unchecked") + public static ConstructingObjectParser> PARSER = new ConstructingObjectParser<>( + "lifecycle_policy", false, (a, c) -> { + String lifecycleType = (String) a[0]; + List phases = (List) a[1]; + Map phaseMap = phases.stream().collect(Collectors.toMap(Phase::getName, Function.identity())); + NamedXContentRegistry registry = c.v2(); + ToXContentContext factory = new ToXContentContext(c.v1(), phaseMap); + try { + return registry.parseNamedObject(LifecyclePolicy.class, lifecycleType, null, factory); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + }); + static { + PARSER.declareString(constructorArg(), TYPE_FIELD); + PARSER.declareNamedObjects(constructorArg(), (p, c, n) -> Phase.parse(p, new Tuple<>(n, c.v2())), + v -> { + throw new IllegalArgumentException("ordered " + PHASES_FIELD.getPreferredName() + " are not supported"); + }, PHASES_FIELD); + } + + ToXContentContext(String name, Map phases) { + this.name = name; + this.phases = phases; + } + + public String getName() { + return name; + } + + public Map getPhases() { + return phases; + } + } } diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/TimeseriesLifecyclePolicy.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/TimeseriesLifecyclePolicy.java new file mode 100644 index 00000000000..3862e979593 --- /dev/null +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/TimeseriesLifecyclePolicy.java @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.indexlifecycle; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Represents the lifecycle of an index from creation to deletion. A + * {@link TimeseriesLifecyclePolicy} is made up of a set of {@link Phase}s which it will + * move through. Soon we will constrain the phases using some kinda of lifecycle + * type which will allow only particular {@link Phase}s to be defined, will + * dictate the order in which the {@link Phase}s are executed and will define + * which {@link LifecycleAction}s are allowed in each phase. + */ +public class TimeseriesLifecyclePolicy extends LifecyclePolicy { + public static final String TYPE = "timeseries"; + static final List VALID_PHASES = Arrays.asList("hot", "warm", "cold", "delete"); + + /** + * @param name + * the name of this {@link TimeseriesLifecyclePolicy} + * @param phases + * a {@link Map} of {@link Phase}s which make up this + * {@link TimeseriesLifecyclePolicy}. + */ + public TimeseriesLifecyclePolicy(String name, Map phases) { + super(name, phases); + } + + /** + * For Serialization + */ + public TimeseriesLifecyclePolicy(StreamInput in) throws IOException { + super(in); + } + + public static TimeseriesLifecyclePolicy parse(XContentParser parser, Object context) { + ToXContentContext factory = (ToXContentContext) context; + return new TimeseriesLifecyclePolicy(factory.getName(), factory.getPhases()); + } + + @Override + protected String getType() { + return TYPE; + } + + @Override + protected Phase getFirstPhase() { + Phase firstPhase = phases.get("hot"); + if (firstPhase == null) { + firstPhase = phases.get("warm"); + } + if (firstPhase == null) { + firstPhase = phases.get("cold"); + } + if (firstPhase == null) { + firstPhase = phases.get("delete"); + } + return firstPhase; + } + + @Override + protected Phase nextPhase(@Nullable Phase currentPhase) { + if (currentPhase == null) { + return getFirstPhase(); + } + + // VALID_PHASES is in order of execution + boolean readyToSetNext = false; + for (String phaseName : VALID_PHASES) { + if (readyToSetNext && phases.containsKey(phaseName)) { + return phases.get(phaseName); + } + if (phaseName.equals(currentPhase.getName())) { + readyToSetNext = true; + } + } + + return null; + } + + @Override + public void validate(Collection phases) { + Set allowedPhases = new HashSet<>(VALID_PHASES); + phases.forEach(phase -> { + if (allowedPhases.contains(phase.getName()) == false) { + throw new IllegalArgumentException("Timeseries lifecycle does not support phase [" + phase.getName() + "]"); + } + }); + } +} diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/GetLifecycleAction.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/GetLifecycleAction.java index b61a217b4bd..1332e769024 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/GetLifecycleAction.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/GetLifecycleAction.java @@ -86,12 +86,12 @@ public class GetLifecycleAction @Override public void readFrom(StreamInput in) throws IOException { - policy = new LifecyclePolicy(in); + policy = in.readNamedWriteable(LifecyclePolicy.class); } @Override public void writeTo(StreamOutput out) throws IOException { - policy.writeTo(out); + out.writeNamedWriteable(policy); } @Override diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/PutLifecycleAction.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/PutLifecycleAction.java index 412c58c7539..b73ba797cf8 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/PutLifecycleAction.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/PutLifecycleAction.java @@ -149,7 +149,7 @@ public class PutLifecycleAction extends Action(name, namedXContentRegistry)); + return PARSER.apply(parser, new Tuple<>(name, namedXContentRegistry)); } @Override @@ -163,13 +163,13 @@ public class PutLifecycleAction extends Action deletePhaseActions = Collections.singletonList(new DeleteAction()); phases.add(new Phase("delete", TimeValue.timeValueSeconds(3), deletePhaseActions)); - lifecyclePolicy = new LifecyclePolicy("test_lifecycle", phases); + lifecyclePolicy = new TestLifecyclePolicy("test", phases); } public void testSingleNodeCluster() throws Exception { diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadataTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadataTests.java index 0ddd78316a3..3c01febe3c3 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadataTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadataTests.java @@ -34,7 +34,8 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe @Before public void setup() { List entries = Arrays - .asList(new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse)); + .asList(new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse), + new NamedXContentRegistry.Entry(LifecyclePolicy.class, new ParseField("test"), TestLifecyclePolicy::parse)); registry = new NamedXContentRegistry(entries); } @@ -54,17 +55,12 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe phases.add(new Phase(randomAlphaOfLength(10), after, actions)); } String policyName = randomAlphaOfLength(10); - policies.put(policyName, new LifecyclePolicy(policyName, phases)); + policies.put(policyName, new TestLifecyclePolicy(policyName, phases)); } long pollInterval = randomNonNegativeLong(); return new IndexLifecycleMetadata(policies, pollInterval); } - @Override - protected String[] getShuffleFieldsExceptions() { - return new String[] { "phases" }; // NOCOMMIT this needs to be temporary since we should not rely on the order of the JSON map - } - @Override protected IndexLifecycleMetadata doParseInstance(XContentParser parser) throws IOException { return IndexLifecycleMetadata.PARSER.apply(parser, registry); @@ -75,9 +71,12 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe return IndexLifecycleMetadata::new; } + @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry( - Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new))); + Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new), + new NamedWriteableRegistry.Entry(LifecyclePolicy.class, TestLifecyclePolicy.TYPE, + TestLifecyclePolicy::new))); } @Override @@ -92,7 +91,7 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe case 1: policies = new TreeMap<>(policies); String policyName = randomAlphaOfLength(10); - policies.put(policyName, new LifecyclePolicy(policyName, Collections.emptyList())); + policies.put(policyName, new TestLifecyclePolicy(policyName, Collections.emptyList())); break; default: throw new AssertionError("Illegal randomisation branch"); diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java index 62543dadd5d..ad7c82d10ed 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java @@ -5,89 +5,14 @@ */ package org.elasticsearch.xpack.indexlifecycle; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.test.ESTestCase; import org.junit.Before; -import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -public class LifecyclePolicyTests extends AbstractSerializingTestCase { - - private NamedXContentRegistry registry; - private String lifecycleName; - - @Before - public void setup() { - List entries = Arrays - .asList(new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse)); - registry = new NamedXContentRegistry(entries); - lifecycleName = randomAlphaOfLength(20); // NOCOMMIT we need to randomise the lifecycle name rather - // than use the same name for all instances - } - - @Override - protected LifecyclePolicy createTestInstance() { - int numberPhases = randomInt(5); - List phases = new ArrayList<>(numberPhases); - for (int i = 0; i < numberPhases; i++) { - TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after"); - List actions = new ArrayList<>(); - if (randomBoolean()) { - actions.add(new DeleteAction()); - } - phases.add(new Phase(randomAlphaOfLength(10), after, actions)); - } - return new LifecyclePolicy(lifecycleName, phases); - } - - @Override - protected String[] getShuffleFieldsExceptions() { - return new String[] { "phases" }; // NOCOMMIT this needs to be temporary since we should not rely on the order of the JSON map - } - - @Override - protected LifecyclePolicy doParseInstance(XContentParser parser) throws IOException { - return LifecyclePolicy.parse(parser, new Tuple<>(lifecycleName, registry)); - } - - @Override - protected Reader instanceReader() { - return LifecyclePolicy::new; - } - - protected NamedWriteableRegistry getNamedWriteableRegistry() { - return new NamedWriteableRegistry( - Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new))); - } - - @Override - protected LifecyclePolicy mutateInstance(LifecyclePolicy instance) throws IOException { - String name = instance.getName(); - List phases = instance.getPhases(); - switch (between(0, 1)) { - case 0: - name = name + randomAlphaOfLengthBetween(1, 5); - break; - case 1: - phases = new ArrayList<>(phases); - phases.add(new Phase(randomAlphaOfLengthBetween(1, 10), TimeValue.timeValueSeconds(randomIntBetween(1, 1000)), - Collections.emptyList())); - break; - default: - throw new AssertionError("Illegal randomisation branch"); - } - return new LifecyclePolicy(name, phases); - } +public class LifecyclePolicyTests extends ESTestCase { public void testExecuteNewIndexBeforeTrigger() throws Exception { String indexName = randomAlphaOfLengthBetween(1, 20); @@ -111,7 +36,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase phasesList; + + public TestLifecyclePolicy(String name, List phasesList) { + super(name, phasesList.stream().collect(Collectors.toMap(Phase::getName, Function.identity()))); + this.phasesList = phasesList; + } + + public TestLifecyclePolicy(StreamInput in) throws IOException { + super(in); + } + + public static TestLifecyclePolicy parse(XContentParser parser, Object context) { + ToXContentContext factory = (ToXContentContext) context; + return new TestLifecyclePolicy(factory.getName(), new ArrayList<>(factory.getPhases().values())); + } + + @Override + protected String getType() { + return TYPE; + } + @Override + protected Phase getFirstPhase() { + return phasesList.get(0); + } + + @Override + protected Phase nextPhase(@Nullable Phase currentPhase) { + if (currentPhase == null) { + return getFirstPhase(); + } + + for(int i=0; i < phasesList.size() - 1; i++) { + if (phasesList.get(i).equals(currentPhase)) { + return phasesList.get(i + 1); + } + } + + return null; + } + + @Override + protected void validate(Collection phases) { + // always valid + } +} diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeseriesLifecyclePolicyTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeseriesLifecyclePolicyTests.java new file mode 100644 index 00000000000..771a6f0ec08 --- /dev/null +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeseriesLifecyclePolicyTests.java @@ -0,0 +1,119 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.indexlifecycle; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.sql.Time; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.indexlifecycle.TimeseriesLifecyclePolicy.VALID_PHASES; +import static org.hamcrest.Matchers.equalTo; + +public class TimeseriesLifecyclePolicyTests extends AbstractSerializingTestCase { + + private NamedXContentRegistry registry; + private String lifecycleName; + + @Before + public void setup() { + List entries = Arrays + .asList(new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse), + new NamedXContentRegistry.Entry(LifecyclePolicy.class, new ParseField(TimeseriesLifecyclePolicy.TYPE), + TimeseriesLifecyclePolicy::parse)); + registry = new NamedXContentRegistry(entries); + lifecycleName = randomAlphaOfLength(20); // NOCOMMIT we need to randomise the lifecycle name rather + // than use the same name for all instances + } + + @Override + protected LifecyclePolicy createTestInstance() { + return new TimeseriesLifecyclePolicy(lifecycleName, Collections.emptyMap()); + } + + @Override + protected LifecyclePolicy doParseInstance(XContentParser parser) throws IOException { + return LifecyclePolicy.parse(parser, new Tuple<>(lifecycleName, registry)); + } + + @Override + protected Reader instanceReader() { + return TimeseriesLifecyclePolicy::new; + } + + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry( + Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new))); + } + + public void testGetFirstPhase() { + Map phases = new HashMap<>(); + Phase expectedFirstPhase = null; + for (String phaseName : Arrays.asList("hot", "warm", "cold", "delete")) { + if (randomBoolean()) { + Phase phase = new Phase(phaseName, TimeValue.MINUS_ONE, Collections.emptyList()); + phases.put(phaseName, phase); + if (expectedFirstPhase == null) { + expectedFirstPhase = phase; + } + } + } + TimeseriesLifecyclePolicy policy = new TimeseriesLifecyclePolicy(lifecycleName, phases); + assertThat(policy.getFirstPhase(), equalTo(expectedFirstPhase)); + } + + public void testGetNextPhase() { + for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) { + Map phases = new HashMap<>(); + List phasesInOrder = new ArrayList<>(); + for (String phase : VALID_PHASES) { + if (randomBoolean()) { + Phase phaseToAdd = new Phase(phase, TimeValue.MINUS_ONE, Collections.emptyList()); + phases.put(phase, phaseToAdd); + phasesInOrder.add(phaseToAdd); + } + } + TimeseriesLifecyclePolicy policy = new TimeseriesLifecyclePolicy(lifecycleName, phases); + assertThat(policy.nextPhase(null), equalTo(policy.getFirstPhase())); + for (int i = 0; i < phasesInOrder.size() - 1; i++) { + assertThat(policy.nextPhase(phasesInOrder.get(i)), equalTo(phasesInOrder.get(i + 1))); + } + if (phasesInOrder.isEmpty() == false) { + assertNull(policy.nextPhase(phasesInOrder.get(phasesInOrder.size() - 1))); + } + } + } + + public void testValidate() { + boolean invalid = randomBoolean(); + String phaseName = randomFrom("hot", "warm", "cold", "delete"); + if (invalid) { + phaseName += randomAlphaOfLength(5); + } + Map phases = Collections.singletonMap(phaseName, + new Phase(phaseName, TimeValue.ZERO, Collections.emptyList())); + if (invalid) { + Exception e = expectThrows(IllegalArgumentException.class, () -> new TimeseriesLifecyclePolicy(lifecycleName, phases)); + assertThat(e.getMessage(), equalTo("Timeseries lifecycle does not support phase [" + phaseName + "]")); + } else { + new TimeseriesLifecyclePolicy(lifecycleName, phases); + } + } +} diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/action/GetLifecycleRequestTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/action/GetLifecycleRequestTests.java index 24f39852f63..3a17432ae7a 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/action/GetLifecycleRequestTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/action/GetLifecycleRequestTests.java @@ -25,5 +25,4 @@ public class GetLifecycleRequestTests extends AbstractStreamableTestCase getMutateFunction() { return resp -> new Request(resp.getPolicyName() + randomAlphaOfLengthBetween(1, 10)); } - } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/action/GetLifecycleResponseTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/action/GetLifecycleResponseTests.java index 0bd58b925fa..deb740414c0 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/action/GetLifecycleResponseTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/action/GetLifecycleResponseTests.java @@ -13,13 +13,17 @@ import org.elasticsearch.xpack.indexlifecycle.DeleteAction; import org.elasticsearch.xpack.indexlifecycle.LifecycleAction; import org.elasticsearch.xpack.indexlifecycle.LifecyclePolicy; import org.elasticsearch.xpack.indexlifecycle.Phase; +import org.elasticsearch.xpack.indexlifecycle.TestLifecyclePolicy; +import org.elasticsearch.xpack.indexlifecycle.TimeseriesLifecyclePolicy; import org.elasticsearch.xpack.indexlifecycle.action.GetLifecycleAction.Response; import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class GetLifecycleResponseTests extends AbstractStreamableTestCase { @@ -33,17 +37,7 @@ public class GetLifecycleResponseTests extends AbstractStreamableTestCase phases = new ArrayList<>(numberPhases); - for (int i = 0; i < numberPhases; i++) { - TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after"); - List actions = new ArrayList<>(); - if (randomBoolean()) { - actions.add(new DeleteAction()); - } - phases.add(new Phase(randomAlphaOfLength(10), after, actions)); - } - return new Response(new LifecyclePolicy(lifecycleName, phases)); + return new Response(new TestLifecyclePolicy(lifecycleName, Collections.emptyList())); } @Override @@ -53,7 +47,8 @@ public class GetLifecycleResponseTests extends AbstractStreamableTestCase { LifecyclePolicy policy = resp.getPolicy(); String name = policy.getName(); - List phases = policy.getPhases(); + Map phases = policy.getPhases(); switch (between(0, 1)) { - case 0: - name = name + randomAlphaOfLengthBetween(1, 5); - break; - case 1: - phases = new ArrayList<>(phases); - phases.add(new Phase(randomAlphaOfLengthBetween(1, 10), TimeValue.timeValueSeconds(randomIntBetween(1, 1000)), + case 0: + name = name + randomAlphaOfLengthBetween(1, 5); + break; + case 1: + phases = new HashMap<>(phases); + String newPhaseName = randomAlphaOfLengthBetween(1, 10); + phases.put(name, new Phase(newPhaseName, TimeValue.timeValueSeconds(randomIntBetween(1, 1000)), Collections.emptyList())); - break; - default: - throw new AssertionError("Illegal randomisation branch"); + break; + default: + throw new AssertionError("Illegal randomisation branch"); } - return new Response(new LifecyclePolicy(name, phases)); + return new Response(new TestLifecyclePolicy(name, new ArrayList<>(phases.values()))); }; } - } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/action/PutLifecycleRequestTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/action/PutLifecycleRequestTests.java index 0be58cd3d06..7867df89320 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/action/PutLifecycleRequestTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/action/PutLifecycleRequestTests.java @@ -16,13 +16,16 @@ import org.elasticsearch.xpack.indexlifecycle.DeleteAction; import org.elasticsearch.xpack.indexlifecycle.LifecycleAction; import org.elasticsearch.xpack.indexlifecycle.LifecyclePolicy; import org.elasticsearch.xpack.indexlifecycle.Phase; +import org.elasticsearch.xpack.indexlifecycle.TestLifecyclePolicy; import org.elasticsearch.xpack.indexlifecycle.action.PutLifecycleAction.Request; import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class PutLifecycleRequestTests extends AbstractStreamableXContentTestCase { @@ -32,7 +35,8 @@ public class PutLifecycleRequestTests extends AbstractStreamableXContentTestCase @Before public void setup() { List entries = Arrays - .asList(new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse)); + .asList(new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse), + new NamedXContentRegistry.Entry(LifecyclePolicy.class, new ParseField(TestLifecyclePolicy.TYPE), TestLifecyclePolicy::parse)); registry = new NamedXContentRegistry(entries); lifecycleName = randomAlphaOfLength(20); // NOCOMMIT we need to randomise the lifecycle name rather // than use the same name for all instances @@ -40,17 +44,7 @@ public class PutLifecycleRequestTests extends AbstractStreamableXContentTestCase @Override protected Request createTestInstance() { - int numberPhases = 1; // NOCOMMIT need to make this more than one when phase order doesn't rely on JSON map order - List phases = new ArrayList<>(numberPhases); - for (int i = 0; i < numberPhases; i++) { - TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after"); - List actions = new ArrayList<>(); - if (randomBoolean()) { - actions.add(new DeleteAction()); - } - phases.add(new Phase(randomAlphaOfLength(10), after, actions)); - } - return new Request(new LifecyclePolicy(lifecycleName, phases)); + return new Request(new TestLifecyclePolicy(lifecycleName, Collections.emptyList())); } @Override @@ -65,7 +59,8 @@ public class PutLifecycleRequestTests extends AbstractStreamableXContentTestCase protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry( - Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new))); + Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new), + new NamedWriteableRegistry.Entry(LifecyclePolicy.class, TestLifecyclePolicy.TYPE, TestLifecyclePolicy::new))); } protected boolean supportsUnknownFields() { @@ -77,20 +72,21 @@ public class PutLifecycleRequestTests extends AbstractStreamableXContentTestCase return resp -> { LifecyclePolicy policy = resp.getPolicy(); String name = policy.getName(); - List phases = policy.getPhases(); + Map phases = policy.getPhases(); switch (between(0, 1)) { - case 0: - name = name + randomAlphaOfLengthBetween(1, 5); - break; - case 1: - phases = new ArrayList<>(phases); - phases.add(new Phase(randomAlphaOfLengthBetween(1, 10), TimeValue.timeValueSeconds(randomIntBetween(1, 1000)), + case 0: + name = name + randomAlphaOfLengthBetween(1, 5); + break; + case 1: + phases = new HashMap<>(phases); + String newPhaseName = randomAlphaOfLengthBetween(1, 10); + phases.put(name, new Phase(newPhaseName, TimeValue.timeValueSeconds(randomIntBetween(1, 1000)), Collections.emptyList())); - break; - default: - throw new AssertionError("Illegal randomisation branch"); + break; + default: + throw new AssertionError("Illegal randomisation branch"); } - return new Request(new LifecyclePolicy(name, phases)); + return new Request(new TestLifecyclePolicy(name, new ArrayList<>(phases.values()))); }; }