mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-20 03:45:02 +00:00
[ILM] fix retry so it picks up latest policy and executes async action (#35406)
Before, moving to a failed step would only change the step info to be that of the failed step. This means two things. 1. Async Steps would never be triggered to execute 2. If there are inherent problems with the action definition that can be fixed with a policy update, these changes were not being reflected by the new execution info. Changes now 1. Async steps are executed after the move to the failed step in cluster state 2. the lifecycle execution info's phase definition is updated from the current latest policy definition, even though the index isn't moving to a new phase. Closes #35397.
This commit is contained in:
parent
40ca62c298
commit
16cbbab7b7
@ -30,6 +30,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.Phase;
|
|||||||
import org.elasticsearch.xpack.core.indexlifecycle.ReadOnlyAction;
|
import org.elasticsearch.xpack.core.indexlifecycle.ReadOnlyAction;
|
||||||
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
|
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
|
||||||
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction;
|
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction;
|
||||||
|
import org.elasticsearch.xpack.core.indexlifecycle.ShrinkStep;
|
||||||
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||||
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;
|
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -178,6 +179,41 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
|||||||
assertBusy(() -> assertFalse(indexExists(shrunkenOriginalIndex)));
|
assertBusy(() -> assertFalse(indexExists(shrunkenOriginalIndex)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testRetryFailedShrinkAction() throws Exception {
|
||||||
|
int numShards = 6;
|
||||||
|
int divisor = randomFrom(2, 3, 6);
|
||||||
|
int expectedFinalShards = numShards / divisor;
|
||||||
|
String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index;
|
||||||
|
createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
|
||||||
|
createNewSingletonPolicy("warm", new ShrinkAction(numShards + randomIntBetween(1, numShards)));
|
||||||
|
updatePolicy(index, policy);
|
||||||
|
assertBusy(() -> {
|
||||||
|
String failedStep = getFailedStepForIndex(index);
|
||||||
|
assertThat(failedStep, equalTo(ShrinkStep.NAME));
|
||||||
|
});
|
||||||
|
|
||||||
|
// update policy to be correct
|
||||||
|
createNewSingletonPolicy("warm", new ShrinkAction(expectedFinalShards));
|
||||||
|
updatePolicy(index, policy);
|
||||||
|
|
||||||
|
// retry step
|
||||||
|
Request retryRequest = new Request("POST", index + "/_ilm/retry");
|
||||||
|
assertOK(client().performRequest(retryRequest));
|
||||||
|
|
||||||
|
// assert corrected policy is picked up and index is shrunken
|
||||||
|
assertBusy(() -> {
|
||||||
|
logger.error(explainIndex(index));
|
||||||
|
assertTrue(indexExists(shrunkenIndex));
|
||||||
|
assertTrue(aliasExists(shrunkenIndex, index));
|
||||||
|
Map<String, Object> settings = getOnlyIndexSettings(shrunkenIndex);
|
||||||
|
assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(TerminalPolicyStep.KEY));
|
||||||
|
assertThat(settings.get(IndexMetaData.SETTING_NUMBER_OF_SHARDS), equalTo(String.valueOf(expectedFinalShards)));
|
||||||
|
assertThat(settings.get(IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.getKey()), equalTo("true"));
|
||||||
|
});
|
||||||
|
expectThrows(ResponseException.class, this::indexDocument);
|
||||||
|
}
|
||||||
|
|
||||||
public void testRolloverAction() throws Exception {
|
public void testRolloverAction() throws Exception {
|
||||||
String originalIndex = index + "-000001";
|
String originalIndex = index + "-000001";
|
||||||
String secondIndex = index + "-000002";
|
String secondIndex = index + "-000002";
|
||||||
|
@ -101,7 +101,7 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
|
|||||||
return state;
|
return state;
|
||||||
} else {
|
} else {
|
||||||
state = IndexLifecycleRunner.moveClusterStateToNextStep(index, state, currentStep.getKey(),
|
state = IndexLifecycleRunner.moveClusterStateToNextStep(index, state, currentStep.getKey(),
|
||||||
currentStep.getNextStepKey(), nowSupplier);
|
currentStep.getNextStepKey(), nowSupplier, false);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// cluster state wait step so evaluate the
|
// cluster state wait step so evaluate the
|
||||||
@ -125,7 +125,7 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
|
|||||||
return state;
|
return state;
|
||||||
} else {
|
} else {
|
||||||
state = IndexLifecycleRunner.moveClusterStateToNextStep(index, state, currentStep.getKey(),
|
state = IndexLifecycleRunner.moveClusterStateToNextStep(index, state, currentStep.getKey(),
|
||||||
currentStep.getNextStepKey(), nowSupplier);
|
currentStep.getNextStepKey(), nowSupplier, false);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.trace("[{}] condition not met ({}) [{}], returning existing state",
|
logger.trace("[{}] condition not met ({}) [{}], returning existing state",
|
||||||
|
@ -271,11 +271,13 @@ public class IndexLifecycleRunner {
|
|||||||
* @param nextStepKey The next step to move the index into
|
* @param nextStepKey The next step to move the index into
|
||||||
* @param nowSupplier The current-time supplier for updating when steps changed
|
* @param nowSupplier The current-time supplier for updating when steps changed
|
||||||
* @param stepRegistry The steps registry to check a step-key's existence in the index's current policy
|
* @param stepRegistry The steps registry to check a step-key's existence in the index's current policy
|
||||||
|
* @param forcePhaseDefinitionRefresh When true, step information will be recompiled from the latest version of the
|
||||||
|
* policy. Otherwise, existing phase definition is used.
|
||||||
* @return The updated cluster state where the index moved to <code>nextStepKey</code>
|
* @return The updated cluster state where the index moved to <code>nextStepKey</code>
|
||||||
*/
|
*/
|
||||||
static ClusterState moveClusterStateToStep(String indexName, ClusterState currentState, StepKey currentStepKey,
|
static ClusterState moveClusterStateToStep(String indexName, ClusterState currentState, StepKey currentStepKey,
|
||||||
StepKey nextStepKey, LongSupplier nowSupplier,
|
StepKey nextStepKey, LongSupplier nowSupplier,
|
||||||
PolicyStepsRegistry stepRegistry) {
|
PolicyStepsRegistry stepRegistry, boolean forcePhaseDefinitionRefresh) {
|
||||||
IndexMetaData idxMeta = currentState.getMetaData().index(indexName);
|
IndexMetaData idxMeta = currentState.getMetaData().index(indexName);
|
||||||
Settings indexSettings = idxMeta.getSettings();
|
Settings indexSettings = idxMeta.getSettings();
|
||||||
String indexPolicySetting = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings);
|
String indexPolicySetting = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings);
|
||||||
@ -295,18 +297,19 @@ public class IndexLifecycleRunner {
|
|||||||
"] with policy [" + indexPolicySetting + "] does not exist");
|
"] with policy [" + indexPolicySetting + "] does not exist");
|
||||||
}
|
}
|
||||||
|
|
||||||
return IndexLifecycleRunner.moveClusterStateToNextStep(idxMeta.getIndex(), currentState, currentStepKey, nextStepKey, nowSupplier);
|
return IndexLifecycleRunner.moveClusterStateToNextStep(idxMeta.getIndex(), currentState, currentStepKey,
|
||||||
|
nextStepKey, nowSupplier, forcePhaseDefinitionRefresh);
|
||||||
}
|
}
|
||||||
|
|
||||||
static ClusterState moveClusterStateToNextStep(Index index, ClusterState clusterState, StepKey currentStep, StepKey nextStep,
|
static ClusterState moveClusterStateToNextStep(Index index, ClusterState clusterState, StepKey currentStep, StepKey nextStep,
|
||||||
LongSupplier nowSupplier) {
|
LongSupplier nowSupplier, boolean forcePhaseDefinitionRefresh) {
|
||||||
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
|
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
|
||||||
IndexLifecycleMetadata ilmMeta = clusterState.metaData().custom(IndexLifecycleMetadata.TYPE);
|
IndexLifecycleMetadata ilmMeta = clusterState.metaData().custom(IndexLifecycleMetadata.TYPE);
|
||||||
LifecyclePolicyMetadata policyMetadata = ilmMeta.getPolicyMetadatas()
|
LifecyclePolicyMetadata policyMetadata = ilmMeta.getPolicyMetadatas()
|
||||||
.get(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()));
|
.get(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()));
|
||||||
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
|
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
|
||||||
LifecycleExecutionState newLifecycleState = moveExecutionStateToNextStep(policyMetadata,
|
LifecycleExecutionState newLifecycleState = moveExecutionStateToNextStep(policyMetadata,
|
||||||
lifecycleState, currentStep, nextStep, nowSupplier);
|
lifecycleState, currentStep, nextStep, nowSupplier, forcePhaseDefinitionRefresh);
|
||||||
ClusterState.Builder newClusterStateBuilder = newClusterStateWithLifecycleState(index, clusterState, newLifecycleState);
|
ClusterState.Builder newClusterStateBuilder = newClusterStateWithLifecycleState(index, clusterState, newLifecycleState);
|
||||||
|
|
||||||
return newClusterStateBuilder.build();
|
return newClusterStateBuilder.build();
|
||||||
@ -324,7 +327,7 @@ public class IndexLifecycleRunner {
|
|||||||
causeXContentBuilder.endObject();
|
causeXContentBuilder.endObject();
|
||||||
LifecycleExecutionState nextStepState = moveExecutionStateToNextStep(policyMetadata,
|
LifecycleExecutionState nextStepState = moveExecutionStateToNextStep(policyMetadata,
|
||||||
LifecycleExecutionState.fromIndexMetadata(idxMeta), currentStep, new StepKey(currentStep.getPhase(),
|
LifecycleExecutionState.fromIndexMetadata(idxMeta), currentStep, new StepKey(currentStep.getPhase(),
|
||||||
currentStep.getAction(), ErrorStep.NAME), nowSupplier);
|
currentStep.getAction(), ErrorStep.NAME), nowSupplier, false);
|
||||||
LifecycleExecutionState.Builder failedState = LifecycleExecutionState.builder(nextStepState);
|
LifecycleExecutionState.Builder failedState = LifecycleExecutionState.builder(nextStepState);
|
||||||
failedState.setFailedStep(currentStep.getName());
|
failedState.setFailedStep(currentStep.getName());
|
||||||
failedState.setStepInfo(BytesReference.bytes(causeXContentBuilder).utf8ToString());
|
failedState.setStepInfo(BytesReference.bytes(causeXContentBuilder).utf8ToString());
|
||||||
@ -343,9 +346,9 @@ public class IndexLifecycleRunner {
|
|||||||
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState);
|
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState);
|
||||||
String failedStep = lifecycleState.getFailedStep();
|
String failedStep = lifecycleState.getFailedStep();
|
||||||
if (currentStepKey != null && ErrorStep.NAME.equals(currentStepKey.getName())
|
if (currentStepKey != null && ErrorStep.NAME.equals(currentStepKey.getName())
|
||||||
&& Strings.isNullOrEmpty(failedStep) == false) {
|
&& Strings.isNullOrEmpty(failedStep) == false) {
|
||||||
StepKey nextStepKey = new StepKey(currentStepKey.getPhase(), currentStepKey.getAction(), failedStep);
|
StepKey nextStepKey = new StepKey(currentStepKey.getPhase(), currentStepKey.getAction(), failedStep);
|
||||||
newState = moveClusterStateToStep(index, currentState, currentStepKey, nextStepKey, nowSupplier, stepRegistry);
|
newState = moveClusterStateToStep(index, currentState, currentStepKey, nextStepKey, nowSupplier, stepRegistry, true);
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("cannot retry an action for an index ["
|
throw new IllegalArgumentException("cannot retry an action for an index ["
|
||||||
+ index + "] that has not encountered an error when running a Lifecycle Policy");
|
+ index + "] that has not encountered an error when running a Lifecycle Policy");
|
||||||
@ -357,7 +360,8 @@ public class IndexLifecycleRunner {
|
|||||||
private static LifecycleExecutionState moveExecutionStateToNextStep(LifecyclePolicyMetadata policyMetadata,
|
private static LifecycleExecutionState moveExecutionStateToNextStep(LifecyclePolicyMetadata policyMetadata,
|
||||||
LifecycleExecutionState existingState,
|
LifecycleExecutionState existingState,
|
||||||
StepKey currentStep, StepKey nextStep,
|
StepKey currentStep, StepKey nextStep,
|
||||||
LongSupplier nowSupplier) {
|
LongSupplier nowSupplier,
|
||||||
|
boolean forcePhaseDefinitionRefresh) {
|
||||||
long nowAsMillis = nowSupplier.getAsLong();
|
long nowAsMillis = nowSupplier.getAsLong();
|
||||||
LifecycleExecutionState.Builder updatedState = LifecycleExecutionState.builder(existingState);
|
LifecycleExecutionState.Builder updatedState = LifecycleExecutionState.builder(existingState);
|
||||||
updatedState.setPhase(nextStep.getPhase());
|
updatedState.setPhase(nextStep.getPhase());
|
||||||
@ -369,7 +373,7 @@ public class IndexLifecycleRunner {
|
|||||||
updatedState.setFailedStep(null);
|
updatedState.setFailedStep(null);
|
||||||
updatedState.setStepInfo(null);
|
updatedState.setStepInfo(null);
|
||||||
|
|
||||||
if (currentStep.getPhase().equals(nextStep.getPhase()) == false) {
|
if (currentStep.getPhase().equals(nextStep.getPhase()) == false || forcePhaseDefinitionRefresh) {
|
||||||
final String newPhaseDefinition;
|
final String newPhaseDefinition;
|
||||||
final Phase nextPhase;
|
final Phase nextPhase;
|
||||||
if ("new".equals(nextStep.getPhase()) || TerminalPolicyStep.KEY.equals(nextStep)) {
|
if ("new".equals(nextStep.getPhase()) || TerminalPolicyStep.KEY.equals(nextStep)) {
|
||||||
|
@ -84,7 +84,7 @@ public class IndexLifecycleService
|
|||||||
|
|
||||||
public ClusterState moveClusterStateToStep(ClusterState currentState, String indexName, StepKey currentStepKey, StepKey nextStepKey) {
|
public ClusterState moveClusterStateToStep(ClusterState currentState, String indexName, StepKey currentStepKey, StepKey nextStepKey) {
|
||||||
return IndexLifecycleRunner.moveClusterStateToStep(indexName, currentState, currentStepKey, nextStepKey,
|
return IndexLifecycleRunner.moveClusterStateToStep(indexName, currentState, currentStepKey, nextStepKey,
|
||||||
nowSupplier, policyRegistry);
|
nowSupplier, policyRegistry, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] indices) {
|
public ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] indices) {
|
||||||
|
@ -68,7 +68,7 @@ public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask {
|
|||||||
if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings))
|
if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings))
|
||||||
&& currentStepKey.equals(IndexLifecycleRunner.getCurrentStepKey(indexILMData))) {
|
&& currentStepKey.equals(IndexLifecycleRunner.getCurrentStepKey(indexILMData))) {
|
||||||
logger.trace("moving [{}] to next step ({})", index.getName(), nextStepKey);
|
logger.trace("moving [{}] to next step ({})", index.getName(), nextStepKey);
|
||||||
return IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStepKey, nextStepKey, nowSupplier);
|
return IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStepKey, nextStepKey, nowSupplier, false);
|
||||||
} else {
|
} else {
|
||||||
// either the policy has changed or the step is now
|
// either the policy has changed or the step is now
|
||||||
// not the same as when we submitted the update task. In
|
// not the same as when we submitted the update task. In
|
||||||
|
@ -13,11 +13,14 @@ import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
|||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState;
|
||||||
|
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
|
||||||
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction;
|
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction;
|
||||||
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Request;
|
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Request;
|
||||||
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Response;
|
import org.elasticsearch.xpack.core.indexlifecycle.action.RetryAction.Response;
|
||||||
@ -55,6 +58,22 @@ public class TransportRetryAction extends TransportMasterNodeAction<Request, Res
|
|||||||
return indexLifecycleService.moveClusterStateToFailedStep(currentState, request.indices());
|
return indexLifecycleService.moveClusterStateToFailedStep(currentState, request.indices());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||||
|
for (String index : request.indices()) {
|
||||||
|
IndexMetaData idxMeta = newState.metaData().index(index);
|
||||||
|
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
|
||||||
|
StepKey retryStep = new StepKey(lifecycleState.getPhase(), lifecycleState.getAction(), lifecycleState.getStep());
|
||||||
|
if (idxMeta == null) {
|
||||||
|
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
|
||||||
|
logger.debug("index [" + index + "] has been deleted after moving to step [" +
|
||||||
|
lifecycleState.getStep() + "], skipping async action check");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
indexLifecycleService.maybeRunAsyncAction(newState, idxMeta, retryStep);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Response newResponse(boolean acknowledged) {
|
protected Response newResponse(boolean acknowledged) {
|
||||||
return new Response(acknowledged);
|
return new Response(acknowledged);
|
||||||
|
@ -668,7 +668,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||||||
.put(LifecycleSettings.LIFECYCLE_NAME, policy.getName()), LifecycleExecutionState.builder().build(), policyMetadatas);
|
.put(LifecycleSettings.LIFECYCLE_NAME, policy.getName()), LifecycleExecutionState.builder().build(), policyMetadatas);
|
||||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||||
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep,
|
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep,
|
||||||
() -> now);
|
() -> now, false);
|
||||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||||
|
|
||||||
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
|
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
|
||||||
@ -684,7 +684,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||||||
|
|
||||||
clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), policyMetadatas);
|
clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), policyMetadatas);
|
||||||
index = clusterState.metaData().index(indexName).getIndex();
|
index = clusterState.metaData().index(indexName).getIndex();
|
||||||
newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep, () -> now);
|
newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep, () -> now, false);
|
||||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -698,7 +698,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||||||
Collections.emptyList());
|
Collections.emptyList());
|
||||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||||
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep,
|
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep,
|
||||||
() -> now);
|
() -> now, false);
|
||||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||||
|
|
||||||
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
|
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
|
||||||
@ -711,7 +711,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||||||
|
|
||||||
clusterState = buildClusterState(indexName, Settings.builder(), lifecycleState.build(), Collections.emptyList());
|
clusterState = buildClusterState(indexName, Settings.builder(), lifecycleState.build(), Collections.emptyList());
|
||||||
index = clusterState.metaData().index(indexName).getIndex();
|
index = clusterState.metaData().index(indexName).getIndex();
|
||||||
newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep, () -> now);
|
newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep, () -> now, false);
|
||||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -725,7 +725,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||||||
Collections.emptyList());
|
Collections.emptyList());
|
||||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||||
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep,
|
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep,
|
||||||
() -> now);
|
() -> now, false);
|
||||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||||
|
|
||||||
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
|
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
|
||||||
@ -737,7 +737,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||||||
}
|
}
|
||||||
clusterState = buildClusterState(indexName, Settings.builder(), lifecycleState.build(), Collections.emptyList());
|
clusterState = buildClusterState(indexName, Settings.builder(), lifecycleState.build(), Collections.emptyList());
|
||||||
index = clusterState.metaData().index(indexName).getIndex();
|
index = clusterState.metaData().index(indexName).getIndex();
|
||||||
newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep, () -> now);
|
newClusterState = IndexLifecycleRunner.moveClusterStateToNextStep(index, clusterState, currentStep, nextStep, () -> now, false);
|
||||||
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
assertClusterStateOnNextStep(clusterState, index, currentStep, nextStep, newClusterState, now);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -764,7 +764,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), policyMetadatas);
|
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), policyMetadatas);
|
||||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||||
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToStep(indexName, clusterState, currentStepKey,
|
ClusterState newClusterState = IndexLifecycleRunner.moveClusterStateToStep(indexName, clusterState, currentStepKey,
|
||||||
nextStepKey, () -> now, stepRegistry);
|
nextStepKey, () -> now, stepRegistry, false);
|
||||||
assertClusterStateOnNextStep(clusterState, index, currentStepKey, nextStepKey, newClusterState, now);
|
assertClusterStateOnNextStep(clusterState, index, currentStepKey, nextStepKey, newClusterState, now);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -786,7 +786,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), Collections.emptyList());
|
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), Collections.emptyList());
|
||||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||||
() -> IndexLifecycleRunner.moveClusterStateToStep(indexName, clusterState, currentStepKey,
|
() -> IndexLifecycleRunner.moveClusterStateToStep(indexName, clusterState, currentStepKey,
|
||||||
nextStepKey, () -> now, stepRegistry));
|
nextStepKey, () -> now, stepRegistry, false));
|
||||||
assertThat(exception.getMessage(), equalTo("index [my_index] is not associated with an Index Lifecycle Policy"));
|
assertThat(exception.getMessage(), equalTo("index [my_index] is not associated with an Index Lifecycle Policy"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -809,7 +809,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), Collections.emptyList());
|
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), Collections.emptyList());
|
||||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||||
() -> IndexLifecycleRunner.moveClusterStateToStep(indexName, clusterState, notCurrentStepKey,
|
() -> IndexLifecycleRunner.moveClusterStateToStep(indexName, clusterState, notCurrentStepKey,
|
||||||
nextStepKey, () -> now, stepRegistry));
|
nextStepKey, () -> now, stepRegistry, false));
|
||||||
assertThat(exception.getMessage(), equalTo("index [my_index] is not on current step " +
|
assertThat(exception.getMessage(), equalTo("index [my_index] is not on current step " +
|
||||||
"[{\"phase\":\"not_current_phase\",\"action\":\"not_current_action\",\"name\":\"not_current_step\"}]"));
|
"[{\"phase\":\"not_current_phase\",\"action\":\"not_current_action\",\"name\":\"not_current_step\"}]"));
|
||||||
}
|
}
|
||||||
@ -832,7 +832,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), Collections.emptyList());
|
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), Collections.emptyList());
|
||||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||||
() -> IndexLifecycleRunner.moveClusterStateToStep(indexName, clusterState, currentStepKey,
|
() -> IndexLifecycleRunner.moveClusterStateToStep(indexName, clusterState, currentStepKey,
|
||||||
nextStepKey, () -> now, stepRegistry));
|
nextStepKey, () -> now, stepRegistry, false));
|
||||||
assertThat(exception.getMessage(),
|
assertThat(exception.getMessage(),
|
||||||
equalTo("step [{\"phase\":\"next_phase\",\"action\":\"next_action\",\"name\":\"next_step\"}] " +
|
equalTo("step [{\"phase\":\"next_phase\",\"action\":\"next_action\",\"name\":\"next_step\"}] " +
|
||||||
"for index [my_index] with policy [my_policy] does not exist"));
|
"for index [my_index] with policy [my_policy] does not exist"));
|
||||||
@ -866,18 +866,26 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||||||
String[] indices = new String[] { indexName };
|
String[] indices = new String[] { indexName };
|
||||||
String policyName = "my_policy";
|
String policyName = "my_policy";
|
||||||
long now = randomNonNegativeLong();
|
long now = randomNonNegativeLong();
|
||||||
StepKey failedStepKey = new StepKey("current_phase", "current_action", "current_step");
|
StepKey failedStepKey = new StepKey("current_phase", MockAction.NAME, "current_step");
|
||||||
StepKey errorStepKey = new StepKey(failedStepKey.getPhase(), failedStepKey.getAction(), ErrorStep.NAME);
|
StepKey errorStepKey = new StepKey(failedStepKey.getPhase(), failedStepKey.getAction(), ErrorStep.NAME);
|
||||||
Step step = new MockStep(failedStepKey, null);
|
Step step = new MockStep(failedStepKey, null);
|
||||||
|
LifecyclePolicy policy = createPolicy(policyName, failedStepKey, null);
|
||||||
|
LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(policy, Collections.emptyMap(),
|
||||||
|
randomNonNegativeLong(), randomNonNegativeLong());
|
||||||
|
|
||||||
PolicyStepsRegistry policyRegistry = createOneStepPolicyStepRegistry(policyName, step, indexName);
|
PolicyStepsRegistry policyRegistry = createOneStepPolicyStepRegistry(policyName, step, indexName);
|
||||||
Settings.Builder indexSettingsBuilder = Settings.builder()
|
Settings.Builder indexSettingsBuilder = Settings.builder()
|
||||||
.put(LifecycleSettings.LIFECYCLE_NAME, policyName);
|
.put(LifecycleSettings.LIFECYCLE_NAME, policyName);
|
||||||
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
|
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
|
||||||
lifecycleState.setPhase(errorStepKey.getPhase());
|
lifecycleState.setPhase(errorStepKey.getPhase());
|
||||||
|
lifecycleState.setPhaseTime(now);
|
||||||
lifecycleState.setAction(errorStepKey.getAction());
|
lifecycleState.setAction(errorStepKey.getAction());
|
||||||
|
lifecycleState.setActionTime(now);
|
||||||
lifecycleState.setStep(errorStepKey.getName());
|
lifecycleState.setStep(errorStepKey.getName());
|
||||||
|
lifecycleState.setStepTime(now);
|
||||||
lifecycleState.setFailedStep(failedStepKey.getName());
|
lifecycleState.setFailedStep(failedStepKey.getName());
|
||||||
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(), Collections.emptyList());
|
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(),
|
||||||
|
Collections.singletonList(policyMetadata));
|
||||||
Index index = clusterState.metaData().index(indexName).getIndex();
|
Index index = clusterState.metaData().index(indexName).getIndex();
|
||||||
IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, () -> now);
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, () -> now);
|
||||||
ClusterState nextClusterState = runner.moveClusterStateToFailedStep(clusterState, indices);
|
ClusterState nextClusterState = runner.moveClusterStateToFailedStep(clusterState, indices);
|
||||||
@ -885,6 +893,41 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
|||||||
nextClusterState, now);
|
nextClusterState, now);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testMoveClusterStateToFailedStepWithUnknownStep() {
|
||||||
|
String indexName = "my_index";
|
||||||
|
String[] indices = new String[] { indexName };
|
||||||
|
String policyName = "my_policy";
|
||||||
|
long now = randomNonNegativeLong();
|
||||||
|
StepKey failedStepKey = new StepKey("current_phase", MockAction.NAME, "current_step");
|
||||||
|
StepKey errorStepKey = new StepKey(failedStepKey.getPhase(), failedStepKey.getAction(), ErrorStep.NAME);
|
||||||
|
|
||||||
|
StepKey registeredStepKey = new StepKey(randomFrom(failedStepKey.getPhase(), "other"),
|
||||||
|
MockAction.NAME, "different_step");
|
||||||
|
Step step = new MockStep(registeredStepKey, null);
|
||||||
|
LifecyclePolicy policy = createPolicy(policyName, failedStepKey, null);
|
||||||
|
LifecyclePolicyMetadata policyMetadata = new LifecyclePolicyMetadata(policy, Collections.emptyMap(),
|
||||||
|
randomNonNegativeLong(), randomNonNegativeLong());
|
||||||
|
|
||||||
|
PolicyStepsRegistry policyRegistry = createOneStepPolicyStepRegistry(policyName, step, indexName);
|
||||||
|
Settings.Builder indexSettingsBuilder = Settings.builder()
|
||||||
|
.put(LifecycleSettings.LIFECYCLE_NAME, policyName);
|
||||||
|
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
|
||||||
|
lifecycleState.setPhase(errorStepKey.getPhase());
|
||||||
|
lifecycleState.setPhaseTime(now);
|
||||||
|
lifecycleState.setAction(errorStepKey.getAction());
|
||||||
|
lifecycleState.setActionTime(now);
|
||||||
|
lifecycleState.setStep(errorStepKey.getName());
|
||||||
|
lifecycleState.setStepTime(now);
|
||||||
|
lifecycleState.setFailedStep(failedStepKey.getName());
|
||||||
|
ClusterState clusterState = buildClusterState(indexName, indexSettingsBuilder, lifecycleState.build(),
|
||||||
|
Collections.singletonList(policyMetadata));
|
||||||
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(policyRegistry, null, () -> now);
|
||||||
|
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
|
||||||
|
() -> runner.moveClusterStateToFailedStep(clusterState, indices));
|
||||||
|
assertThat(exception.getMessage(), equalTo("step [" + failedStepKey
|
||||||
|
+ "] for index [my_index] with policy [my_policy] does not exist"));
|
||||||
|
}
|
||||||
|
|
||||||
public void testMoveClusterStateToFailedStepIndexNotFound() {
|
public void testMoveClusterStateToFailedStepIndexNotFound() {
|
||||||
String existingIndexName = "my_index";
|
String existingIndexName = "my_index";
|
||||||
String invalidIndexName = "does_not_exist";
|
String invalidIndexName = "does_not_exist";
|
||||||
|
Loading…
x
Reference in New Issue
Block a user