* ILM action to wait for SLM policy execution (#50454) This change add new ILM action to wait for SLM policy execution to ensure that index has snapshot before deletion. Closes #45067 * Fix flaky TimeSeriesLifecycleActionsIT#testWaitForSnapshot test This change adds some randomness and cleanup step to TimeSeriesLifecycleActionsIT#testWaitForSnapshot and testWaitForSnapshotSlmExecutedBefore tests in attempt to make them stable. Reletes to #50781 * Formatting changes * Longer timeout * Fix Map.of in Java8 * Unused import removed
This commit is contained in:
parent
4cb525d8d3
commit
a18736b46d
|
@ -113,6 +113,7 @@ policy definition.
|
|||
- <<ilm-allocate-action,Allocate>>
|
||||
- <<ilm-freeze-action,Freeze>>
|
||||
* Delete
|
||||
- <<ilm-delete-action,Wait For Snapshot>>
|
||||
- <<ilm-delete-action,Delete>>
|
||||
|
||||
[[ilm-allocate-action]]
|
||||
|
@ -224,6 +225,40 @@ PUT _ilm/policy/my_policy
|
|||
}
|
||||
--------------------------------------------------
|
||||
|
||||
[[ilm-wait-for-snapshot-action]]
|
||||
==== Wait For Snapshot
|
||||
|
||||
Phases allowed: delete.
|
||||
|
||||
The Wait For Snapshot Action waits for defined SLM policy to be executed to ensure that snapshot of index exists before
|
||||
deletion.
|
||||
|
||||
[[ilm-wait-for-snapshot-options]]
|
||||
.Wait For Snapshot
|
||||
[options="header"]
|
||||
|======
|
||||
| Name | Required | Default | Description
|
||||
| `policy` | yes | - | SLM policy name that this action should wait for
|
||||
|======
|
||||
|
||||
[source,console]
|
||||
--------------------------------------------------
|
||||
PUT _ilm/policy/my_policy
|
||||
{
|
||||
"policy": {
|
||||
"phases": {
|
||||
"delete": {
|
||||
"actions": {
|
||||
"wait_for_snapshot" : {
|
||||
"policy": "slm-policy-name"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
[[ilm-delete-action]]
|
||||
==== Delete
|
||||
|
||||
|
|
|
@ -65,6 +65,7 @@ import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
|
|||
import org.elasticsearch.xpack.core.ilm.RolloverAction;
|
||||
import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
|
||||
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
|
||||
import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction;
|
||||
import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType;
|
||||
import org.elasticsearch.xpack.core.ilm.UnfollowAction;
|
||||
import org.elasticsearch.xpack.core.ilm.action.DeleteLifecycleAction;
|
||||
|
@ -588,6 +589,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
|
|||
new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, WaitForSnapshotAction.NAME, WaitForSnapshotAction::new),
|
||||
// Data Frame
|
||||
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.TRANSFORM, TransformFeatureSetUsage::new),
|
||||
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, TransformField.TASK_NAME, TransformTaskParams::new),
|
||||
|
|
|
@ -37,7 +37,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
|
|||
AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME);
|
||||
static final List<String> ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
|
||||
FreezeAction.NAME);
|
||||
static final List<String> ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(DeleteAction.NAME);
|
||||
static final List<String> ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(WaitForSnapshotAction.NAME, DeleteAction.NAME);
|
||||
static final Set<String> VALID_HOT_ACTIONS = Sets.newHashSet(ORDERED_VALID_HOT_ACTIONS);
|
||||
static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(ORDERED_VALID_WARM_ACTIONS);
|
||||
static final Set<String> VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS);
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ilm;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A {@link LifecycleAction} which waits for snapshot to be taken (by configured SLM policy).
|
||||
*/
|
||||
public class WaitForSnapshotAction implements LifecycleAction {
|
||||
|
||||
public static final String NAME = "wait_for_snapshot";
|
||||
public static final ParseField POLICY_FIELD = new ParseField("policy");
|
||||
|
||||
private static final ConstructingObjectParser<WaitForSnapshotAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
|
||||
a -> new WaitForSnapshotAction((String) a[0]));
|
||||
|
||||
static {
|
||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), POLICY_FIELD);
|
||||
}
|
||||
|
||||
private final String policy;
|
||||
|
||||
public static WaitForSnapshotAction parse(XContentParser parser) {
|
||||
return PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
public WaitForSnapshotAction(String policy) {
|
||||
if (Strings.hasText(policy) == false) {
|
||||
throw new IllegalArgumentException("policy name must be specified");
|
||||
}
|
||||
this.policy = policy;
|
||||
}
|
||||
|
||||
public WaitForSnapshotAction(StreamInput in) throws IOException {
|
||||
this(in.readString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
|
||||
StepKey waitForSnapshotKey = new StepKey(phase, NAME, WaitForSnapshotStep.NAME);
|
||||
return Collections.singletonList(new WaitForSnapshotStep(waitForSnapshotKey, nextStepKey, policy));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSafeAction() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(policy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(POLICY_FIELD.getPreferredName(), policy);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
WaitForSnapshotAction that = (WaitForSnapshotAction) o;
|
||||
return policy.equals(that.policy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(policy);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ilm;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
|
||||
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyMetadata;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
|
||||
/***
|
||||
* A step that waits for snapshot to be taken by SLM to ensure we have backup before we delete the index.
|
||||
* It will signal error if it can't get data needed to do the check (phase time from ILM and SLM metadata)
|
||||
* and will only return success if execution of SLM policy took place after index entered deleted phase.
|
||||
*/
|
||||
public class WaitForSnapshotStep extends ClusterStateWaitStep {
|
||||
|
||||
static final String NAME = "wait-for-snapshot";
|
||||
|
||||
private static final String MESSAGE_FIELD = "message";
|
||||
private static final String POLICY_NOT_EXECUTED_MESSAGE = "waiting for policy '%s' to be executed since %s";
|
||||
private static final String POLICY_NOT_FOUND_MESSAGE = "configured policy '%s' not found";
|
||||
private static final String NO_INDEX_METADATA_MESSAGE = "no index metadata found for index '%s'";
|
||||
private static final String NO_PHASE_TIME_MESSAGE = "no information about ILM phase start in index metadata for index '%s'";
|
||||
|
||||
private final String policy;
|
||||
|
||||
WaitForSnapshotStep(StepKey key, StepKey nextStepKey, String policy) {
|
||||
super(key, nextStepKey);
|
||||
this.policy = policy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result isConditionMet(Index index, ClusterState clusterState) {
|
||||
IndexMetaData indexMetaData = clusterState.metaData().index(index);
|
||||
if (indexMetaData == null) {
|
||||
throw error(NO_INDEX_METADATA_MESSAGE, index.getName());
|
||||
}
|
||||
|
||||
Long phaseTime = LifecycleExecutionState.fromIndexMetadata(indexMetaData).getPhaseTime();
|
||||
|
||||
if (phaseTime == null) {
|
||||
throw error(NO_PHASE_TIME_MESSAGE, index.getName());
|
||||
}
|
||||
|
||||
SnapshotLifecycleMetadata snapMeta = clusterState.metaData().custom(SnapshotLifecycleMetadata.TYPE);
|
||||
if (snapMeta == null || snapMeta.getSnapshotConfigurations().containsKey(policy) == false) {
|
||||
throw error(POLICY_NOT_FOUND_MESSAGE, policy);
|
||||
}
|
||||
SnapshotLifecyclePolicyMetadata snapPolicyMeta = snapMeta.getSnapshotConfigurations().get(policy);
|
||||
if (snapPolicyMeta.getLastSuccess() == null || snapPolicyMeta.getLastSuccess().getTimestamp() < phaseTime) {
|
||||
return new Result(false, notExecutedMessage(phaseTime));
|
||||
}
|
||||
|
||||
return new Result(true, null);
|
||||
}
|
||||
|
||||
public String getPolicy() {
|
||||
return policy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRetryable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
private ToXContentObject notExecutedMessage(long time) {
|
||||
return (builder, params) -> {
|
||||
builder.startObject();
|
||||
builder.field(MESSAGE_FIELD, String.format(Locale.ROOT, POLICY_NOT_EXECUTED_MESSAGE, policy, new Date(time)));
|
||||
builder.endObject();
|
||||
return builder;
|
||||
};
|
||||
}
|
||||
|
||||
private IllegalStateException error(String message, Object... args) {
|
||||
return new IllegalStateException(String.format(Locale.ROOT, message, args));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (!super.equals(o)) return false;
|
||||
WaitForSnapshotStep that = (WaitForSnapshotStep) o;
|
||||
return policy.equals(that.policy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(super.hashCode(), policy);
|
||||
}
|
||||
}
|
|
@ -39,6 +39,7 @@ public class LifecyclePolicyMetadataTests extends AbstractSerializingTestCase<Li
|
|||
new NamedWriteableRegistry.Entry(LifecycleType.class, TimeseriesLifecycleType.TYPE,
|
||||
(in) -> TimeseriesLifecycleType.INSTANCE),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, AllocateAction.NAME, AllocateAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, WaitForSnapshotAction.NAME, WaitForSnapshotAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, ForceMergeAction.NAME, ForceMergeAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, ReadOnlyAction.NAME, ReadOnlyAction::new),
|
||||
|
@ -57,6 +58,8 @@ public class LifecyclePolicyMetadataTests extends AbstractSerializingTestCase<Li
|
|||
new NamedXContentRegistry.Entry(LifecycleType.class, new ParseField(TimeseriesLifecycleType.TYPE),
|
||||
(p) -> TimeseriesLifecycleType.INSTANCE),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(AllocateAction.NAME), AllocateAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class,
|
||||
new ParseField(WaitForSnapshotAction.NAME), WaitForSnapshotAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReadOnlyAction.NAME), ReadOnlyAction::parse),
|
||||
|
|
|
@ -48,6 +48,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
new NamedWriteableRegistry.Entry(LifecycleType.class, TimeseriesLifecycleType.TYPE,
|
||||
(in) -> TimeseriesLifecycleType.INSTANCE),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, AllocateAction.NAME, AllocateAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, WaitForSnapshotAction.NAME, WaitForSnapshotAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, ForceMergeAction.NAME, ForceMergeAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, ReadOnlyAction.NAME, ReadOnlyAction::new),
|
||||
|
@ -66,6 +67,8 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
new NamedXContentRegistry.Entry(LifecycleType.class, new ParseField(TimeseriesLifecycleType.TYPE),
|
||||
(p) -> TimeseriesLifecycleType.INSTANCE),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(AllocateAction.NAME), AllocateAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class,
|
||||
new ParseField(WaitForSnapshotAction.NAME), WaitForSnapshotAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReadOnlyAction.NAME), ReadOnlyAction::parse),
|
||||
|
@ -110,6 +113,8 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
return AllocateActionTests.randomInstance();
|
||||
case DeleteAction.NAME:
|
||||
return new DeleteAction();
|
||||
case WaitForSnapshotAction.NAME:
|
||||
return WaitForSnapshotActionTests.randomInstance();
|
||||
case ForceMergeAction.NAME:
|
||||
return ForceMergeActionTests.randomInstance();
|
||||
case ReadOnlyAction.NAME:
|
||||
|
@ -160,6 +165,8 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
|
|||
switch (action) {
|
||||
case AllocateAction.NAME:
|
||||
return AllocateActionTests.randomInstance();
|
||||
case WaitForSnapshotAction.NAME:
|
||||
return WaitForSnapshotActionTests.randomInstance();
|
||||
case DeleteAction.NAME:
|
||||
return new DeleteAction();
|
||||
case ForceMergeAction.NAME:
|
||||
|
|
|
@ -34,6 +34,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
private static final AllocateAction TEST_ALLOCATE_ACTION =
|
||||
new AllocateAction(2, Collections.singletonMap("node", "node1"),null, null);
|
||||
private static final DeleteAction TEST_DELETE_ACTION = new DeleteAction();
|
||||
private static final WaitForSnapshotAction TEST_WAIT_FOR_SNAPSHOT_ACTION = new WaitForSnapshotAction("policy");
|
||||
private static final ForceMergeAction TEST_FORCE_MERGE_ACTION = new ForceMergeAction(1);
|
||||
private static final RolloverAction TEST_ROLLOVER_ACTION = new RolloverAction(new ByteSizeValue(1), null, null);
|
||||
private static final ShrinkAction TEST_SHRINK_ACTION = new ShrinkAction(1);
|
||||
|
@ -556,6 +557,8 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
|
|||
switch (actionName) {
|
||||
case AllocateAction.NAME:
|
||||
return TEST_ALLOCATE_ACTION;
|
||||
case WaitForSnapshotAction.NAME:
|
||||
return TEST_WAIT_FOR_SNAPSHOT_ACTION;
|
||||
case DeleteAction.NAME:
|
||||
return TEST_DELETE_ACTION;
|
||||
case ForceMergeAction.NAME:
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ilm;
|
||||
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class WaitForSnapshotActionTests extends AbstractActionTestCase<WaitForSnapshotAction> {
|
||||
|
||||
@Override
|
||||
public void testToSteps() {
|
||||
WaitForSnapshotAction action = createTestInstance();
|
||||
Step.StepKey nextStep = new Step.StepKey("", "", "");
|
||||
List<Step> steps = action.toSteps(null, "delete", nextStep);
|
||||
assertEquals(1, steps.size());
|
||||
Step step = steps.get(0);
|
||||
assertTrue(step instanceof WaitForSnapshotStep);
|
||||
assertEquals(nextStep, step.getNextStepKey());
|
||||
|
||||
Step.StepKey key = step.getKey();
|
||||
assertEquals("delete", key.getPhase());
|
||||
assertEquals(WaitForSnapshotAction.NAME, key.getAction());
|
||||
assertEquals(WaitForSnapshotStep.NAME, key.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WaitForSnapshotAction doParseInstance(XContentParser parser) throws IOException {
|
||||
return WaitForSnapshotAction.parse(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WaitForSnapshotAction createTestInstance() {
|
||||
return randomInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writeable.Reader<WaitForSnapshotAction> instanceReader() {
|
||||
return WaitForSnapshotAction::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WaitForSnapshotAction mutateInstance(WaitForSnapshotAction instance) throws IOException {
|
||||
return randomInstance();
|
||||
}
|
||||
|
||||
static WaitForSnapshotAction randomInstance() {
|
||||
return new WaitForSnapshotAction(randomAlphaOfLengthBetween(5, 10));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,149 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.ilm;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.xpack.core.slm.SnapshotInvocationRecord;
|
||||
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
|
||||
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyMetadata;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
public class WaitForSnapshotStepTests extends AbstractStepTestCase<WaitForSnapshotStep> {
|
||||
|
||||
@Override
|
||||
protected WaitForSnapshotStep createRandomInstance() {
|
||||
return new WaitForSnapshotStep(randomStepKey(), randomStepKey(), randomAlphaOfLengthBetween(1, 10));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WaitForSnapshotStep mutateInstance(WaitForSnapshotStep instance) {
|
||||
Step.StepKey key = instance.getKey();
|
||||
Step.StepKey nextKey = instance.getNextStepKey();
|
||||
String policy = instance.getPolicy();
|
||||
|
||||
switch (between(0, 2)) {
|
||||
case 0:
|
||||
key = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
break;
|
||||
case 1:
|
||||
nextKey = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
|
||||
break;
|
||||
case 2:
|
||||
policy = randomAlphaOfLengthBetween(1, 10);
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("Illegal randomisation branch");
|
||||
}
|
||||
|
||||
return new WaitForSnapshotStep(key, nextKey, policy);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WaitForSnapshotStep copyInstance(WaitForSnapshotStep instance) {
|
||||
return new WaitForSnapshotStep(instance.getKey(), instance.getNextStepKey(), instance.getPolicy());
|
||||
}
|
||||
|
||||
public void testNoSlmPolicies() {
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
|
||||
.putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, Collections.singletonMap("phase_time", Long.toString(randomLong())))
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
ImmutableOpenMap.Builder<String, IndexMetaData> indices =
|
||||
ImmutableOpenMap.<String, IndexMetaData>builder().fPut(indexMetaData.getIndex().getName(), indexMetaData);
|
||||
MetaData.Builder meta = MetaData.builder().indices(indices.build());
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(meta).build();
|
||||
WaitForSnapshotStep instance = createRandomInstance();
|
||||
IllegalStateException e = expectThrows(IllegalStateException.class, () -> instance.isConditionMet(indexMetaData.getIndex(),
|
||||
clusterState));
|
||||
assertTrue(e.getMessage().contains(instance.getPolicy()));
|
||||
}
|
||||
|
||||
public void testSlmPolicyNotExecuted() throws IOException {
|
||||
WaitForSnapshotStep instance = createRandomInstance();
|
||||
SnapshotLifecyclePolicyMetadata slmPolicy = SnapshotLifecyclePolicyMetadata.builder()
|
||||
.setModifiedDate(randomLong())
|
||||
.setPolicy(new SnapshotLifecyclePolicy("", "", "", "", null, null))
|
||||
.build();
|
||||
SnapshotLifecycleMetadata smlMetaData = new SnapshotLifecycleMetadata(Collections.singletonMap(instance.getPolicy(), slmPolicy),
|
||||
OperationMode.RUNNING, null);
|
||||
|
||||
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
|
||||
.putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, Collections.singletonMap("phase_time", Long.toString(randomLong())))
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
ImmutableOpenMap.Builder<String, IndexMetaData> indices =
|
||||
ImmutableOpenMap.<String, IndexMetaData>builder().fPut(indexMetaData.getIndex().getName(), indexMetaData);
|
||||
MetaData.Builder meta = MetaData.builder().indices(indices.build()).putCustom(SnapshotLifecycleMetadata.TYPE, smlMetaData);
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(meta).build();
|
||||
ClusterStateWaitStep.Result result = instance.isConditionMet(indexMetaData.getIndex(), clusterState);
|
||||
assertFalse(result.isComplete());
|
||||
assertTrue(getMessage(result).contains("to be executed"));
|
||||
}
|
||||
|
||||
public void testSlmPolicyExecutedBeforeStep() throws IOException {
|
||||
long phaseTime = randomLong();
|
||||
|
||||
WaitForSnapshotStep instance = createRandomInstance();
|
||||
SnapshotLifecyclePolicyMetadata slmPolicy = SnapshotLifecyclePolicyMetadata.builder()
|
||||
.setModifiedDate(randomLong())
|
||||
.setPolicy(new SnapshotLifecyclePolicy("", "", "", "", null, null))
|
||||
.setLastSuccess(new SnapshotInvocationRecord("", phaseTime - 10, ""))
|
||||
.build();
|
||||
SnapshotLifecycleMetadata smlMetaData = new SnapshotLifecycleMetadata(Collections.singletonMap(instance.getPolicy(), slmPolicy),
|
||||
OperationMode.RUNNING, null);
|
||||
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
|
||||
.putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, Collections.singletonMap("phase_time", Long.toString(phaseTime)))
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
ImmutableOpenMap.Builder<String, IndexMetaData> indices =
|
||||
ImmutableOpenMap.<String, IndexMetaData>builder().fPut(indexMetaData.getIndex().getName(), indexMetaData);
|
||||
MetaData.Builder meta = MetaData.builder().indices(indices.build()).putCustom(SnapshotLifecycleMetadata.TYPE, smlMetaData);
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(meta).build();
|
||||
ClusterStateWaitStep.Result result = instance.isConditionMet(indexMetaData.getIndex(), clusterState);
|
||||
assertFalse(result.isComplete());
|
||||
assertTrue(getMessage(result).contains("to be executed"));
|
||||
}
|
||||
|
||||
public void testSlmPolicyExecutedAfterStep() throws IOException {
|
||||
long phaseTime = randomLong();
|
||||
|
||||
WaitForSnapshotStep instance = createRandomInstance();
|
||||
SnapshotLifecyclePolicyMetadata slmPolicy = SnapshotLifecyclePolicyMetadata.builder()
|
||||
.setModifiedDate(randomLong())
|
||||
.setPolicy(new SnapshotLifecyclePolicy("", "", "", "", null, null))
|
||||
.setLastSuccess(new SnapshotInvocationRecord("", phaseTime + 10, ""))
|
||||
.build();
|
||||
SnapshotLifecycleMetadata smlMetaData = new SnapshotLifecycleMetadata(Collections.singletonMap(instance.getPolicy(), slmPolicy),
|
||||
OperationMode.RUNNING, null);
|
||||
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder(randomAlphaOfLength(10))
|
||||
.putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, Collections.singletonMap("phase_time", Long.toString(phaseTime)))
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
ImmutableOpenMap.Builder<String, IndexMetaData> indices =
|
||||
ImmutableOpenMap.<String, IndexMetaData>builder().fPut(indexMetaData.getIndex().getName(), indexMetaData);
|
||||
MetaData.Builder meta = MetaData.builder().indices(indices.build()).putCustom(SnapshotLifecycleMetadata.TYPE, smlMetaData);
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(meta).build();
|
||||
ClusterStateWaitStep.Result result = instance.isConditionMet(indexMetaData.getIndex(), clusterState);
|
||||
assertTrue(result.isComplete());
|
||||
assertNull(result.getInfomationContext());
|
||||
}
|
||||
|
||||
private String getMessage(ClusterStateWaitStep.Result result) throws IOException {
|
||||
return Strings.toString(result.getInfomationContext());
|
||||
}
|
||||
}
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
|
|||
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
|
||||
import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType;
|
||||
import org.elasticsearch.xpack.core.ilm.UnfollowAction;
|
||||
import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction;
|
||||
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction.Request;
|
||||
import org.junit.Before;
|
||||
|
||||
|
@ -64,6 +65,7 @@ public class PutLifecycleRequestTests extends AbstractSerializingTestCase<Reques
|
|||
new NamedWriteableRegistry.Entry(LifecycleType.class, TimeseriesLifecycleType.TYPE,
|
||||
(in) -> TimeseriesLifecycleType.INSTANCE),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, AllocateAction.NAME, AllocateAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, WaitForSnapshotAction.NAME, WaitForSnapshotAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, ForceMergeAction.NAME, ForceMergeAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, ReadOnlyAction.NAME, ReadOnlyAction::new),
|
||||
|
@ -82,6 +84,8 @@ public class PutLifecycleRequestTests extends AbstractSerializingTestCase<Reques
|
|||
new NamedXContentRegistry.Entry(LifecycleType.class, new ParseField(TimeseriesLifecycleType.TYPE),
|
||||
(p) -> TimeseriesLifecycleType.INSTANCE),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(AllocateAction.NAME), AllocateAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class,
|
||||
new ParseField(WaitForSnapshotAction.NAME), WaitForSnapshotAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReadOnlyAction.NAME), ReadOnlyAction::parse),
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
|||
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
|
||||
import org.elasticsearch.xpack.core.ilm.UpdateRolloverLifecycleDateStep;
|
||||
import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep;
|
||||
import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Before;
|
||||
|
||||
|
@ -323,6 +324,64 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
});
|
||||
}
|
||||
|
||||
public void testWaitForSnapshot() throws Exception {
|
||||
createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
|
||||
String smlPolicy = randomAlphaOfLengthBetween(4, 10);
|
||||
createNewSingletonPolicy("delete", new WaitForSnapshotAction(smlPolicy));
|
||||
updatePolicy(index, policy);
|
||||
assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("wait_for_snapshot")));
|
||||
assertBusy(() -> assertThat(getFailedStepForIndex(index), equalTo("wait-for-snapshot")));
|
||||
|
||||
String repo = createSnapshotRepo();
|
||||
createSlmPolicy(smlPolicy, repo);
|
||||
|
||||
assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("wait_for_snapshot")));
|
||||
|
||||
Request request = new Request("PUT", "/_slm/policy/" + smlPolicy + "/_execute");
|
||||
assertOK(client().performRequest(request));
|
||||
|
||||
|
||||
assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("completed")), 2, TimeUnit.MINUTES);
|
||||
|
||||
request = new Request("DELETE", "/_slm/policy/" + smlPolicy);
|
||||
assertOK(client().performRequest(request));
|
||||
|
||||
request = new Request("DELETE", "/_snapshot/" + repo);
|
||||
assertOK(client().performRequest(request));
|
||||
}
|
||||
|
||||
public void testWaitForSnapshotSlmExecutedBefore() throws Exception {
|
||||
createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
|
||||
String smlPolicy = randomAlphaOfLengthBetween(4, 10);
|
||||
createNewSingletonPolicy("delete", new WaitForSnapshotAction(smlPolicy));
|
||||
|
||||
String repo = createSnapshotRepo();
|
||||
createSlmPolicy(smlPolicy, repo);
|
||||
|
||||
Request request = new Request("PUT", "/_slm/policy/" + smlPolicy + "/_execute");
|
||||
assertOK(client().performRequest(request));
|
||||
|
||||
updatePolicy(index, policy);
|
||||
assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("wait_for_snapshot")));
|
||||
assertBusy(() -> assertThat(getStepKeyForIndex(index).getName(), equalTo("wait-for-snapshot")));
|
||||
|
||||
request = new Request("PUT", "/_slm/policy/" + smlPolicy + "/_execute");
|
||||
assertOK(client().performRequest(request));
|
||||
|
||||
request = new Request("PUT", "/_slm/policy/" + smlPolicy + "/_execute");
|
||||
assertOK(client().performRequest(request));
|
||||
|
||||
assertBusy(() -> assertThat(getStepKeyForIndex(index).getAction(), equalTo("completed")), 2, TimeUnit.MINUTES);
|
||||
|
||||
request = new Request("DELETE", "/_slm/policy/" + smlPolicy);
|
||||
assertOK(client().performRequest(request));
|
||||
|
||||
request = new Request("DELETE", "/_snapshot/" + repo);
|
||||
assertOK(client().performRequest(request));
|
||||
}
|
||||
|
||||
public void testDelete() throws Exception {
|
||||
createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
|
||||
|
@ -1586,4 +1645,38 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||
assertThat(snapResponse.get("snapshot"), equalTo(snapshot));
|
||||
return (String) snapResponse.get("state");
|
||||
}
|
||||
|
||||
private void createSlmPolicy(String smlPolicy, String repo) throws IOException {
|
||||
Request request;
|
||||
request = new Request("PUT", "/_slm/policy/" + smlPolicy);
|
||||
request.setJsonEntity(Strings
|
||||
.toString(JsonXContent.contentBuilder()
|
||||
.startObject()
|
||||
.field("schedule", "59 59 23 31 12 ? 2099")
|
||||
.field("repository", repo)
|
||||
.field("name", "snap" + randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT))
|
||||
.startObject("config")
|
||||
.field("include_global_state", false)
|
||||
.endObject()
|
||||
.endObject()));
|
||||
|
||||
assertOK(client().performRequest(request));
|
||||
}
|
||||
|
||||
private String createSnapshotRepo() throws IOException {
|
||||
String repo = randomAlphaOfLengthBetween(4, 10);
|
||||
Request request = new Request("PUT", "/_snapshot/" + repo);
|
||||
request.setJsonEntity(Strings
|
||||
.toString(JsonXContent.contentBuilder()
|
||||
.startObject()
|
||||
.field("type", "fs")
|
||||
.startObject("settings")
|
||||
.field("compress", randomBoolean())
|
||||
.field("location", System.getProperty("tests.path.repo"))
|
||||
.field("max_snapshot_bytes_per_sec", "256b")
|
||||
.endObject()
|
||||
.endObject()));
|
||||
assertOK(client().performRequest(request));
|
||||
return repo;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
|
|||
import org.elasticsearch.xpack.core.ilm.RolloverAction;
|
||||
import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
|
||||
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
|
||||
import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction;
|
||||
import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType;
|
||||
import org.elasticsearch.xpack.core.ilm.UnfollowAction;
|
||||
import org.elasticsearch.xpack.core.ilm.action.DeleteLifecycleAction;
|
||||
|
@ -244,7 +245,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
|
|||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse)
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(WaitForSnapshotAction.NAME), WaitForSnapshotAction::parse)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
|
|||
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
|
||||
import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType;
|
||||
import org.elasticsearch.xpack.core.ilm.UnfollowAction;
|
||||
import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -80,6 +81,7 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
|
|||
new NamedWriteableRegistry.Entry(LifecycleType.class, TimeseriesLifecycleType.TYPE,
|
||||
(in) -> TimeseriesLifecycleType.INSTANCE),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, AllocateAction.NAME, AllocateAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, WaitForSnapshotAction.NAME, WaitForSnapshotAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, ForceMergeAction.NAME, ForceMergeAction::new),
|
||||
new NamedWriteableRegistry.Entry(LifecycleAction.class, ReadOnlyAction.NAME, ReadOnlyAction::new),
|
||||
|
@ -99,6 +101,8 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
|
|||
(p) -> TimeseriesLifecycleType.INSTANCE),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(AllocateAction.NAME), AllocateAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class,
|
||||
new ParseField(WaitForSnapshotAction.NAME), WaitForSnapshotAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReadOnlyAction.NAME), ReadOnlyAction::parse),
|
||||
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse),
|
||||
|
|
Loading…
Reference in New Issue