This commit wraps the calls to retrieve the current step in a try/catch so that the exception does not bubble up. Instead, step info is added containing the exception to the existing step. Semi-related to #49128 (cherry picked from commit 72530f8a7f40ae1fca3704effb38cf92daf29057) Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
This commit is contained in:
parent
97cdfd2848
commit
19780e20ba
|
@ -104,7 +104,14 @@ public class IndexLifecycleRunner {
|
||||||
public void runPeriodicStep(String policy, IndexMetaData indexMetaData) {
|
public void runPeriodicStep(String policy, IndexMetaData indexMetaData) {
|
||||||
String index = indexMetaData.getIndex().getName();
|
String index = indexMetaData.getIndex().getName();
|
||||||
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData);
|
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData);
|
||||||
Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, lifecycleState);
|
final Step currentStep;
|
||||||
|
try {
|
||||||
|
currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, lifecycleState);
|
||||||
|
} catch (Exception e) {
|
||||||
|
markPolicyRetrievalError(policy, indexMetaData.getIndex(), lifecycleState, e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (currentStep == null) {
|
if (currentStep == null) {
|
||||||
if (stepRegistry.policyExists(policy) == false) {
|
if (stepRegistry.policyExists(policy) == false) {
|
||||||
markPolicyDoesNotExist(policy, indexMetaData.getIndex(), lifecycleState);
|
markPolicyDoesNotExist(policy, indexMetaData.getIndex(), lifecycleState);
|
||||||
|
@ -194,7 +201,13 @@ public class IndexLifecycleRunner {
|
||||||
public void maybeRunAsyncAction(ClusterState currentState, IndexMetaData indexMetaData, String policy, StepKey expectedStepKey) {
|
public void maybeRunAsyncAction(ClusterState currentState, IndexMetaData indexMetaData, String policy, StepKey expectedStepKey) {
|
||||||
String index = indexMetaData.getIndex().getName();
|
String index = indexMetaData.getIndex().getName();
|
||||||
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData);
|
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData);
|
||||||
Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, lifecycleState);
|
final Step currentStep;
|
||||||
|
try {
|
||||||
|
currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, lifecycleState);
|
||||||
|
} catch (Exception e) {
|
||||||
|
markPolicyRetrievalError(policy, indexMetaData.getIndex(), lifecycleState, e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (currentStep == null) {
|
if (currentStep == null) {
|
||||||
logger.warn("current step [{}] for index [{}] with policy [{}] is not recognized",
|
logger.warn("current step [{}] for index [{}] with policy [{}] is not recognized",
|
||||||
getCurrentStepKey(lifecycleState), index, policy);
|
getCurrentStepKey(lifecycleState), index, policy);
|
||||||
|
@ -237,7 +250,13 @@ public class IndexLifecycleRunner {
|
||||||
public void runPolicyAfterStateChange(String policy, IndexMetaData indexMetaData) {
|
public void runPolicyAfterStateChange(String policy, IndexMetaData indexMetaData) {
|
||||||
String index = indexMetaData.getIndex().getName();
|
String index = indexMetaData.getIndex().getName();
|
||||||
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData);
|
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData);
|
||||||
Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, lifecycleState);
|
final Step currentStep;
|
||||||
|
try {
|
||||||
|
currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, lifecycleState);
|
||||||
|
} catch (Exception e) {
|
||||||
|
markPolicyRetrievalError(policy, indexMetaData.getIndex(), lifecycleState, e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (currentStep == null) {
|
if (currentStep == null) {
|
||||||
if (stepRegistry.policyExists(policy) == false) {
|
if (stepRegistry.policyExists(policy) == false) {
|
||||||
markPolicyDoesNotExist(policy, indexMetaData.getIndex(), lifecycleState);
|
markPolicyDoesNotExist(policy, indexMetaData.getIndex(), lifecycleState);
|
||||||
|
@ -596,10 +615,14 @@ public class IndexLifecycleRunner {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void markPolicyDoesNotExist(String policyName, Index index, LifecycleExecutionState executionState) {
|
private void markPolicyDoesNotExist(String policyName, Index index, LifecycleExecutionState executionState) {
|
||||||
logger.debug("policy [{}] for index [{}] does not exist, recording this in step_info for this index",
|
markPolicyRetrievalError(policyName, index, executionState,
|
||||||
policyName, index.getName());
|
new IllegalArgumentException("policy [" + policyName + "] does not exist"));
|
||||||
setStepInfo(index, policyName, getCurrentStepKey(executionState),
|
}
|
||||||
new SetStepInfoUpdateTask.ExceptionWrapper(
|
|
||||||
new IllegalArgumentException("policy [" + policyName + "] does not exist")));
|
private void markPolicyRetrievalError(String policyName, Index index, LifecycleExecutionState executionState, Exception e) {
|
||||||
|
logger.debug(
|
||||||
|
new ParameterizedMessage("unable to retrieve policy [{}] for index [{}], recording this in step_info for this index",
|
||||||
|
policyName, index.getName()), e);
|
||||||
|
setStepInfo(index, policyName, getCurrentStepKey(executionState), new SetStepInfoUpdateTask.ExceptionWrapper(e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,6 +72,7 @@ import java.util.SortedMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
@ -356,6 +357,84 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
|
||||||
threadPool.shutdownNow();
|
threadPool.shutdownNow();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testRunPeriodicPolicyWithFailureToReadPolicy() throws Exception {
|
||||||
|
doTestRunPolicyWithFailureToReadPolicy(false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRunStateChangePolicyWithFailureToReadPolicy() throws Exception {
|
||||||
|
doTestRunPolicyWithFailureToReadPolicy(false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRunAsyncActionPolicyWithFailureToReadPolicy() throws Exception {
|
||||||
|
doTestRunPolicyWithFailureToReadPolicy(true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doTestRunPolicyWithFailureToReadPolicy(boolean asyncAction, boolean periodicAction) throws Exception {
|
||||||
|
String policyName = "foo";
|
||||||
|
StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step");
|
||||||
|
StepKey nextStepKey = new StepKey("phase", "action", "next_cluster_state_action_step");
|
||||||
|
MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, nextStepKey);
|
||||||
|
MockClusterStateActionStep nextStep = new MockClusterStateActionStep(nextStepKey, null);
|
||||||
|
MockPolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
|
||||||
|
AtomicBoolean resolved = new AtomicBoolean(false);
|
||||||
|
stepRegistry.setResolver((i, k) -> {
|
||||||
|
resolved.set(true);
|
||||||
|
throw new IllegalArgumentException("fake failure retrieving step");
|
||||||
|
});
|
||||||
|
ThreadPool threadPool = new TestThreadPool("name");
|
||||||
|
LifecycleExecutionState les = LifecycleExecutionState.builder()
|
||||||
|
.setPhase("phase")
|
||||||
|
.setAction("action")
|
||||||
|
.setStep("cluster_state_action_step")
|
||||||
|
.build();
|
||||||
|
IndexMetaData indexMetaData = IndexMetaData.builder("test")
|
||||||
|
.settings(Settings.builder()
|
||||||
|
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||||
|
.put(LifecycleSettings.LIFECYCLE_NAME, policyName))
|
||||||
|
.putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, les.asMap())
|
||||||
|
.build();
|
||||||
|
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||||
|
DiscoveryNode node = clusterService.localNode();
|
||||||
|
IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING);
|
||||||
|
ClusterState state = ClusterState.builder(new ClusterName("cluster"))
|
||||||
|
.metaData(MetaData.builder()
|
||||||
|
.put(indexMetaData, true)
|
||||||
|
.putCustom(IndexLifecycleMetadata.TYPE, ilm))
|
||||||
|
.nodes(DiscoveryNodes.builder()
|
||||||
|
.add(node)
|
||||||
|
.masterNodeId(node.getId())
|
||||||
|
.localNodeId(node.getId()))
|
||||||
|
.build();
|
||||||
|
ClusterServiceUtils.setState(clusterService, state);
|
||||||
|
long stepTime = randomLong();
|
||||||
|
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, threadPool, () -> stepTime);
|
||||||
|
|
||||||
|
ClusterState before = clusterService.state();
|
||||||
|
if (asyncAction) {
|
||||||
|
runner.maybeRunAsyncAction(before, indexMetaData, policyName, stepKey);
|
||||||
|
} else if (periodicAction) {
|
||||||
|
runner.runPeriodicStep(policyName, indexMetaData);
|
||||||
|
} else {
|
||||||
|
runner.runPolicyAfterStateChange(policyName, indexMetaData);
|
||||||
|
}
|
||||||
|
|
||||||
|
// The cluster state can take a few extra milliseconds to update after the steps are executed
|
||||||
|
assertBusy(() -> assertNotEquals(before, clusterService.state()));
|
||||||
|
LifecycleExecutionState newExecutionState = LifecycleExecutionState
|
||||||
|
.fromIndexMetadata(clusterService.state().metaData().index(indexMetaData.getIndex()));
|
||||||
|
assertThat(newExecutionState.getPhase(), equalTo("phase"));
|
||||||
|
assertThat(newExecutionState.getAction(), equalTo("action"));
|
||||||
|
assertThat(newExecutionState.getStep(), equalTo("cluster_state_action_step"));
|
||||||
|
assertThat(step.getExecuteCount(), equalTo(0L));
|
||||||
|
assertThat(nextStep.getExecuteCount(), equalTo(0L));
|
||||||
|
assertThat(newExecutionState.getStepInfo(),
|
||||||
|
containsString("{\"type\":\"illegal_argument_exception\",\"reason\":\"fake failure retrieving step\"}"));
|
||||||
|
clusterService.close();
|
||||||
|
threadPool.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
public void testRunAsyncActionDoesNotRun() {
|
public void testRunAsyncActionDoesNotRun() {
|
||||||
String policyName = "foo";
|
String policyName = "foo";
|
||||||
StepKey stepKey = new StepKey("phase", "action", "async_action_step");
|
StepKey stepKey = new StepKey("phase", "action", "async_action_step");
|
||||||
|
|
Loading…
Reference in New Issue