NIFI-4642 updated tests to be more tolerant/variable to different system speeds. Many of these should be integration tests and not unit tests. This closes #2303.

This commit is contained in:
joewitt 2017-11-28 12:51:47 -05:00 committed by Mark Payne
parent 45df23b1e0
commit dd981e87dd
1 changed files with 53 additions and 75 deletions

View File

@ -72,6 +72,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -86,7 +87,7 @@ public class TestProcessorLifecycle {
private static final Logger logger = LoggerFactory.getLogger(TestProcessorLifecycle.class); private static final Logger logger = LoggerFactory.getLogger(TestProcessorLifecycle.class);
private FlowController fc; private FlowController fc;
private Map<String,String> properties = new HashMap<>(); private Map<String, String> properties = new HashMap<>();
private volatile String propsFile = TestProcessorLifecycle.class.getResource("/lifecycletest.nifi.properties").getFile(); private volatile String propsFile = TestProcessorLifecycle.class.getResource("/lifecycletest.nifi.properties").getFile();
@Before @Before
@ -100,6 +101,23 @@ public class TestProcessorLifecycle {
FileUtils.deleteDirectory(new File("./target/lifecycletest")); FileUtils.deleteDirectory(new File("./target/lifecycletest"));
} }
private void assertCondition(final Supplier<Boolean> supplier) {
assertCondition(supplier, 1000L);
}
private void assertCondition(final Supplier<Boolean> supplier, final long delayToleranceMillis) {
final long startTime = System.currentTimeMillis();
while (((System.currentTimeMillis() - startTime) < delayToleranceMillis) && !supplier.get()) {
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
Thread.interrupted();
break;
}
}
assertTrue(supplier.get());
}
@Test @Test
public void validateEnableOperation() throws Exception { public void validateEnableOperation() throws Exception {
final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
@ -109,17 +127,17 @@ public class TestProcessorLifecycle {
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate()); UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate());
assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState());
// validates idempotency // validates idempotency
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
testProcNode.enable(); testProcNode.enable();
} }
assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState());
testProcNode.disable(); testProcNode.disable();
assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState()); assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState());
assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
} }
@Test @Test
@ -132,18 +150,18 @@ public class TestProcessorLifecycle {
final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(),
UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate()); UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties); testProcNode.setProperties(properties);
assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState());
// validates idempotency // validates idempotency
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
testProcNode.disable(); testProcNode.disable();
} }
assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState()); assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState());
assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
ProcessScheduler ps = fc.getProcessScheduler(); ProcessScheduler ps = fc.getProcessScheduler();
ps.startProcessor(testProcNode); ps.startProcessor(testProcNode);
assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState());
} }
/** /**
@ -171,7 +189,7 @@ public class TestProcessorLifecycle {
ps.startProcessor(testProcNode); ps.startProcessor(testProcNode);
Thread.sleep(500); Thread.sleep(500);
assertEquals(1, testProcessor.operationNames.size()); assertCondition(() -> testProcessor.operationNames.size() == 1);
assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
} }
@ -189,14 +207,14 @@ public class TestProcessorLifecycle {
fcsb.getSystemBundle().getBundleDetails().getCoordinate()); fcsb.getSystemBundle().getBundleDetails().getCoordinate());
testProcNode.setProperties(properties); testProcNode.setProperties(properties);
TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
// sets the scenario for the processor to run // sets the scenario for the processor to run
int randomDelayLimit = 3000; int randomDelayLimit = 3000;
this.randomOnTriggerDelay(testProcessor, randomDelayLimit); this.randomOnTriggerDelay(testProcessor, randomDelayLimit);
final ProcessScheduler ps = fc.getProcessScheduler(); final ProcessScheduler ps = fc.getProcessScheduler();
ps.stopProcessor(testProcNode); ps.stopProcessor(testProcNode);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
assertTrue(testProcessor.operationNames.size() == 0); assertTrue(testProcessor.operationNames.isEmpty());
} }
/** /**
@ -205,7 +223,7 @@ public class TestProcessorLifecycle {
*/ */
@Test @Test
@Ignore @Ignore
public void validateSuccessfullAndOrderlyShutdown() throws Exception { public void validateSuccessfulAndOrderlyShutdown() throws Exception {
final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest();
fc = fcsb.getFlowController(); fc = fcsb.getFlowController();
ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString());
@ -226,8 +244,7 @@ public class TestProcessorLifecycle {
testGroup.addProcessor(testProcNode); testGroup.addProcessor(testProcNode);
fc.startProcessGroup(testGroup.getIdentifier()); fc.startProcessGroup(testGroup.getIdentifier());
Thread.sleep(2000); // let it run for a while assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
fc.stopAllProcessors(); fc.stopAllProcessors();
@ -235,9 +252,9 @@ public class TestProcessorLifecycle {
// validates that regardless of how many running tasks, lifecycle // validates that regardless of how many running tasks, lifecycle
// operation are invoked atomically (once each). // operation are invoked atomically (once each).
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 1000L);
// . . . hence only 3 operations must be in the list // . . . hence only 3 operations must be in the list
assertEquals(3, testProcessor.operationNames.size()); assertCondition(() -> testProcessor.operationNames.size() == 3, 2000L);
// . . . and ordered as @OnScheduled, @OnUnscheduled, @OnStopped // . . . and ordered as @OnScheduled, @OnUnscheduled, @OnStopped
assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
@ -268,7 +285,7 @@ public class TestProcessorLifecycle {
ExecutorService executor = Executors.newFixedThreadPool(100); ExecutorService executor = Executors.newFixedThreadPool(100);
int startCallsCount = 10000; int startCallsCount = 10000;
final CountDownLatch countDownCounter = new CountDownLatch(startCallsCount); final CountDownLatch countDownCounter = new CountDownLatch(startCallsCount);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState());
final Random random = new Random(); final Random random = new Random();
for (int i = 0; i < startCallsCount / 2; i++) { for (int i = 0; i < startCallsCount / 2; i++) {
executor.execute(new Runnable() { executor.execute(new Runnable() {
@ -326,18 +343,11 @@ public class TestProcessorLifecycle {
ProcessScheduler ps = fc.getProcessScheduler(); ProcessScheduler ps = fc.getProcessScheduler();
ps.startProcessor(testProcNode); ps.startProcessor(testProcNode);
Thread.sleep(1000); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 5000L);
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
ps.stopProcessor(testProcNode); ps.stopProcessor(testProcNode);
Thread.sleep(100); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 5000L);
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING); assertCondition(() -> testProcessor.operationNames.size() == 2, 8000L);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
Thread.sleep(1000);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
assertEquals(2, testProcessor.operationNames.size());
assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnScheduled", testProcessor.operationNames.get(0));
assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1));
} }
@ -366,15 +376,9 @@ public class TestProcessorLifecycle {
ProcessScheduler ps = fc.getProcessScheduler(); ProcessScheduler ps = fc.getProcessScheduler();
ps.startProcessor(testProcNode); ps.startProcessor(testProcNode);
Thread.sleep(1000); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 10000L);
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
Thread.sleep(1000);
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
Thread.sleep(100);
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
ps.stopProcessor(testProcNode); ps.stopProcessor(testProcNode);
Thread.sleep(500); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
} }
/** /**
@ -401,15 +405,9 @@ public class TestProcessorLifecycle {
ProcessScheduler ps = fc.getProcessScheduler(); ProcessScheduler ps = fc.getProcessScheduler();
ps.startProcessor(testProcNode); ps.startProcessor(testProcNode);
Thread.sleep(1000); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
Thread.sleep(1000);
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
ps.stopProcessor(testProcNode); ps.stopProcessor(testProcNode);
Thread.sleep(100); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L);
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING);
Thread.sleep(500);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
} }
/** /**
@ -432,15 +430,9 @@ public class TestProcessorLifecycle {
ProcessScheduler ps = fc.getProcessScheduler(); ProcessScheduler ps = fc.getProcessScheduler();
ps.startProcessor(testProcNode); ps.startProcessor(testProcNode);
Thread.sleep(1000); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
Thread.sleep(1000);
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
ps.stopProcessor(testProcNode); ps.stopProcessor(testProcNode);
Thread.sleep(100); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 5000L);
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING);
Thread.sleep(4000);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
} }
/** /**
@ -463,20 +455,9 @@ public class TestProcessorLifecycle {
ProcessScheduler ps = fc.getProcessScheduler(); ProcessScheduler ps = fc.getProcessScheduler();
ps.startProcessor(testProcNode); ps.startProcessor(testProcNode);
Thread.sleep(1000); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 3000L);
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
Thread.sleep(1000);
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
ps.disableProcessor(testProcNode); // no effect
Thread.sleep(100);
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING);
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
ps.stopProcessor(testProcNode); ps.stopProcessor(testProcNode);
Thread.sleep(100); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 4000L);
assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
Thread.sleep(4000);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
} }
/** /**
@ -501,14 +482,11 @@ public class TestProcessorLifecycle {
ProcessScheduler ps = fc.getProcessScheduler(); ProcessScheduler ps = fc.getProcessScheduler();
ps.startProcessor(testProcNode); ps.startProcessor(testProcNode);
Thread.sleep(1000); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
ps.disableProcessor(testProcNode); ps.disableProcessor(testProcNode);
Thread.sleep(100); assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L);
assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING);
ps.stopProcessor(testProcNode); ps.stopProcessor(testProcNode);
Thread.sleep(500); assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L);
assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED);
} }
/** /**