[7.x] Don't halt policy execution on policy trigger exception… (#49171)

When triggered either by becoming master, a new cluster state, or a
periodic schedule, an ILM policy execution through
`maybeRunAsyncAction`, `runPolicyAfterStateChange`, or
`runPeriodicStep` throwing an exception will cause the loop the
terminate. This means that any indices that would have been processed
after the index where the exception was thrown will not be processed by
ILM.

For most execution this is not a problem because the actual running of
steps is protected by a try/catch that moves the index to the ERROR step
in the event of a problem. If an exception occurs prior to step
execution (for example, in fetching and parsing the current
policy/step) however, it causes the loop termination previously
mentioned.

This commit wraps the invocation of the methods specified above in a
try/catch block that provides better logging and does not bubble the
exception up.
This commit is contained in:
Lee Hinman 2019-11-15 09:22:37 -07:00 committed by GitHub
parent 36dc544819
commit 680436dd0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 165 additions and 26 deletions

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ilm;
import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.SetOnce;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
@ -119,19 +120,35 @@ public class IndexLifecycleService
final LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(idxMeta); final LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
StepKey stepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState); StepKey stepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState);
if (OperationMode.STOPPING == currentMode) { try {
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.getName())) { if (OperationMode.STOPPING == currentMode) {
logger.info("waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]", if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.getName())) {
idxMeta.getIndex().getName(), policyName, stepKey.getName()); logger.info("waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
lifecycleRunner.maybeRunAsyncAction(clusterState, idxMeta, policyName, stepKey); idxMeta.getIndex().getName(), policyName, stepKey.getName());
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop lifecycleRunner.maybeRunAsyncAction(clusterState, idxMeta, policyName, stepKey);
safeToStop = false; // ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
safeToStop = false;
} else {
logger.info("skipping policy execution of step [{}] for index [{}] with policy [{}]" +
" because ILM is stopping",
stepKey == null ? "n/a" : stepKey.getName(), idxMeta.getIndex().getName(), policyName);
}
} else { } else {
logger.info("skipping policy execution of step [{}] for index [{}] with policy [{}] because ILM is stopping", lifecycleRunner.maybeRunAsyncAction(clusterState, idxMeta, policyName, stepKey);
stepKey == null ? "n/a" : stepKey.getName(), idxMeta.getIndex().getName(), policyName);
} }
} else { } catch (Exception e) {
lifecycleRunner.maybeRunAsyncAction(clusterState, idxMeta, policyName, stepKey); if (logger.isTraceEnabled()) {
logger.warn(new ParameterizedMessage("async action execution failed during master election trigger" +
" for index [{}] with policy [{}] in step [{}], lifecycle state: [{}]",
idxMeta.getIndex().getName(), policyName, stepKey, lifecycleState.asMap()), e);
} else {
logger.warn(new ParameterizedMessage("async action execution failed during master election trigger" +
" for index [{}] with policy [{}] in step [{}]",
idxMeta.getIndex().getName(), policyName, stepKey), e);
}
// Don't rethrow the exception, we don't want a failure for one index to be
// called to cause actions not to be triggered for further indices
} }
} }
} }
@ -264,27 +281,42 @@ public class IndexLifecycleService
final LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(idxMeta); final LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
StepKey stepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState); StepKey stepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState);
if (OperationMode.STOPPING == currentMode) { try {
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.getName())) { if (OperationMode.STOPPING == currentMode) {
logger.info("waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]", if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.getName())) {
idxMeta.getIndex().getName(), policyName, stepKey.getName()); logger.info("waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
idxMeta.getIndex().getName(), policyName, stepKey.getName());
if (fromClusterStateChange) {
lifecycleRunner.runPolicyAfterStateChange(policyName, idxMeta);
} else {
lifecycleRunner.runPeriodicStep(policyName, idxMeta);
}
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
safeToStop = false;
} else {
logger.info("skipping policy execution of step [{}] for index [{}] with policy [{}] because ILM is stopping",
stepKey == null ? "n/a" : stepKey.getName(), idxMeta.getIndex().getName(), policyName);
}
} else {
if (fromClusterStateChange) { if (fromClusterStateChange) {
lifecycleRunner.runPolicyAfterStateChange(policyName, idxMeta); lifecycleRunner.runPolicyAfterStateChange(policyName, idxMeta);
} else { } else {
lifecycleRunner.runPeriodicStep(policyName, idxMeta); lifecycleRunner.runPeriodicStep(policyName, idxMeta);
} }
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
safeToStop = false;
} else {
logger.info("skipping policy execution of step [{}] for index [{}] with policy [{}] because ILM is stopping",
stepKey == null ? "n/a" : stepKey.getName(), idxMeta.getIndex().getName(), policyName);
} }
} else { } catch (Exception e) {
if (fromClusterStateChange) { if (logger.isTraceEnabled()) {
lifecycleRunner.runPolicyAfterStateChange(policyName, idxMeta); logger.warn(new ParameterizedMessage("async action execution failed during policy trigger" +
" for index [{}] with policy [{}] in step [{}], lifecycle state: [{}]",
idxMeta.getIndex().getName(), policyName, stepKey, lifecycleState.asMap()), e);
} else { } else {
lifecycleRunner.runPeriodicStep(policyName, idxMeta); logger.warn(new ParameterizedMessage("async action execution failed during policy trigger" +
" for index [{}] with policy [{}] in step [{}]",
idxMeta.getIndex().getName(), policyName, stepKey), e);
} }
// Don't rethrow the exception, we don't want a failure for one index to be
// called to cause actions not to be triggered for further indices
} }
} }
} }

