Fix expected next step being incorrect when executing async action (#34313)
This fixes an issue where an incorrect expected next step is used when checking to execute `AsyncActionStep`s after a cluster state step. It fixes this scenario: - `ExecuteStepsUpdateTask` executes a `ClusterStateWaitStep` or `ClusterStateActionStep` successfully - The next step is also a `ClusterStateWaitStep`, so it loops - The `ClusterStateWaitStep` has a next stepkey (which gets set to the `nextStepKey` in the code) - The `ClusterStateWaitStep` fails the condition, meaning that it will have to wait longer - The `nextStepKey` is now incorrect though, because we did not advance the index's step, and it's not `null` (which is another safe value if there is no step after the `ClusterStateWaitStep`) This fixes the problem by resetting the nextStepKey to null if the condition is not met, since we are not going to advance the step metadata in this case (thereby skipping the `maybeRunAsyncAction` invocation). This commit also tightens up and enhances much of the ILM logging. A lot of logging was missing the index name (making it hard to debug in the presence of multiple indices) and a lot was using the wrong logging level (DEBUG is now actually readable without being a wall of text). Resolves #34297
This commit is contained in:
parent
13d89295c8
commit
9ad2a7fa77
|
@ -219,10 +219,6 @@ public class LifecyclePolicy extends AbstractDiffable<LifecyclePolicy>
|
||||||
steps.add(new InitializePolicyContextStep(InitializePolicyContextStep.KEY, lastStepKey));
|
steps.add(new InitializePolicyContextStep(InitializePolicyContextStep.KEY, lastStepKey));
|
||||||
|
|
||||||
Collections.reverse(steps);
|
Collections.reverse(steps);
|
||||||
logger.trace("STEP COUNT: " + steps.size());
|
|
||||||
for (Step step : steps) {
|
|
||||||
logger.trace(step.getKey() + " -> " + step.getNextStepKey());
|
|
||||||
}
|
|
||||||
|
|
||||||
return steps;
|
return steps;
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,9 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
|
||||||
return startStep;
|
return startStep;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Step.StepKey getNextStepKey() {
|
||||||
|
return nextStepKey;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link Step}s for the current index and policy are executed in succession until the next step to be
|
* {@link Step}s for the current index and policy are executed in succession until the next step to be
|
||||||
|
@ -107,6 +110,8 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
|
||||||
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey());
|
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey());
|
||||||
ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state);
|
ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state);
|
||||||
if (result.isComplete()) {
|
if (result.isComplete()) {
|
||||||
|
logger.trace("[{}] cluster state step condition met successfully ({}) [{}], moving to next step {}",
|
||||||
|
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey());
|
||||||
if (currentStep.getNextStepKey() == null) {
|
if (currentStep.getNextStepKey() == null) {
|
||||||
return state;
|
return state;
|
||||||
} else {
|
} else {
|
||||||
|
@ -114,7 +119,13 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
|
||||||
currentStep.getNextStepKey(), nowSupplier);
|
currentStep.getNextStepKey(), nowSupplier);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.debug("[{}] condition not met ({}), returning existing state", index.getName(), currentStep.getKey());
|
logger.trace("[{}] condition not met ({}) [{}], returning existing state",
|
||||||
|
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey());
|
||||||
|
// We may have executed a step and set "nextStepKey" to
|
||||||
|
// a value, but in this case, since the condition was
|
||||||
|
// not met, we can't advance any way, so don't attempt
|
||||||
|
// to run the current step
|
||||||
|
nextStepKey = null;
|
||||||
ToXContentObject stepInfo = result.getInfomationContext();
|
ToXContentObject stepInfo = result.getInfomationContext();
|
||||||
if (stepInfo == null) {
|
if (stepInfo == null) {
|
||||||
return state;
|
return state;
|
||||||
|
@ -146,6 +157,8 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
|
||||||
if (oldState.equals(newState) == false) {
|
if (oldState.equals(newState) == false) {
|
||||||
IndexMetaData indexMetaData = newState.metaData().index(index);
|
IndexMetaData indexMetaData = newState.metaData().index(index);
|
||||||
if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY && indexMetaData != null) {
|
if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY && indexMetaData != null) {
|
||||||
|
logger.trace("[{}] step sequence starting with {} has completed, running next step {} if it is an async action",
|
||||||
|
index.getName(), startStep.getKey(), nextStepKey);
|
||||||
// After the cluster state has been processed and we have moved
|
// After the cluster state has been processed and we have moved
|
||||||
// to a new step, we need to conditionally execute the step iff
|
// to a new step, we need to conditionally execute the step iff
|
||||||
// it is an `AsyncAction` so that it is executed exactly once.
|
// it is an `AsyncAction` so that it is executed exactly once.
|
||||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.indexlifecycle;
|
||||||
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.action.support.TransportAction;
|
import org.elasticsearch.action.support.TransportAction;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
@ -114,6 +115,8 @@ public class IndexLifecycleRunner {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.trace("[{}] maybe running periodic step ({}) with current step {}",
|
||||||
|
index, currentStep.getClass().getSimpleName(), currentStep.getKey());
|
||||||
// Only phase changing and async wait steps should be run through periodic polling
|
// Only phase changing and async wait steps should be run through periodic polling
|
||||||
if (currentStep instanceof PhaseCompleteStep) {
|
if (currentStep instanceof PhaseCompleteStep) {
|
||||||
// Only proceed to the next step if enough time has elapsed to go into the next phase
|
// Only proceed to the next step if enough time has elapsed to go into the next phase
|
||||||
|
@ -121,12 +124,12 @@ public class IndexLifecycleRunner {
|
||||||
moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey());
|
moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey());
|
||||||
}
|
}
|
||||||
} else if (currentStep instanceof AsyncWaitStep) {
|
} else if (currentStep instanceof AsyncWaitStep) {
|
||||||
logger.debug("running periodic policy with current-step [{}]", currentStep.getKey());
|
logger.debug("[{}] running periodic policy with current-step [{}]", index, currentStep.getKey());
|
||||||
((AsyncWaitStep) currentStep).evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {
|
((AsyncWaitStep) currentStep).evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean conditionMet, ToXContentObject stepInfo) {
|
public void onResponse(boolean conditionMet, ToXContentObject stepInfo) {
|
||||||
logger.debug("cs-change-async-wait-callback, current-step: " + currentStep.getKey());
|
logger.trace("cs-change-async-wait-callback, [{}] current-step: {}", index, currentStep.getKey());
|
||||||
if (conditionMet) {
|
if (conditionMet) {
|
||||||
moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey());
|
moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey());
|
||||||
} else if (stepInfo != null) {
|
} else if (stepInfo != null) {
|
||||||
|
@ -140,7 +143,7 @@ public class IndexLifecycleRunner {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
logger.trace("ignoring non periodic step execution from step transition [{}]", currentStep.getKey());
|
logger.trace("[{}] ignoring non periodic step execution from step transition [{}]", index, currentStep.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -162,17 +165,19 @@ public class IndexLifecycleRunner {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.trace("[{}] maybe running async action step ({}) with current step {}",
|
||||||
|
index, currentStep.getClass().getSimpleName(), currentStep.getKey());
|
||||||
if (currentStep.getKey().equals(expectedStepKey) == false) {
|
if (currentStep.getKey().equals(expectedStepKey) == false) {
|
||||||
throw new IllegalStateException("expected index [" + indexMetaData.getIndex().getName() + "] with policy [" + policy +
|
throw new IllegalStateException("expected index [" + indexMetaData.getIndex().getName() + "] with policy [" + policy +
|
||||||
"] to have current step consistent with provided step key (" + expectedStepKey + ") but it was " + currentStep.getKey());
|
"] to have current step consistent with provided step key (" + expectedStepKey + ") but it was " + currentStep.getKey());
|
||||||
}
|
}
|
||||||
if (currentStep instanceof AsyncActionStep) {
|
if (currentStep instanceof AsyncActionStep) {
|
||||||
logger.debug("running policy with async action step [{}]", currentStep.getKey());
|
logger.debug("[{}] running policy with async action step [{}]", index, currentStep.getKey());
|
||||||
((AsyncActionStep) currentStep).performAction(indexMetaData, currentState, new AsyncActionStep.Listener() {
|
((AsyncActionStep) currentStep).performAction(indexMetaData, currentState, new AsyncActionStep.Listener() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(boolean complete) {
|
public void onResponse(boolean complete) {
|
||||||
logger.debug("cs-change-async-action-callback, current-step: [{}]", currentStep.getKey());
|
logger.trace("cs-change-async-action-callback, [{}], current-step: {}", index, currentStep.getKey());
|
||||||
if (complete && ((AsyncActionStep) currentStep).indexSurvives()) {
|
if (complete && ((AsyncActionStep) currentStep).indexSurvives()) {
|
||||||
moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey());
|
moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey());
|
||||||
}
|
}
|
||||||
|
@ -184,7 +189,7 @@ public class IndexLifecycleRunner {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
logger.trace("ignoring non async action step execution from step transition [{}]", currentStep.getKey());
|
logger.trace("[{}] ignoring non async action step execution from step transition [{}]", index, currentStep.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,6 +225,8 @@ public class IndexLifecycleRunner {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.trace("[{}] maybe running step ({}) after state change: {}",
|
||||||
|
index, currentStep.getClass().getSimpleName(), currentStep.getKey());
|
||||||
if (currentStep instanceof PhaseCompleteStep) {
|
if (currentStep instanceof PhaseCompleteStep) {
|
||||||
// Only proceed to the next step if enough time has elapsed to go into the next phase
|
// Only proceed to the next step if enough time has elapsed to go into the next phase
|
||||||
if (isReadyToTransitionToThisPhase(policy, indexMetaData, currentStep.getNextStepKey().getPhase())) {
|
if (isReadyToTransitionToThisPhase(policy, indexMetaData, currentStep.getNextStepKey().getPhase())) {
|
||||||
|
@ -230,7 +237,7 @@ public class IndexLifecycleRunner {
|
||||||
clusterService.submitStateUpdateTask("ilm-execute-cluster-state-steps",
|
clusterService.submitStateUpdateTask("ilm-execute-cluster-state-steps",
|
||||||
new ExecuteStepsUpdateTask(policy, indexMetaData.getIndex(), currentStep, stepRegistry, this, nowSupplier));
|
new ExecuteStepsUpdateTask(policy, indexMetaData.getIndex(), currentStep, stepRegistry, this, nowSupplier));
|
||||||
} else {
|
} else {
|
||||||
logger.trace("ignoring step execution from cluster state change event [{}]", currentStep.getKey());
|
logger.trace("[{}] ignoring step execution from cluster state change event [{}]", index, currentStep.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,6 +267,7 @@ public class IndexLifecycleRunner {
|
||||||
static Step getCurrentStep(PolicyStepsRegistry stepRegistry, String policy, IndexMetaData indexMetaData,
|
static Step getCurrentStep(PolicyStepsRegistry stepRegistry, String policy, IndexMetaData indexMetaData,
|
||||||
LifecycleExecutionState lifecycleState) {
|
LifecycleExecutionState lifecycleState) {
|
||||||
StepKey currentStepKey = getCurrentStepKey(lifecycleState);
|
StepKey currentStepKey = getCurrentStepKey(lifecycleState);
|
||||||
|
logger.trace("[{}] retrieved current step key: {}", indexMetaData.getIndex().getName(), currentStepKey);
|
||||||
if (currentStepKey == null) {
|
if (currentStepKey == null) {
|
||||||
return stepRegistry.getFirstStep(policy);
|
return stepRegistry.getFirstStep(policy);
|
||||||
} else {
|
} else {
|
||||||
|
@ -438,8 +446,7 @@ public class IndexLifecycleRunner {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void moveToStep(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey) {
|
private void moveToStep(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey) {
|
||||||
logger.debug("moveToStep[" + policy + "] [" + index.getName() + "]" + currentStepKey + " -> "
|
logger.debug("[{}] moving to step [{}] {} -> {}", index.getName(), policy, currentStepKey, nextStepKey);
|
||||||
+ nextStepKey);
|
|
||||||
clusterService.submitStateUpdateTask("ilm-move-to-step",
|
clusterService.submitStateUpdateTask("ilm-move-to-step",
|
||||||
new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, nowSupplier, clusterState ->
|
new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, nowSupplier, clusterState ->
|
||||||
{
|
{
|
||||||
|
@ -451,8 +458,8 @@ public class IndexLifecycleRunner {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void moveToErrorStep(Index index, String policy, StepKey currentStepKey, Exception e) {
|
private void moveToErrorStep(Index index, String policy, StepKey currentStepKey, Exception e) {
|
||||||
logger.error("policy [" + policy + "] for index [" + index.getName() + "] failed on step [" + currentStepKey
|
logger.error(new ParameterizedMessage("policy [{}] for index [{}] failed on step [{}]. Moving to ERROR step",
|
||||||
+ "]. Moving to ERROR step.", e);
|
policy, index.getName(), currentStepKey), e);
|
||||||
clusterService.submitStateUpdateTask("ilm-move-to-error-step",
|
clusterService.submitStateUpdateTask("ilm-move-to-error-step",
|
||||||
new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier));
|
new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier));
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,6 +148,24 @@ public class ExecuteStepsUpdateTaskTests extends ESTestCase {
|
||||||
assertThat(task.execute(clusterState), sameInstance(clusterState));
|
assertThat(task.execute(clusterState), sameInstance(clusterState));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSuccessThenFailureUnsetNextKey() throws IOException {
|
||||||
|
secondStep.setWillComplete(false);
|
||||||
|
setStateToKey(firstStepKey);
|
||||||
|
Step startStep = policyStepsRegistry.getStep(indexMetaData, firstStepKey);
|
||||||
|
long now = randomNonNegativeLong();
|
||||||
|
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now);
|
||||||
|
ClusterState newState = task.execute(clusterState);
|
||||||
|
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index));
|
||||||
|
StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState);
|
||||||
|
assertThat(currentStepKey, equalTo(secondStepKey));
|
||||||
|
assertThat(firstStep.getExecuteCount(), equalTo(1L));
|
||||||
|
assertThat(secondStep.getExecuteCount(), equalTo(1L));
|
||||||
|
assertThat(task.getNextStepKey(), nullValue());
|
||||||
|
assertThat(lifecycleState.getPhaseTime(), nullValue());
|
||||||
|
assertThat(lifecycleState.getActionTime(), nullValue());
|
||||||
|
assertThat(lifecycleState.getStepInfo(), nullValue());
|
||||||
|
}
|
||||||
|
|
||||||
public void testExecuteUntilFirstNonClusterStateStep() throws IOException {
|
public void testExecuteUntilFirstNonClusterStateStep() throws IOException {
|
||||||
setStateToKey(secondStepKey);
|
setStateToKey(secondStepKey);
|
||||||
Step startStep = policyStepsRegistry.getStep(indexMetaData, secondStepKey);
|
Step startStep = policyStepsRegistry.getStep(indexMetaData, secondStepKey);
|
||||||
|
|
Loading…
Reference in New Issue