From 6649115a34e302c5ebd45fd5c12e236357fc307f Mon Sep 17 00:00:00 2001 From: nathluu Date: Fri, 24 Nov 2023 17:50:01 +0700 Subject: [PATCH] NIFI-12402 Added Wait for Activity to MonitorActivity This closes #8063 Signed-off-by: David Handermann Co-authored-by: Vuong Nguyen Van Co-authored-by: nathluu --- .../processors/standard/MonitorActivity.java | 16 ++++++ .../standard/TestMonitorActivity.java | 55 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java index fa4d6e8722..656c4ab11e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java @@ -105,6 +105,14 @@ public class MonitorActivity extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue("Activity restored at time: ${now():format('yyyy/MM/dd HH:mm:ss')} after being inactive for ${inactivityDurationMillis:toNumber():divide(60000)} minutes") .build(); + public static final PropertyDescriptor WAIT_FOR_ACTIVITY = new PropertyDescriptor.Builder() + .name("Wait for Activity") + .description("When the processor gets started or restarted, if set to true, only send an inactive indicator if there had been activity beforehand. " + + "Otherwise send an inactive indicator even if there had not been activity beforehand.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); public static final PropertyDescriptor INACTIVITY_MESSAGE = new PropertyDescriptor.Builder() .name("Inactivity Message") .description("The message that will be the content of FlowFiles that are sent to the 'inactive' relationship") @@ -170,6 +178,7 @@ public class MonitorActivity extends AbstractProcessor { private final AtomicLong latestSuccessTransfer = new AtomicLong(System.currentTimeMillis()); private final AtomicLong latestReportedNodeState = new AtomicLong(System.currentTimeMillis()); private final AtomicBoolean inactive = new AtomicBoolean(false); + private final AtomicBoolean hasSuccessTransfer = new AtomicBoolean(false); private final AtomicBoolean connectedWhenLastTriggered = new AtomicBoolean(false); private final AtomicLong lastInactiveMessage = new AtomicLong(System.currentTimeMillis()); public static final String STATE_KEY_LATEST_SUCCESS_TRANSFER = "MonitorActivity.latestSuccessTransfer"; @@ -181,6 +190,7 @@ public class MonitorActivity extends AbstractProcessor { properties.add(CONTINUALLY_SEND_MESSAGES); properties.add(INACTIVITY_MESSAGE); properties.add(ACTIVITY_RESTORED_MESSAGE); + properties.add(WAIT_FOR_ACTIVITY); properties.add(COPY_ATTRIBUTES); properties.add(MONITORING_SCOPE); properties.add(REPORTING_NODE); @@ -209,6 +219,7 @@ public class MonitorActivity extends AbstractProcessor { isClusterScope(context, true); resetLastSuccessfulTransfer(); inactive.set(false); + hasSuccessTransfer.set(false); } @@ -254,6 +265,7 @@ public class MonitorActivity extends AbstractProcessor { final ComponentLog logger = getLogger(); final boolean copyAttributes = context.getProperty(COPY_ATTRIBUTES).asBoolean(); + final boolean waitForActivity = context.getProperty(WAIT_FOR_ACTIVITY).asBoolean(); final boolean isClusterScope = isClusterScope(context, false); final boolean isConnectedToCluster = context.isConnectedToCluster(); final boolean shouldReportOnlyOnPrimary = shouldReportOnlyOnPrimary(isClusterScope, context); @@ -301,6 +313,9 @@ public class MonitorActivity extends AbstractProcessor { if (isInactive) { final boolean continual = context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean(); sendInactiveMarker = !inactive.getAndSet(true) || (continual && (now > lastInactiveMessage.get() + thresholdMillis)); + if (waitForActivity) { + sendInactiveMarker = sendInactiveMarker && hasSuccessTransfer.get(); + } } if (sendInactiveMarker && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context)) { @@ -327,6 +342,7 @@ public class MonitorActivity extends AbstractProcessor { } else { session.transfer(flowFiles, REL_SUCCESS); + hasSuccessTransfer.set(true); updatedLatestSuccessTransfer = now; logger.info("Transferred {} FlowFiles to 'success'", new Object[]{flowFiles.size()}); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java index 7b71678091..a38ca868b0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java @@ -100,6 +100,61 @@ public class TestMonitorActivity { restoredFlowFile.assertAttributeNotExists("key1"); } + @Test + public void testFirstMessageWithWaitForActivityTrue() { + final TestableProcessor processor = new TestableProcessor(1000); + final TestRunner runner = TestRunners.newTestRunner(processor); + runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false"); + runner.setProperty(MonitorActivity.THRESHOLD, "100 millis"); + runner.setProperty(MonitorActivity.WAIT_FOR_ACTIVITY, "true"); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1); + runner.clearTransferState(); + + processor.resetLastSuccessfulTransfer(); + + runNext(runner); + runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1); + runner.clearTransferState(); + + Map attributes = new HashMap<>(); + attributes.put("key", "value"); + attributes.put("key1", "value1"); + + runner.enqueue(new byte[0], attributes); + runNext(runner); + + runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1); + runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1); + + MockFlowFile restoredFlowFile = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0); + restoredFlowFile.assertAttributeNotExists("key"); + restoredFlowFile.assertAttributeNotExists("key1"); + + runner.clearTransferState(); + runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true"); + + processor.resetLastSuccessfulTransfer(); + runNext(runner); + + runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1); + runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0); + runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0); + runner.clearTransferState(); + + runner.enqueue(new byte[0], attributes); + runNext(runner); + + runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0); + runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1); + runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1); + + restoredFlowFile = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0); + restoredFlowFile.assertAttributeNotExists("key"); + restoredFlowFile.assertAttributeNotExists("key1"); + } @Test public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySet() throws Exception { final String lastSuccessInCluster = String.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));