This commits adds a timeout when moving ILM back on to a failed step. In case the master is struggling with processing the cluster update requests these ones will expire (as we'll send them again anyway on the next ILM loop run) ILM more descriptive source messages for cluster updates Use the configured ILM step master timeout setting (cherry picked from commit ff6c5ed16616eadfcddd9c95317d370f0d126583) Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
This commit is contained in:
parent
b8df265b42
commit
c0406f78b7
|
@ -32,6 +32,7 @@ import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
|
||||||
import org.elasticsearch.xpack.ilm.history.ILMHistoryItem;
|
import org.elasticsearch.xpack.ilm.history.ILMHistoryItem;
|
||||||
import org.elasticsearch.xpack.ilm.history.ILMHistoryStore;
|
import org.elasticsearch.xpack.ilm.history.ILMHistoryStore;
|
||||||
|
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.function.LongSupplier;
|
import java.util.function.LongSupplier;
|
||||||
|
|
||||||
import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_ORIGINATION_DATE;
|
import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_ORIGINATION_DATE;
|
||||||
|
@ -204,33 +205,45 @@ class IndexLifecycleRunner {
|
||||||
int currentRetryAttempt = lifecycleState.getFailedStepRetryCount() == null ? 1 : 1 + lifecycleState.getFailedStepRetryCount();
|
int currentRetryAttempt = lifecycleState.getFailedStepRetryCount() == null ? 1 : 1 + lifecycleState.getFailedStepRetryCount();
|
||||||
logger.info("policy [{}] for index [{}] on an error step due to a transitive error, moving back to the failed " +
|
logger.info("policy [{}] for index [{}] on an error step due to a transitive error, moving back to the failed " +
|
||||||
"step [{}] for execution. retry attempt [{}]", policy, index, lifecycleState.getFailedStep(), currentRetryAttempt);
|
"step [{}] for execution. retry attempt [{}]", policy, index, lifecycleState.getFailedStep(), currentRetryAttempt);
|
||||||
clusterService.submitStateUpdateTask("ilm-retry-failed-step", new ClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask(
|
||||||
@Override
|
String.format(Locale.ROOT, "ilm-retry-failed-step {policy [%s], index [%s], failedStep [%s]}", policy, index,
|
||||||
public ClusterState execute(ClusterState currentState) {
|
failedStep.getKey()),
|
||||||
return IndexLifecycleTransition.moveClusterStateToPreviouslyFailedStep(currentState, index,
|
new ClusterStateUpdateTask() {
|
||||||
nowSupplier, stepRegistry, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(String source, Exception e) {
|
public TimeValue timeout() {
|
||||||
logger.error(new ParameterizedMessage("retry execution of step [{}] for index [{}] failed",
|
// we can afford to drop these requests if they timeout as on the next {@link
|
||||||
failedStep.getKey().getName(), index), e);
|
// IndexLifecycleRunner#runPeriodicStep} run the policy will still be in the ERROR step, as we haven't been able
|
||||||
}
|
// to move it back into the failed step, so we'll try again
|
||||||
|
return LifecycleSettings.LIFECYCLE_STEP_MASTER_TIMEOUT_SETTING.get(clusterService.state().metadata().settings());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
if (oldState.equals(newState) == false) {
|
return IndexLifecycleTransition.moveClusterStateToPreviouslyFailedStep(currentState, index,
|
||||||
IndexMetadata newIndexMeta = newState.metadata().index(index);
|
nowSupplier, stepRegistry, true);
|
||||||
Step indexMetaCurrentStep = getCurrentStep(stepRegistry, policy, newIndexMeta);
|
}
|
||||||
StepKey stepKey = indexMetaCurrentStep.getKey();
|
|
||||||
if (stepKey != null && stepKey != TerminalPolicyStep.KEY && newIndexMeta != null) {
|
@Override
|
||||||
logger.trace("policy [{}] for index [{}] was moved back on the failed step for as part of an automatic " +
|
public void onFailure(String source, Exception e) {
|
||||||
"retry. Attempting to execute the failed step [{}] if it's an async action", policy, index, stepKey);
|
logger.error(new ParameterizedMessage("retry execution of step [{}] for index [{}] failed",
|
||||||
maybeRunAsyncAction(newState, newIndexMeta, policy, stepKey);
|
failedStep.getKey().getName(), index), e);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||||
|
if (oldState.equals(newState) == false) {
|
||||||
|
IndexMetadata newIndexMeta = newState.metadata().index(index);
|
||||||
|
Step indexMetaCurrentStep = getCurrentStep(stepRegistry, policy, newIndexMeta);
|
||||||
|
StepKey stepKey = indexMetaCurrentStep.getKey();
|
||||||
|
if (stepKey != null && stepKey != TerminalPolicyStep.KEY && newIndexMeta != null) {
|
||||||
|
logger.trace("policy [{}] for index [{}] was moved back on the failed step for as part of an automatic " +
|
||||||
|
"retry. Attempting to execute the failed step [{}] if it's an async action", policy, index, stepKey);
|
||||||
|
maybeRunAsyncAction(newState, newIndexMeta, policy, stepKey);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
});
|
|
||||||
} else {
|
} else {
|
||||||
logger.debug("policy [{}] for index [{}] on an error step after a terminal error, skipping execution", policy, index);
|
logger.debug("policy [{}] for index [{}] on an error step after a terminal error, skipping execution", policy, index);
|
||||||
}
|
}
|
||||||
|
@ -338,7 +351,7 @@ class IndexLifecycleRunner {
|
||||||
}
|
}
|
||||||
} else if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
|
} else if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
|
||||||
logger.debug("[{}] running policy with current-step [{}]", indexMetadata.getIndex().getName(), currentStep.getKey());
|
logger.debug("[{}] running policy with current-step [{}]", indexMetadata.getIndex().getName(), currentStep.getKey());
|
||||||
clusterService.submitStateUpdateTask("ilm-execute-cluster-state-steps",
|
clusterService.submitStateUpdateTask(String.format(Locale.ROOT, "ilm-execute-cluster-state-steps [%s]", currentStep),
|
||||||
new ExecuteStepsUpdateTask(policy, indexMetadata.getIndex(), currentStep, stepRegistry, this, nowSupplier));
|
new ExecuteStepsUpdateTask(policy, indexMetadata.getIndex(), currentStep, stepRegistry, this, nowSupplier));
|
||||||
} else {
|
} else {
|
||||||
logger.trace("[{}] ignoring step execution from cluster state change event [{}]", index, currentStep.getKey());
|
logger.trace("[{}] ignoring step execution from cluster state change event [{}]", index, currentStep.getKey());
|
||||||
|
@ -351,7 +364,9 @@ class IndexLifecycleRunner {
|
||||||
*/
|
*/
|
||||||
private void moveToStep(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey newStepKey) {
|
private void moveToStep(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey newStepKey) {
|
||||||
logger.debug("[{}] moving to step [{}] {} -> {}", index.getName(), policy, currentStepKey, newStepKey);
|
logger.debug("[{}] moving to step [{}] {} -> {}", index.getName(), policy, currentStepKey, newStepKey);
|
||||||
clusterService.submitStateUpdateTask("ilm-move-to-step",
|
clusterService.submitStateUpdateTask(
|
||||||
|
String.format(Locale.ROOT, "ilm-move-to-step {policy [%s], index [%s], currentStep [%s], nextStep [%s]}", policy,
|
||||||
|
index.getName(), currentStepKey, newStepKey),
|
||||||
new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, clusterState ->
|
new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, clusterState ->
|
||||||
{
|
{
|
||||||
IndexMetadata indexMetadata = clusterState.metadata().index(index);
|
IndexMetadata indexMetadata = clusterState.metadata().index(index);
|
||||||
|
@ -368,7 +383,9 @@ class IndexLifecycleRunner {
|
||||||
private void moveToErrorStep(Index index, String policy, Step.StepKey currentStepKey, Exception e) {
|
private void moveToErrorStep(Index index, String policy, Step.StepKey currentStepKey, Exception e) {
|
||||||
logger.error(new ParameterizedMessage("policy [{}] for index [{}] failed on step [{}]. Moving to ERROR step",
|
logger.error(new ParameterizedMessage("policy [{}] for index [{}] failed on step [{}]. Moving to ERROR step",
|
||||||
policy, index.getName(), currentStepKey), e);
|
policy, index.getName(), currentStepKey), e);
|
||||||
clusterService.submitStateUpdateTask("ilm-move-to-error-step",
|
clusterService.submitStateUpdateTask(
|
||||||
|
String.format(Locale.ROOT, "ilm-move-to-error-step {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(),
|
||||||
|
currentStepKey),
|
||||||
new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, clusterState -> {
|
new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, clusterState -> {
|
||||||
IndexMetadata indexMetadata = clusterState.metadata().index(index);
|
IndexMetadata indexMetadata = clusterState.metadata().index(index);
|
||||||
registerFailedOperation(indexMetadata, e);
|
registerFailedOperation(indexMetadata, e);
|
||||||
|
@ -379,8 +396,11 @@ class IndexLifecycleRunner {
|
||||||
* Set step info for the given index inside of its {@link LifecycleExecutionState} without
|
* Set step info for the given index inside of its {@link LifecycleExecutionState} without
|
||||||
* changing other execution state.
|
* changing other execution state.
|
||||||
*/
|
*/
|
||||||
private void setStepInfo(Index index, String policy, Step.StepKey currentStepKey, ToXContentObject stepInfo) {
|
private void setStepInfo(Index index, String policy, @Nullable Step.StepKey currentStepKey, ToXContentObject stepInfo) {
|
||||||
clusterService.submitStateUpdateTask("ilm-set-step-info", new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo));
|
clusterService.submitStateUpdateTask(
|
||||||
|
String.format(Locale.ROOT, "ilm-set-step-info {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(),
|
||||||
|
currentStepKey),
|
||||||
|
new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -622,8 +622,11 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
|
|
||||||
runner.runPolicyAfterStateChange(policyName, indexMetadata);
|
runner.runPolicyAfterStateChange(policyName, indexMetadata);
|
||||||
|
|
||||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-execute-cluster-state-steps"),
|
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(
|
||||||
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step)));
|
Mockito.eq("ilm-execute-cluster-state-steps [{\"phase\":\"phase\",\"action\":\"action\"," +
|
||||||
|
"\"name\":\"cluster_state_action_step\"} => null]"),
|
||||||
|
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step))
|
||||||
|
);
|
||||||
Mockito.verifyNoMoreInteractions(clusterService);
|
Mockito.verifyNoMoreInteractions(clusterService);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -640,8 +643,11 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
|
|
||||||
runner.runPolicyAfterStateChange(policyName, indexMetadata);
|
runner.runPolicyAfterStateChange(policyName, indexMetadata);
|
||||||
|
|
||||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-execute-cluster-state-steps"),
|
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(
|
||||||
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step)));
|
Mockito.eq("ilm-execute-cluster-state-steps [{\"phase\":\"phase\",\"action\":\"action\"," +
|
||||||
|
"\"name\":\"cluster_state_action_step\"} => null]"),
|
||||||
|
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step))
|
||||||
|
);
|
||||||
Mockito.verifyNoMoreInteractions(clusterService);
|
Mockito.verifyNoMoreInteractions(clusterService);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -690,7 +696,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
|
||||||
// verify that no exception is thrown
|
// verify that no exception is thrown
|
||||||
runner.runPolicyAfterStateChange(policyName, indexMetadata);
|
runner.runPolicyAfterStateChange(policyName, indexMetadata);
|
||||||
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-set-step-info"),
|
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(
|
||||||
|
Mockito.eq("ilm-set-step-info {policy [cluster_state_action_policy], index [my_index], currentStep [null]}"),
|
||||||
Mockito.argThat(new SetStepInfoUpdateTaskMatcher(indexMetadata.getIndex(), policyName, null,
|
Mockito.argThat(new SetStepInfoUpdateTaskMatcher(indexMetadata.getIndex(), policyName, null,
|
||||||
(builder, params) -> {
|
(builder, params) -> {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
|
@ -698,7 +705,8 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
builder.field("type", "illegal_argument_exception");
|
builder.field("type", "illegal_argument_exception");
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
return builder;
|
return builder;
|
||||||
})));
|
}))
|
||||||
|
);
|
||||||
Mockito.verifyNoMoreInteractions(clusterService);
|
Mockito.verifyNoMoreInteractions(clusterService);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue