mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 02:14:54 +00:00
This adds the infrastructure to be able to retry the execution of retryable steps and makes the `check-rollover-ready` retryable as an initial step to make the rollover action more resilient to transient errors. (cherry picked from commit 454020ac8acb147eae97acb4ccd6fb470d1e5f48) Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
This commit is contained in:
parent
0366c4d4a9
commit
ffe5d5417f
@ -239,8 +239,11 @@ information for the step that's being performed on the index.
|
||||
|
||||
If the index is in the ERROR step, something went wrong while executing a
|
||||
step in the policy and you will need to take action for the index to proceed
|
||||
to the next step. To help you diagnose the problem, the explain response shows
|
||||
the step that failed and the step info provides information about the error.
|
||||
to the next step. Some steps are safe to automatically be retried in certain
|
||||
circumstances. To help you diagnose the problem, the explain response shows
|
||||
the step that failed, the step info which provides information about the error,
|
||||
and information about the retry attempts executed for the failed step if it's
|
||||
the case.
|
||||
|
||||
[source,console-result]
|
||||
--------------------------------------------------
|
||||
@ -262,10 +265,12 @@ the step that failed and the step info provides information about the error.
|
||||
"step": "ERROR",
|
||||
"step_time_millis": 1538475653317,
|
||||
"step_time": "2018-10-15T13:45:22.577Z",
|
||||
"failed_step": "attempt-rollover", <1>
|
||||
"step_info": { <2>
|
||||
"type": "resource_already_exists_exception",
|
||||
"reason": "index [test-000057/H7lF9n36Rzqa-KfKcnGQMg] already exists",
|
||||
"failed_step": "check-rollover-ready", <1>
|
||||
"is_auto_retryable_error": true, <2>
|
||||
"failed_step_retry_count": 1, <3>
|
||||
"step_info": { <4>
|
||||
"type": "cluster_block_exception",
|
||||
"reason": "index [test-000057/H7lF9n36Rzqa-KfKcnGQMg] blocked by: [FORBIDDEN/5/index read-only (api)",
|
||||
"index_uuid": "H7lF9n36Rzqa-KfKcnGQMg",
|
||||
"index": "test-000057"
|
||||
},
|
||||
@ -290,4 +295,8 @@ the step that failed and the step info provides information about the error.
|
||||
// TESTRESPONSE[skip:not possible to get the cluster into this state in a docs test]
|
||||
|
||||
<1> The step that caused the error
|
||||
<2> What went wrong
|
||||
<2> Indicates if retrying the failed step can overcome the error. If this
|
||||
is true, ILM will retry the failed step automatically.
|
||||
<3> Shows the number of attempted automatic retries to execute the failed
|
||||
step.
|
||||
<4> What went wrong
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
package org.elasticsearch.xpack.core.ilm;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
@ -34,6 +35,8 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
|
||||
private static final ParseField ACTION_FIELD = new ParseField("action");
|
||||
private static final ParseField STEP_FIELD = new ParseField("step");
|
||||
private static final ParseField FAILED_STEP_FIELD = new ParseField("failed_step");
|
||||
private static final ParseField IS_AUTO_RETRYABLE_ERROR_FIELD = new ParseField("is_auto_retryable_error");
|
||||
private static final ParseField FAILED_STEP_RETRY_COUNT_FIELD = new ParseField("failed_step_retry_count");
|
||||
private static final ParseField PHASE_TIME_MILLIS_FIELD = new ParseField("phase_time_millis");
|
||||
private static final ParseField PHASE_TIME_FIELD = new ParseField("phase_time");
|
||||
private static final ParseField ACTION_TIME_MILLIS_FIELD = new ParseField("action_time_millis");
|
||||
@ -55,6 +58,8 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
|
||||
(String) a[5],
|
||||
(String) a[6],
|
||||
(String) a[7],
|
||||
(Boolean) a[14],
|
||||
(Integer) a[15],
|
||||
(Long) (a[8]),
|
||||
(Long) (a[9]),
|
||||
(Long) (a[10]),
|
||||
@ -82,6 +87,8 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
|
||||
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> PhaseExecutionInfo.parse(p, ""),
|
||||
PHASE_EXECUTION_INFO);
|
||||
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), AGE_FIELD);
|
||||
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), IS_AUTO_RETRYABLE_ERROR_FIELD);
|
||||
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), FAILED_STEP_RETRY_COUNT_FIELD);
|
||||
}
|
||||
|
||||
private final String index;
|
||||
@ -97,21 +104,25 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
|
||||
private final boolean managedByILM;
|
||||
private final BytesReference stepInfo;
|
||||
private final PhaseExecutionInfo phaseExecutionInfo;
|
||||
private final Boolean isAutoRetryableError;
|
||||
private final Integer failedStepRetryCount;
|
||||
|
||||
public static IndexLifecycleExplainResponse newManagedIndexResponse(String index, String policyName, Long lifecycleDate,
|
||||
String phase, String action, String step, String failedStep, Long phaseTime, Long actionTime, Long stepTime,
|
||||
BytesReference stepInfo, PhaseExecutionInfo phaseExecutionInfo) {
|
||||
return new IndexLifecycleExplainResponse(index, true, policyName, lifecycleDate, phase, action, step, failedStep, phaseTime,
|
||||
actionTime, stepTime, stepInfo, phaseExecutionInfo);
|
||||
String phase, String action, String step, String failedStep, Boolean isAutoRetryableError, Integer failedStepRetryCount,
|
||||
Long phaseTime, Long actionTime, Long stepTime, BytesReference stepInfo, PhaseExecutionInfo phaseExecutionInfo) {
|
||||
return new IndexLifecycleExplainResponse(index, true, policyName, lifecycleDate, phase, action, step, failedStep,
|
||||
isAutoRetryableError, failedStepRetryCount, phaseTime, actionTime, stepTime, stepInfo, phaseExecutionInfo);
|
||||
}
|
||||
|
||||
public static IndexLifecycleExplainResponse newUnmanagedIndexResponse(String index) {
|
||||
return new IndexLifecycleExplainResponse(index, false, null, null, null, null, null, null, null, null, null, null, null);
|
||||
return new IndexLifecycleExplainResponse(index, false, null, null, null, null, null, null, null, null, null, null, null, null,
|
||||
null);
|
||||
}
|
||||
|
||||
private IndexLifecycleExplainResponse(String index, boolean managedByILM, String policyName, Long lifecycleDate,
|
||||
String phase, String action, String step, String failedStep, Long phaseTime, Long actionTime,
|
||||
Long stepTime, BytesReference stepInfo, PhaseExecutionInfo phaseExecutionInfo) {
|
||||
String phase, String action, String step, String failedStep, Boolean isAutoRetryableError,
|
||||
Integer failedStepRetryCount, Long phaseTime, Long actionTime, Long stepTime,
|
||||
BytesReference stepInfo, PhaseExecutionInfo phaseExecutionInfo) {
|
||||
if (managedByILM) {
|
||||
if (policyName == null) {
|
||||
throw new IllegalArgumentException("[" + POLICY_NAME_FIELD.getPreferredName() + "] cannot be null for managed index");
|
||||
@ -142,6 +153,8 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
|
||||
this.actionTime = actionTime;
|
||||
this.stepTime = stepTime;
|
||||
this.failedStep = failedStep;
|
||||
this.isAutoRetryableError = isAutoRetryableError;
|
||||
this.failedStepRetryCount = failedStepRetryCount;
|
||||
this.stepInfo = stepInfo;
|
||||
this.phaseExecutionInfo = phaseExecutionInfo;
|
||||
}
|
||||
@ -161,6 +174,13 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
|
||||
stepTime = in.readOptionalLong();
|
||||
stepInfo = in.readOptionalBytesReference();
|
||||
phaseExecutionInfo = in.readOptionalWriteable(PhaseExecutionInfo::new);
|
||||
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
|
||||
isAutoRetryableError = in.readOptionalBoolean();
|
||||
failedStepRetryCount = in.readOptionalVInt();
|
||||
} else {
|
||||
isAutoRetryableError = null;
|
||||
failedStepRetryCount = null;
|
||||
}
|
||||
} else {
|
||||
policyName = null;
|
||||
lifecycleDate = null;
|
||||
@ -168,6 +188,8 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
|
||||
action = null;
|
||||
step = null;
|
||||
failedStep = null;
|
||||
isAutoRetryableError = null;
|
||||
failedStepRetryCount = null;
|
||||
phaseTime = null;
|
||||
actionTime = null;
|
||||
stepTime = null;
|
||||
@ -192,6 +214,10 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
|
||||
out.writeOptionalLong(stepTime);
|
||||
out.writeOptionalBytesReference(stepInfo);
|
||||
out.writeOptionalWriteable(phaseExecutionInfo);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
|
||||
out.writeOptionalBoolean(isAutoRetryableError);
|
||||
out.writeOptionalVInt(failedStepRetryCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -247,6 +273,14 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
|
||||
return phaseExecutionInfo;
|
||||
}
|
||||
|
||||
public Boolean isAutoRetryableError() {
|
||||
return isAutoRetryableError;
|
||||
}
|
||||
|
||||
public Integer getFailedStepRetryCount() {
|
||||
return failedStepRetryCount;
|
||||
}
|
||||
|
||||
public TimeValue getAge() {
|
||||
if (lifecycleDate == null) {
|
||||
return TimeValue.MINUS_ONE;
|
||||
@ -287,6 +321,12 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
|
||||
if (Strings.hasLength(failedStep)) {
|
||||
builder.field(FAILED_STEP_FIELD.getPreferredName(), failedStep);
|
||||
}
|
||||
if (isAutoRetryableError != null) {
|
||||
builder.field(IS_AUTO_RETRYABLE_ERROR_FIELD.getPreferredName(), isAutoRetryableError);
|
||||
}
|
||||
if (failedStepRetryCount != null) {
|
||||
builder.field(FAILED_STEP_RETRY_COUNT_FIELD.getPreferredName(), failedStepRetryCount);
|
||||
}
|
||||
if (stepInfo != null && stepInfo.length() > 0) {
|
||||
builder.rawField(STEP_INFO_FIELD.getPreferredName(), stepInfo.streamInput(), XContentType.JSON);
|
||||
}
|
||||
@ -300,8 +340,8 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(index, managedByILM, policyName, lifecycleDate, phase, action, step, failedStep, phaseTime, actionTime,
|
||||
stepTime, stepInfo, phaseExecutionInfo);
|
||||
return Objects.hash(index, managedByILM, policyName, lifecycleDate, phase, action, step, failedStep, isAutoRetryableError,
|
||||
failedStepRetryCount, phaseTime, actionTime, stepTime, stepInfo, phaseExecutionInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -321,6 +361,8 @@ public class IndexLifecycleExplainResponse implements ToXContentObject, Writeabl
|
||||
Objects.equals(action, other.action) &&
|
||||
Objects.equals(step, other.step) &&
|
||||
Objects.equals(failedStep, other.failedStep) &&
|
||||
Objects.equals(isAutoRetryableError, other.isAutoRetryableError) &&
|
||||
Objects.equals(failedStepRetryCount, other.failedStepRetryCount) &&
|
||||
Objects.equals(phaseTime, other.phaseTime) &&
|
||||
Objects.equals(actionTime, other.actionTime) &&
|
||||
Objects.equals(stepTime, other.stepTime) &&
|
||||
|
@ -30,6 +30,8 @@ public class LifecycleExecutionState {
|
||||
private static final String ACTION_TIME = "action_time";
|
||||
private static final String STEP_TIME = "step_time";
|
||||
private static final String FAILED_STEP = "failed_step";
|
||||
private static final String IS_AUTO_RETRYABLE_ERROR = "is_auto_retryable_error";
|
||||
private static final String FAILED_STEP_RETRY_COUNT = "failed_step_retry_count";
|
||||
private static final String STEP_INFO = "step_info";
|
||||
private static final String PHASE_DEFINITION = "phase_definition";
|
||||
|
||||
@ -37,6 +39,8 @@ public class LifecycleExecutionState {
|
||||
private final String action;
|
||||
private final String step;
|
||||
private final String failedStep;
|
||||
private final Boolean isAutoRetryableError;
|
||||
private final Integer failedStepRetryCount;
|
||||
private final String stepInfo;
|
||||
private final String phaseDefinition;
|
||||
private final Long lifecycleDate;
|
||||
@ -44,13 +48,15 @@ public class LifecycleExecutionState {
|
||||
private final Long actionTime;
|
||||
private final Long stepTime;
|
||||
|
||||
private LifecycleExecutionState(String phase, String action, String step, String failedStep,
|
||||
String stepInfo, String phaseDefinition, Long lifecycleDate,
|
||||
private LifecycleExecutionState(String phase, String action, String step, String failedStep, Boolean isAutoRetryableError,
|
||||
Integer failedStepRetryCount, String stepInfo, String phaseDefinition, Long lifecycleDate,
|
||||
Long phaseTime, Long actionTime, Long stepTime) {
|
||||
this.phase = phase;
|
||||
this.action = action;
|
||||
this.step = step;
|
||||
this.failedStep = failedStep;
|
||||
this.isAutoRetryableError = isAutoRetryableError;
|
||||
this.failedStepRetryCount = failedStepRetryCount;
|
||||
this.stepInfo = stepInfo;
|
||||
this.phaseDefinition = phaseDefinition;
|
||||
this.lifecycleDate = lifecycleDate;
|
||||
@ -82,6 +88,8 @@ public class LifecycleExecutionState {
|
||||
.setAction(state.action)
|
||||
.setStep(state.step)
|
||||
.setFailedStep(state.failedStep)
|
||||
.setIsAutoRetryableError(state.isAutoRetryableError)
|
||||
.setFailedStepRetryCount(state.failedStepRetryCount)
|
||||
.setStepInfo(state.stepInfo)
|
||||
.setPhaseDefinition(state.phaseDefinition)
|
||||
.setIndexCreationDate(state.lifecycleDate)
|
||||
@ -104,6 +112,12 @@ public class LifecycleExecutionState {
|
||||
if (customData.containsKey(FAILED_STEP)) {
|
||||
builder.setFailedStep(customData.get(FAILED_STEP));
|
||||
}
|
||||
if (customData.containsKey(IS_AUTO_RETRYABLE_ERROR)) {
|
||||
builder.setIsAutoRetryableError(Boolean.parseBoolean(customData.get(IS_AUTO_RETRYABLE_ERROR)));
|
||||
}
|
||||
if (customData.containsKey(FAILED_STEP_RETRY_COUNT)) {
|
||||
builder.setFailedStepRetryCount(Integer.parseInt(customData.get(FAILED_STEP_RETRY_COUNT)));
|
||||
}
|
||||
if (customData.containsKey(STEP_INFO)) {
|
||||
builder.setStepInfo(customData.get(STEP_INFO));
|
||||
}
|
||||
@ -164,6 +178,12 @@ public class LifecycleExecutionState {
|
||||
if (failedStep != null) {
|
||||
result.put(FAILED_STEP, failedStep);
|
||||
}
|
||||
if (isAutoRetryableError != null) {
|
||||
result.put(IS_AUTO_RETRYABLE_ERROR, String.valueOf(isAutoRetryableError));
|
||||
}
|
||||
if (failedStepRetryCount != null) {
|
||||
result.put(FAILED_STEP_RETRY_COUNT, String.valueOf(failedStepRetryCount));
|
||||
}
|
||||
if (stepInfo != null) {
|
||||
result.put(STEP_INFO, stepInfo);
|
||||
}
|
||||
@ -201,6 +221,14 @@ public class LifecycleExecutionState {
|
||||
return failedStep;
|
||||
}
|
||||
|
||||
public Boolean isAutoRetryableError() {
|
||||
return isAutoRetryableError;
|
||||
}
|
||||
|
||||
public Integer getFailedStepRetryCount() {
|
||||
return failedStepRetryCount;
|
||||
}
|
||||
|
||||
public String getStepInfo() {
|
||||
return stepInfo;
|
||||
}
|
||||
@ -230,7 +258,7 @@ public class LifecycleExecutionState {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
LifecycleExecutionState that = (LifecycleExecutionState) o;
|
||||
return Objects.equals(getLifecycleDate(),that.getLifecycleDate()) &&
|
||||
return Objects.equals(getLifecycleDate(), that.getLifecycleDate()) &&
|
||||
Objects.equals(getPhaseTime(), that.getPhaseTime()) &&
|
||||
Objects.equals(getActionTime(), that.getActionTime()) &&
|
||||
Objects.equals(getStepTime(), that.getStepTime()) &&
|
||||
@ -238,14 +266,16 @@ public class LifecycleExecutionState {
|
||||
Objects.equals(getAction(), that.getAction()) &&
|
||||
Objects.equals(getStep(), that.getStep()) &&
|
||||
Objects.equals(getFailedStep(), that.getFailedStep()) &&
|
||||
Objects.equals(isAutoRetryableError(), that.isAutoRetryableError()) &&
|
||||
Objects.equals(getFailedStepRetryCount(), that.getFailedStepRetryCount()) &&
|
||||
Objects.equals(getStepInfo(), that.getStepInfo()) &&
|
||||
Objects.equals(getPhaseDefinition(), that.getPhaseDefinition());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(getPhase(), getAction(), getStep(), getFailedStep(), getStepInfo(), getPhaseDefinition(),
|
||||
getLifecycleDate(), getPhaseTime(), getActionTime(), getStepTime());
|
||||
return Objects.hash(getPhase(), getAction(), getStep(), getFailedStep(), isAutoRetryableError(), getFailedStepRetryCount(),
|
||||
getStepInfo(), getPhaseDefinition(), getLifecycleDate(), getPhaseTime(), getActionTime(), getStepTime());
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
@ -259,6 +289,8 @@ public class LifecycleExecutionState {
|
||||
private Long phaseTime;
|
||||
private Long actionTime;
|
||||
private Long stepTime;
|
||||
private Boolean isAutoRetryableError;
|
||||
private Integer failedStepRetryCount;
|
||||
|
||||
public Builder setPhase(String phase) {
|
||||
this.phase = phase;
|
||||
@ -310,9 +342,19 @@ public class LifecycleExecutionState {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setIsAutoRetryableError(Boolean isAutoRetryableError) {
|
||||
this.isAutoRetryableError = isAutoRetryableError;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setFailedStepRetryCount(Integer failedStepRetryCount) {
|
||||
this.failedStepRetryCount = failedStepRetryCount;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LifecycleExecutionState build() {
|
||||
return new LifecycleExecutionState(phase, action, step, failedStep, stepInfo, phaseDefinition, indexCreationDate,
|
||||
phaseTime, actionTime, stepTime);
|
||||
return new LifecycleExecutionState(phase, action, step, failedStep, isAutoRetryableError, failedStepRetryCount, stepInfo,
|
||||
phaseDefinition, indexCreationDate, phaseTime, actionTime, stepTime);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -36,6 +36,7 @@ public class LifecycleSettings {
|
||||
public static final Setting<Boolean> LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING = Setting.boolSetting(LIFECYCLE_PARSE_ORIGINATION_DATE,
|
||||
false, Setting.Property.Dynamic, Setting.Property.IndexScope);
|
||||
|
||||
|
||||
public static final Setting<Boolean> SLM_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(SLM_HISTORY_INDEX_ENABLED, true,
|
||||
Setting.Property.NodeScope);
|
||||
public static final Setting<String> SLM_RETENTION_SCHEDULE_SETTING = Setting.simpleString(SLM_RETENTION_SCHEDULE,
|
||||
|
@ -38,6 +38,13 @@ public abstract class Step {
|
||||
return nextStepKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates if the step can be automatically retried when it encounters an execution error.
|
||||
*/
|
||||
public boolean isRetryable() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(key, nextStepKey);
|
||||
|
@ -42,6 +42,11 @@ public class WaitForRolloverReadyStep extends AsyncWaitStep {
|
||||
this.maxDocs = maxDocs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRetryable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
|
||||
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetaData.getSettings());
|
||||
|
@ -52,6 +52,8 @@ public class IndexLifecycleExplainResponseTests extends AbstractSerializingTestC
|
||||
stepNull ? null : randomAlphaOfLength(10),
|
||||
stepNull ? null : randomAlphaOfLength(10),
|
||||
randomBoolean() ? null : randomAlphaOfLength(10),
|
||||
stepNull ? null : randomBoolean(),
|
||||
stepNull ? null : randomInt(10),
|
||||
stepNull ? null : randomNonNegativeLong(),
|
||||
stepNull ? null : randomNonNegativeLong(),
|
||||
stepNull ? null : randomNonNegativeLong(),
|
||||
@ -69,6 +71,8 @@ public class IndexLifecycleExplainResponseTests extends AbstractSerializingTestC
|
||||
(numNull == 2) ? null : randomAlphaOfLength(10),
|
||||
(numNull == 3) ? null : randomAlphaOfLength(10),
|
||||
randomBoolean() ? null : randomAlphaOfLength(10),
|
||||
randomBoolean() ? null : randomBoolean(),
|
||||
randomBoolean() ? null : randomInt(10),
|
||||
randomBoolean() ? null : randomNonNegativeLong(),
|
||||
randomBoolean() ? null : randomNonNegativeLong(),
|
||||
randomBoolean() ? null : randomNonNegativeLong(),
|
||||
@ -106,6 +110,8 @@ public class IndexLifecycleExplainResponseTests extends AbstractSerializingTestC
|
||||
String action = instance.getAction();
|
||||
String step = instance.getStep();
|
||||
String failedStep = instance.getFailedStep();
|
||||
Boolean isAutoRetryableError = instance.isAutoRetryableError();
|
||||
Integer failedStepRetryCount = instance.getFailedStepRetryCount();
|
||||
Long policyTime = instance.getLifecycleDate();
|
||||
Long phaseTime = instance.getPhaseTime();
|
||||
Long actionTime = instance.getActionTime();
|
||||
@ -114,7 +120,7 @@ public class IndexLifecycleExplainResponseTests extends AbstractSerializingTestC
|
||||
BytesReference stepInfo = instance.getStepInfo();
|
||||
PhaseExecutionInfo phaseExecutionInfo = instance.getPhaseExecutionInfo();
|
||||
if (managed) {
|
||||
switch (between(0, 10)) {
|
||||
switch (between(0, 11)) {
|
||||
case 0:
|
||||
index = index + randomAlphaOfLengthBetween(1, 5);
|
||||
break;
|
||||
@ -162,11 +168,15 @@ public class IndexLifecycleExplainResponseTests extends AbstractSerializingTestC
|
||||
break;
|
||||
case 10:
|
||||
return IndexLifecycleExplainResponse.newUnmanagedIndexResponse(index);
|
||||
case 11:
|
||||
isAutoRetryableError = true;
|
||||
failedStepRetryCount = randomValueOtherThan(failedStepRetryCount, () -> randomInt(10));
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("Illegal randomisation branch");
|
||||
}
|
||||
return IndexLifecycleExplainResponse.newManagedIndexResponse(index, policy, policyTime, phase, action, step, failedStep,
|
||||
phaseTime, actionTime, stepTime, stepInfo, phaseExecutionInfo);
|
||||
isAutoRetryableError, failedStepRetryCount, phaseTime, actionTime, stepTime, stepInfo, phaseExecutionInfo);
|
||||
} else {
|
||||
switch (between(0, 1)) {
|
||||
case 0:
|
||||
|
@ -27,7 +27,7 @@ public class RolloverActionTests extends AbstractActionTestCase<RolloverAction>
|
||||
return randomInstance();
|
||||
}
|
||||
|
||||
static RolloverAction randomInstance() {
|
||||
public static RolloverAction randomInstance() {
|
||||
ByteSizeUnit maxSizeUnit = randomFrom(ByteSizeUnit.values());
|
||||
ByteSizeValue maxSize = randomBoolean() ? null : new ByteSizeValue(randomNonNegativeLong() / maxSizeUnit.toBytes(1), maxSizeUnit);
|
||||
Long maxDocs = randomBoolean() ? null : randomNonNegativeLong();
|
||||
|
@ -60,16 +60,20 @@ import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
||||
private static final Logger logger = LogManager.getLogger(TimeSeriesLifecycleActionsIT.class);
|
||||
private static final String FAILED_STEP_RETRY_COUNT_FIELD = "failed_step_retry_count";
|
||||
private static final String IS_AUTO_RETRYABLE_ERROR_FIELD = "is_auto_retryable_error";
|
||||
|
||||
private String index;
|
||||
private String policy;
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(TimeSeriesLifecycleActionsIT.class);
|
||||
|
||||
@Before
|
||||
public void refreshIndex() {
|
||||
index = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
@ -77,7 +81,6 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
||||
}
|
||||
|
||||
public static void updatePolicy(String indexName, String policy) throws IOException {
|
||||
|
||||
Request changePolicyRequest = new Request("PUT", "/" + indexName + "/_settings");
|
||||
final StringEntity changePolicyEntity = new StringEntity("{ \"index.lifecycle.name\": \"" + policy + "\" }",
|
||||
ContentType.APPLICATION_JSON);
|
||||
@ -292,7 +295,9 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
||||
String secondIndex = index + "-000002";
|
||||
createIndexWithSettings(originalIndex, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"));
|
||||
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias")
|
||||
);
|
||||
|
||||
// create policy
|
||||
createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L));
|
||||
// update policy on index
|
||||
@ -842,10 +847,12 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
||||
createIndexWithSettings(goodIndex, Settings.builder()
|
||||
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policy));
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policy)
|
||||
);
|
||||
createIndexWithSettingsNoAlias(errorIndex, Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policy));
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policy)
|
||||
);
|
||||
createIndexWithSettingsNoAlias(nonexistantPolicyIndex, Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, randomValueOtherThan(policy, () -> randomAlphaOfLengthBetween(3,10))));
|
||||
@ -863,27 +870,66 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
||||
assertThat(onlyManagedResponse, allOf(hasKey(goodIndex), hasKey(errorIndex), hasKey(nonexistantPolicyIndex)));
|
||||
assertThat(onlyManagedResponse, not(hasKey(unmanagedIndex)));
|
||||
|
||||
Map<String, Map<String, Object>> onlyErrorsResponse = explain(index + "*", true, randomBoolean());
|
||||
Map<String, Map<String, Object>> onlyErrorsResponse = explain(index + "*", true, true);
|
||||
assertNotNull(onlyErrorsResponse);
|
||||
assertThat(onlyErrorsResponse, allOf(hasKey(errorIndex), hasKey(nonexistantPolicyIndex)));
|
||||
assertThat(onlyErrorsResponse, allOf(not(hasKey(goodIndex)), not(hasKey(unmanagedIndex))));
|
||||
});
|
||||
}
|
||||
|
||||
public void testExplainIndexContainsAutomaticRetriesInformation() throws Exception {
|
||||
createFullPolicy(TimeValue.ZERO);
|
||||
|
||||
// create index without alias so the rollover action fails and is retried
|
||||
createIndexWithSettingsNoAlias(index, Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policy)
|
||||
);
|
||||
|
||||
assertBusy(() -> {
|
||||
Map<String, Object> explainIndex = explainIndex(index);
|
||||
assertThat((Integer) explainIndex.get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1));
|
||||
assertThat(explainIndex.get(IS_AUTO_RETRYABLE_ERROR_FIELD), is(true));
|
||||
});
|
||||
}
|
||||
|
||||
public void testILMRolloverRetriesOnReadOnlyBlock() throws Exception {
|
||||
String firstIndex = index + "-000001";
|
||||
|
||||
createNewSingletonPolicy("hot", new RolloverAction(null, TimeValue.timeValueSeconds(1), null));
|
||||
|
||||
// create the index as readonly and associate the ILM policy to it
|
||||
createIndexWithSettings(
|
||||
firstIndex,
|
||||
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policy)
|
||||
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias")
|
||||
.put("index.blocks.read_only", true),
|
||||
true
|
||||
);
|
||||
|
||||
// wait for ILM to start retrying the step
|
||||
assertBusy(() -> assertThat((Integer) explainIndex(firstIndex).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)));
|
||||
|
||||
// remove the read only block
|
||||
Request allowWritesOnIndexSettingUpdate = new Request("PUT", firstIndex + "/_settings");
|
||||
allowWritesOnIndexSettingUpdate.setJsonEntity("{" +
|
||||
" \"index\": {\n" +
|
||||
" \"blocks.read_only\" : \"false\" \n" +
|
||||
" }\n" +
|
||||
"}");
|
||||
client().performRequest(allowWritesOnIndexSettingUpdate);
|
||||
|
||||
// index is not readonly so the ILM should complete successfully
|
||||
assertBusy(() -> assertThat(getStepKeyForIndex(firstIndex), equalTo(TerminalPolicyStep.KEY)));
|
||||
}
|
||||
|
||||
public void testILMRolloverOnManuallyRolledIndex() throws Exception {
|
||||
String originalIndex = index + "-000001";
|
||||
String secondIndex = index + "-000002";
|
||||
String thirdIndex = index + "-000003";
|
||||
|
||||
// Configure ILM to run every second
|
||||
Request updateLifecylePollSetting = new Request("PUT", "_cluster/settings");
|
||||
updateLifecylePollSetting.setJsonEntity("{" +
|
||||
" \"transient\": {\n" +
|
||||
"\"indices.lifecycle.poll_interval\" : \"1s\" \n" +
|
||||
" }\n" +
|
||||
"}");
|
||||
client().performRequest(updateLifecylePollSetting);
|
||||
|
||||
// Set up a policy with rollover
|
||||
createNewSingletonPolicy("hot", new RolloverAction(null, null, 2L));
|
||||
Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes");
|
||||
|
@ -192,7 +192,7 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
|
||||
logger.error("policy [{}] for index [{}] failed on cluster state step [{}]. Moving to ERROR step", policy, index.getName(),
|
||||
currentStepKey);
|
||||
MoveToErrorStepUpdateTask moveToErrorStepUpdateTask = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause,
|
||||
nowSupplier);
|
||||
nowSupplier, policyStepsRegistry::getStep);
|
||||
return moveToErrorStepUpdateTask.execute(state);
|
||||
}
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
@ -46,6 +47,7 @@ import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE;
|
||||
@ -118,7 +120,7 @@ public class IndexLifecycleRunner {
|
||||
logger.debug("policy [{}] for index [{}] complete, skipping execution", policy, index);
|
||||
return;
|
||||
} else if (currentStep instanceof ErrorStep) {
|
||||
logger.debug("policy [{}] for index [{}] on an error step, skipping execution", policy, index);
|
||||
onErrorMaybeRetryFailedStep(policy, indexMetaData);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -154,6 +156,38 @@ public class IndexLifecycleRunner {
|
||||
}
|
||||
}
|
||||
|
||||
private void onErrorMaybeRetryFailedStep(String policy, IndexMetaData indexMetaData) {
|
||||
String index = indexMetaData.getIndex().getName();
|
||||
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData);
|
||||
Step failedStep = stepRegistry.getStep(indexMetaData, new StepKey(lifecycleState.getPhase(), lifecycleState.getAction(),
|
||||
lifecycleState.getFailedStep()));
|
||||
if (failedStep == null) {
|
||||
logger.warn("failed step [{}] for index [{}] is not part of policy [{}] anymore, or it is invalid. skipping execution",
|
||||
lifecycleState.getFailedStep(), index, policy);
|
||||
return;
|
||||
}
|
||||
|
||||
if (lifecycleState.isAutoRetryableError() != null && lifecycleState.isAutoRetryableError()) {
|
||||
int currentRetryAttempt = lifecycleState.getFailedStepRetryCount() == null ? 1 : 1 + lifecycleState.getFailedStepRetryCount();
|
||||
logger.info("policy [{}] for index [{}] on an error step due to a transitive error, moving back to the failed " +
|
||||
"step [{}] for execution. retry attempt [{}]", policy, index, lifecycleState.getFailedStep(), currentRetryAttempt);
|
||||
clusterService.submitStateUpdateTask("ilm-retry-failed-step", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return moveClusterStateToPreviouslyFailedStep(currentState, index, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.error(new ParameterizedMessage("retry execution of step [{}] for index [{}] failed",
|
||||
failedStep.getKey().getName(), index), e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
logger.debug("policy [{}] for index [{}] on an error step after a terminal error, skipping execution", policy, index);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the current step (matching the expected step key) is an asynchronous action step, run it
|
||||
*/
|
||||
@ -291,6 +325,19 @@ public class IndexLifecycleRunner {
|
||||
StepKey nextStepKey, LongSupplier nowSupplier,
|
||||
PolicyStepsRegistry stepRegistry, boolean forcePhaseDefinitionRefresh) {
|
||||
IndexMetaData idxMeta = currentState.getMetaData().index(indexName);
|
||||
validateTransition(idxMeta, currentStepKey, nextStepKey, stepRegistry);
|
||||
|
||||
Settings indexSettings = idxMeta.getSettings();
|
||||
String policy = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings);
|
||||
logger.info("moving index [{}] from [{}] to [{}] in policy [{}]",
|
||||
indexName, currentStepKey, nextStepKey, policy);
|
||||
|
||||
return IndexLifecycleRunner.moveClusterStateToNextStep(idxMeta.getIndex(), currentState, currentStepKey,
|
||||
nextStepKey, nowSupplier, forcePhaseDefinitionRefresh);
|
||||
}
|
||||
|
||||
static void validateTransition(IndexMetaData idxMeta, StepKey currentStepKey, StepKey nextStepKey, PolicyStepsRegistry stepRegistry) {
|
||||
String indexName = idxMeta.getIndex().getName();
|
||||
Settings indexSettings = idxMeta.getSettings();
|
||||
String indexPolicySetting = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings);
|
||||
|
||||
@ -308,12 +355,6 @@ public class IndexLifecycleRunner {
|
||||
throw new IllegalArgumentException("step [" + nextStepKey + "] for index [" + idxMeta.getIndex().getName() +
|
||||
"] with policy [" + indexPolicySetting + "] does not exist");
|
||||
}
|
||||
|
||||
logger.info("moving index [{}] from [{}] to [{}] in policy [{}]",
|
||||
indexName, currentStepKey, nextStepKey, indexPolicySetting);
|
||||
|
||||
return IndexLifecycleRunner.moveClusterStateToNextStep(idxMeta.getIndex(), currentState, currentStepKey,
|
||||
nextStepKey, nowSupplier, forcePhaseDefinitionRefresh);
|
||||
}
|
||||
|
||||
static ClusterState moveClusterStateToNextStep(Index index, ClusterState clusterState, StepKey currentStep, StepKey nextStep,
|
||||
@ -331,7 +372,8 @@ public class IndexLifecycleRunner {
|
||||
}
|
||||
|
||||
static ClusterState moveClusterStateToErrorStep(Index index, ClusterState clusterState, StepKey currentStep, Exception cause,
|
||||
LongSupplier nowSupplier) throws IOException {
|
||||
LongSupplier nowSupplier,
|
||||
BiFunction<IndexMetaData, StepKey, Step> stepLookupFunction) throws IOException {
|
||||
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
|
||||
IndexLifecycleMetadata ilmMeta = clusterState.metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||
LifecyclePolicyMetadata policyMetadata = ilmMeta.getPolicyMetadatas()
|
||||
@ -340,34 +382,67 @@ public class IndexLifecycleRunner {
|
||||
causeXContentBuilder.startObject();
|
||||
ElasticsearchException.generateThrowableXContent(causeXContentBuilder, STACKTRACE_PARAMS, cause);
|
||||
causeXContentBuilder.endObject();
|
||||
LifecycleExecutionState nextStepState = moveExecutionStateToNextStep(policyMetadata,
|
||||
LifecycleExecutionState.fromIndexMetadata(idxMeta), currentStep, new StepKey(currentStep.getPhase(),
|
||||
currentStep.getAction(), ErrorStep.NAME), nowSupplier, false);
|
||||
LifecycleExecutionState currentState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
|
||||
LifecycleExecutionState nextStepState = moveExecutionStateToNextStep(policyMetadata, currentState, currentStep,
|
||||
new StepKey(currentStep.getPhase(), currentStep.getAction(), ErrorStep.NAME), nowSupplier, false);
|
||||
LifecycleExecutionState.Builder failedState = LifecycleExecutionState.builder(nextStepState);
|
||||
failedState.setFailedStep(currentStep.getName());
|
||||
failedState.setStepInfo(BytesReference.bytes(causeXContentBuilder).utf8ToString());
|
||||
Step failedStep = stepLookupFunction.apply(idxMeta, currentStep);
|
||||
if (failedStep != null) {
|
||||
// as an initial step we'll mark the failed step as auto retryable without actually looking at the cause to determine
|
||||
// if the error is transient/recoverable from
|
||||
failedState.setIsAutoRetryableError(failedStep.isRetryable());
|
||||
// maintain the retry count of the failed step as it will be cleared after a successful execution
|
||||
failedState.setFailedStepRetryCount(currentState.getFailedStepRetryCount());
|
||||
} else {
|
||||
logger.warn("failed step [{}] for index [{}] is not part of policy [{}] anymore, or it is invalid",
|
||||
currentStep.getName(), index, policyMetadata.getName());
|
||||
}
|
||||
|
||||
ClusterState.Builder newClusterStateBuilder = newClusterStateWithLifecycleState(index, clusterState, failedState.build());
|
||||
return newClusterStateBuilder.build();
|
||||
}
|
||||
|
||||
ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] indices) {
|
||||
ClusterState moveClusterStateToPreviouslyFailedStep(ClusterState currentState, String index, boolean isAutomaticRetry) {
|
||||
ClusterState newState;
|
||||
IndexMetaData indexMetaData = currentState.metaData().index(index);
|
||||
if (indexMetaData == null) {
|
||||
throw new IllegalArgumentException("index [" + index + "] does not exist");
|
||||
}
|
||||
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData);
|
||||
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState);
|
||||
String failedStep = lifecycleState.getFailedStep();
|
||||
if (currentStepKey != null && ErrorStep.NAME.equals(currentStepKey.getName()) && Strings.isNullOrEmpty(failedStep) == false) {
|
||||
StepKey nextStepKey = new StepKey(currentStepKey.getPhase(), currentStepKey.getAction(), failedStep);
|
||||
validateTransition(indexMetaData, currentStepKey, nextStepKey, stepRegistry);
|
||||
IndexLifecycleMetadata ilmMeta = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||
|
||||
LifecyclePolicyMetadata policyMetadata = ilmMeta.getPolicyMetadatas()
|
||||
.get(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetaData.getSettings()));
|
||||
LifecycleExecutionState nextStepState = moveExecutionStateToNextStep(policyMetadata,
|
||||
lifecycleState, currentStepKey, nextStepKey, nowSupplier, true);
|
||||
LifecycleExecutionState.Builder retryStepState = LifecycleExecutionState.builder(nextStepState);
|
||||
retryStepState.setIsAutoRetryableError(lifecycleState.isAutoRetryableError());
|
||||
Integer currentRetryCount = lifecycleState.getFailedStepRetryCount();
|
||||
if (isAutomaticRetry) {
|
||||
retryStepState.setFailedStepRetryCount(currentRetryCount == null ? 1 : ++currentRetryCount);
|
||||
} else {
|
||||
// manual retries don't update the retry count
|
||||
retryStepState.setFailedStepRetryCount(lifecycleState.getFailedStepRetryCount());
|
||||
}
|
||||
newState = newClusterStateWithLifecycleState(indexMetaData.getIndex(), currentState, retryStepState.build()).build();
|
||||
} else {
|
||||
throw new IllegalArgumentException("cannot retry an action for an index ["
|
||||
+ index + "] that has not encountered an error when running a Lifecycle Policy");
|
||||
}
|
||||
return newState;
|
||||
}
|
||||
|
||||
ClusterState moveClusterStateToPreviouslyFailedStep(ClusterState currentState, String[] indices) {
|
||||
ClusterState newState = currentState;
|
||||
for (String index : indices) {
|
||||
IndexMetaData indexMetaData = currentState.metaData().index(index);
|
||||
if (indexMetaData == null) {
|
||||
throw new IllegalArgumentException("index [" + index + "] does not exist");
|
||||
}
|
||||
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData);
|
||||
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState);
|
||||
String failedStep = lifecycleState.getFailedStep();
|
||||
if (currentStepKey != null && ErrorStep.NAME.equals(currentStepKey.getName())
|
||||
&& Strings.isNullOrEmpty(failedStep) == false) {
|
||||
StepKey nextStepKey = new StepKey(currentStepKey.getPhase(), currentStepKey.getAction(), failedStep);
|
||||
newState = moveClusterStateToStep(index, currentState, currentStepKey, nextStepKey, nowSupplier, stepRegistry, true);
|
||||
} else {
|
||||
throw new IllegalArgumentException("cannot retry an action for an index ["
|
||||
+ index + "] that has not encountered an error when running a Lifecycle Policy");
|
||||
}
|
||||
newState = moveClusterStateToPreviouslyFailedStep(newState, index, false);
|
||||
}
|
||||
return newState;
|
||||
}
|
||||
@ -387,6 +462,8 @@ public class IndexLifecycleRunner {
|
||||
// clear any step info or error-related settings from the current step
|
||||
updatedState.setFailedStep(null);
|
||||
updatedState.setStepInfo(null);
|
||||
updatedState.setIsAutoRetryableError(null);
|
||||
updatedState.setFailedStepRetryCount(null);
|
||||
|
||||
if (currentStep.getPhase().equals(nextStep.getPhase()) == false || forcePhaseDefinitionRefresh) {
|
||||
final String newPhaseDefinition;
|
||||
@ -473,7 +550,7 @@ public class IndexLifecycleRunner {
|
||||
logger.error(new ParameterizedMessage("policy [{}] for index [{}] failed on step [{}]. Moving to ERROR step",
|
||||
policy, index.getName(), currentStepKey), e);
|
||||
clusterService.submitStateUpdateTask("ilm-move-to-error-step",
|
||||
new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier));
|
||||
new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep));
|
||||
}
|
||||
|
||||
private void setStepInfo(Index index, String policy, StepKey currentStepKey, ToXContentObject stepInfo) {
|
||||
|
@ -91,8 +91,8 @@ public class IndexLifecycleService
|
||||
nowSupplier, policyRegistry, false);
|
||||
}
|
||||
|
||||
public ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] indices) {
|
||||
return lifecycleRunner.moveClusterStateToFailedStep(currentState, indices);
|
||||
public ClusterState moveClusterStateToPreviouslyFailedStep(ClusterState currentState, String[] indices) {
|
||||
return lifecycleRunner.moveClusterStateToPreviouslyFailedStep(currentState, indices);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -16,21 +16,25 @@ import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.ilm.Step;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask {
|
||||
private final Index index;
|
||||
private final String policy;
|
||||
private final Step.StepKey currentStepKey;
|
||||
private final BiFunction<IndexMetaData, Step.StepKey, Step> stepLookupFunction;
|
||||
private LongSupplier nowSupplier;
|
||||
private Exception cause;
|
||||
|
||||
public MoveToErrorStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Exception cause, LongSupplier nowSupplier) {
|
||||
public MoveToErrorStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Exception cause, LongSupplier nowSupplier,
|
||||
BiFunction<IndexMetaData, Step.StepKey, Step> stepLookupFunction) {
|
||||
this.index = index;
|
||||
this.policy = policy;
|
||||
this.currentStepKey = currentStepKey;
|
||||
this.cause = cause;
|
||||
this.nowSupplier = nowSupplier;
|
||||
this.stepLookupFunction = stepLookupFunction;
|
||||
}
|
||||
|
||||
Index getIndex() {
|
||||
@ -60,7 +64,8 @@ public class MoveToErrorStepUpdateTask extends ClusterStateUpdateTask {
|
||||
LifecycleExecutionState indexILMData = LifecycleExecutionState.fromIndexMetadata(idxMeta);
|
||||
if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings))
|
||||
&& currentStepKey.equals(IndexLifecycleRunner.getCurrentStepKey(indexILMData))) {
|
||||
return IndexLifecycleRunner.moveClusterStateToErrorStep(index, currentState, currentStepKey, cause, nowSupplier);
|
||||
return IndexLifecycleRunner.moveClusterStateToErrorStep(index, currentState, currentStepKey, cause, nowSupplier,
|
||||
stepLookupFunction);
|
||||
} else {
|
||||
// either the policy has changed or the step is now
|
||||
// not the same as when we submitted the update task. In
|
||||
|
@ -116,6 +116,8 @@ public class TransportExplainLifecycleAction
|
||||
lifecycleState.getAction(),
|
||||
lifecycleState.getStep(),
|
||||
lifecycleState.getFailedStep(),
|
||||
lifecycleState.isAutoRetryableError(),
|
||||
lifecycleState.getFailedStepRetryCount(),
|
||||
lifecycleState.getPhaseTime(),
|
||||
lifecycleState.getActionTime(),
|
||||
lifecycleState.getStepTime(),
|
||||
|
@ -57,7 +57,7 @@ public class TransportRetryAction extends TransportMasterNodeAction<Request, Res
|
||||
new AckedClusterStateUpdateTask<Response>(request, listener) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return indexLifecycleService.moveClusterStateToFailedStep(currentState, request.indices());
|
||||
return indexLifecycleService.moveClusterStateToPreviouslyFailedStep(currentState, request.indices());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.test.ClusterServiceUtils;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.client.NoOpClient;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.ilm.AbstractStepTestCase;
|
||||
@ -50,9 +51,11 @@ import org.elasticsearch.xpack.core.ilm.OperationMode;
|
||||
import org.elasticsearch.xpack.core.ilm.Phase;
|
||||
import org.elasticsearch.xpack.core.ilm.PhaseExecutionInfo;
|
||||
import org.elasticsearch.xpack.core.ilm.RolloverAction;
|
||||
import org.elasticsearch.xpack.core.ilm.RolloverActionTests;
|
||||
import org.elasticsearch.xpack.core.ilm.Step;
|
||||
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
||||
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
|
||||
import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
@ -74,12 +77,18 @@ import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch;
|
||||
import static org.elasticsearch.xpack.core.ilm.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY;
|
||||
import static org.elasticsearch.xpack.core.ilm.LifecyclePolicyTestsUtils.newTestLifecyclePolicy;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
private static final NamedXContentRegistry REGISTRY;
|
||||
@ -147,7 +156,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
Index index = new Index(indexName, indexName + "uuid");
|
||||
indexSteps.put(index, steps);
|
||||
Client client = mock(Client.class);
|
||||
Mockito.when(client.settings()).thenReturn(Settings.EMPTY);
|
||||
when(client.settings()).thenReturn(Settings.EMPTY);
|
||||
return new MockPolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, REGISTRY, client);
|
||||
}
|
||||
|
||||
@ -167,24 +176,76 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
|
||||
public void testRunPolicyErrorStep() {
|
||||
String policyName = "async_action_policy";
|
||||
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
|
||||
MockClusterStateWaitStep step = new MockClusterStateWaitStep(stepKey, null);
|
||||
LifecyclePolicy policy = LifecyclePolicyTests.randomTimeseriesLifecyclePolicyWithAllPhases(policyName);
|
||||
String phaseName = randomFrom(policy.getPhases().keySet());
|
||||
Phase phase = policy.getPhases().get(phaseName);
|
||||
PhaseExecutionInfo phaseExecutionInfo = new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong());
|
||||
String phaseJson = Strings.toString(phaseExecutionInfo);
|
||||
LifecycleAction action = randomFrom(phase.getActions().values());
|
||||
Step step = randomFrom(action.toSteps(new NoOpClient(threadPool), phaseName, null));
|
||||
StepKey stepKey = step.getKey();
|
||||
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L);
|
||||
LifecycleExecutionState.Builder newState = LifecycleExecutionState.builder();
|
||||
newState.setFailedStep(stepKey.getName());
|
||||
newState.setIsAutoRetryableError(false);
|
||||
newState.setPhase(stepKey.getPhase());
|
||||
newState.setAction(stepKey.getAction());
|
||||
newState.setStep(ErrorStep.NAME);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT))
|
||||
newState.setPhaseDefinition(phaseJson);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("test")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName))
|
||||
.putCustom(ILM_CUSTOM_METADATA_KEY, newState.build().asMap())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5))
|
||||
.build();
|
||||
|
||||
runner.runPolicyAfterStateChange(policyName, indexMetaData);
|
||||
|
||||
Mockito.verifyZeroInteractions(clusterService);
|
||||
}
|
||||
|
||||
public void testRunPolicyErrorStepOnRetryableFailedStep() {
|
||||
String policyName = "rollover_policy";
|
||||
String phaseName = "hot";
|
||||
TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after");
|
||||
Map<String, LifecycleAction> actions = new HashMap<>();
|
||||
RolloverAction action = RolloverActionTests.randomInstance();
|
||||
actions.put(RolloverAction.NAME, action);
|
||||
Phase phase = new Phase(phaseName, after, actions);
|
||||
PhaseExecutionInfo phaseExecutionInfo = new PhaseExecutionInfo(policyName, phase, 1, randomNonNegativeLong());
|
||||
String phaseJson = Strings.toString(phaseExecutionInfo);
|
||||
NoOpClient client = new NoOpClient(threadPool);
|
||||
List<Step> waitForRolloverStepList =
|
||||
action.toSteps(client, phaseName, null).stream()
|
||||
.filter(s -> s.getKey().getName().equals(WaitForRolloverReadyStep.NAME))
|
||||
.collect(toList());
|
||||
assertThat(waitForRolloverStepList.size(), is(1));
|
||||
Step waitForRolloverStep = waitForRolloverStepList.get(0);
|
||||
StepKey stepKey = waitForRolloverStep.getKey();
|
||||
|
||||
PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, waitForRolloverStep);
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> 0L);
|
||||
LifecycleExecutionState.Builder newState = LifecycleExecutionState.builder();
|
||||
newState.setFailedStep(stepKey.getName());
|
||||
newState.setIsAutoRetryableError(true);
|
||||
newState.setPhase(stepKey.getPhase());
|
||||
newState.setAction(stepKey.getAction());
|
||||
newState.setStep(ErrorStep.NAME);
|
||||
newState.setPhaseDefinition(phaseJson);
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("test")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName))
|
||||
.putCustom(ILM_CUSTOM_METADATA_KEY, newState.build().asMap())
|
||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5))
|
||||
.build();
|
||||
|
||||
runner.runPeriodicStep(policyName, indexMetaData);
|
||||
|
||||
Mockito.verify(clusterService, times(1)).submitStateUpdateTask(any(), any());
|
||||
}
|
||||
|
||||
public void testRunStateChangePolicyWithNoNextStep() throws Exception {
|
||||
String policyName = "foo";
|
||||
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
|
||||
@ -627,7 +688,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
|
||||
Step.StepKey MOCK_STEP_KEY = new Step.StepKey("mock", "mock", "mock");
|
||||
Client client = mock(Client.class);
|
||||
Mockito.when(client.settings()).thenReturn(Settings.EMPTY);
|
||||
when(client.settings()).thenReturn(Settings.EMPTY);
|
||||
LifecyclePolicy policy = LifecyclePolicyTests.randomTimeseriesLifecyclePolicyWithAllPhases(policyName);
|
||||
LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(policy, Collections.emptyMap(), 1, randomNonNegativeLong());
|
||||
String phaseName = randomFrom(policy.getPhases().keySet());
|
||||
@ -855,6 +916,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
public void testMoveClusterStateToErrorStep() throws IOException {
|
||||
String indexName = "my_index";
|
||||
StepKey currentStep = new StepKey("current_phase", "current_action", "current_step");
|
||||
StepKey nextStepKey = new StepKey("next_phase", "next_action", "next_step");
|
||||
long now = randomNonNegativeLong();
|
||||
Exception cause = new ElasticsearchException("THIS IS AN EXPECTED CAUSE");
|
||||
|
||||
@ -865,12 +927,14 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
ClusterState clusterState = buildClusterState(indexName, Settings.builder(), lifecycleState.build(), Collections.emptyList());
|
||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||
|
||||
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToErrorStep(index, clusterState, currentStep, cause, () -> now);
|
||||
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToErrorStep(index, clusterState, currentStep, cause, () -> now,
|
||||
(idxMeta, stepKey) -> new MockStep(stepKey, nextStepKey));
|
||||
assertClusterStateOnErrorStep(clusterState, index, currentStep, newClusterState, now,
|
||||
"{\"type\":\"exception\",\"reason\":\"THIS IS AN EXPECTED CAUSE\"");
|
||||
|
||||
cause = new IllegalArgumentException("non elasticsearch-exception");
|
||||
newClusterState = IndexLifecycleRunner.moveClusterStateToErrorStep(index, clusterState, currentStep, cause, () -> now);
|
||||
newClusterState = IndexLifecycleRunner.moveClusterStateToErrorStep(index, clusterState, currentStep, cause, () -> now,
|
||||
(idxMeta, stepKey) -> new MockStep(stepKey, nextStepKey));
|
||||
assertClusterStateOnErrorStep(clusterState, index, currentStep, newClusterState, now,
|
||||
"{\"type\":\"illegal_argument_exception\",\"reason\":\"non elasticsearch-exception\",\"stack_trace\":\"");
|
||||
}
|
||||
@ -902,9 +966,11 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
Collections.singletonList(policyMetadata));
|
||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, threadPool, () -> now);
|
||||
ClusterState nextClusterState = runner.moveClusterStateToFailedStep(clusterState, indices);
|
||||
ClusterState nextClusterState = runner.moveClusterStateToPreviouslyFailedStep(clusterState, indices);
|
||||
IndexLifecycleRunnerTests.assertClusterStateOnNextStep(clusterState, index, errorStepKey, failedStepKey,
|
||||
nextClusterState, now);
|
||||
LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(nextClusterState.metaData().index(indexName));
|
||||
assertThat("manual move to failed step should not count as a retry", executionState.getFailedStepRetryCount(), is(nullValue()));
|
||||
}
|
||||
|
||||
public void testMoveClusterStateToFailedStepWithUnknownStep() {
|
||||
@ -937,7 +1003,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
Collections.singletonList(policyMetadata));
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, threadPool, () -> now);
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
() -> runner.moveClusterStateToFailedStep(clusterState, indices));
|
||||
() -> runner.moveClusterStateToPreviouslyFailedStep(clusterState, indices));
|
||||
assertThat(exception.getMessage(), equalTo("step [" + failedStepKey
|
||||
+ "] for index [my_index] with policy [my_policy] does not exist"));
|
||||
}
|
||||
@ -949,7 +1015,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
Collections.emptyList());
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(null, null, threadPool, () -> 0L);
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
() -> runner.moveClusterStateToFailedStep(clusterState, new String[] { invalidIndexName }));
|
||||
() -> runner.moveClusterStateToPreviouslyFailedStep(clusterState, new String[] { invalidIndexName }));
|
||||
assertThat(exception.getMessage(), equalTo("index [" + invalidIndexName + "] does not exist"));
|
||||
}
|
||||
|
||||
@ -972,7 +1038,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), Collections.emptyList());
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, threadPool, () -> now);
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
() -> runner.moveClusterStateToFailedStep(clusterState, indices));
|
||||
() -> runner.moveClusterStateToPreviouslyFailedStep(clusterState, indices));
|
||||
assertThat(exception.getMessage(), equalTo("index [" + indexName + "] is not associated with an Index Lifecycle Policy"));
|
||||
}
|
||||
|
||||
@ -993,11 +1059,44 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), Collections.emptyList());
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, threadPool, () -> now);
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||
() -> runner.moveClusterStateToFailedStep(clusterState, indices));
|
||||
() -> runner.moveClusterStateToPreviouslyFailedStep(clusterState, indices));
|
||||
assertThat(exception.getMessage(), equalTo("cannot retry an action for an index [" + indices[0]
|
||||
+ "] that has not encountered an error when running a Lifecycle Policy"));
|
||||
}
|
||||
|
||||
public void testMoveClusterStateToPreviouslyFailedStepAsAutomaticRetry() {
|
||||
String indexName = "my_index";
|
||||
String policyName = "my_policy";
|
||||
long now = randomNonNegativeLong();
|
||||
StepKey failedStepKey = new StepKey("current_phase", MockAction.NAME, "current_step");
|
||||
StepKey errorStepKey = new StepKey(failedStepKey.getPhase(), failedStepKey.getAction(), ErrorStep.NAME);
|
||||
Step retryableStep = new RetryableMockStep(failedStepKey, null);
|
||||
LifecyclePolicy policy = createPolicy(policyName, failedStepKey, null);
|
||||
LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(policy, Collections.emptyMap(),
|
||||
randomNonNegativeLong(), randomNonNegativeLong());
|
||||
|
||||
PolicyStepsRegistry policyRegistry = createOneStepPolicyStepRegistry(policyName, retryableStep, indexName);
|
||||
Settings.Builder indexSettingsBuilder = Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policyName);
|
||||
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
|
||||
lifecycleState.setPhase(errorStepKey.getPhase());
|
||||
lifecycleState.setPhaseTime(now);
|
||||
lifecycleState.setAction(errorStepKey.getAction());
|
||||
lifecycleState.setActionTime(now);
|
||||
lifecycleState.setStep(errorStepKey.getName());
|
||||
lifecycleState.setStepTime(now);
|
||||
lifecycleState.setFailedStep(failedStepKey.getName());
|
||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(),
|
||||
Collections.singletonList(policyMetadata));
|
||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, threadPool, () -> now);
|
||||
ClusterState nextClusterState = runner.moveClusterStateToPreviouslyFailedStep(clusterState, indexName, true);
|
||||
IndexLifecycleRunnerTests.assertClusterStateOnNextStep(clusterState, index, errorStepKey, failedStepKey,
|
||||
nextClusterState, now);
|
||||
LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(nextClusterState.metaData().index(indexName));
|
||||
assertThat(executionState.getFailedStepRetryCount(), is(1));
|
||||
}
|
||||
|
||||
public void testAddStepInfoToClusterState() throws IOException {
|
||||
String indexName = "my_index";
|
||||
StepKey currentStep = new StepKey("current_phase", "current_action", "current_step");
|
||||
@ -1227,6 +1326,75 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
runner.isReadyToTransitionToThisPhase(policyName, indexMetaData, "phase"));
|
||||
}
|
||||
|
||||
public void testValidateTransitionThrowsExceptionForMissingIndexPolicy() {
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder("index").settings(settings(Version.CURRENT))
|
||||
.numberOfShards(randomIntBetween(1, 5))
|
||||
.numberOfReplicas(randomIntBetween(0, 5))
|
||||
.build();
|
||||
|
||||
StepKey currentStepKey = new StepKey("hot", "action", "firstStep");
|
||||
StepKey nextStepKey = new StepKey("hot", "action", "secondStep");
|
||||
Step currentStep = new MockStep(currentStepKey, nextStepKey);
|
||||
MockPolicyStepsRegistry policyRegistry = createOneStepPolicyStepRegistry("policy", currentStep);
|
||||
|
||||
expectThrows(IllegalArgumentException.class,
|
||||
() -> IndexLifecycleRunner.validateTransition(indexMetaData, currentStepKey, nextStepKey, policyRegistry));
|
||||
}
|
||||
|
||||
public void testValidateTransitionThrowsExceptionIfTheCurrentStepIsIncorrect() {
|
||||
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
|
||||
lifecycleState.setPhase("hot");
|
||||
lifecycleState.setAction("action");
|
||||
lifecycleState.setStep("another_step");
|
||||
String policy = "policy";
|
||||
IndexMetaData indexMetaData = buildIndexMetadata(policy, lifecycleState);
|
||||
|
||||
StepKey currentStepKey = new StepKey("hot", "action", "firstStep");
|
||||
StepKey nextStepKey = new StepKey("hot", "action", "secondStep");
|
||||
Step currentStep = new MockStep(currentStepKey, nextStepKey);
|
||||
MockPolicyStepsRegistry policyRegistry = createOneStepPolicyStepRegistry(policy, currentStep);
|
||||
|
||||
expectThrows(IllegalArgumentException.class,
|
||||
() -> IndexLifecycleRunner.validateTransition(indexMetaData, currentStepKey, nextStepKey, policyRegistry));
|
||||
}
|
||||
|
||||
public void testValidateTransitionThrowsExceptionIfNextStepDoesNotExist() {
|
||||
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
|
||||
lifecycleState.setPhase("hot");
|
||||
lifecycleState.setAction("action");
|
||||
lifecycleState.setStep("firstStep");
|
||||
String policy = "policy";
|
||||
IndexMetaData indexMetaData = buildIndexMetadata(policy, lifecycleState);
|
||||
|
||||
StepKey currentStepKey = new StepKey("hot", "action", "firstStep");
|
||||
StepKey nextStepKey = new StepKey("hot", "action", "secondStep");
|
||||
Step currentStep = new MockStep(currentStepKey, nextStepKey);
|
||||
MockPolicyStepsRegistry policyRegistry = createOneStepPolicyStepRegistry(policy, currentStep);
|
||||
|
||||
expectThrows(IllegalArgumentException.class,
|
||||
() -> IndexLifecycleRunner.validateTransition(indexMetaData, currentStepKey, nextStepKey, policyRegistry));
|
||||
}
|
||||
|
||||
public void testValidateValidTransition() {
|
||||
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
|
||||
lifecycleState.setPhase("hot");
|
||||
lifecycleState.setAction("action");
|
||||
lifecycleState.setStep("firstStep");
|
||||
String policy = "policy";
|
||||
IndexMetaData indexMetaData = buildIndexMetadata(policy, lifecycleState);
|
||||
|
||||
StepKey currentStepKey = new StepKey("hot", "action", "firstStep");
|
||||
StepKey nextStepKey = new StepKey("hot", "action", "secondStep");
|
||||
Step finalStep = new MockStep(nextStepKey, new StepKey("hot", "action", "completed"));
|
||||
MockPolicyStepsRegistry policyRegistry = createOneStepPolicyStepRegistry(policy, finalStep);
|
||||
|
||||
try {
|
||||
IndexLifecycleRunner.validateTransition(indexMetaData, currentStepKey, nextStepKey, policyRegistry);
|
||||
} catch (Exception e) {
|
||||
logger.error(e);
|
||||
fail("validateTransition should not throw exception on valid transitions");
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertIndexNotManagedByILM(ClusterState clusterState, Index index) {
|
||||
MetaData metadata = clusterState.metaData();
|
||||
@ -1304,6 +1472,15 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
assertEquals(null, newLifecycleState.getStepInfo());
|
||||
}
|
||||
|
||||
private IndexMetaData buildIndexMetadata(String policy, LifecycleExecutionState.Builder lifecycleState) {
|
||||
return IndexMetaData.builder("index")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policy))
|
||||
.numberOfShards(randomIntBetween(1, 5))
|
||||
.numberOfReplicas(randomIntBetween(0, 5))
|
||||
.putCustom(ILM_CUSTOM_METADATA_KEY, lifecycleState.build().asMap())
|
||||
.build();
|
||||
}
|
||||
|
||||
private void assertClusterStateOnErrorStep(ClusterState oldClusterState, Index index, StepKey currentStep,
|
||||
ClusterState newClusterState, long now, String expectedCauseValue) throws IOException {
|
||||
assertNotSame(oldClusterState, newClusterState);
|
||||
@ -1573,4 +1750,16 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static final class RetryableMockStep extends MockStep {
|
||||
|
||||
RetryableMockStep(StepKey stepKey, StepKey nextStepKey) {
|
||||
super(stepKey, nextStepKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRetryable() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,12 +19,13 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.ilm.ErrorStep;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState;
|
||||
import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyTests;
|
||||
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
|
||||
import org.elasticsearch.xpack.core.ilm.MockStep;
|
||||
import org.elasticsearch.xpack.core.ilm.OperationMode;
|
||||
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
||||
import org.junit.Before;
|
||||
@ -67,12 +68,14 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase {
|
||||
|
||||
public void testExecuteSuccessfullyMoved() throws IOException {
|
||||
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
|
||||
StepKey nextStepKey = new StepKey("next-phase", "next-action", "next-step-name");
|
||||
long now = randomNonNegativeLong();
|
||||
Exception cause = new ElasticsearchException("THIS IS AN EXPECTED CAUSE");
|
||||
|
||||
setStateToKey(currentStepKey);
|
||||
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now);
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now,
|
||||
(idxMeta, stepKey) -> new MockStep(stepKey, nextStepKey));
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index));
|
||||
StepKey actualKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState);
|
||||
@ -97,7 +100,8 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase {
|
||||
long now = randomNonNegativeLong();
|
||||
Exception cause = new ElasticsearchException("THIS IS AN EXPECTED CAUSE");
|
||||
setStateToKey(notCurrentStepKey);
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now);
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now,
|
||||
(idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step")));
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertThat(newState, sameInstance(clusterState));
|
||||
}
|
||||
@ -108,7 +112,8 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase {
|
||||
Exception cause = new ElasticsearchException("THIS IS AN EXPECTED CAUSE");
|
||||
setStateToKey(currentStepKey);
|
||||
setStatePolicy("not-" + policy);
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now);
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now,
|
||||
(idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step")));
|
||||
ClusterState newState = task.execute(clusterState);
|
||||
assertThat(newState, sameInstance(clusterState));
|
||||
}
|
||||
@ -120,7 +125,8 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase {
|
||||
|
||||
setStateToKey(currentStepKey);
|
||||
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now);
|
||||
MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, cause, () -> now,
|
||||
(idxMeta, stepKey) -> new MockStep(stepKey, new StepKey("next-phase", "action", "step")));
|
||||
Exception expectedException = new RuntimeException();
|
||||
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
|
||||
() -> task.onFailure(randomAlphaOfLength(10), expectedException));
|
||||
@ -134,7 +140,6 @@ public class MoveToErrorStepUpdateTaskTests extends ESTestCase {
|
||||
.metaData(MetaData.builder(clusterState.metaData())
|
||||
.updateSettings(Settings.builder()
|
||||
.put(LifecycleSettings.LIFECYCLE_NAME, policy).build(), index.getName())).build();
|
||||
|
||||
}
|
||||
private void setStateToKey(StepKey stepKey) {
|
||||
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder(
|
||||
|
Loading…
x
Reference in New Issue
Block a user