From 9ab2121fcf35c4c1149edd47dc64d6bfae76f627 Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Mon, 27 Nov 2017 11:07:26 +0000 Subject: [PATCH] When actions complete the next action is now run --- .../xpack/indexlifecycle/Phase.java | 60 +++++++-- .../InternalIndexLifecycleContextTests.java | 1 + .../indexlifecycle/LifecyclePolicyTests.java | 12 +- .../xpack/indexlifecycle/MockAction.java | 2 +- .../MockIndexLifecycleContextTests.java | 1 + .../xpack/indexlifecycle/PhaseTests.java | 119 +++++++++++++++++- 6 files changed, 175 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/Phase.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/Phase.java index 00d27bb0f5c..3b429f69df9 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/Phase.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/Phase.java @@ -110,10 +110,11 @@ public class Phase implements ToXContentObject, Writeable { public void onSuccess(boolean completed) { if (completed) { logger.info("Action [" + firstActionName + "] for index [" + indexName - + "] executed sucessfully but is not yet complete"); + + "] complete, moving to next action"); + moveToAction(context, indexName, 1); } else { logger.info("Action [" + firstActionName + "] for index [" + indexName - + "] complete, moving to next action"); + + "] executed sucessfully but is not yet complete"); } } @@ -131,18 +132,29 @@ public class Phase implements ToXContentObject, Writeable { } }); } else if (currentActionName.equals(PHASE_COMPLETED) == false) { - LifecycleAction currentAction = actions.stream().filter(action -> action.getWriteableName().equals(currentActionName)).findAny() - .orElseThrow(() -> new IllegalStateException("Current action [" + currentActionName + "] not found in phase [" - + getName() + "] for index [" + indexName + "]")); + int currentActionIndex = -1; + for (int i = 0; i < actions.size(); i++) { + if (actions.get(i).getWriteableName().equals(currentActionName)) { + currentActionIndex = i; + break; + } + } + if (currentActionIndex == -1) { + throw new IllegalStateException("Current action [" + currentActionName + "] not found in phase [" + + getName() + "] for index [" + indexName + "]"); + } + LifecycleAction currentAction = actions.get(currentActionIndex); + final int nextActionIndex = currentActionIndex + 1; context.executeAction(currentAction, new LifecycleAction.Listener() { @Override public void onSuccess(boolean completed) { if (completed) { + logger.info("Action [" + currentActionName + "] for index [" + indexName + "] complete, moving to next action"); + moveToAction(context, indexName, nextActionIndex); + } else { logger.info("Action [" + currentActionName + "] for index [" + indexName + "] executed sucessfully but is not yet complete"); - } else { - logger.info("Action [" + currentActionName + "] for index [" + indexName + "] complete, moving to next action"); } } @@ -154,6 +166,40 @@ public class Phase implements ToXContentObject, Writeable { } } + private void moveToAction(IndexLifecycleContext context, String indexName, final int nextActionIndex) { + if (nextActionIndex < actions.size()) { + LifecycleAction nextAction = actions.get(nextActionIndex); + context.setAction(nextAction.getWriteableName(), new Listener() { + + @Override + public void onSuccess() { + logger.info("Successfully initialised action [" + nextAction.getWriteableName() + "] in phase [" + getName() + + "] for index [" + indexName + "]"); + execute(context); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to initialised action [" + nextAction.getWriteableName() + "] in phase [" + getName() + + "] for index [" + indexName + "]", e); + } + }); + } else { + context.setAction(Phase.PHASE_COMPLETED, new Listener() { + + @Override + public void onSuccess() { + logger.info("Successfully completed phase [" + getName() + "] for index [" + indexName + "]"); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to complete phase [" + getName() + "] for index [" + indexName + "]", e); + } + }); + } + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContextTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContextTests.java index eed2e6a3b14..b9d0e334440 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContextTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/InternalIndexLifecycleContextTests.java @@ -481,6 +481,7 @@ public class InternalIndexLifecycleContextTests extends ESTestCase { }); MockAction action = new MockAction(); + action.setCompleteOnExecute(true); assertFalse(action.wasCompleted()); assertEquals(0L, action.getExecutedCount()); diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java index 7fa1b4994fe..62543dadd5d 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/LifecyclePolicyTests.java @@ -181,7 +181,7 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase completed = new SetOnce<>(); private final AtomicLong executedCount; private Exception exceptionToThrow = null; - private boolean completeOnExecute = true; + private boolean completeOnExecute = false; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, a -> new MockAction((Boolean) a[0], (Long) a[1])); diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/MockIndexLifecycleContextTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/MockIndexLifecycleContextTests.java index 6a2068ba814..a385311ad6e 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/MockIndexLifecycleContextTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/MockIndexLifecycleContextTests.java @@ -137,6 +137,7 @@ public class MockIndexLifecycleContextTests extends ESTestCase { }; MockAction action = new MockAction(); + action.setCompleteOnExecute(true); assertFalse(action.wasCompleted()); assertEquals(0L, action.getExecutedCount()); diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/PhaseTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/PhaseTests.java index 535dbc77930..e9a0d077b8a 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/PhaseTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/PhaseTests.java @@ -83,7 +83,7 @@ public class PhaseTests extends AbstractSerializingTestCase { return new Phase(name, after, actions); } - public void testExecuteNewIndex() throws Exception { + public void testExecuteNewIndexCompleteActions() throws Exception { String indexName = randomAlphaOfLengthBetween(1, 20); String phaseName = randomAlphaOfLengthBetween(1, 20); TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100)); @@ -94,6 +94,60 @@ public class PhaseTests extends AbstractSerializingTestCase { return "first_action"; } }; + firstAction.setCompleteOnExecute(true); + actions.add(firstAction); + MockAction secondAction = new MockAction() { + @Override + public String getWriteableName() { + return "second_action"; + } + }; + secondAction.setCompleteOnExecute(true); + actions.add(secondAction); + MockAction thirdAction = new MockAction() { + @Override + public String getWriteableName() { + return "third_action"; + } + }; + thirdAction.setCompleteOnExecute(true); + actions.add(thirdAction); + Phase phase = new Phase(phaseName, after, actions); + + MockIndexLifecycleContext context = new MockIndexLifecycleContext(indexName, phaseName, "") { + + @Override + public boolean canExecute(Phase phase) { + throw new AssertionError("canExecute should not have been called"); + } + }; + + phase.execute(context); + + assertEquals(indexName, context.getLifecycleTarget()); + assertEquals(phaseName, context.getPhase()); + assertEquals(Phase.PHASE_COMPLETED, context.getAction()); + + assertTrue(firstAction.wasCompleted()); + assertEquals(1L, firstAction.getExecutedCount()); + assertTrue(secondAction.wasCompleted()); + assertEquals(1L, secondAction.getExecutedCount()); + assertTrue(thirdAction.wasCompleted()); + assertEquals(1L, thirdAction.getExecutedCount()); + } + + public void testExecuteNewIndexIncompleteFirstAction() throws Exception { + String indexName = randomAlphaOfLengthBetween(1, 20); + String phaseName = randomAlphaOfLengthBetween(1, 20); + TimeValue after = TimeValue.timeValueSeconds(randomIntBetween(10, 100)); + List actions = new ArrayList<>(); + MockAction firstAction = new MockAction() { + @Override + public String getWriteableName() { + return "first_action"; + } + }; + firstAction.setCompleteOnExecute(false); actions.add(firstAction); MockAction secondAction = new MockAction() { @Override @@ -125,7 +179,7 @@ public class PhaseTests extends AbstractSerializingTestCase { assertEquals(phaseName, context.getPhase()); assertEquals(firstAction.getWriteableName(), context.getAction()); - assertTrue(firstAction.wasCompleted()); + assertFalse(firstAction.wasCompleted()); assertEquals(1L, firstAction.getExecutedCount()); assertFalse(secondAction.wasCompleted()); assertEquals(0L, secondAction.getExecutedCount()); @@ -268,6 +322,7 @@ public class PhaseTests extends AbstractSerializingTestCase { return "first_action"; } }; + firstAction.setCompleteOnExecute(false); actions.add(firstAction); MockAction secondAction = new MockAction() { @Override @@ -275,6 +330,7 @@ public class PhaseTests extends AbstractSerializingTestCase { return "second_action"; } }; + secondAction.setCompleteOnExecute(false); actions.add(secondAction); MockAction thirdAction = new MockAction() { @Override @@ -282,6 +338,7 @@ public class PhaseTests extends AbstractSerializingTestCase { return "third_action"; } }; + thirdAction.setCompleteOnExecute(false); actions.add(thirdAction); Phase phase = new Phase(phaseName, after, actions); @@ -299,12 +356,27 @@ public class PhaseTests extends AbstractSerializingTestCase { assertEquals(phaseName, context.getPhase()); assertEquals(firstAction.getWriteableName(), context.getAction()); - assertTrue(firstAction.wasCompleted()); + assertFalse(firstAction.wasCompleted()); assertEquals(1L, firstAction.getExecutedCount()); assertFalse(secondAction.wasCompleted()); assertEquals(0L, secondAction.getExecutedCount()); assertFalse(thirdAction.wasCompleted()); assertEquals(0L, thirdAction.getExecutedCount()); + + firstAction.setCompleteOnExecute(true); + + phase.execute(context); + + assertEquals(indexName, context.getLifecycleTarget()); + assertEquals(phaseName, context.getPhase()); + assertEquals(secondAction.getWriteableName(), context.getAction()); + + assertTrue(firstAction.wasCompleted()); + assertEquals(2L, firstAction.getExecutedCount()); + assertFalse(secondAction.wasCompleted()); + assertEquals(1L, secondAction.getExecutedCount()); + assertFalse(thirdAction.wasCompleted()); + assertEquals(0L, thirdAction.getExecutedCount()); } public void testExecuteSecondAction() throws Exception { @@ -318,6 +390,7 @@ public class PhaseTests extends AbstractSerializingTestCase { return "first_action"; } }; + firstAction.setCompleteOnExecute(false); actions.add(firstAction); MockAction secondAction = new MockAction() { @Override @@ -325,6 +398,7 @@ public class PhaseTests extends AbstractSerializingTestCase { return "second_action"; } }; + secondAction.setCompleteOnExecute(false); actions.add(secondAction); MockAction thirdAction = new MockAction() { @Override @@ -332,6 +406,7 @@ public class PhaseTests extends AbstractSerializingTestCase { return "third_action"; } }; + thirdAction.setCompleteOnExecute(false); actions.add(thirdAction); Phase phase = new Phase(phaseName, after, actions); @@ -351,10 +426,25 @@ public class PhaseTests extends AbstractSerializingTestCase { assertFalse(firstAction.wasCompleted()); assertEquals(0L, firstAction.getExecutedCount()); - assertTrue(secondAction.wasCompleted()); + assertFalse(secondAction.wasCompleted()); assertEquals(1L, secondAction.getExecutedCount()); assertFalse(thirdAction.wasCompleted()); assertEquals(0L, thirdAction.getExecutedCount()); + + secondAction.setCompleteOnExecute(true); + + phase.execute(context); + + assertEquals(indexName, context.getLifecycleTarget()); + assertEquals(phaseName, context.getPhase()); + assertEquals(thirdAction.getWriteableName(), context.getAction()); + + assertFalse(firstAction.wasCompleted()); + assertEquals(0L, firstAction.getExecutedCount()); + assertTrue(secondAction.wasCompleted()); + assertEquals(2L, secondAction.getExecutedCount()); + assertFalse(thirdAction.wasCompleted()); + assertEquals(1L, thirdAction.getExecutedCount()); } public void testExecuteThirdAction() throws Exception { @@ -368,6 +458,7 @@ public class PhaseTests extends AbstractSerializingTestCase { return "first_action"; } }; + firstAction.setCompleteOnExecute(false); actions.add(firstAction); MockAction secondAction = new MockAction() { @Override @@ -375,6 +466,7 @@ public class PhaseTests extends AbstractSerializingTestCase { return "second_action"; } }; + secondAction.setCompleteOnExecute(false); actions.add(secondAction); MockAction thirdAction = new MockAction() { @Override @@ -382,6 +474,7 @@ public class PhaseTests extends AbstractSerializingTestCase { return "third_action"; } }; + thirdAction.setCompleteOnExecute(false); actions.add(thirdAction); Phase phase = new Phase(phaseName, after, actions); @@ -395,7 +488,6 @@ public class PhaseTests extends AbstractSerializingTestCase { phase.execute(context); - assertEquals(indexName, context.getLifecycleTarget()); assertEquals(phaseName, context.getPhase()); assertEquals(thirdAction.getWriteableName(), context.getAction()); @@ -404,8 +496,23 @@ public class PhaseTests extends AbstractSerializingTestCase { assertEquals(0L, firstAction.getExecutedCount()); assertFalse(secondAction.wasCompleted()); assertEquals(0L, secondAction.getExecutedCount()); - assertTrue(thirdAction.wasCompleted()); + assertFalse(thirdAction.wasCompleted()); assertEquals(1L, thirdAction.getExecutedCount()); + + thirdAction.setCompleteOnExecute(true); + + phase.execute(context); + + assertEquals(indexName, context.getLifecycleTarget()); + assertEquals(phaseName, context.getPhase()); + assertEquals(Phase.PHASE_COMPLETED, context.getAction()); + + assertFalse(firstAction.wasCompleted()); + assertEquals(0L, firstAction.getExecutedCount()); + assertFalse(secondAction.wasCompleted()); + assertEquals(0L, secondAction.getExecutedCount()); + assertTrue(thirdAction.wasCompleted()); + assertEquals(2L, thirdAction.getExecutedCount()); } public void testExecuteMissingAction() throws Exception {