View File

@ -1527,7 +1527,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
assertEquals(newLifecycleState.getStepTime(), newLifecycleState.getStepTime()); assertEquals(newLifecycleState.getStepTime(), newLifecycleState.getStepTime());
} }
private static class MockAsyncActionStep extends AsyncActionStep { static class MockAsyncActionStep extends AsyncActionStep {
private Exception exception; private Exception exception;
private boolean willComplete; private boolean willComplete;
@ -1576,7 +1576,7 @@ public class IndexLifecycleRunnerTests extends ESTestCase {
} }
private static class MockAsyncWaitStep extends AsyncWaitStep { static class MockAsyncWaitStep extends AsyncWaitStep {
private Exception exception; private Exception exception;
private boolean willComplete; private boolean willComplete;

View File

@ -51,7 +51,9 @@ import java.util.Collections;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.node.Node.NODE_MASTER_SETTING; import static org.elasticsearch.node.Node.NODE_MASTER_SETTING;
import static org.elasticsearch.xpack.core.ilm.AbstractStepTestCase.randomStepKey; import static org.elasticsearch.xpack.core.ilm.AbstractStepTestCase.randomStepKey;
@ -300,6 +302,111 @@ public class IndexLifecycleServiceTests extends ESTestCase {
assertTrue(moveToMaintenance.get()); assertTrue(moveToMaintenance.get());
} }
public void testExceptionStillProcessesOtherIndices() {
doTestExceptionStillProcessesOtherIndices(false);
}
public void testExceptionStillProcessesOtherIndicesOnMaster() {
doTestExceptionStillProcessesOtherIndices(true);
}
@SuppressWarnings("unchecked")
public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) {
String policy1 = randomAlphaOfLengthBetween(1, 20);
Step.StepKey i1currentStepKey = randomStepKey();
final Step i1mockStep;
if (useOnMaster) {
i1mockStep = new IndexLifecycleRunnerTests.MockAsyncActionStep(i1currentStepKey, randomStepKey());
} else {
i1mockStep = new IndexLifecycleRunnerTests.MockClusterStateActionStep(i1currentStepKey, randomStepKey());
}
MockAction i1mockAction = new MockAction(Collections.singletonList(i1mockStep));
Phase i1phase = new Phase("phase", TimeValue.ZERO, Collections.singletonMap("action", i1mockAction));
LifecyclePolicy i1policy = newTestLifecyclePolicy(policy1, Collections.singletonMap(i1phase.getName(), i1phase));
Index index1 = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
LifecycleExecutionState.Builder i1lifecycleState = LifecycleExecutionState.builder();
i1lifecycleState.setPhase(i1currentStepKey.getPhase());
i1lifecycleState.setAction(i1currentStepKey.getAction());
i1lifecycleState.setStep(i1currentStepKey.getName());
String policy2 = randomValueOtherThan(policy1, () -> randomAlphaOfLengthBetween(1, 20));
Step.StepKey i2currentStepKey = randomStepKey();
final Step i2mockStep;
if (useOnMaster) {
i2mockStep = new IndexLifecycleRunnerTests.MockAsyncActionStep(i2currentStepKey, randomStepKey());
} else {
i2mockStep = new IndexLifecycleRunnerTests.MockClusterStateActionStep(i2currentStepKey, randomStepKey());
}
MockAction mockAction = new MockAction(Collections.singletonList(i2mockStep));
Phase i2phase = new Phase("phase", TimeValue.ZERO, Collections.singletonMap("action", mockAction));
LifecyclePolicy i2policy = newTestLifecyclePolicy(policy1, Collections.singletonMap(i2phase.getName(), i1phase));
Index index2 = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
LifecycleExecutionState.Builder i2lifecycleState = LifecycleExecutionState.builder();
i2lifecycleState.setPhase(i2currentStepKey.getPhase());
i2lifecycleState.setAction(i2currentStepKey.getAction());
i2lifecycleState.setStep(i2currentStepKey.getName());
CountDownLatch stepLatch = new CountDownLatch(2);
boolean failStep1 = randomBoolean();
if (useOnMaster) {
((IndexLifecycleRunnerTests.MockAsyncActionStep) i1mockStep).setLatch(stepLatch);
((IndexLifecycleRunnerTests.MockAsyncActionStep) i1mockStep)
.setException(failStep1 ? new IllegalArgumentException("forcing a failure for index 1") : null);
((IndexLifecycleRunnerTests.MockAsyncActionStep) i2mockStep).setLatch(stepLatch);
((IndexLifecycleRunnerTests.MockAsyncActionStep) i2mockStep)
.setException(failStep1 ? null : new IllegalArgumentException("forcing a failure for index 2"));
} else {
((IndexLifecycleRunnerTests.MockClusterStateActionStep) i1mockStep).setLatch(stepLatch);
((IndexLifecycleRunnerTests.MockClusterStateActionStep) i1mockStep)
.setException(failStep1 ? new IllegalArgumentException("forcing a failure for index 1") : null);
((IndexLifecycleRunnerTests.MockClusterStateActionStep) i1mockStep).setLatch(stepLatch);
((IndexLifecycleRunnerTests.MockClusterStateActionStep) i1mockStep)
.setException(failStep1 ? null : new IllegalArgumentException("forcing a failure for index 2"));
}
SortedMap<String, LifecyclePolicyMetadata> policyMap = new TreeMap<>();
policyMap.put(policy1, new LifecyclePolicyMetadata(i1policy, Collections.emptyMap(),
randomNonNegativeLong(), randomNonNegativeLong()));
policyMap.put(policy2, new LifecyclePolicyMetadata(i2policy, Collections.emptyMap(),
randomNonNegativeLong(), randomNonNegativeLong()));
IndexMetaData i1indexMetadata = IndexMetaData.builder(index1.getName())
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey(), policy1))
.putCustom(ILM_CUSTOM_METADATA_KEY, i1lifecycleState.build().asMap())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
IndexMetaData i2indexMetadata = IndexMetaData.builder(index2.getName())
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey(), policy1))
.putCustom(ILM_CUSTOM_METADATA_KEY, i2lifecycleState.build().asMap())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder()
.fPut(index1.getName(), i1indexMetadata)
.fPut(index2.getName(), i2indexMetadata);
MetaData metaData = MetaData.builder()
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING))
.indices(indices.build())
.persistentSettings(settings(Version.CURRENT).build())
.build();
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
if (useOnMaster) {
when(clusterService.state()).thenReturn(currentState);
indexLifecycleService.onMaster();
} else {
indexLifecycleService.triggerPolicies(currentState, randomBoolean());
}
try {
stepLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("failure while waiting for step execution", e);
fail("both steps should have been executed, even with an exception");
}
}
public void testTriggeredDifferentJob() { public void testTriggeredDifferentJob() {
Mockito.reset(clusterService); Mockito.reset(clusterService);
SchedulerEngine.Event schedulerEvent = new SchedulerEngine.Event("foo", randomLong(), randomLong()); SchedulerEngine.Event schedulerEvent = new SchedulerEngine.Event("foo", randomLong(), randomLong());