mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-20 03:45:02 +00:00
Improves handling of exceptions in Index Lifecycle (#3511)
* Improves handling of exceptions in Index Lifecycle This change improves a few different aspects: * If an exception occurs executing the lifecycle of one index it is caught, logged and other indexes are still processed * If the lifecycle policy specified in the settings does not exist an error is logged * Fixes the exception when the delete action is run which occurs because Phase attempts to update the phase and action settings for the deleted index. A `LifecycleAction.indexSurvives()` method is introduced which defaults to `true` but can be overridden to indicate whether the index survives following completion of the action. * Adds test
This commit is contained in:
parent
e3a6e738ea
commit
6a370251fc
@ -57,6 +57,11 @@ public class DeleteAction implements LifecycleAction {
|
|||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean indexSurvives() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(Index index, Client client, ClusterService clusterService, Listener listener) {
|
public void execute(Index index, Client client, ClusterService clusterService, Listener listener) {
|
||||||
client.admin().indices().delete(new DeleteIndexRequest(index.getName()), new ActionListener<DeleteIndexResponse>() {
|
client.admin().indices().delete(new DeleteIndexRequest(index.getName()), new ActionListener<DeleteIndexResponse>() {
|
||||||
|
@ -32,6 +32,10 @@ public interface LifecycleAction extends ToXContentObject, NamedWriteable {
|
|||||||
*/
|
*/
|
||||||
void execute(Index index, Client client, ClusterService clusterService, Listener listener);
|
void execute(Index index, Client client, ClusterService clusterService, Listener listener);
|
||||||
|
|
||||||
|
default boolean indexSurvives() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A callback for when a {@link LifecycleAction} finishes executing
|
* A callback for when a {@link LifecycleAction} finishes executing
|
||||||
*/
|
*/
|
||||||
|
@ -192,9 +192,14 @@ public class Phase implements ToXContentObject, Writeable {
|
|||||||
@Override
|
@Override
|
||||||
public void onSuccess(boolean completed) {
|
public void onSuccess(boolean completed) {
|
||||||
if (completed) {
|
if (completed) {
|
||||||
logger.info("Action [" + actionName + "] for index [" + indexName + "] complete, moving to next action");
|
// Since we completed the current action move to the next
|
||||||
// Since we completed the current action move to the next action
|
// action if the index survives this action
|
||||||
moveToAction(context, indexName, action, nextActionProvider);
|
if (action.indexSurvives()) {
|
||||||
|
logger.info("Action [" + actionName + "] for index [" + indexName + "] complete, moving to next action");
|
||||||
|
moveToNextAction(context, indexName, action, nextActionProvider);
|
||||||
|
} else {
|
||||||
|
logger.info("Action [" + actionName + "] for index [" + indexName + "] complete");
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.info("Action [" + actionName + "] for index [" + indexName + "] executed sucessfully but is not yet complete");
|
logger.info("Action [" + actionName + "] for index [" + indexName + "] executed sucessfully but is not yet complete");
|
||||||
}
|
}
|
||||||
@ -207,7 +212,7 @@ public class Phase implements ToXContentObject, Writeable {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void moveToAction(IndexLifecycleContext context, String indexName, LifecycleAction currentAction,
|
private void moveToNextAction(IndexLifecycleContext context, String indexName, LifecycleAction currentAction,
|
||||||
LifecyclePolicy.NextActionProvider nextActionProvider) {
|
LifecyclePolicy.NextActionProvider nextActionProvider) {
|
||||||
LifecycleAction nextAction = nextActionProvider.next(currentAction);
|
LifecycleAction nextAction = nextActionProvider.next(currentAction);
|
||||||
if (nextAction != null) {
|
if (nextAction != null) {
|
||||||
|
@ -9,6 +9,7 @@ import com.google.common.base.Strings;
|
|||||||
|
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.lucene.util.SetOnce;
|
import org.apache.lucene.util.SetOnce;
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
@ -108,7 +109,16 @@ public class IndexLifecycleService extends AbstractComponent
|
|||||||
if (Strings.isNullOrEmpty(policyName) == false) {
|
if (Strings.isNullOrEmpty(policyName) == false) {
|
||||||
logger.info("Checking index for next action: " + idxMeta.getIndex().getName() + " (" + policyName + ")");
|
logger.info("Checking index for next action: " + idxMeta.getIndex().getName() + " (" + policyName + ")");
|
||||||
LifecyclePolicy policy = policies.get(policyName);
|
LifecyclePolicy policy = policies.get(policyName);
|
||||||
policy.execute(new InternalIndexLifecycleContext(idxMeta.getIndex(), client, clusterService, nowSupplier));
|
if (policy == null) {
|
||||||
|
logger.error("Unknown lifecycle policy [{}] for index [{}]", policyName, idxMeta.getIndex().getName());
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
policy.execute(new InternalIndexLifecycleContext(idxMeta.getIndex(), client, clusterService, nowSupplier));
|
||||||
|
} catch (ElasticsearchException e) {
|
||||||
|
logger.error("Failed to execute lifecycle policy [{}] for index [{}]", policyName, idxMeta.getIndex().getName(),
|
||||||
|
policyName);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -436,6 +436,97 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
|
|||||||
assertEquals(0L, thirdAction.getExecutedCount());
|
assertEquals(0L, thirdAction.getExecutedCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testExecuteFirstActionIndexDoesNotSurvive() throws Exception {
|
||||||
|
String indexName = randomAlphaOfLengthBetween(1, 20);
|
||||||
|
String phaseName = randomAlphaOfLengthBetween(1, 20);
|
||||||
|
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100));
|
||||||
|
Map<String, LifecycleAction> actions = new HashMap<>();
|
||||||
|
MockAction firstAction = new MockAction() {
|
||||||
|
@Override
|
||||||
|
public String getWriteableName() {
|
||||||
|
return "first_action";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean indexSurvives() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
firstAction.setCompleteOnExecute(false);
|
||||||
|
actions.put(firstAction.getWriteableName(), firstAction);
|
||||||
|
MockAction secondAction = new MockAction() {
|
||||||
|
@Override
|
||||||
|
public String getWriteableName() {
|
||||||
|
return "second_action";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
secondAction.setCompleteOnExecute(false);
|
||||||
|
actions.put(secondAction.getWriteableName(), secondAction);
|
||||||
|
MockAction thirdAction = new MockAction() {
|
||||||
|
@Override
|
||||||
|
public String getWriteableName() {
|
||||||
|
return "third_action";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
thirdAction.setCompleteOnExecute(false);
|
||||||
|
actions.put(thirdAction.getWriteableName(), thirdAction);
|
||||||
|
Phase phase = new Phase(phaseName, after, actions);
|
||||||
|
|
||||||
|
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, firstAction.getWriteableName(), 0) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean canExecute(Phase phase) {
|
||||||
|
throw new AssertionError("canExecute should not have been called");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
phase.execute(context, current -> {
|
||||||
|
if (current == null) {
|
||||||
|
return firstAction;
|
||||||
|
} else if ("first_action".equals(current.getWriteableName())) {
|
||||||
|
return secondAction;
|
||||||
|
} else if ("second_action".equals(current.getWriteableName())) {
|
||||||
|
return thirdAction;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
assertEquals(indexName, context.getLifecycleTarget());
|
||||||
|
assertEquals(phaseName, context.getPhase());
|
||||||
|
assertEquals(firstAction.getWriteableName(), context.getAction());
|
||||||
|
|
||||||
|
assertFalse(firstAction.wasCompleted());
|
||||||
|
assertEquals(1L, firstAction.getExecutedCount());
|
||||||
|
assertFalse(secondAction.wasCompleted());
|
||||||
|
assertEquals(0L, secondAction.getExecutedCount());
|
||||||
|
assertFalse(thirdAction.wasCompleted());
|
||||||
|
assertEquals(0L, thirdAction.getExecutedCount());
|
||||||
|
|
||||||
|
firstAction.setCompleteOnExecute(true);
|
||||||
|
|
||||||
|
phase.execute(context, current -> {
|
||||||
|
if (current == null) {
|
||||||
|
return firstAction;
|
||||||
|
} else if ("first_action".equals(current.getWriteableName())) {
|
||||||
|
return secondAction;
|
||||||
|
} else if ("second_action".equals(current.getWriteableName())) {
|
||||||
|
return thirdAction;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
assertEquals(indexName, context.getLifecycleTarget());
|
||||||
|
assertEquals(phaseName, context.getPhase());
|
||||||
|
assertEquals(firstAction.getWriteableName(), context.getAction());
|
||||||
|
|
||||||
|
assertTrue(firstAction.wasCompleted());
|
||||||
|
assertEquals(2L, firstAction.getExecutedCount());
|
||||||
|
assertFalse(secondAction.wasCompleted());
|
||||||
|
assertEquals(0L, secondAction.getExecutedCount());
|
||||||
|
assertFalse(thirdAction.wasCompleted());
|
||||||
|
assertEquals(0L, thirdAction.getExecutedCount());
|
||||||
|
}
|
||||||
|
|
||||||
public void testExecuteSecondAction() throws Exception {
|
public void testExecuteSecondAction() throws Exception {
|
||||||
String indexName = randomAlphaOfLengthBetween(1, 20);
|
String indexName = randomAlphaOfLengthBetween(1, 20);
|
||||||
String phaseName = randomAlphaOfLengthBetween(1, 20);
|
String phaseName = randomAlphaOfLengthBetween(1, 20);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user