NIFI-1668 modified TestProcessorLifecycle to ensure FlowController is always shut down

This closes #324
This commit is contained in:
Oleg Zhurakousky 2016-04-04 16:21:25 -04:00 committed by jpercivall
parent cee2426df6
commit 91cdd78ebc

View File

@ -78,24 +78,26 @@ import org.slf4j.LoggerFactory;
public class TestProcessorLifecycle { public class TestProcessorLifecycle {
private static final Logger logger = LoggerFactory.getLogger(TestProcessorLifecycle.class); private static final Logger logger = LoggerFactory.getLogger(TestProcessorLifecycle.class);
private FlowController fc;
@Before @Before
public void before() { public void before() throws Exception {
System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties"); System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
NiFiProperties.getInstance().setProperty(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec"); NiFiProperties.getInstance().setProperty(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec");
NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml"); NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml");
NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, "local-provider"); NiFiProperties.getInstance().setProperty(NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, "local-provider");
fc = this.buildFlowControllerForTest();
} }
@After @After
public void after() throws Exception { public void after() throws Exception {
FileUtils.deleteDirectory(new File("./target/test-repo")); FileUtils.deleteDirectory(new File("./target/test-repo"));
FileUtils.deleteDirectory(new File("./target/content_repository")); FileUtils.deleteDirectory(new File("./target/content_repository"));
fc.shutdown(true);
} }
@Test @Test
public void validateEnableOperation() throws Exception { public void validateEnableOperation() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
@ -112,13 +114,11 @@ public class TestProcessorLifecycle {
testProcNode.disable(); testProcNode.disable();
assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState()); assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState());
assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState());
fc.shutdown(true);
} }
@Test @Test
public void validateDisableOperation() throws Exception { public void validateDisableOperation() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
@ -136,8 +136,6 @@ public class TestProcessorLifecycle {
ProcessScheduler ps = fc.getProcessScheduler(); ProcessScheduler ps = fc.getProcessScheduler();
ps.startProcessor(testProcNode); ps.startProcessor(testProcNode);
assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState());
fc.shutdown(true);
} }
/** /**
@ -146,7 +144,6 @@ public class TestProcessorLifecycle {
*/ */
@Test @Test
public void validateIdempotencyOfProcessorStartOperation() throws Exception { public void validateIdempotencyOfProcessorStartOperation() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@ -154,32 +151,16 @@ public class TestProcessorLifecycle {
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
// sets the scenario for the processor to run // sets the scenario for the processor to run
int randomDelayLimit = 3000; this.noop(testProcessor);
this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
final ProcessScheduler ps = fc.getProcessScheduler(); final ProcessScheduler ps = fc.getProcessScheduler();
ExecutorService executor = Executors.newCachedThreadPool();
int startCallsCount = 100; ps.startProcessor(testProcNode);
final CountDownLatch countDownCounter = new CountDownLatch(startCallsCount); ps.startProcessor(testProcNode);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); ps.startProcessor(testProcNode);
for (int i = 0; i < startCallsCount; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
ps.startProcessor(testProcNode);
countDownCounter.countDown();
}
});
}
assertTrue(countDownCounter.await(2000, TimeUnit.MILLISECONDS)); Thread.sleep(500);
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
// regardless of how many threads attempted to start Processor, it must
// only be started once, hence have only single entry for @OnScheduled
assertEquals(1, testProcessor.operationNames.size()); assertEquals(1, testProcessor.operationNames.size());
assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
fc.shutdown(true);
executor.shutdownNow();
} }
/** /**
@ -188,7 +169,6 @@ public class TestProcessorLifecycle {
*/ */
@Test @Test
public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws Exception { public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@ -202,7 +182,6 @@ public class TestProcessorLifecycle {
ps.stopProcessor(testProcNode); ps.stopProcessor(testProcNode);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
assertTrue(testProcessor.operationNames.size() == 0); assertTrue(testProcessor.operationNames.size() == 0);
fc.shutdown(true);
} }
/** /**
@ -211,7 +190,6 @@ public class TestProcessorLifecycle {
*/ */
@Test @Test
public void validateSuccessfullAndOrderlyShutdown() throws Exception { public void validateSuccessfullAndOrderlyShutdown() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@ -245,8 +223,6 @@ public class TestProcessorLifecycle {
assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
assertEquals("@OnStopped", testProcessor.operationNames.get(2)); assertEquals("@OnStopped", testProcessor.operationNames.get(2));
fc.shutdown(true);
} }
/** /**
@ -255,7 +231,6 @@ public class TestProcessorLifecycle {
*/ */
@Test @Test
public void validateLifecycleOperationOrderWithConcurrentCallsToStartStop() throws Exception { public void validateLifecycleOperationOrderWithConcurrentCallsToStartStop() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@ -304,7 +279,6 @@ public class TestProcessorLifecycle {
previousOperation = operationName; previousOperation = operationName;
} }
executor.shutdownNow(); executor.shutdownNow();
fc.shutdown(true);
} }
/** /**
@ -312,7 +286,6 @@ public class TestProcessorLifecycle {
*/ */
@Test @Test
public void validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcessorFullyStarted() throws Exception { public void validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcessorFullyStarted() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@ -339,7 +312,6 @@ public class TestProcessorLifecycle {
assertEquals(2, testProcessor.operationNames.size()); assertEquals(2, testProcessor.operationNames.size());
assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
fc.shutdown(true);
} }
/** /**
@ -348,7 +320,6 @@ public class TestProcessorLifecycle {
*/ */
@Test @Test
public void validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledException() throws Exception { public void validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledException() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@ -371,7 +342,6 @@ public class TestProcessorLifecycle {
ps.stopProcessor(testProcNode); ps.stopProcessor(testProcNode);
Thread.sleep(500); Thread.sleep(500);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
fc.shutdown(true);
} }
/** /**
@ -381,7 +351,6 @@ public class TestProcessorLifecycle {
*/ */
@Test @Test
public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() throws Exception { public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@ -404,7 +373,6 @@ public class TestProcessorLifecycle {
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING); assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING);
Thread.sleep(500); Thread.sleep(500);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
fc.shutdown(true);
} }
/** /**
@ -414,7 +382,6 @@ public class TestProcessorLifecycle {
@Test @Test
public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception { public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception {
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec"); NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec");
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@ -434,7 +401,6 @@ public class TestProcessorLifecycle {
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING); assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING);
Thread.sleep(4000); Thread.sleep(4000);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
fc.shutdown(true);
} }
/** /**
@ -444,7 +410,6 @@ public class TestProcessorLifecycle {
@Test @Test
public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception { public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception {
NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec"); NiFiProperties.getInstance().setProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec");
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@ -469,7 +434,6 @@ public class TestProcessorLifecycle {
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
Thread.sleep(4000); Thread.sleep(4000);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
fc.shutdown(true);
} }
/** /**
@ -478,7 +442,6 @@ public class TestProcessorLifecycle {
*/ */
@Test @Test
public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() throws Exception { public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
@ -499,7 +462,6 @@ public class TestProcessorLifecycle {
ps.stopProcessor(testProcNode); ps.stopProcessor(testProcNode);
Thread.sleep(500); Thread.sleep(500);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
fc.shutdown(true);
} }
/** /**
@ -508,17 +470,12 @@ public class TestProcessorLifecycle {
*/ */
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void validateStartFailsOnInvalidProcessorWithMissingProperty() throws Exception { public void validateStartFailsOnInvalidProcessorWithMissingProperty() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString());
ProcessScheduler ps = fc.getProcessScheduler(); ProcessScheduler ps = fc.getProcessScheduler();
try { ps.startProcessor(testProcNode);
ps.startProcessor(testProcNode); fail();
fail();
} finally {
fc.shutdown(true);
}
} }
/** /**
@ -527,7 +484,6 @@ public class TestProcessorLifecycle {
*/ */
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void validateStartFailsOnInvalidProcessorWithDisabledService() throws Exception { public void validateStartFailsOnInvalidProcessorWithDisabledService() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
@ -542,12 +498,8 @@ public class TestProcessorLifecycle {
testProcessor.withService = true; testProcessor.withService = true;
ProcessScheduler ps = fc.getProcessScheduler(); ProcessScheduler ps = fc.getProcessScheduler();
try { ps.startProcessor(testProcNode);
ps.startProcessor(testProcNode); fail();
fail();
} finally {
fc.shutdown(true);
}
} }
/** /**
@ -555,7 +507,6 @@ public class TestProcessorLifecycle {
*/ */
@Test @Test
public void validateStartSucceedsOnProcessorWithEnabledService() throws Exception { public void validateStartSucceedsOnProcessorWithEnabledService() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
@ -577,7 +528,6 @@ public class TestProcessorLifecycle {
Thread.sleep(500); Thread.sleep(500);
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
fc.shutdown(true);
} }
/** /**
@ -586,7 +536,6 @@ public class TestProcessorLifecycle {
*/ */
@Test @Test
public void validateProcessorDeletion() throws Exception { public void validateProcessorDeletion() throws Exception {
FlowController fc = this.buildFlowControllerForTest();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
this.setControllerRootGroup(fc, testGroup); this.setControllerRootGroup(fc, testGroup);
@ -644,7 +593,6 @@ public class TestProcessorLifecycle {
testGroup.removeProcessor(testProcNodeA); testGroup.removeProcessor(testProcNodeA);
testGroup.removeProcessor(testProcNodeB); testGroup.removeProcessor(testProcNodeB);
testGroup.shutdown(); testGroup.shutdown();
fc.shutdown(true);
} }
/** /**