When actions complete the next action is now run

This commit is contained in:
Colin Goodheart-Smithe 2017-11-27 11:07:26 +00:00
parent 152d5d45e5
commit 9ab2121fcf
6 changed files with 175 additions and 20 deletions

View File

@ -110,10 +110,11 @@ public class Phase implements ToXContentObject, Writeable {
public void onSuccess(boolean completed) { public void onSuccess(boolean completed) {
if (completed) { if (completed) {
logger.info("Action [" + firstActionName + "] for index [" + indexName logger.info("Action [" + firstActionName + "] for index [" + indexName
+ "] executed sucessfully but is not yet complete"); + "] complete, moving to next action");
moveToAction(context, indexName, 1);
} else { } else {
logger.info("Action [" + firstActionName + "] for index [" + indexName logger.info("Action [" + firstActionName + "] for index [" + indexName
+ "] complete, moving to next action"); + "] executed sucessfully but is not yet complete");
} }
} }
@ -131,18 +132,29 @@ public class Phase implements ToXContentObject, Writeable {
} }
}); });
} else if (currentActionName.equals(PHASE_COMPLETED) == false) { } else if (currentActionName.equals(PHASE_COMPLETED) == false) {
LifecycleAction currentAction = actions.stream().filter(action -> action.getWriteableName().equals(currentActionName)).findAny() int currentActionIndex = -1;
.orElseThrow(() -> new IllegalStateException("Current action [" + currentActionName + "] not found in phase [" for (int i = 0; i < actions.size(); i++) {
+ getName() + "] for index [" + indexName + "]")); if (actions.get(i).getWriteableName().equals(currentActionName)) {
currentActionIndex = i;
break;
}
}
if (currentActionIndex == -1) {
throw new IllegalStateException("Current action [" + currentActionName + "] not found in phase ["
+ getName() + "] for index [" + indexName + "]");
}
LifecycleAction currentAction = actions.get(currentActionIndex);
final int nextActionIndex = currentActionIndex + 1;
context.executeAction(currentAction, new LifecycleAction.Listener() { context.executeAction(currentAction, new LifecycleAction.Listener() {
@Override @Override
public void onSuccess(boolean completed) { public void onSuccess(boolean completed) {
if (completed) { if (completed) {
logger.info("Action [" + currentActionName + "] for index [" + indexName + "] complete, moving to next action");
moveToAction(context, indexName, nextActionIndex);
} else {
logger.info("Action [" + currentActionName + "] for index [" + indexName logger.info("Action [" + currentActionName + "] for index [" + indexName
+ "] executed sucessfully but is not yet complete"); + "] executed sucessfully but is not yet complete");
} else {
logger.info("Action [" + currentActionName + "] for index [" + indexName + "] complete, moving to next action");
} }
} }
@ -154,6 +166,40 @@ public class Phase implements ToXContentObject, Writeable {
} }
} }
private void moveToAction(IndexLifecycleContext context, String indexName, final int nextActionIndex) {
if (nextActionIndex < actions.size()) {
LifecycleAction nextAction = actions.get(nextActionIndex);
context.setAction(nextAction.getWriteableName(), new Listener() {
@Override
public void onSuccess() {
logger.info("Successfully initialised action [" + nextAction.getWriteableName() + "] in phase [" + getName()
+ "] for index [" + indexName + "]");
execute(context);
}
@Override
public void onFailure(Exception e) {
logger.error("Failed to initialised action [" + nextAction.getWriteableName() + "] in phase [" + getName()
+ "] for index [" + indexName + "]", e);
}
});
} else {
context.setAction(Phase.PHASE_COMPLETED, new Listener() {
@Override
public void onSuccess() {
logger.info("Successfully completed phase [" + getName() + "] for index [" + indexName + "]");
}
@Override
public void onFailure(Exception e) {
logger.error("Failed to complete phase [" + getName() + "] for index [" + indexName + "]", e);
}
});
}
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();

View File

@ -481,6 +481,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase {
}); });
MockAction action = new MockAction(); MockAction action = new MockAction();
action.setCompleteOnExecute(true);
assertFalse(action.wasCompleted()); assertFalse(action.wasCompleted());
assertEquals(0L, action.getExecutedCount()); assertEquals(0L, action.getExecutedCount());

View File

