NIFI-846 MonitorActivity not setting start of evaluation period correctly

This commit is contained in:
Bryan Bende 2015-08-21 11:06:20 -04:00
parent b17be66a10
commit d421e3c242
2 changed files with 66 additions and 21 deletions

View File

@ -16,6 +16,26 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
@ -30,25 +50,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@SideEffectFree
@TriggerSerially
@TriggerWhenEmpty
@ -149,6 +150,15 @@ public class MonitorActivity extends AbstractProcessor {
return properties;
}
@OnScheduled
public void resetLastSuccessfulTransfer() {
setLastSuccessfulTransfer(System.currentTimeMillis());
}
protected final void setLastSuccessfulTransfer(final long timestamp) {
latestSuccessTransfer.set(timestamp);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final long thresholdMillis = context.getProperty(THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS);

View File

@ -31,7 +31,7 @@ public class TestMonitorActivity {
@Test
public void testFirstMessage() throws InterruptedException, IOException {
final TestRunner runner = TestRunners.newTestRunner(new MonitorActivity());
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(1000L));
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
@ -101,7 +101,7 @@ public class TestMonitorActivity {
@Test
public void testFirstMessageWithInherit() throws InterruptedException, IOException {
final TestRunner runner = TestRunners.newTestRunner(new MonitorActivity());
final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(1000L));
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
@ -188,4 +188,39 @@ public class TestMonitorActivity {
String.format("lineage start dates match when they shouldn't original=%1$s restored=%2$s",
originalFlowFile.getLineageStartDate(), restoredFlowFile.getLineageStartDate()), restoredFlowFile.getLineageStartDate() != originalFlowFile.getLineageStartDate());
}
@Test
public void testFirstRunNoMessages() throws InterruptedException, IOException {
// don't use the TestableProcessor, we want the real timestamp from @OnScheduled
final TestRunner runner = TestRunners.newTestRunner(new MonitorActivity());
runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
Thread.sleep(1000L);
// shouldn't generate inactivity b/c run() will reset the lastSuccessfulTransfer
runner.run();
runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
runner.clearTransferState();
}
/**
* Since each call to run() will call @OnScheduled methods which will set the lastSuccessfulTransfer to the
* current time, we need a way to create an artificial time difference between calls to run.
*/
private class TestableProcessor extends MonitorActivity {
private final long timestampDifference;
public TestableProcessor(final long timestampDifference) {
this.timestampDifference = timestampDifference;
}
@Override
public void resetLastSuccessfulTransfer() {
setLastSuccessfulTransfer(System.currentTimeMillis() - timestampDifference);
}
}
}