diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java index 99a29e5bb6..29006239da 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java @@ -16,6 +16,26 @@ */ package org.apache.nifi.processors.standard; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; @@ -30,25 +50,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import org.apache.nifi.annotation.behavior.SideEffectFree; -import org.apache.nifi.annotation.behavior.TriggerSerially; -import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; - @SideEffectFree @TriggerSerially @TriggerWhenEmpty @@ -149,6 +150,15 @@ public class MonitorActivity extends AbstractProcessor { return properties; } + @OnScheduled + public void resetLastSuccessfulTransfer() { + setLastSuccessfulTransfer(System.currentTimeMillis()); + } + + protected final void setLastSuccessfulTransfer(final long timestamp) { + latestSuccessTransfer.set(timestamp); + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { final long thresholdMillis = context.getProperty(THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java index 2e8744154e..f02e6da3fa 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java @@ -31,7 +31,7 @@ public class TestMonitorActivity { @Test public void testFirstMessage() throws InterruptedException, IOException { - final TestRunner runner = TestRunners.newTestRunner(new MonitorActivity()); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(1000L)); runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false"); runner.setProperty(MonitorActivity.THRESHOLD, "100 millis"); @@ -101,7 +101,7 @@ public class TestMonitorActivity { @Test public void testFirstMessageWithInherit() throws InterruptedException, IOException { - final TestRunner runner = TestRunners.newTestRunner(new MonitorActivity()); + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(1000L)); runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false"); runner.setProperty(MonitorActivity.THRESHOLD, "100 millis"); runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true"); @@ -188,4 +188,39 @@ public class TestMonitorActivity { String.format("lineage start dates match when they shouldn't original=%1$s restored=%2$s", originalFlowFile.getLineageStartDate(), restoredFlowFile.getLineageStartDate()), restoredFlowFile.getLineageStartDate() != originalFlowFile.getLineageStartDate()); } + + @Test + public void testFirstRunNoMessages() throws InterruptedException, IOException { + // don't use the TestableProcessor, we want the real timestamp from @OnScheduled + final TestRunner runner = TestRunners.newTestRunner(new MonitorActivity()); + runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false"); + runner.setProperty(MonitorActivity.THRESHOLD, "100 millis"); + + Thread.sleep(1000L); + + // shouldn't generate inactivity b/c run() will reset the lastSuccessfulTransfer + runner.run(); + runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0); + runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0); + runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0); + runner.clearTransferState(); + } + + /** + * Since each call to run() will call @OnScheduled methods which will set the lastSuccessfulTransfer to the + * current time, we need a way to create an artificial time difference between calls to run. + */ + private class TestableProcessor extends MonitorActivity { + + private final long timestampDifference; + + public TestableProcessor(final long timestampDifference) { + this.timestampDifference = timestampDifference; + } + + @Override + public void resetLastSuccessfulTransfer() { + setLastSuccessfulTransfer(System.currentTimeMillis() - timestampDifference); + } + } }