NIFI-13829: Mitigate false positive reports of MonitorActivity, in case of infrequent Flow Files

This closes #9333.

Signed-off-by: Tamas Palfy <tpalfy@apache.org>
This commit is contained in:
Rajmund Takacs 2024-10-02 14:41:53 +02:00 committed by tpalfy
parent ae2277478a
commit 7da06aab62
2 changed files with 63 additions and 13 deletions

View File

@ -197,7 +197,7 @@ public class MonitorActivity extends AbstractProcessor {
private final AtomicBoolean connectedWhenLastTriggered = new AtomicBoolean(false); private final AtomicBoolean connectedWhenLastTriggered = new AtomicBoolean(false);
private final AtomicLong lastInactiveMessage = new AtomicLong(); private final AtomicLong lastInactiveMessage = new AtomicLong();
private final AtomicLong inactivityStartMillis = new AtomicLong(System.currentTimeMillis()); private final AtomicLong inactivityStartMillis = new AtomicLong(nowMillis());
private final AtomicBoolean wasActive = new AtomicBoolean(true); private final AtomicBoolean wasActive = new AtomicBoolean(true);
private volatile LocalFlowActivityInfo localFlowActivityInfo; private volatile LocalFlowActivityInfo localFlowActivityInfo;
@ -294,7 +294,7 @@ public class MonitorActivity extends AbstractProcessor {
final boolean isActive = localFlowActivityInfo.isActive() || !flowFiles.isEmpty(); final boolean isActive = localFlowActivityInfo.isActive() || !flowFiles.isEmpty();
final long lastActivity = localFlowActivityInfo.getLastActivity(); final long lastActivity = localFlowActivityInfo.getLastActivity();
final long inactivityStartMillis = this.inactivityStartMillis.get(); final long inactivityStartMillis = this.inactivityStartMillis.get();
final boolean timeToRepeatInactiveMessage = (lastInactiveMessage.get() + thresholdMillis) <= System.currentTimeMillis(); final boolean timeToRepeatInactiveMessage = (lastInactiveMessage.get() + thresholdMillis) <= nowMillis();
final boolean canReport = !isClusterScope || isConnectedToCluster || !flowFiles.isEmpty(); final boolean canReport = !isClusterScope || isConnectedToCluster || !flowFiles.isEmpty();
final boolean canChangeState = !waitForActivity || localFlowActivityInfo.hasSuccessfulTransfer(); final boolean canChangeState = !waitForActivity || localFlowActivityInfo.hasSuccessfulTransfer();
@ -314,10 +314,14 @@ public class MonitorActivity extends AbstractProcessor {
} }
} }
protected long getStartupTime() { protected long nowMillis() {
return System.currentTimeMillis(); return System.currentTimeMillis();
} }
protected long getStartupTime() {
return nowMillis();
}
protected final long getLastSuccessfulTransfer() { protected final long getLastSuccessfulTransfer() {
return localFlowActivityInfo.getLastSuccessfulTransfer(); return localFlowActivityInfo.getLastSuccessfulTransfer();
} }
@ -362,7 +366,7 @@ public class MonitorActivity extends AbstractProcessor {
if (shouldThisNodeReport) { if (shouldThisNodeReport) {
sendInactivityMarker(context, session, lastActivity, logger); sendInactivityMarker(context, session, lastActivity, logger);
} }
lastInactiveMessage.set(System.currentTimeMillis()); lastInactiveMessage.set(nowMillis());
setInactivityFlag(context.getStateManager()); setInactivityFlag(context.getStateManager());
} }
@ -450,7 +454,7 @@ public class MonitorActivity extends AbstractProcessor {
inactiveFlowFile = session.putAttribute( inactiveFlowFile = session.putAttribute(
inactiveFlowFile, inactiveFlowFile,
"inactivityDurationMillis", "inactivityDurationMillis",
String.valueOf(System.currentTimeMillis() - inactivityStartMillis) String.valueOf(nowMillis() - inactivityStartMillis)
); );
final byte[] outBytes = context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes( final byte[] outBytes = context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(
@ -471,7 +475,8 @@ public class MonitorActivity extends AbstractProcessor {
activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis", String.valueOf( activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis", String.valueOf(
inactivityStartMillis)); inactivityStartMillis));
activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", String.valueOf(System.currentTimeMillis() - inactivityStartMillis)); activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", String.valueOf(
nowMillis() - inactivityStartMillis));
final byte[] outBytes = context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes( final byte[] outBytes = context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(
StandardCharsets.UTF_8); StandardCharsets.UTF_8);
@ -482,12 +487,14 @@ public class MonitorActivity extends AbstractProcessor {
logger.info("Transferred {} to 'activity.restored'", activityRestoredFlowFile); logger.info("Transferred {} to 'activity.restored'", activityRestoredFlowFile);
} }
private static class LocalFlowActivityInfo { private class LocalFlowActivityInfo {
private static final long NO_VALUE = 0; private static final long NO_VALUE = 0;
private static final int TIMES_SYNC_WITHIN_THRESHOLD = 3;
private final long startupTimeMillis; private final long startupTimeMillis;
private final long thresholdMillis; private final long thresholdMillis;
private final boolean saveAttributes; private final boolean saveAttributes;
private final long syncPeriodMillis;
private long nextSyncMillis = NO_VALUE; private long nextSyncMillis = NO_VALUE;
private long lastSuccessfulTransfer = NO_VALUE; private long lastSuccessfulTransfer = NO_VALUE;
@ -497,6 +504,7 @@ public class MonitorActivity extends AbstractProcessor {
this.startupTimeMillis = startupTimeMillis; this.startupTimeMillis = startupTimeMillis;
this.thresholdMillis = thresholdMillis; this.thresholdMillis = thresholdMillis;
this.saveAttributes = saveAttributes; this.saveAttributes = saveAttributes;
this.syncPeriodMillis = thresholdMillis / TIMES_SYNC_WITHIN_THRESHOLD;
} }
public LocalFlowActivityInfo(long startupTimeMillis, long thresholdMillis, boolean saveAttributes, long initialLastSuccessfulTransfer) { public LocalFlowActivityInfo(long startupTimeMillis, long thresholdMillis, boolean saveAttributes, long initialLastSuccessfulTransfer) {
@ -505,22 +513,22 @@ public class MonitorActivity extends AbstractProcessor {
} }
public boolean syncNeeded() { public boolean syncNeeded() {
return nextSyncMillis <= System.currentTimeMillis(); return nextSyncMillis <= nowMillis();
} }
public void setNextSyncMillis() { public void setNextSyncMillis() {
nextSyncMillis = System.currentTimeMillis() + (thresholdMillis / 3); nextSyncMillis = nowMillis() + syncPeriodMillis;
} }
public void forceSync() { public void forceSync() {
nextSyncMillis = System.currentTimeMillis(); nextSyncMillis = nowMillis();
} }
public boolean isActive() { public boolean isActive() {
if (hasSuccessfulTransfer()) { if (hasSuccessfulTransfer()) {
return System.currentTimeMillis() < (lastSuccessfulTransfer + thresholdMillis); return nowMillis() < (lastSuccessfulTransfer + thresholdMillis);
} else { } else {
return System.currentTimeMillis() < (startupTimeMillis + thresholdMillis); return nowMillis() < (startupTimeMillis + thresholdMillis);
} }
} }
@ -545,7 +553,11 @@ public class MonitorActivity extends AbstractProcessor {
} }
public void update(FlowFile flowFile) { public void update(FlowFile flowFile) {
this.lastSuccessfulTransfer = System.currentTimeMillis(); final long now = nowMillis();
if ((now - this.getLastActivity()) > syncPeriodMillis) {
this.forceSync(); // Immediate synchronization if Flow Files are infrequent, to mitigate false reports
}
this.lastSuccessfulTransfer = now;
if (saveAttributes) { if (saveAttributes) {
lastSuccessfulTransferAttributes = new HashMap<>(flowFile.getAttributes()); lastSuccessfulTransferAttributes = new HashMap<>(flowFile.getAttributes());
lastSuccessfulTransferAttributes.remove(CoreAttributes.UUID.key()); lastSuccessfulTransferAttributes.remove(CoreAttributes.UUID.key());

View File

@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManager;
@ -1375,4 +1376,41 @@ public class TestMonitorActivity {
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 3); runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 3);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1); runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
} }
@Test
public void testInfrequentFlowFilesTriggerImmediateSynchronization() throws IOException, InterruptedException {
final long threshold_seconds = 30;
final long startup_time_seconds = 1;
final AtomicLong nowProvider = new AtomicLong(TimeUnit.SECONDS.toMillis(startup_time_seconds));
final TestRunner runner = TestRunners.newTestRunner(new MonitorActivity() {
@Override
protected long nowMillis() {
return nowProvider.get();
}
});
runner.setIsConfiguredForClustering(true);
runner.setConnected(true);
runner.setPrimaryNode(false);
runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
runner.setProperty(MonitorActivity.THRESHOLD, threshold_seconds + " seconds");
// Initialize
runner.run(1, false);
final String state_0 = runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
assertNull(state_0);
// First ever FlowFile triggers sync
runner.enqueue("Incoming data 1");
runNext(runner);
final String state_1 = runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
assertNotNull(state_1);
// Wait > (2/3 * T)
nowProvider.set(TimeUnit.SECONDS.toMillis(startup_time_seconds + ((2 * threshold_seconds) / 3) + 1));
runNext(runner);
runner.enqueue("Incoming data 2");
runNext(runner);
final String state_2 = runner.getStateManager().getState(Scope.CLUSTER).get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
assertNotEquals(state_1, state_2);
}
} }