@ -181,7 +181,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
assertEquals(firstPhase.getName(), context.getPhase()); assertEquals(firstPhase.getName(), context.getPhase());
assertEquals(MockAction.NAME, context.getAction()); assertEquals(MockAction.NAME, context.getAction());
assertTrue(firstAction.wasCompleted()); assertFalse(firstAction.wasCompleted());
assertEquals(1L, firstAction.getExecutedCount()); assertEquals(1L, firstAction.getExecutedCount());
assertFalse(secondAction.wasCompleted()); assertFalse(secondAction.wasCompleted());
assertEquals(0L, secondAction.getExecutedCount()); assertEquals(0L, secondAction.getExecutedCount());
@ -281,7 +281,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
assertEquals(firstPhase.getName(), context.getPhase()); assertEquals(firstPhase.getName(), context.getPhase());
assertEquals(MockAction.NAME, context.getAction()); assertEquals(MockAction.NAME, context.getAction());
assertTrue(firstAction.wasCompleted()); assertFalse(firstAction.wasCompleted());
assertEquals(1L, firstAction.getExecutedCount()); assertEquals(1L, firstAction.getExecutedCount());
assertFalse(secondAction.wasCompleted()); assertFalse(secondAction.wasCompleted());
assertEquals(0L, secondAction.getExecutedCount()); assertEquals(0L, secondAction.getExecutedCount());
@ -329,7 +329,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
assertFalse(firstAction.wasCompleted()); assertFalse(firstAction.wasCompleted());
assertEquals(0L, firstAction.getExecutedCount()); assertEquals(0L, firstAction.getExecutedCount());
assertTrue(secondAction.wasCompleted()); assertFalse(secondAction.wasCompleted());
assertEquals(1L, secondAction.getExecutedCount()); assertEquals(1L, secondAction.getExecutedCount());
assertFalse(thirdAction.wasCompleted()); assertFalse(thirdAction.wasCompleted());
assertEquals(0L, thirdAction.getExecutedCount()); assertEquals(0L, thirdAction.getExecutedCount());
@ -377,7 +377,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
assertEquals(0L, firstAction.getExecutedCount()); assertEquals(0L, firstAction.getExecutedCount());
assertFalse(secondAction.wasCompleted()); assertFalse(secondAction.wasCompleted());
assertEquals(0L, secondAction.getExecutedCount()); assertEquals(0L, secondAction.getExecutedCount());
assertTrue(thirdAction.wasCompleted()); assertFalse(thirdAction.wasCompleted());
assertEquals(1L, thirdAction.getExecutedCount()); assertEquals(1L, thirdAction.getExecutedCount());
} }
@ -524,7 +524,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
assertFalse(firstAction.wasCompleted()); assertFalse(firstAction.wasCompleted());
assertEquals(0L, firstAction.getExecutedCount()); assertEquals(0L, firstAction.getExecutedCount());
assertTrue(secondAction.wasCompleted()); assertFalse(secondAction.wasCompleted());
assertEquals(1L, secondAction.getExecutedCount()); assertEquals(1L, secondAction.getExecutedCount());
assertFalse(thirdAction.wasCompleted()); assertFalse(thirdAction.wasCompleted());
assertEquals(0L, thirdAction.getExecutedCount()); assertEquals(0L, thirdAction.getExecutedCount());
@ -626,7 +626,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
assertEquals(0L, firstAction.getExecutedCount()); assertEquals(0L, firstAction.getExecutedCount());
assertFalse(secondAction.wasCompleted()); assertFalse(secondAction.wasCompleted());
assertEquals(0L, secondAction.getExecutedCount()); assertEquals(0L, secondAction.getExecutedCount());
assertTrue(thirdAction.wasCompleted()); assertFalse(thirdAction.wasCompleted());
assertEquals(1L, thirdAction.getExecutedCount()); assertEquals(1L, thirdAction.getExecutedCount());
} }

View File

@ -28,7 +28,7 @@ public class MockAction implements LifecycleAction {
private final SetOnce<Boolean> completed = new SetOnce<>(); private final SetOnce<Boolean> completed = new SetOnce<>();
private final AtomicLong executedCount; private final AtomicLong executedCount;
private Exception exceptionToThrow = null; private Exception exceptionToThrow = null;
private boolean completeOnExecute = true; private boolean completeOnExecute = false;
private static final ConstructingObjectParser<MockAction, Void> PARSER = new ConstructingObjectParser<>(NAME, private static final ConstructingObjectParser<MockAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
a -> new MockAction((Boolean) a[0], (Long) a[1])); a -> new MockAction((Boolean) a[0], (Long) a[1]));

View File

@ -137,6 +137,7 @@ public class MockIndexLifecycleContextTests extends ESTestCase {
}; };
MockAction action = new MockAction(); MockAction action = new MockAction();
action.setCompleteOnExecute(true);
assertFalse(action.wasCompleted()); assertFalse(action.wasCompleted());
assertEquals(0L, action.getExecutedCount()); assertEquals(0L, action.getExecutedCount());

View File

