From 0ad58200af01c5c07e25770acd967e7321800547 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 13 Mar 2020 13:11:27 -0400 Subject: [PATCH] NIFI-7256: This closes #4142. Fixed thresholds in unit test. Instead of assuming that multiple runs of the processor will occur within 100 milliseconds, allowed the multiple runs to occur within 3 mins of one another. Signed-off-by: Joe Witt --- .../standard/TestMonitorActivity.java | 61 +++++++++---------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java index bd375e470d..aa2d289f11 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import org.apache.nifi.components.state.Scope; @@ -39,7 +40,7 @@ import static org.junit.Assert.assertTrue; public class TestMonitorActivity { @Test - public void testFirstMessage() throws InterruptedException, IOException { + public void testFirstMessage() throws InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(1000L)); runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false"); runner.setProperty(MonitorActivity.THRESHOLD, "100 millis"); @@ -255,7 +256,7 @@ public class TestMonitorActivity { @Test public void testClusterMonitorInvalidReportingNode() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100)); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120))); runner.setClustered(true); runner.setPrimaryNode(false); @@ -267,7 +268,7 @@ public class TestMonitorActivity { @Test public void testClusterMonitorActive() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100)); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120))); runner.setClustered(true); runner.setPrimaryNode(false); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); @@ -289,7 +290,7 @@ public class TestMonitorActivity { @Test public void testClusterMonitorActiveFallbackToNodeScope() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100)); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120))); runner.setClustered(false); runner.setPrimaryNode(false); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); @@ -309,7 +310,7 @@ public class TestMonitorActivity { @Test public void testClusterMonitorActiveWithLatestTimestamp() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100)); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120))); runner.setClustered(true); runner.setPrimaryNode(false); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); @@ -343,7 +344,7 @@ public class TestMonitorActivity { @Test public void testClusterMonitorActiveMoreRecentTimestampExisted() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100)); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120))); runner.setClustered(true); runner.setPrimaryNode(false); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); @@ -377,7 +378,7 @@ public class TestMonitorActivity { @Test public void testClusterMonitorActiveCopyAttribute() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100)); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120))); runner.setClustered(true); runner.setPrimaryNode(false); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); @@ -402,11 +403,11 @@ public class TestMonitorActivity { @Test public void testClusterMonitorInactivity() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000)); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120))); runner.setClustered(true); runner.setPrimaryNode(false); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); - runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); + runner.setProperty(MonitorActivity.THRESHOLD, "3 mins"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); // Becomes inactive @@ -425,11 +426,11 @@ public class TestMonitorActivity { @Test public void testClusterMonitorInactivityFallbackToNodeScope() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000)); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120))); runner.setClustered(false); runner.setPrimaryNode(false); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); - runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); + runner.setProperty(MonitorActivity.THRESHOLD, "3 mins"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); // Becomes inactive @@ -448,14 +449,14 @@ public class TestMonitorActivity { @Test public void testClusterMonitorInactivityOnPrimaryNode() throws Exception { - final TestableProcessor processor = new TestableProcessor(10000); + final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120)); final TestRunner runner = TestRunners.newTestRunner(processor); runner.setClustered(true); runner.setPrimaryNode(true); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY); - runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); + runner.setProperty(MonitorActivity.THRESHOLD, "3 mins"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); // Becomes inactive @@ -474,12 +475,12 @@ public class TestMonitorActivity { @Test public void testClusterMonitorInactivityOnNode() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000)); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120))); runner.setClustered(true); runner.setPrimaryNode(false); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY); - runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); + runner.setProperty(MonitorActivity.THRESHOLD, "3 mins"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); // Becomes inactive, but this not shouldn't send flow file @@ -493,11 +494,11 @@ public class TestMonitorActivity { @Test public void testClusterMonitorActivityRestoredBySelf() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000)); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120))); runner.setClustered(true); runner.setPrimaryNode(false); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); - runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); + runner.setProperty(MonitorActivity.THRESHOLD, "3 mins"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); // Becomes inactive @@ -529,12 +530,12 @@ public class TestMonitorActivity { @Test public void testClusterMonitorActivityRestoredBySelfOnNode() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000)); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120))); runner.setClustered(true); runner.setPrimaryNode(false); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY); - runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); + runner.setProperty(MonitorActivity.THRESHOLD, "3 mins"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); // Becomes inactive @@ -563,14 +564,14 @@ public class TestMonitorActivity { @Test public void testClusterMonitorActivityRestoredBySelfOnPrimaryNode() throws Exception { - final TestableProcessor processor = new TestableProcessor(10000); + final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120)); final TestRunner runner = TestRunners.newTestRunner(processor); runner.setClustered(true); runner.setPrimaryNode(true); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY); - runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); + runner.setProperty(MonitorActivity.THRESHOLD, "3 mins"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); // Becomes inactive @@ -602,14 +603,14 @@ public class TestMonitorActivity { @Test public void testClusterMonitorActivityRestoredBySelfOnPrimaryNodeFallbackToNodeScope() throws Exception { - final TestableProcessor processor = new TestableProcessor(10000); + final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120)); final TestRunner runner = TestRunners.newTestRunner(processor); runner.setClustered(false); runner.setPrimaryNode(false); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY); - runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); + runner.setProperty(MonitorActivity.THRESHOLD, "3 mins"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); // Becomes inactive @@ -640,11 +641,11 @@ public class TestMonitorActivity { @Test public void testClusterMonitorActivityRestoredByOtherNode() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000)); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120))); runner.setClustered(true); runner.setPrimaryNode(false); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); - runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); + runner.setProperty(MonitorActivity.THRESHOLD, "3 mins"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); // Becomes inactive @@ -674,14 +675,14 @@ public class TestMonitorActivity { @Test public void testClusterMonitorActivityRestoredByOtherNodeOnPrimary() throws Exception { - final TestableProcessor processor = new TestableProcessor(10000); + final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120)); final TestRunner runner = TestRunners.newTestRunner(processor); runner.setClustered(true); runner.setPrimaryNode(true); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY); - runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); + runner.setProperty(MonitorActivity.THRESHOLD, "1 hour"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); // Becomes inactive @@ -705,18 +706,16 @@ public class TestMonitorActivity { assertEquals("value1", activityRestoredFiles.get(0).getAttribute("key1")); assertEquals("value2", activityRestoredFiles.get(0).getAttribute("key2")); runner.clearTransferState(); - } @Test public void testClusterMonitorActivityRestoredByOtherNodeOnNode() throws Exception { - - final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000)); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120))); runner.setClustered(true); runner.setPrimaryNode(false); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY); - runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); + runner.setProperty(MonitorActivity.THRESHOLD, "3 mins"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); // Becomes inactive