diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index eb6836f770..a42cfcab04 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -78,24 +78,26 @@ import org.slf4j.LoggerFactory; public class TestProcessorLifecycle { private static final Logger logger = LoggerFactory.getLogger(TestProcessorLifecycle.class); + private FlowController fc; @Before - public void before() { + public void before() throws Exception { System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties"); NiFiProperties.getInstance().setProperty(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec"); NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml"); NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, "local-provider"); + fc = this.buildFlowControllerForTest(); } @After public void after() throws Exception { FileUtils.deleteDirectory(new File("./target/test-repo")); FileUtils.deleteDirectory(new File("./target/content_repository")); + fc.shutdown(true); } @Test public void validateEnableOperation() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), @@ -112,13 +114,11 @@ public class TestProcessorLifecycle { testProcNode.disable(); assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState()); assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); - fc.shutdown(true); } @Test public void validateDisableOperation() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), @@ -136,8 +136,6 @@ public class TestProcessorLifecycle { ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); - - fc.shutdown(true); } /** @@ -146,7 +144,6 @@ public class TestProcessorLifecycle { */ @Test public void validateIdempotencyOfProcessorStartOperation() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -154,32 +151,16 @@ public class TestProcessorLifecycle { TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); // sets the scenario for the processor to run - int randomDelayLimit = 3000; - this.randomOnTriggerDelay(testProcessor, randomDelayLimit); + this.noop(testProcessor); final ProcessScheduler ps = fc.getProcessScheduler(); - ExecutorService executor = Executors.newCachedThreadPool(); - int startCallsCount = 100; - final CountDownLatch countDownCounter = new CountDownLatch(startCallsCount); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - for (int i = 0; i < startCallsCount; i++) { - executor.execute(new Runnable() { - @Override - public void run() { - ps.startProcessor(testProcNode); - countDownCounter.countDown(); - } - }); - } + ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode); + ps.startProcessor(testProcNode); - assertTrue(countDownCounter.await(2000, TimeUnit.MILLISECONDS)); - assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); - // regardless of how many threads attempted to start Processor, it must - // only be started once, hence have only single entry for @OnScheduled + Thread.sleep(500); assertEquals(1, testProcessor.operationNames.size()); assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); - fc.shutdown(true); - executor.shutdownNow(); } /** @@ -188,7 +169,6 @@ public class TestProcessorLifecycle { */ @Test public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -202,7 +182,6 @@ public class TestProcessorLifecycle { ps.stopProcessor(testProcNode); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); assertTrue(testProcessor.operationNames.size() == 0); - fc.shutdown(true); } /** @@ -211,7 +190,6 @@ public class TestProcessorLifecycle { */ @Test public void validateSuccessfullAndOrderlyShutdown() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -245,8 +223,6 @@ public class TestProcessorLifecycle { assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); assertEquals("@OnStopped", testProcessor.operationNames.get(2)); - - fc.shutdown(true); } /** @@ -255,7 +231,6 @@ public class TestProcessorLifecycle { */ @Test public void validateLifecycleOperationOrderWithConcurrentCallsToStartStop() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -304,7 +279,6 @@ public class TestProcessorLifecycle { previousOperation = operationName; } executor.shutdownNow(); - fc.shutdown(true); } /** @@ -312,7 +286,6 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcessorFullyStarted() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -339,7 +312,6 @@ public class TestProcessorLifecycle { assertEquals(2, testProcessor.operationNames.size()); assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); - fc.shutdown(true); } /** @@ -348,7 +320,6 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledException() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -371,7 +342,6 @@ public class TestProcessorLifecycle { ps.stopProcessor(testProcNode); Thread.sleep(500); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - fc.shutdown(true); } /** @@ -381,7 +351,6 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -404,7 +373,6 @@ public class TestProcessorLifecycle { assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING); Thread.sleep(500); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - fc.shutdown(true); } /** @@ -414,7 +382,6 @@ public class TestProcessorLifecycle { @Test public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception { NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec"); - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -434,7 +401,6 @@ public class TestProcessorLifecycle { assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING); Thread.sleep(4000); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - fc.shutdown(true); } /** @@ -444,7 +410,6 @@ public class TestProcessorLifecycle { @Test public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception { NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec"); - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -469,7 +434,6 @@ public class TestProcessorLifecycle { assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); Thread.sleep(4000); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - fc.shutdown(true); } /** @@ -478,7 +442,6 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); @@ -499,7 +462,6 @@ public class TestProcessorLifecycle { ps.stopProcessor(testProcNode); Thread.sleep(500); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - fc.shutdown(true); } /** @@ -508,17 +470,12 @@ public class TestProcessorLifecycle { */ @Test(expected = IllegalStateException.class) public void validateStartFailsOnInvalidProcessorWithMissingProperty() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); ProcessScheduler ps = fc.getProcessScheduler(); - try { - ps.startProcessor(testProcNode); - fail(); - } finally { - fc.shutdown(true); - } + ps.startProcessor(testProcNode); + fail(); } /** @@ -527,7 +484,6 @@ public class TestProcessorLifecycle { */ @Test(expected = IllegalStateException.class) public void validateStartFailsOnInvalidProcessorWithDisabledService() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); @@ -542,12 +498,8 @@ public class TestProcessorLifecycle { testProcessor.withService = true; ProcessScheduler ps = fc.getProcessScheduler(); - try { - ps.startProcessor(testProcNode); - fail(); - } finally { - fc.shutdown(true); - } + ps.startProcessor(testProcNode); + fail(); } /** @@ -555,7 +507,6 @@ public class TestProcessorLifecycle { */ @Test public void validateStartSucceedsOnProcessorWithEnabledService() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); @@ -577,7 +528,6 @@ public class TestProcessorLifecycle { Thread.sleep(500); assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); - fc.shutdown(true); } /** @@ -586,7 +536,6 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorDeletion() throws Exception { - FlowController fc = this.buildFlowControllerForTest(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); @@ -644,7 +593,6 @@ public class TestProcessorLifecycle { testGroup.removeProcessor(testProcNodeA); testGroup.removeProcessor(testProcNodeB); testGroup.shutdown(); - fc.shutdown(true); } /**