@ -83,7 +83,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
return new Phase(name, after, actions); return new Phase(name, after, actions);
} }
public void testExecuteNewIndex() throws Exception { public void testExecuteNewIndexCompleteActions() throws Exception {
String indexName = randomAlphaOfLengthBetween(1, 20); String indexName = randomAlphaOfLengthBetween(1, 20);
String phaseName = randomAlphaOfLengthBetween(1, 20); String phaseName = randomAlphaOfLengthBetween(1, 20);
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100)); TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100));
@ -94,6 +94,60 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
return "first_action"; return "first_action";
} }
}; };
firstAction.setCompleteOnExecute(true);
actions.add(firstAction);
MockAction secondAction = new MockAction() {
@Override
public String getWriteableName() {
return "second_action";
}
};
secondAction.setCompleteOnExecute(true);
actions.add(secondAction);
MockAction thirdAction = new MockAction() {
@Override
public String getWriteableName() {
return "third_action";
}
};
thirdAction.setCompleteOnExecute(true);
actions.add(thirdAction);
Phase phase = new Phase(phaseName, after, actions);
MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "") {
@Override
public boolean canExecute(Phase phase) {
throw new AssertionError("canExecute should not have been called");
}
};
phase.execute(context);
assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase());
assertEquals(Phase.PHASE_COMPLETED, context.getAction());
assertTrue(firstAction.wasCompleted());
assertEquals(1L, firstAction.getExecutedCount());
assertTrue(secondAction.wasCompleted());
assertEquals(1L, secondAction.getExecutedCount());
assertTrue(thirdAction.wasCompleted());
assertEquals(1L, thirdAction.getExecutedCount());
}
public void testExecuteNewIndexIncompleteFirstAction() throws Exception {
String indexName = randomAlphaOfLengthBetween(1, 20);
String phaseName = randomAlphaOfLengthBetween(1, 20);
TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100));
List<LifecycleAction> actions = new ArrayList<>();
MockAction firstAction = new MockAction() {
@Override
public String getWriteableName() {
return "first_action";
}
};
firstAction.setCompleteOnExecute(false);
actions.add(firstAction); actions.add(firstAction);
MockAction secondAction = new MockAction() { MockAction secondAction = new MockAction() {
@Override @Override
@ -125,7 +179,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
assertEquals(phaseName, context.getPhase()); assertEquals(phaseName, context.getPhase());
assertEquals(firstAction.getWriteableName(), context.getAction()); assertEquals(firstAction.getWriteableName(), context.getAction());
assertTrue(firstAction.wasCompleted()); assertFalse(firstAction.wasCompleted());
assertEquals(1L, firstAction.getExecutedCount()); assertEquals(1L, firstAction.getExecutedCount());
assertFalse(secondAction.wasCompleted()); assertFalse(secondAction.wasCompleted());
assertEquals(0L, secondAction.getExecutedCount()); assertEquals(0L, secondAction.getExecutedCount());
@ -268,6 +322,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
return "first_action"; return "first_action";
} }
}; };
firstAction.setCompleteOnExecute(false);
actions.add(firstAction); actions.add(firstAction);
MockAction secondAction = new MockAction() { MockAction secondAction = new MockAction() {
@Override @Override
@ -275,6 +330,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
return "second_action"; return "second_action";
} }
}; };
secondAction.setCompleteOnExecute(false);
actions.add(secondAction); actions.add(secondAction);
MockAction thirdAction = new MockAction() { MockAction thirdAction = new MockAction() {
@Override @Override
@ -282,6 +338,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
return "third_action"; return "third_action";
} }
}; };
thirdAction.setCompleteOnExecute(false);
actions.add(thirdAction); actions.add(thirdAction);
Phase phase = new Phase(phaseName, after, actions); Phase phase = new Phase(phaseName, after, actions);
@ -299,12 +356,27 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
assertEquals(phaseName, context.getPhase()); assertEquals(phaseName, context.getPhase());
assertEquals(firstAction.getWriteableName(), context.getAction()); assertEquals(firstAction.getWriteableName(), context.getAction());
assertTrue(firstAction.wasCompleted()); assertFalse(firstAction.wasCompleted());
assertEquals(1L, firstAction.getExecutedCount()); assertEquals(1L, firstAction.getExecutedCount());
assertFalse(secondAction.wasCompleted()); assertFalse(secondAction.wasCompleted());
assertEquals(0L, secondAction.getExecutedCount()); assertEquals(0L, secondAction.getExecutedCount());
assertFalse(thirdAction.wasCompleted()); assertFalse(thirdAction.wasCompleted());
assertEquals(0L, thirdAction.getExecutedCount()); assertEquals(0L, thirdAction.getExecutedCount());
firstAction.setCompleteOnExecute(true);
phase.execute(context);
assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase());
assertEquals(secondAction.getWriteableName(), context.getAction());
assertTrue(firstAction.wasCompleted());
assertEquals(2L, firstAction.getExecutedCount());
assertFalse(secondAction.wasCompleted());
assertEquals(1L, secondAction.getExecutedCount());
assertFalse(thirdAction.wasCompleted());
assertEquals(0L, thirdAction.getExecutedCount());
} }
public void testExecuteSecondAction() throws Exception { public void testExecuteSecondAction() throws Exception {
@ -318,6 +390,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
return "first_action"; return "first_action";
} }
}; };
firstAction.setCompleteOnExecute(false);
actions.add(firstAction); actions.add(firstAction);
MockAction secondAction = new MockAction() { MockAction secondAction = new MockAction() {
@Override @Override
@ -325,6 +398,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
return "second_action"; return "second_action";
} }
}; };
secondAction.setCompleteOnExecute(false);
actions.add(secondAction); actions.add(secondAction);
MockAction thirdAction = new MockAction() { MockAction thirdAction = new MockAction() {
@Override @Override
@ -332,6 +406,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
return "third_action"; return "third_action";
} }
}; };
thirdAction.setCompleteOnExecute(false);
actions.add(thirdAction); actions.add(thirdAction);
Phase phase = new Phase(phaseName, after, actions); Phase phase = new Phase(phaseName, after, actions);
@ -351,10 +426,25 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
assertFalse(firstAction.wasCompleted()); assertFalse(firstAction.wasCompleted());
assertEquals(0L, firstAction.getExecutedCount()); assertEquals(0L, firstAction.getExecutedCount());
assertTrue(secondAction.wasCompleted()); assertFalse(secondAction.wasCompleted());
assertEquals(1L, secondAction.getExecutedCount()); assertEquals(1L, secondAction.getExecutedCount());
assertFalse(thirdAction.wasCompleted()); assertFalse(thirdAction.wasCompleted());
assertEquals(0L, thirdAction.getExecutedCount()); assertEquals(0L, thirdAction.getExecutedCount());
secondAction.setCompleteOnExecute(true);
phase.execute(context);
assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase());
assertEquals(thirdAction.getWriteableName(), context.getAction());
assertFalse(firstAction.wasCompleted());
assertEquals(0L, firstAction.getExecutedCount());
assertTrue(secondAction.wasCompleted());
assertEquals(2L, secondAction.getExecutedCount());
assertFalse(thirdAction.wasCompleted());
assertEquals(1L, thirdAction.getExecutedCount());
} }
public void testExecuteThirdAction() throws Exception { public void testExecuteThirdAction() throws Exception {
@ -368,6 +458,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
return "first_action"; return "first_action";
} }
}; };
firstAction.setCompleteOnExecute(false);
actions.add(firstAction); actions.add(firstAction);
MockAction secondAction = new MockAction() { MockAction secondAction = new MockAction() {
@Override @Override
@ -375,6 +466,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
return "second_action"; return "second_action";
} }
}; };
secondAction.setCompleteOnExecute(false);
actions.add(secondAction); actions.add(secondAction);
MockAction thirdAction = new MockAction() { MockAction thirdAction = new MockAction() {
@Override @Override
@ -382,6 +474,7 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
return "third_action"; return "third_action";
} }
}; };
thirdAction.setCompleteOnExecute(false);
actions.add(thirdAction); actions.add(thirdAction);
Phase phase = new Phase(phaseName, after, actions); Phase phase = new Phase(phaseName, after, actions);
@ -395,7 +488,6 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
phase.execute(context); phase.execute(context);
assertEquals(indexName, context.getLifecycleTarget()); assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase()); assertEquals(phaseName, context.getPhase());
assertEquals(thirdAction.getWriteableName(), context.getAction()); assertEquals(thirdAction.getWriteableName(), context.getAction());
@ -404,8 +496,23 @@ public class PhaseTests extends AbstractSerializingTestCase<Phase> {
assertEquals(0L, firstAction.getExecutedCount()); assertEquals(0L, firstAction.getExecutedCount());
assertFalse(secondAction.wasCompleted()); assertFalse(secondAction.wasCompleted());
assertEquals(0L, secondAction.getExecutedCount()); assertEquals(0L, secondAction.getExecutedCount());
assertTrue(thirdAction.wasCompleted()); assertFalse(thirdAction.wasCompleted());
assertEquals(1L, thirdAction.getExecutedCount()); assertEquals(1L, thirdAction.getExecutedCount());
thirdAction.setCompleteOnExecute(true);
phase.execute(context);
assertEquals(indexName, context.getLifecycleTarget());
assertEquals(phaseName, context.getPhase());
assertEquals(Phase.PHASE_COMPLETED, context.getAction());
assertFalse(firstAction.wasCompleted());
assertEquals(0L, firstAction.getExecutedCount());
assertFalse(secondAction.wasCompleted());
assertEquals(0L, secondAction.getExecutedCount());
assertTrue(thirdAction.wasCompleted());
assertEquals(2L, thirdAction.getExecutedCount());
} }
public void testExecuteMissingAction() throws Exception { public void testExecuteMissingAction() throws Exception {