diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index c544ef4bbf..f8f0426b64 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -72,6 +72,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; +import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -86,7 +87,7 @@ public class TestProcessorLifecycle { private static final Logger logger = LoggerFactory.getLogger(TestProcessorLifecycle.class); private FlowController fc; - private Map properties = new HashMap<>(); + private Map properties = new HashMap<>(); private volatile String propsFile = TestProcessorLifecycle.class.getResource("/lifecycletest.nifi.properties").getFile(); @Before @@ -100,6 +101,23 @@ public class TestProcessorLifecycle { FileUtils.deleteDirectory(new File("./target/lifecycletest")); } + private void assertCondition(final Supplier supplier) { + assertCondition(supplier, 1000L); + } + + private void assertCondition(final Supplier supplier, final long delayToleranceMillis) { + final long startTime = System.currentTimeMillis(); + while (((System.currentTimeMillis() - startTime) < delayToleranceMillis) && !supplier.get()) { + try { + Thread.sleep(50); + } catch (InterruptedException ex) { + Thread.interrupted(); + break; + } + } + assertTrue(supplier.get()); + } + @Test public void validateEnableOperation() throws Exception { final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); @@ -109,17 +127,17 @@ public class TestProcessorLifecycle { final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate()); - assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); - assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState()); // validates idempotency for (int i = 0; i < 2; i++) { testProcNode.enable(); } - assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); - assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState()); testProcNode.disable(); - assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState()); - assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); + assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState()); + assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState()); } @Test @@ -132,18 +150,18 @@ public class TestProcessorLifecycle { final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); - assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); - assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState()); // validates idempotency for (int i = 0; i < 2; i++) { testProcNode.disable(); } - assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState()); - assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); + assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState()); + assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState()); ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); + assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState()); } /** @@ -171,7 +189,7 @@ public class TestProcessorLifecycle { ps.startProcessor(testProcNode); Thread.sleep(500); - assertEquals(1, testProcessor.operationNames.size()); + assertCondition(() -> testProcessor.operationNames.size() == 1); assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); } @@ -189,14 +207,14 @@ public class TestProcessorLifecycle { fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); // sets the scenario for the processor to run int randomDelayLimit = 3000; this.randomOnTriggerDelay(testProcessor, randomDelayLimit); final ProcessScheduler ps = fc.getProcessScheduler(); ps.stopProcessor(testProcNode); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - assertTrue(testProcessor.operationNames.size() == 0); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); + assertTrue(testProcessor.operationNames.isEmpty()); } /** @@ -205,7 +223,7 @@ public class TestProcessorLifecycle { */ @Test @Ignore - public void validateSuccessfullAndOrderlyShutdown() throws Exception { + public void validateSuccessfulAndOrderlyShutdown() throws Exception { final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); fc = fcsb.getFlowController(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); @@ -226,8 +244,7 @@ public class TestProcessorLifecycle { testGroup.addProcessor(testProcNode); fc.startProcessGroup(testGroup.getIdentifier()); - Thread.sleep(2000); // let it run for a while - assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); fc.stopAllProcessors(); @@ -235,9 +252,9 @@ public class TestProcessorLifecycle { // validates that regardless of how many running tasks, lifecycle // operation are invoked atomically (once each). - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 1000L); // . . . hence only 3 operations must be in the list - assertEquals(3, testProcessor.operationNames.size()); + assertCondition(() -> testProcessor.operationNames.size() == 3, 2000L); // . . . and ordered as @OnScheduled, @OnUnscheduled, @OnStopped assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); @@ -268,7 +285,7 @@ public class TestProcessorLifecycle { ExecutorService executor = Executors.newFixedThreadPool(100); int startCallsCount = 10000; final CountDownLatch countDownCounter = new CountDownLatch(startCallsCount); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); final Random random = new Random(); for (int i = 0; i < startCallsCount / 2; i++) { executor.execute(new Runnable() { @@ -326,18 +343,11 @@ public class TestProcessorLifecycle { ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 5000L); ps.stopProcessor(testProcNode); - Thread.sleep(100); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - Thread.sleep(1000); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - - assertEquals(2, testProcessor.operationNames.size()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 5000L); + assertCondition(() -> testProcessor.operationNames.size() == 2, 8000L); assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); } @@ -366,15 +376,9 @@ public class TestProcessorLifecycle { ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - Thread.sleep(100); - assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 10000L); ps.stopProcessor(testProcNode); - Thread.sleep(500); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L); } /** @@ -401,15 +405,9 @@ public class TestProcessorLifecycle { ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); ps.stopProcessor(testProcNode); - Thread.sleep(100); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING); - Thread.sleep(500); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L); } /** @@ -432,15 +430,9 @@ public class TestProcessorLifecycle { ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); ps.stopProcessor(testProcNode); - Thread.sleep(100); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING); - Thread.sleep(4000); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 5000L); } /** @@ -463,20 +455,9 @@ public class TestProcessorLifecycle { ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - ps.disableProcessor(testProcNode); // no effect - Thread.sleep(100); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 3000L); ps.stopProcessor(testProcNode); - Thread.sleep(100); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - Thread.sleep(4000); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 4000L); } /** @@ -501,14 +482,11 @@ public class TestProcessorLifecycle { ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - Thread.sleep(1000); - assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); ps.disableProcessor(testProcNode); - Thread.sleep(100); - assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); ps.stopProcessor(testProcNode); - Thread.sleep(500); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L); } /**