NIFI-7256: This closes #4142. Fixed thresholds in unit test. Instead of assuming that multiple runs of the processor will occur within 100 milliseconds, allowed the multiple runs to occur within 3 mins of one another.

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2020-03-13 13:11:27 -04:00 committed by Joe Witt
parent 561be89a21
commit 0ad58200af
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
@ -39,7 +40,7 @@ import static org.junit.Assert.assertTrue;
public class TestMonitorActivity { public class TestMonitorActivity {
@Test @Test
public void testFirstMessage() throws InterruptedException, IOException { public void testFirstMessage() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(1000L)); final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(1000L));
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false"); runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "100 millis"); runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
@ -255,7 +256,7 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorInvalidReportingNode() throws Exception { public void testClusterMonitorInvalidReportingNode() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100)); final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setClustered(true); runner.setClustered(true);
runner.setPrimaryNode(false); runner.setPrimaryNode(false);
@ -267,7 +268,7 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorActive() throws Exception { public void testClusterMonitorActive() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100)); final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setClustered(true); runner.setClustered(true);
runner.setPrimaryNode(false); runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
@ -289,7 +290,7 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorActiveFallbackToNodeScope() throws Exception { public void testClusterMonitorActiveFallbackToNodeScope() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100)); final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setClustered(false); runner.setClustered(false);
runner.setPrimaryNode(false); runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
@ -309,7 +310,7 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorActiveWithLatestTimestamp() throws Exception { public void testClusterMonitorActiveWithLatestTimestamp() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100)); final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setClustered(true); runner.setClustered(true);
runner.setPrimaryNode(false); runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
@ -343,7 +344,7 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorActiveMoreRecentTimestampExisted() throws Exception { public void testClusterMonitorActiveMoreRecentTimestampExisted() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100)); final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setClustered(true); runner.setClustered(true);
runner.setPrimaryNode(false); runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
@ -377,7 +378,7 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorActiveCopyAttribute() throws Exception { public void testClusterMonitorActiveCopyAttribute() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(100)); final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setClustered(true); runner.setClustered(true);
runner.setPrimaryNode(false); runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
@ -402,11 +403,11 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorInactivity() throws Exception { public void testClusterMonitorInactivity() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000)); final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setClustered(true); runner.setClustered(true);
runner.setPrimaryNode(false); runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive // Becomes inactive
@ -425,11 +426,11 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorInactivityFallbackToNodeScope() throws Exception { public void testClusterMonitorInactivityFallbackToNodeScope() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000)); final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setClustered(false); runner.setClustered(false);
runner.setPrimaryNode(false); runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive // Becomes inactive
@ -448,14 +449,14 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorInactivityOnPrimaryNode() throws Exception { public void testClusterMonitorInactivityOnPrimaryNode() throws Exception {
final TestableProcessor processor = new TestableProcessor(10000); final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120));
final TestRunner runner = TestRunners.newTestRunner(processor); final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setClustered(true); runner.setClustered(true);
runner.setPrimaryNode(true); runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY); runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive // Becomes inactive
@ -474,12 +475,12 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorInactivityOnNode() throws Exception { public void testClusterMonitorInactivityOnNode() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000)); final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setClustered(true); runner.setClustered(true);
runner.setPrimaryNode(false); runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY); runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive, but this not shouldn't send flow file // Becomes inactive, but this not shouldn't send flow file
@ -493,11 +494,11 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorActivityRestoredBySelf() throws Exception { public void testClusterMonitorActivityRestoredBySelf() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000)); final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setClustered(true); runner.setClustered(true);
runner.setPrimaryNode(false); runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive // Becomes inactive
@ -529,12 +530,12 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorActivityRestoredBySelfOnNode() throws Exception { public void testClusterMonitorActivityRestoredBySelfOnNode() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000)); final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setClustered(true); runner.setClustered(true);
runner.setPrimaryNode(false); runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY); runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive // Becomes inactive
@ -563,14 +564,14 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorActivityRestoredBySelfOnPrimaryNode() throws Exception { public void testClusterMonitorActivityRestoredBySelfOnPrimaryNode() throws Exception {
final TestableProcessor processor = new TestableProcessor(10000); final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120));
final TestRunner runner = TestRunners.newTestRunner(processor); final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setClustered(true); runner.setClustered(true);
runner.setPrimaryNode(true); runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY); runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive // Becomes inactive
@ -602,14 +603,14 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorActivityRestoredBySelfOnPrimaryNodeFallbackToNodeScope() throws Exception { public void testClusterMonitorActivityRestoredBySelfOnPrimaryNodeFallbackToNodeScope() throws Exception {
final TestableProcessor processor = new TestableProcessor(10000); final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120));
final TestRunner runner = TestRunners.newTestRunner(processor); final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setClustered(false); runner.setClustered(false);
runner.setPrimaryNode(false); runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY); runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive // Becomes inactive
@ -640,11 +641,11 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorActivityRestoredByOtherNode() throws Exception { public void testClusterMonitorActivityRestoredByOtherNode() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000)); final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
runner.setClustered(true); runner.setClustered(true);
runner.setPrimaryNode(false); runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive // Becomes inactive
@ -674,14 +675,14 @@ public class TestMonitorActivity {
@Test @Test
public void testClusterMonitorActivityRestoredByOtherNodeOnPrimary() throws Exception { public void testClusterMonitorActivityRestoredByOtherNodeOnPrimary() throws Exception {
final TestableProcessor processor = new TestableProcessor(10000); final TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120));
final TestRunner runner = TestRunners.newTestRunner(processor); final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setClustered(true); runner.setClustered(true);
runner.setPrimaryNode(true); runner.setPrimaryNode(true);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY); runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); runner.setProperty(MonitorActivity.THRESHOLD, "1 hour");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive // Becomes inactive
@ -705,18 +706,16 @@ public class TestMonitorActivity {
assertEquals("value1", activityRestoredFiles.get(0).getAttribute("key1")); assertEquals("value1", activityRestoredFiles.get(0).getAttribute("key1"));
assertEquals("value2", activityRestoredFiles.get(0).getAttribute("key2")); assertEquals("value2", activityRestoredFiles.get(0).getAttribute("key2"));
runner.clearTransferState(); runner.clearTransferState();
} }
@Test @Test
public void testClusterMonitorActivityRestoredByOtherNodeOnNode() throws Exception { public void testClusterMonitorActivityRestoredByOtherNodeOnNode() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120)));
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(10000));
runner.setClustered(true); runner.setClustered(true);
runner.setPrimaryNode(false); runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER); runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY); runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
runner.setProperty(MonitorActivity.THRESHOLD, "100 ms"); runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
// Becomes inactive // Becomes inactive