Adds `index.lifecycle.step_info` setting and uses it on ERROR and incomplete steps (#30465)
* Adds `index.lifecycle.step_info` setting and uses it on ERROR This change make a new `index.lifecycle.step_info` setting which can be used to store a JSON blob of containing context about the current step. It then adds code so that when we move to the error step we serialise the exception to JSON and store it in this setting so the user can get information on why the step failed. x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifec ycle/LifecycleSettings.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycle.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycleRunner.java x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/MoveToErrorStepUpdateTask.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycleRunnerTests.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/MoveToErrorStepUpdateTaskTests.java * Adds step information for AsyncWaitSteps The Listener for AsyncWaitStep now takes a ToXContentObject which represents information about the status of the condition if it has not completed. This object is then serialised to a JSON string and stored in the `index.lifecycle.step_info` index setting. This information is only stored if the step is not complete. If the step is complete the step info is ignored sice we will move straight to the next step where the info is no longer relevant. Changes for the `ClusterStateWaitStep` will be very similar but will be made in a following commit after this approach has been agreed. I do not intend to have information for `AsyncActionStep` to have the ability to set step info since actions should either be done or not done and if they error they should transition to the ERROR step. * Clear step info when transitioning to next step x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycleRunner.java x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde xlifecycle/IndexLifecycleRunnerTests.java * Addresses review comments
This commit is contained in:
parent
b08d7c872b
commit
7d9d9feb22
|
@ -6,6 +6,7 @@
|
|||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
public abstract class AsyncWaitStep extends Step {
|
||||
|
@ -25,7 +26,7 @@ public abstract class AsyncWaitStep extends Step {
|
|||
|
||||
public interface Listener {
|
||||
|
||||
void onResponse(boolean conditionMet);
|
||||
void onResponse(boolean conditionMet, ToXContentObject infomationContext);
|
||||
|
||||
void onFailure(Exception e);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ public class LifecycleSettings {
|
|||
public static final String LIFECYCLE_ACTION_TIME = "index.lifecycle.action_time";
|
||||
public static final String LIFECYCLE_STEP_TIME = "index.lifecycle.step_time";
|
||||
public static final String LIFECYCLE_FAILED_STEP = "index.lifecycle.failed_step";
|
||||
public static final String LIFECYCLE_STEP_INFO = "index.lifecycle.step_info";
|
||||
|
||||
// NORELEASE: we should probably change the default to something other than three seconds for initial release
|
||||
public static final Setting<TimeValue> LIFECYCLE_POLL_INTERVAL_SETTING = Setting.positiveTimeSetting(LIFECYCLE_POLL_INTERVAL,
|
||||
|
@ -44,4 +45,6 @@ public class LifecycleSettings {
|
|||
-1L, -1L, Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting<Long> LIFECYCLE_STEP_TIME_SETTING = Setting.longSetting(LIFECYCLE_STEP_TIME,
|
||||
-1L, -1L, Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
public static final Setting<String> LIFECYCLE_STEP_INFO_SETTING = Setting.simpleString(LIFECYCLE_STEP_INFO, Setting.Property.Dynamic,
|
||||
Setting.Property.IndexScope, Setting.Property.NotCopyableOnResize);
|
||||
}
|
||||
|
|
|
@ -8,8 +8,14 @@ package org.elasticsearch.xpack.core.indexlifecycle;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
@ -40,8 +46,8 @@ public class SegmentCountStep extends AsyncWaitStep {
|
|||
@Override
|
||||
public void evaluateCondition(Index index, Listener listener) {
|
||||
getClient().admin().indices().segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> {
|
||||
listener.onResponse(StreamSupport.stream(response.getIndices().get(index.getName()).spliterator(), false)
|
||||
.anyMatch(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> {
|
||||
long numberShardsLeftToMerge = StreamSupport.stream(response.getIndices().get(index.getName()).spliterator(), false)
|
||||
.filter(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> {
|
||||
boolean hasRightAmountOfSegments = p.getSegments().size() <= maxNumSegments;
|
||||
if (bestCompression) {
|
||||
// // TODO(talevy): discuss
|
||||
|
@ -51,11 +57,12 @@ public class SegmentCountStep extends AsyncWaitStep {
|
|||
// s.getAttributes().get(Lucene50StoredFieldsFormat.MODE_KEY)))
|
||||
// );
|
||||
boolean allUsingCorrectCompression = true;
|
||||
return hasRightAmountOfSegments && allUsingCorrectCompression;
|
||||
return (hasRightAmountOfSegments && allUsingCorrectCompression) == false;
|
||||
} else {
|
||||
return hasRightAmountOfSegments;
|
||||
return hasRightAmountOfSegments == false;
|
||||
}
|
||||
})));
|
||||
})).count();
|
||||
listener.onResponse(numberShardsLeftToMerge == 0, new Info(numberShardsLeftToMerge));
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
||||
|
@ -77,4 +84,54 @@ public class SegmentCountStep extends AsyncWaitStep {
|
|||
&& Objects.equals(maxNumSegments, other.maxNumSegments)
|
||||
&& Objects.equals(bestCompression, other.bestCompression);
|
||||
}
|
||||
|
||||
public static class Info implements ToXContentObject {
|
||||
|
||||
private final long numberShardsLeftToMerge;
|
||||
|
||||
static final ParseField SHARDS_TO_MERGE = new ParseField("shards_left_to_merge");
|
||||
static final ConstructingObjectParser<Info, Void> PARSER = new ConstructingObjectParser<>("segment_count_step_info",
|
||||
a -> new Info((long) a[0]));
|
||||
static {
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), SHARDS_TO_MERGE);
|
||||
}
|
||||
|
||||
public Info(long numberShardsLeftToMerge) {
|
||||
this.numberShardsLeftToMerge = numberShardsLeftToMerge;
|
||||
}
|
||||
|
||||
public long getNumberShardsLeftToMerge() {
|
||||
return numberShardsLeftToMerge;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(SHARDS_TO_MERGE.getPreferredName(), numberShardsLeftToMerge);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(numberShardsLeftToMerge);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
Info other = (Info) obj;
|
||||
return Objects.equals(numberShardsLeftToMerge, other.numberShardsLeftToMerge);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||
import org.elasticsearch.test.EqualsHashCodeTestUtils;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.SegmentCountStep.Info;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class SegmentCountStepInfoTests extends AbstractXContentTestCase<SegmentCountStep.Info> {
|
||||
|
||||
@Override
|
||||
protected Info createTestInstance() {
|
||||
return new Info(randomNonNegativeLong());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Info doParseInstance(XContentParser parser) throws IOException {
|
||||
return Info.PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supportsUnknownFields() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public final void testEqualsAndHashcode() {
|
||||
for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) {
|
||||
EqualsHashCodeTestUtils.checkEqualsAndHashCode(createTestInstance(), this::copyInstance, this::mutateInstance);
|
||||
}
|
||||
}
|
||||
|
||||
protected final Info copyInstance(Info instance) throws IOException {
|
||||
return new Info(instance.getNumberShardsLeftToMerge());
|
||||
}
|
||||
|
||||
protected Info mutateInstance(Info instance) throws IOException {
|
||||
return createTestInstance();
|
||||
}
|
||||
|
||||
}
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.action.admin.indices.segments.ShardSegments;
|
|||
import org.elasticsearch.client.AdminClient;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.engine.Segment;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
@ -112,12 +113,14 @@ public class SegmentCountStepTests extends AbstractStepTestCase<SegmentCountStep
|
|||
}).when(indicesClient).segments(any(), any());
|
||||
|
||||
SetOnce<Boolean> conditionMetResult = new SetOnce<>();
|
||||
SetOnce<ToXContentObject> conditionInfo = new SetOnce<>();
|
||||
|
||||
SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments, bestCompression);
|
||||
step.evaluateCondition(index, new AsyncWaitStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean conditionMet) {
|
||||
public void onResponse(boolean conditionMet, ToXContentObject info) {
|
||||
conditionMetResult.set(conditionMet);
|
||||
conditionInfo.set(info);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -127,6 +130,7 @@ public class SegmentCountStepTests extends AbstractStepTestCase<SegmentCountStep
|
|||
});
|
||||
|
||||
assertTrue(conditionMetResult.get());
|
||||
assertEquals(new SegmentCountStep.Info(0L), conditionInfo.get());
|
||||
}
|
||||
|
||||
public void testIsConditionFails() {
|
||||
|
@ -167,12 +171,14 @@ public class SegmentCountStepTests extends AbstractStepTestCase<SegmentCountStep
|
|||
}).when(indicesClient).segments(any(), any());
|
||||
|
||||
SetOnce<Boolean> conditionMetResult = new SetOnce<>();
|
||||
SetOnce<ToXContentObject> conditionInfo = new SetOnce<>();
|
||||
|
||||
SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments, bestCompression);
|
||||
step.evaluateCondition(index, new AsyncWaitStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean conditionMet) {
|
||||
public void onResponse(boolean conditionMet, ToXContentObject info) {
|
||||
conditionMetResult.set(conditionMet);
|
||||
conditionInfo.set(info);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -182,6 +188,7 @@ public class SegmentCountStepTests extends AbstractStepTestCase<SegmentCountStep
|
|||
});
|
||||
|
||||
assertFalse(conditionMetResult.get());
|
||||
assertEquals(new SegmentCountStep.Info(1L), conditionInfo.get());
|
||||
}
|
||||
|
||||
public void testThrowsException() {
|
||||
|
@ -210,7 +217,7 @@ public class SegmentCountStepTests extends AbstractStepTestCase<SegmentCountStep
|
|||
SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments, bestCompression);
|
||||
step.evaluateCondition(index, new AsyncWaitStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean conditionMet) {
|
||||
public void onResponse(boolean conditionMet, ToXContentObject info) {
|
||||
throw new AssertionError("unexpected method call");
|
||||
}
|
||||
|
||||
|
|
|
@ -97,6 +97,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
|
|||
LifecycleSettings.LIFECYCLE_ACTION_SETTING,
|
||||
LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING,
|
||||
LifecycleSettings.LIFECYCLE_STEP_SETTING,
|
||||
LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING,
|
||||
LifecycleSettings.LIFECYCLE_FAILED_STEP_SETTING,
|
||||
RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING);
|
||||
}
|
||||
|
||||
|
|
|
@ -6,13 +6,19 @@
|
|||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.AsyncWaitStep;
|
||||
|
@ -24,6 +30,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class IndexLifecycleRunner {
|
||||
|
@ -55,10 +62,12 @@ public class IndexLifecycleRunner {
|
|||
((AsyncWaitStep) currentStep).evaluateCondition(indexMetaData.getIndex(), new AsyncWaitStep.Listener() {
|
||||
|
||||
@Override
|
||||
public void onResponse(boolean conditionMet) {
|
||||
public void onResponse(boolean conditionMet, ToXContentObject stepInfo) {
|
||||
logger.debug("cs-change-async-wait-callback. current-step:" + currentStep.getKey());
|
||||
if (conditionMet) {
|
||||
moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey());
|
||||
} else if (stepInfo != null) {
|
||||
setStepInfo(indexMetaData.getIndex(), policy, currentStep.getKey(), stepInfo);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -145,11 +154,17 @@ public class IndexLifecycleRunner {
|
|||
return newClusterStateBuilder.build();
|
||||
}
|
||||
|
||||
static ClusterState moveClusterStateToErrorStep(Index index, ClusterState clusterState, StepKey currentStep, LongSupplier nowSupplier) {
|
||||
static ClusterState moveClusterStateToErrorStep(Index index, ClusterState clusterState, StepKey currentStep, Exception cause,
|
||||
LongSupplier nowSupplier) throws IOException {
|
||||
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
|
||||
XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder();
|
||||
causeXContentBuilder.startObject();
|
||||
ElasticsearchException.generateFailureXContent(causeXContentBuilder, ToXContent.EMPTY_PARAMS, cause, false);
|
||||
causeXContentBuilder.endObject();
|
||||
Settings.Builder indexSettings = moveIndexSettingsToNextStep(idxMeta.getSettings(), currentStep,
|
||||
new StepKey(currentStep.getPhase(), currentStep.getAction(), ErrorStep.NAME), nowSupplier)
|
||||
.put(LifecycleSettings.LIFECYCLE_FAILED_STEP, currentStep.getName());
|
||||
.put(LifecycleSettings.LIFECYCLE_FAILED_STEP, currentStep.getName())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP_INFO, BytesReference.bytes(causeXContentBuilder).utf8ToString());
|
||||
ClusterState.Builder newClusterStateBuilder = newClusterStateWithIndexSettings(index, clusterState, indexSettings);
|
||||
return newClusterStateBuilder.build();
|
||||
}
|
||||
|
@ -159,7 +174,9 @@ public class IndexLifecycleRunner {
|
|||
long nowAsMillis = nowSupplier.getAsLong();
|
||||
Settings.Builder newSettings = Settings.builder().put(existingSettings).put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction()).put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP_TIME, nowAsMillis);
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP_TIME, nowAsMillis)
|
||||
// clear any step info from the current step
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP_INFO, (String) null);
|
||||
if (currentStep.getPhase().equals(nextStep.getPhase()) == false) {
|
||||
newSettings.put(LifecycleSettings.LIFECYCLE_PHASE_TIME, nowAsMillis);
|
||||
}
|
||||
|
@ -169,7 +186,7 @@ public class IndexLifecycleRunner {
|
|||
return newSettings;
|
||||
}
|
||||
|
||||
private static ClusterState.Builder newClusterStateWithIndexSettings(Index index, ClusterState clusterState,
|
||||
static ClusterState.Builder newClusterStateWithIndexSettings(Index index, ClusterState clusterState,
|
||||
Settings.Builder newSettings) {
|
||||
ClusterState.Builder newClusterStateBuilder = ClusterState.builder(clusterState);
|
||||
newClusterStateBuilder.metaData(MetaData.builder(clusterState.getMetaData())
|
||||
|
@ -187,6 +204,10 @@ public class IndexLifecycleRunner {
|
|||
private void moveToErrorStep(Index index, String policy, StepKey currentStepKey, Exception e) {
|
||||
logger.debug("policy [" + policy + "] for index [" + index.getName() + "] failed on step [" + currentStepKey
|
||||
+ "]. Moving to ERROR step.", e);
|
||||
clusterService.submitStateUpdateTask("ILM", new MoveToErrorStepUpdateTask(index, policy, currentStepKey, nowSupplier));
|
||||
clusterService.submitStateUpdateTask("ILM", new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier));
|
||||
}
|
||||
|
||||
private void setStepInfo(Index index, String policy, StepKey currentStepKey, ToXContentObject stepInfo) {
|
||||
clusterService.submitStateUpdateTask("ILM", new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.index.Index;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask {
|
||||
|
@ -20,12 +21,13 @@ public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask {
|
|||
private final String policy;
|
||||
private final Step.StepKey currentStepKey;
|
||||
private LongSupplier nowSupplier;
|
||||
private Exception cause;
|
||||
|
||||
public MoveToErrorStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey,
|
||||
LongSupplier nowSupplier) {
|
||||
public MoveToErrorStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Exception cause, LongSupplier nowSupplier) {
|
||||
this.index = index;
|
||||
this.policy = policy;
|
||||
this.currentStepKey = currentStepKey;
|
||||
this.cause = cause;
|
||||
this.nowSupplier = nowSupplier;
|
||||
}
|
||||
|
||||
|
@ -41,12 +43,16 @@ public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask {
|
|||
return currentStepKey;
|
||||
}
|
||||
|
||||
Exception getCause() {
|
||||
return cause;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
public ClusterState execute(ClusterState currentState) throws IOException {
|
||||
Settings indexSettings = currentState.getMetaData().index(index).getSettings();
|
||||
if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings))
|
||||
&& currentStepKey.equals(IndexLifecycleRunner.getCurrentStepKey(indexSettings))) {
|
||||
return IndexLifecycleRunner.moveClusterStateToErrorStep(index, currentState, currentStepKey, nowSupplier);
|
||||
return IndexLifecycleRunner.moveClusterStateToErrorStep(index, currentState, currentStepKey, cause, nowSupplier);
|
||||
} else {
|
||||
// either the policy has changed or the step is now
|
||||
// not the same as when we submitted the update task. In
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class SetStepInfoUpdateTask extends ClusterStateUpdateTask {
|
||||
private final Index index;
|
||||
private final String policy;
|
||||
private final Step.StepKey currentStepKey;
|
||||
private ToXContentObject stepInfo;
|
||||
|
||||
public SetStepInfoUpdateTask(Index index, String policy, Step.StepKey currentStepKey, ToXContentObject stepInfo) {
|
||||
this.index = index;
|
||||
this.policy = policy;
|
||||
this.currentStepKey = currentStepKey;
|
||||
this.stepInfo = stepInfo;
|
||||
}
|
||||
|
||||
Index getIndex() {
|
||||
return index;
|
||||
}
|
||||
|
||||
String getPolicy() {
|
||||
return policy;
|
||||
}
|
||||
|
||||
Step.StepKey getCurrentStepKey() {
|
||||
return currentStepKey;
|
||||
}
|
||||
|
||||
ToXContentObject getStepInfo() {
|
||||
return stepInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws IOException {
|
||||
Settings indexSettings = currentState.getMetaData().index(index).getSettings();
|
||||
if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings))
|
||||
&& currentStepKey.equals(IndexLifecycleRunner.getCurrentStepKey(indexSettings))) {
|
||||
XContentBuilder infoXContentBuilder = JsonXContent.contentBuilder();
|
||||
stepInfo.toXContent(infoXContentBuilder, ToXContent.EMPTY_PARAMS);
|
||||
String stepInfoString = BytesReference.bytes(infoXContentBuilder).utf8ToString();
|
||||
Settings.Builder newSettings = Settings.builder().put(indexSettings).put(LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.getKey(),
|
||||
stepInfoString);
|
||||
return IndexLifecycleRunner.newClusterStateWithIndexSettings(index, currentState, newSettings).build();
|
||||
} else {
|
||||
// either the policy has changed or the step is now
|
||||
// not the same as when we submitted the update task. In
|
||||
// either case we don't want to do anything now
|
||||
return currentState;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
throw new ElasticsearchException("policy [" + policy + "] for index [" + index.getName()
|
||||
+ "] failed trying to set step info for step [" + currentStepKey + "].", e);
|
||||
}
|
||||
}
|
|
@ -5,13 +5,20 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.Settings.Builder;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep;
|
||||
|
@ -28,6 +35,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;
|
|||
import org.mockito.ArgumentMatcher;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
@ -183,7 +191,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
|
||||
assertEquals(1, step.getExecuteCount());
|
||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
||||
Mockito.argThat(new MoveToErrorStepUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey)));
|
||||
Mockito.argThat(new MoveToErrorStepUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey, expectedException)));
|
||||
Mockito.verifyNoMoreInteractions(clusterService);
|
||||
}
|
||||
|
||||
|
@ -228,6 +236,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
String policyName = "async_wait_policy";
|
||||
StepKey stepKey = new StepKey("phase", "action", "async_wait_step");
|
||||
MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null);
|
||||
RandomStepInfo stepInfo = new RandomStepInfo();
|
||||
step.expectedInfo(stepInfo);
|
||||
step.setWillComplete(false);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
|
@ -237,6 +247,27 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
|
||||
runner.runPolicy(policyName, indexMetaData, null, false);
|
||||
|
||||
assertEquals(1, step.getExecuteCount());
|
||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
||||
Mockito.argThat(new SetStepInfoUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey, stepInfo)));
|
||||
Mockito.verifyNoMoreInteractions(clusterService);
|
||||
}
|
||||
|
||||
public void testRunPolicyAsyncWaitStepNotCompleteNoStepInfo() {
|
||||
String policyName = "async_wait_policy";
|
||||
StepKey stepKey = new StepKey("phase", "action", "async_wait_step");
|
||||
MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null);
|
||||
RandomStepInfo stepInfo = null;
|
||||
step.expectedInfo(stepInfo);
|
||||
step.setWillComplete(false);
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = Mockito.mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
|
||||
runner.runPolicy(policyName, indexMetaData, null, false);
|
||||
|
||||
assertEquals(1, step.getExecuteCount());
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
}
|
||||
|
@ -257,7 +288,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
|
||||
assertEquals(1, step.getExecuteCount());
|
||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"),
|
||||
Mockito.argThat(new MoveToErrorStepUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey)));
|
||||
Mockito.argThat(new MoveToErrorStepUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey, expectedException)));
|
||||
Mockito.verifyNoMoreInteractions(clusterService);
|
||||
}
|
||||
|
||||
|
@ -478,10 +509,14 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
() -> now);
|
||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||
|
||||
Builder indexSettingsBuilder = Settings.builder().put(LifecycleSettings.LIFECYCLE_PHASE, currentStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStep.getName());
|
||||
if (randomBoolean()) {
|
||||
indexSettingsBuilder.put(LifecycleSettings.LIFECYCLE_STEP_INFO, randomAlphaOfLength(20));
|
||||
}
|
||||
clusterState = buildClusterState(indexName,
|
||||
Settings.builder().put(LifecycleSettings.LIFECYCLE_PHASE, currentStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStep.getName()));
|
||||
indexSettingsBuilder);
|
||||
index = clusterState.metaData().index(indexName).getIndex();
|
||||
newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep, () -> now);
|
||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||
|
@ -499,10 +534,14 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
() -> now);
|
||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||
|
||||
Builder indexSettingsBuilder = Settings.builder().put(LifecycleSettings.LIFECYCLE_PHASE, currentStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStep.getName());
|
||||
if (randomBoolean()) {
|
||||
indexSettingsBuilder.put(LifecycleSettings.LIFECYCLE_STEP_INFO, randomAlphaOfLength(20));
|
||||
}
|
||||
clusterState = buildClusterState(indexName,
|
||||
Settings.builder().put(LifecycleSettings.LIFECYCLE_PHASE, currentStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStep.getName()));
|
||||
indexSettingsBuilder);
|
||||
index = clusterState.metaData().index(indexName).getIndex();
|
||||
newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep, () -> now);
|
||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||
|
@ -520,27 +559,32 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
() -> now);
|
||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||
|
||||
Builder indexSettingsBuilder = Settings.builder().put(LifecycleSettings.LIFECYCLE_PHASE, currentStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStep.getName());
|
||||
if (randomBoolean()) {
|
||||
indexSettingsBuilder.put(LifecycleSettings.LIFECYCLE_STEP_INFO, randomAlphaOfLength(20));
|
||||
}
|
||||
clusterState = buildClusterState(indexName,
|
||||
Settings.builder().put(LifecycleSettings.LIFECYCLE_PHASE, currentStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStep.getName()));
|
||||
indexSettingsBuilder);
|
||||
index = clusterState.metaData().index(indexName).getIndex();
|
||||
newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep, () -> now);
|
||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||
}
|
||||
|
||||
public void testMoveClusterStateToErrorStep() {
|
||||
public void testMoveClusterStateToErrorStep() throws IOException {
|
||||
String indexName = "my_index";
|
||||
StepKey currentStep = new StepKey("current_phase", "current_action", "current_step");
|
||||
long now = randomNonNegativeLong();
|
||||
Exception cause = new ElasticsearchException("THIS IS AN EXPECTED CAUSE");
|
||||
|
||||
ClusterState clusterState = buildClusterState(indexName,
|
||||
Settings.builder().put(LifecycleSettings.LIFECYCLE_PHASE, currentStep.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, currentStep.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, currentStep.getName()));
|
||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToErrorStep(index, clusterState, currentStep, () -> now);
|
||||
assertClusterStateOnErrorStep(clusterState, index, currentStep, newClusterState, now);
|
||||
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToErrorStep(index, clusterState, currentStep, cause, () -> now);
|
||||
assertClusterStateOnErrorStep(clusterState, index, currentStep, newClusterState, cause, now);
|
||||
}
|
||||
|
||||
private ClusterState buildClusterState(String indexName, Settings.Builder indexSettingsBuilder) {
|
||||
|
@ -578,10 +622,16 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
}
|
||||
assertEquals(now, (long) LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newIndexSettings));
|
||||
assertFalse(LifecycleSettings.LIFECYCLE_FAILED_STEP_SETTING.exists(newIndexSettings));
|
||||
assertEquals("", LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.get(newIndexSettings));
|
||||
}
|
||||
|
||||
private void assertClusterStateOnErrorStep(ClusterState oldClusterState, Index index, StepKey currentStep, ClusterState newClusterState,
|
||||
long now) {
|
||||
Exception cause, long now) throws IOException {
|
||||
XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder();
|
||||
causeXContentBuilder.startObject();
|
||||
ElasticsearchException.generateFailureXContent(causeXContentBuilder, ToXContent.EMPTY_PARAMS, cause, false);
|
||||
causeXContentBuilder.endObject();
|
||||
String expectedCauseValue = BytesReference.bytes(causeXContentBuilder).utf8ToString();
|
||||
assertNotSame(oldClusterState, newClusterState);
|
||||
MetaData newMetadata = newClusterState.metaData();
|
||||
assertNotSame(oldClusterState.metaData(), newMetadata);
|
||||
|
@ -593,6 +643,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
assertEquals(currentStep.getAction(), LifecycleSettings.LIFECYCLE_ACTION_SETTING.get(newIndexSettings));
|
||||
assertEquals(ErrorStep.NAME, LifecycleSettings.LIFECYCLE_STEP_SETTING.get(newIndexSettings));
|
||||
assertEquals(currentStep.getName(), LifecycleSettings.LIFECYCLE_FAILED_STEP_SETTING.get(newIndexSettings));
|
||||
assertEquals(expectedCauseValue, LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.get(newIndexSettings));
|
||||
assertEquals(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(oldClusterState.metaData().index(index).getSettings()),
|
||||
LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newIndexSettings));
|
||||
assertEquals(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(oldClusterState.metaData().index(index).getSettings()),
|
||||
|
@ -600,6 +651,42 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
assertEquals(now, (long) LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newIndexSettings));
|
||||
}
|
||||
|
||||
private class RandomStepInfo implements ToXContentObject {
|
||||
|
||||
private final String key;
|
||||
private final String value;
|
||||
|
||||
RandomStepInfo() {
|
||||
this.key = randomAlphaOfLength(20);
|
||||
this.value = randomAlphaOfLength(20);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(key, value);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
RandomStepInfo other = (RandomStepInfo) obj;
|
||||
return Objects.equals(key, other.key) && Objects.equals(value, other.value);
|
||||
}
|
||||
}
|
||||
|
||||
private static class MockAsyncActionStep extends AsyncActionStep {
|
||||
|
||||
private Exception exception;
|
||||
|
@ -649,6 +736,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
private Exception exception;
|
||||
private boolean willComplete;
|
||||
private long executeCount = 0;
|
||||
private ToXContentObject expectedInfo = null;
|
||||
|
||||
MockAsyncWaitStep(StepKey key, StepKey nextStepKey) {
|
||||
super(key, nextStepKey, null);
|
||||
|
@ -662,6 +750,10 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
this.willComplete = willComplete;
|
||||
}
|
||||
|
||||
void expectedInfo(ToXContentObject expectedInfo) {
|
||||
this.expectedInfo = expectedInfo;
|
||||
}
|
||||
|
||||
long getExecuteCount() {
|
||||
return executeCount;
|
||||
}
|
||||
|
@ -670,7 +762,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
public void evaluateCondition(Index index, Listener listener) {
|
||||
executeCount++;
|
||||
if (exception == null) {
|
||||
listener.onResponse(willComplete);
|
||||
listener.onResponse(willComplete, expectedInfo);
|
||||
} else {
|
||||
listener.onFailure(exception);
|
||||
}
|
||||
|
@ -771,11 +863,13 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
private Index index;
|
||||
private String policy;
|
||||
private StepKey currentStepKey;
|
||||
private Exception cause;
|
||||
|
||||
MoveToErrorStepUpdateTaskMatcher(Index index, String policy, StepKey currentStepKey) {
|
||||
MoveToErrorStepUpdateTaskMatcher(Index index, String policy, StepKey currentStepKey, Exception cause) {
|
||||
this.index = index;
|
||||
this.policy = policy;
|
||||
this.currentStepKey = currentStepKey;
|
||||
this.cause = cause;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -784,8 +878,39 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||
return false;
|
||||
}
|
||||
MoveToErrorStepUpdateTask task = (MoveToErrorStepUpdateTask) argument;
|
||||
return Objects.equals(index, task.getIndex()) && Objects.equals(policy, task.getPolicy())
|
||||
&& Objects.equals(currentStepKey, task.getCurrentStepKey());
|
||||
return Objects.equals(index, task.getIndex()) &&
|
||||
Objects.equals(policy, task.getPolicy())&&
|
||||
Objects.equals(currentStepKey, task.getCurrentStepKey()) &&
|
||||
Objects.equals(cause.getClass(), task.getCause().getClass()) &&
|
||||
Objects.equals(cause.getMessage(), task.getCause().getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class SetStepInfoUpdateTaskMatcher extends ArgumentMatcher<SetStepInfoUpdateTask> {
|
||||
|
||||
private Index index;
|
||||
private String policy;
|
||||
private StepKey currentStepKey;
|
||||
private ToXContentObject stepInfo;
|
||||
|
||||
SetStepInfoUpdateTaskMatcher(Index index, String policy, StepKey currentStepKey, ToXContentObject stepInfo) {
|
||||
this.index = index;
|
||||
this.policy = policy;
|
||||
this.currentStepKey = currentStepKey;
|
||||
this.stepInfo = stepInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matches(Object argument) {
|
||||
if (argument == null || argument instanceof SetStepInfoUpdateTask == false) {
|
||||
return false;
|
||||
}
|
||||
SetStepInfoUpdateTask task = (SetStepInfoUpdateTask) argument;
|
||||
return Objects.equals(index, task.getIndex()) &&
|
||||
Objects.equals(policy, task.getPolicy())&&
|
||||
Objects.equals(currentStepKey, task.getCurrentStepKey()) &&
|
||||
Objects.equals(stepInfo, task.getStepInfo());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -11,7 +11,11 @@ import org.elasticsearch.cluster.ClusterName;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
|
||||
|
@ -19,6 +23,8 @@ import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
|||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
|
@ -43,13 +49,14 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase {
|
|||
clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
|
||||
}
|
||||
|
||||
public void testExecuteSuccessfullyMoved() {
|
||||
public void testExecuteSuccessfullyMoved() throws IOException {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
long now = randomNonNegativeLong();
|
||||
Exception cause = new ElasticsearchException("THIS IS AN EXPECTED CAUSE");
|
||||
|
||||
setStateToKey(currentStepKey);
|
||||
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, () -> now);
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
StepKey actualKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
|
||||
assertThat(actualKey, equalTo(new StepKey(currentStepKey.getPhase(), currentStepKey.getAction(), ErrorStep.NAME)));
|
||||
|
@ -58,24 +65,34 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase {
|
|||
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(now));
|
||||
|
||||
XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder();
|
||||
causeXContentBuilder.startObject();
|
||||
ElasticsearchException.generateFailureXContent(causeXContentBuilder, ToXContent.EMPTY_PARAMS, cause, false);
|
||||
causeXContentBuilder.endObject();
|
||||
String expectedCauseValue = BytesReference.bytes(causeXContentBuilder).utf8ToString();
|
||||
assertThat(LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.get(newState.metaData().index(index).getSettings()),
|
||||
equalTo(expectedCauseValue));
|
||||
}
|
||||
|
||||
public void testExecuteNoopDifferentStep() {
|
||||
public void testExecuteNoopDifferentStep() throws IOException {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
StepKey notCurrentStepKey = new StepKey("not-current", "not-current", "not-current");
|
||||
long now = randomNonNegativeLong();
|
||||
Exception cause = new ElasticsearchException("THIS IS AN EXPECTED CAUSE");
|
||||
setStateToKey(notCurrentStepKey);
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, () -> now);
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertThat(newState, sameInstance(clusterState));
|
||||
}
|
||||
|
||||
public void testExecuteNoopDifferentPolicy() {
|
||||
public void testExecuteNoopDifferentPolicy() throws IOException {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
long now = randomNonNegativeLong();
|
||||
Exception cause = new ElasticsearchException("THIS IS AN EXPECTED CAUSE");
|
||||
setStateToKey(currentStepKey);
|
||||
setStatePolicy("not-" + policy);
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, () -> now);
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertThat(newState, sameInstance(clusterState));
|
||||
}
|
||||
|
@ -83,10 +100,11 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase {
|
|||
public void testOnFailure() {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
long now = randomNonNegativeLong();
|
||||
Exception cause = new ElasticsearchException("THIS IS AN EXPECTED CAUSE");
|
||||
|
||||
setStateToKey(currentStepKey);
|
||||
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, () -> now);
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now);
|
||||
Exception expectedException = new RuntimeException();
|
||||
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
|
||||
() -> task.onFailure(randomAlphaOfLength(10), expectedException));
|
||||
|
|
|
@ -0,0 +1,133 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class SetStepInfoUpdateTaskTests extends ESTestCase {
|
||||
|
||||
String policy;
|
||||
ClusterState clusterState;
|
||||
Index index;
|
||||
|
||||
@Before
|
||||
public void setupClusterState() {
|
||||
policy = randomAlphaOfLength(10);
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder(randomAlphaOfLength(5))
|
||||
.settings(settings(Version.CURRENT)
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policy))
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
index = indexMetadata.getIndex();
|
||||
MetaData metaData = MetaData.builder()
|
||||
.persistentSettings(settings(Version.CURRENT).build())
|
||||
.put(IndexMetaData.builder(indexMetadata))
|
||||
.build();
|
||||
clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
|
||||
}
|
||||
|
||||
public void testExecuteSuccessfullySet() throws IOException {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
ToXContentObject stepInfo = getRandomStepInfo();
|
||||
setStateToKey(currentStepKey);
|
||||
|
||||
SetStepInfoUpdateTask task = new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
StepKey actualKey = IndexLifecycleRunner.getCurrentStepKey(newState.metaData().index(index).getSettings());
|
||||
assertThat(actualKey, equalTo(currentStepKey));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_PHASE_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
|
||||
assertThat(LifecycleSettings.LIFECYCLE_STEP_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
|
||||
|
||||
XContentBuilder infoXContentBuilder = JsonXContent.contentBuilder();
|
||||
stepInfo.toXContent(infoXContentBuilder, ToXContent.EMPTY_PARAMS);
|
||||
String expectedCauseValue = BytesReference.bytes(infoXContentBuilder).utf8ToString();
|
||||
assertThat(LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING.get(newState.metaData().index(index).getSettings()),
|
||||
equalTo(expectedCauseValue));
|
||||
}
|
||||
|
||||
private ToXContentObject getRandomStepInfo() {
|
||||
String key = randomAlphaOfLength(20);
|
||||
String value = randomAlphaOfLength(20);
|
||||
return (b, p) -> {
|
||||
b.startObject();
|
||||
b.field(key, value);
|
||||
b.endObject();
|
||||
return b;
|
||||
};
|
||||
}
|
||||
|
||||
public void testExecuteNoopDifferentStep() throws IOException {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
StepKey notCurrentStepKey = new StepKey("not-current", "not-current", "not-current");
|
||||
ToXContentObject stepInfo = getRandomStepInfo();
|
||||
setStateToKey(notCurrentStepKey);
|
||||
SetStepInfoUpdateTask task = new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertThat(newState, sameInstance(clusterState));
|
||||
}
|
||||
|
||||
public void testExecuteNoopDifferentPolicy() throws IOException {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
ToXContentObject stepInfo = getRandomStepInfo();
|
||||
setStateToKey(currentStepKey);
|
||||
setStatePolicy("not-" + policy);
|
||||
SetStepInfoUpdateTask task = new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo);
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertThat(newState, sameInstance(clusterState));
|
||||
}
|
||||
|
||||
public void testOnFailure() {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
ToXContentObject stepInfo = getRandomStepInfo();
|
||||
|
||||
setStateToKey(currentStepKey);
|
||||
|
||||
SetStepInfoUpdateTask task = new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo);
|
||||
Exception expectedException = new RuntimeException();
|
||||
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
|
||||
() -> task.onFailure(randomAlphaOfLength(10), expectedException));
|
||||
assertEquals("policy [" + policy + "] for index [" + index.getName() + "] failed trying to set step info for step ["
|
||||
+ currentStepKey + "].", exception.getMessage());
|
||||
assertSame(expectedException, exception.getCause());
|
||||
}
|
||||
|
||||
private void setStatePolicy(String policy) {
|
||||
clusterState = ClusterState.builder(clusterState)
|
||||
.metaData(MetaData.builder(clusterState.metaData())
|
||||
.updateSettings(Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policy).build(), index.getName())).build();
|
||||
|
||||
}
|
||||
private void setStateToKey(StepKey stepKey) {
|
||||
clusterState = ClusterState.builder(clusterState)
|
||||
.metaData(MetaData.builder(clusterState.metaData())
|
||||
.updateSettings(Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_PHASE, stepKey.getPhase())
|
||||
.put(LifecycleSettings.LIFECYCLE_ACTION, stepKey.getAction())
|
||||
.put(LifecycleSettings.LIFECYCLE_STEP, stepKey.getName()).build(), index.getName())).build();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue