diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java index ae663f0ad4f..576ca78c571 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java @@ -32,6 +32,7 @@ import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep; import org.elasticsearch.xpack.ilm.history.ILMHistoryItem; import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; +import java.util.Locale; import java.util.function.LongSupplier; import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_ORIGINATION_DATE; @@ -204,33 +205,45 @@ class IndexLifecycleRunner { int currentRetryAttempt = lifecycleState.getFailedStepRetryCount() == null ? 1 : 1 + lifecycleState.getFailedStepRetryCount(); logger.info("policy [{}] for index [{}] on an error step due to a transitive error, moving back to the failed " + "step [{}] for execution. retry attempt [{}]", policy, index, lifecycleState.getFailedStep(), currentRetryAttempt); - clusterService.submitStateUpdateTask("ilm-retry-failed-step", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - return IndexLifecycleTransition.moveClusterStateToPreviouslyFailedStep(currentState, index, - nowSupplier, stepRegistry, true); - } + clusterService.submitStateUpdateTask( + String.format(Locale.ROOT, "ilm-retry-failed-step {policy [%s], index [%s], failedStep [%s]}", policy, index, + failedStep.getKey()), + new ClusterStateUpdateTask() { - @Override - public void onFailure(String source, Exception e) { - logger.error(new ParameterizedMessage("retry execution of step [{}] for index [{}] failed", - failedStep.getKey().getName(), index), e); - } + @Override + public TimeValue timeout() { + // we can afford to drop these requests if they timeout as on the next {@link + // IndexLifecycleRunner#runPeriodicStep} run the policy will still be in the ERROR step, as we haven't been able + // to move it back into the failed step, so we'll try again + return LifecycleSettings.LIFECYCLE_STEP_MASTER_TIMEOUT_SETTING.get(clusterService.state().metadata().settings()); + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (oldState.equals(newState) == false) { - IndexMetadata newIndexMeta = newState.metadata().index(index); - Step indexMetaCurrentStep = getCurrentStep(stepRegistry, policy, newIndexMeta); - StepKey stepKey = indexMetaCurrentStep.getKey(); - if (stepKey != null && stepKey != TerminalPolicyStep.KEY && newIndexMeta != null) { - logger.trace("policy [{}] for index [{}] was moved back on the failed step for as part of an automatic " + - "retry. Attempting to execute the failed step [{}] if it's an async action", policy, index, stepKey); - maybeRunAsyncAction(newState, newIndexMeta, policy, stepKey); + @Override + public ClusterState execute(ClusterState currentState) { + return IndexLifecycleTransition.moveClusterStateToPreviouslyFailedStep(currentState, index, + nowSupplier, stepRegistry, true); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error(new ParameterizedMessage("retry execution of step [{}] for index [{}] failed", + failedStep.getKey().getName(), index), e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (oldState.equals(newState) == false) { + IndexMetadata newIndexMeta = newState.metadata().index(index); + Step indexMetaCurrentStep = getCurrentStep(stepRegistry, policy, newIndexMeta); + StepKey stepKey = indexMetaCurrentStep.getKey(); + if (stepKey != null && stepKey != TerminalPolicyStep.KEY && newIndexMeta != null) { + logger.trace("policy [{}] for index [{}] was moved back on the failed step for as part of an automatic " + + "retry. Attempting to execute the failed step [{}] if it's an async action", policy, index, stepKey); + maybeRunAsyncAction(newState, newIndexMeta, policy, stepKey); + } } } - } - }); + }); } else { logger.debug("policy [{}] for index [{}] on an error step after a terminal error, skipping execution", policy, index); } @@ -338,7 +351,7 @@ class IndexLifecycleRunner { } } else if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { logger.debug("[{}] running policy with current-step [{}]", indexMetadata.getIndex().getName(), currentStep.getKey()); - clusterService.submitStateUpdateTask("ilm-execute-cluster-state-steps", + clusterService.submitStateUpdateTask(String.format(Locale.ROOT, "ilm-execute-cluster-state-steps [%s]", currentStep), new ExecuteStepsUpdateTask(policy, indexMetadata.getIndex(), currentStep, stepRegistry, this, nowSupplier)); } else { logger.trace("[{}] ignoring step execution from cluster state change event [{}]", index, currentStep.getKey()); @@ -351,7 +364,9 @@ class IndexLifecycleRunner { */ private void moveToStep(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey newStepKey) { logger.debug("[{}] moving to step [{}] {} -> {}", index.getName(), policy, currentStepKey, newStepKey); - clusterService.submitStateUpdateTask("ilm-move-to-step", + clusterService.submitStateUpdateTask( + String.format(Locale.ROOT, "ilm-move-to-step {policy [%s], index [%s], currentStep [%s], nextStep [%s]}", policy, + index.getName(), currentStepKey, newStepKey), new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, clusterState -> { IndexMetadata indexMetadata = clusterState.metadata().index(index); @@ -368,7 +383,9 @@ class IndexLifecycleRunner { private void moveToErrorStep(Index index, String policy, Step.StepKey currentStepKey, Exception e) { logger.error(new ParameterizedMessage("policy [{}] for index [{}] failed on step [{}]. Moving to ERROR step", policy, index.getName(), currentStepKey), e); - clusterService.submitStateUpdateTask("ilm-move-to-error-step", + clusterService.submitStateUpdateTask( + String.format(Locale.ROOT, "ilm-move-to-error-step {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(), + currentStepKey), new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, clusterState -> { IndexMetadata indexMetadata = clusterState.metadata().index(index); registerFailedOperation(indexMetadata, e); @@ -379,8 +396,11 @@ class IndexLifecycleRunner { * Set step info for the given index inside of its {@link LifecycleExecutionState} without * changing other execution state. */ - private void setStepInfo(Index index, String policy, Step.StepKey currentStepKey, ToXContentObject stepInfo) { - clusterService.submitStateUpdateTask("ilm-set-step-info", new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo)); + private void setStepInfo(Index index, String policy, @Nullable Step.StepKey currentStepKey, ToXContentObject stepInfo) { + clusterService.submitStateUpdateTask( + String.format(Locale.ROOT, "ilm-set-step-info {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(), + currentStepKey), + new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo)); } /** diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java index f86a74bb768..f52bc0200bd 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java @@ -622,8 +622,11 @@ public class IndexLifecycleRunnerTests extends ESTestCase { runner.runPolicyAfterStateChange(policyName, indexMetadata); - Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-execute-cluster-state-steps"), - Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step))); + Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask( + Mockito.eq("ilm-execute-cluster-state-steps [{\"phase\":\"phase\",\"action\":\"action\"," + + "\"name\":\"cluster_state_action_step\"} => null]"), + Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step)) + ); Mockito.verifyNoMoreInteractions(clusterService); } @@ -640,8 +643,11 @@ public class IndexLifecycleRunnerTests extends ESTestCase { runner.runPolicyAfterStateChange(policyName, indexMetadata); - Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-execute-cluster-state-steps"), - Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step))); + Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask( + Mockito.eq("ilm-execute-cluster-state-steps [{\"phase\":\"phase\",\"action\":\"action\"," + + "\"name\":\"cluster_state_action_step\"} => null]"), + Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step)) + ); Mockito.verifyNoMoreInteractions(clusterService); } @@ -690,7 +696,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase { .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); // verify that no exception is thrown runner.runPolicyAfterStateChange(policyName, indexMetadata); - Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-set-step-info"), + Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask( + Mockito.eq("ilm-set-step-info {policy [cluster_state_action_policy], index [my_index], currentStep [null]}"), Mockito.argThat(new SetStepInfoUpdateTaskMatcher(indexMetadata.getIndex(), policyName, null, (builder, params) -> { builder.startObject(); @@ -698,7 +705,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase { builder.field("type", "illegal_argument_exception"); builder.endObject(); return builder; - }))); + })) + ); Mockito.verifyNoMoreInteractions(clusterService); }