NIFI-12402 Added Wait for Activity to MonitorActivity

This closes #8063

Signed-off-by: David Handermann <exceptionfactory@apache.org>
Co-authored-by: Vuong Nguyen Van <vanvuong24072001@gmail.com>
Co-authored-by: nathluu <luuhoangtanbis@gmail.com>
This commit is contained in:
nathluu 2023-11-24 17:50:01 +07:00 committed by exceptionfactory
parent 9c871699c3
commit 6649115a34
No known key found for this signature in database
2 changed files with 71 additions and 0 deletions

View File

@ -105,6 +105,14 @@ public class MonitorActivity extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("Activity restored at time: ${now():format('yyyy/MM/dd HH:mm:ss')} after being inactive for ${inactivityDurationMillis:toNumber():divide(60000)} minutes")
.build();
public static final PropertyDescriptor WAIT_FOR_ACTIVITY = new PropertyDescriptor.Builder()
.name("Wait for Activity")
.description("When the processor gets started or restarted, if set to true, only send an inactive indicator if there had been activity beforehand. "
+ "Otherwise send an inactive indicator even if there had not been activity beforehand.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final PropertyDescriptor INACTIVITY_MESSAGE = new PropertyDescriptor.Builder()
.name("Inactivity Message")
.description("The message that will be the content of FlowFiles that are sent to the 'inactive' relationship")
@ -170,6 +178,7 @@ public class MonitorActivity extends AbstractProcessor {
private final AtomicLong latestSuccessTransfer = new AtomicLong(System.currentTimeMillis());
private final AtomicLong latestReportedNodeState = new AtomicLong(System.currentTimeMillis());
private final AtomicBoolean inactive = new AtomicBoolean(false);
private final AtomicBoolean hasSuccessTransfer = new AtomicBoolean(false);
private final AtomicBoolean connectedWhenLastTriggered = new AtomicBoolean(false);
private final AtomicLong lastInactiveMessage = new AtomicLong(System.currentTimeMillis());
public static final String STATE_KEY_LATEST_SUCCESS_TRANSFER = "MonitorActivity.latestSuccessTransfer";
@ -181,6 +190,7 @@ public class MonitorActivity extends AbstractProcessor {
properties.add(CONTINUALLY_SEND_MESSAGES);
properties.add(INACTIVITY_MESSAGE);
properties.add(ACTIVITY_RESTORED_MESSAGE);
properties.add(WAIT_FOR_ACTIVITY);
properties.add(COPY_ATTRIBUTES);
properties.add(MONITORING_SCOPE);
properties.add(REPORTING_NODE);
@ -209,6 +219,7 @@ public class MonitorActivity extends AbstractProcessor {
isClusterScope(context, true);
resetLastSuccessfulTransfer();
inactive.set(false);
hasSuccessTransfer.set(false);
}
@ -254,6 +265,7 @@ public class MonitorActivity extends AbstractProcessor {
final ComponentLog logger = getLogger();
final boolean copyAttributes = context.getProperty(COPY_ATTRIBUTES).asBoolean();
final boolean waitForActivity = context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
final boolean isClusterScope = isClusterScope(context, false);
final boolean isConnectedToCluster = context.isConnectedToCluster();
final boolean shouldReportOnlyOnPrimary = shouldReportOnlyOnPrimary(isClusterScope, context);
@ -301,6 +313,9 @@ public class MonitorActivity extends AbstractProcessor {
if (isInactive) {
final boolean continual = context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
sendInactiveMarker = !inactive.getAndSet(true) || (continual && (now > lastInactiveMessage.get() + thresholdMillis));
if (waitForActivity) {
sendInactiveMarker = sendInactiveMarker && hasSuccessTransfer.get();
}
}
if (sendInactiveMarker && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary, context)) {
@ -327,6 +342,7 @@ public class MonitorActivity extends AbstractProcessor {
} else {
session.transfer(flowFiles, REL_SUCCESS);
hasSuccessTransfer.set(true);
updatedLatestSuccessTransfer = now;
logger.info("Transferred {} FlowFiles to 'success'", new Object[]{flowFiles.size()});

View File

@ -100,6 +100,61 @@ public class TestMonitorActivity {
restoredFlowFile.assertAttributeNotExists("key1");
}
@Test
public void testFirstMessageWithWaitForActivityTrue() {
final TestableProcessor processor = new TestableProcessor(1000);
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
runner.setProperty(MonitorActivity.WAIT_FOR_ACTIVITY, "true");
runner.enqueue(new byte[0]);
runner.run();
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
runner.clearTransferState();
processor.resetLastSuccessfulTransfer();
runNext(runner);
runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
runner.clearTransferState();
Map<String, String> attributes = new HashMap<>();
attributes.put("key", "value");
attributes.put("key1", "value1");
runner.enqueue(new byte[0], attributes);
runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
MockFlowFile restoredFlowFile = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
restoredFlowFile.assertAttributeNotExists("key");
restoredFlowFile.assertAttributeNotExists("key1");
runner.clearTransferState();
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
processor.resetLastSuccessfulTransfer();
runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
runner.clearTransferState();
runner.enqueue(new byte[0], attributes);
runNext(runner);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
restoredFlowFile = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
restoredFlowFile.assertAttributeNotExists("key");
restoredFlowFile.assertAttributeNotExists("key1");
}
@Test
public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySet() throws Exception {
final String lastSuccessInCluster = String.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5));