NIFI-5892 Wait timestamp lingers, potentially messing up downstream wait-notify pairs

Clear the wait timestamp when transferring to failur or success

replace explicit attribute clear with function call, refactor and integrate into existing tests per review

This closes #3233.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Otto Fowler 2018-12-21 09:15:24 -05:00 committed by Koji Kawamura
parent 2e4f002945
commit 1330b92cfa
2 changed files with 45 additions and 5 deletions

View File

@ -84,7 +84,8 @@ import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJE
) )
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the " @WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
+ "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile."), + "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile. "
+ "This attribute is not written when the FlowFile is transferred to failure or success"),
@WritesAttribute(attribute = "wait.counter.<counterName>", description = "If a signal exists when the processor runs, " @WritesAttribute(attribute = "wait.counter.<counterName>", description = "If a signal exists when the processor runs, "
+ "each count value in the signal is copied.") + "each count value in the signal is copied.")
}) })
@ -314,6 +315,8 @@ public class Wait extends AbstractProcessor {
final Consumer<FlowFile> transferToFailure = flowFile -> { final Consumer<FlowFile> transferToFailure = flowFile -> {
flowFile = session.penalize(flowFile); flowFile = session.penalize(flowFile);
// This flowFile is now failed, our tracking is done, clear the timer
flowFile = clearWaitState(session, flowFile);
getFlowFilesFor.apply(REL_FAILURE).add(flowFile); getFlowFilesFor.apply(REL_FAILURE).add(flowFile);
}; };
@ -328,9 +331,19 @@ public class Wait extends AbstractProcessor {
relationship = Relationship.SELF; relationship = Relationship.SELF;
} }
} }
final Relationship finalRelationship = relationship;
final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream() final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream()
.map(f -> copySignalAttributes(session, f, signalRef.get(), originalSignalCounts, replaceOriginalAttributes)).collect(Collectors.toList()); .map(f -> {
if (REL_SUCCESS.equals(finalRelationship)) {
// These flowFiles will be exiting the wait, clear the timer
f = clearWaitState(session, f);
}
return copySignalAttributes(session, f, signalRef.get(),
originalSignalCounts,
replaceOriginalAttributes);
})
.collect(Collectors.toList());
session.transfer(flowFilesWithSignalAttributes, relationship); session.transfer(flowFilesWithSignalAttributes, relationship);
}; };
@ -470,6 +483,10 @@ public class Wait extends AbstractProcessor {
} }
private FlowFile clearWaitState(final ProcessSession session, final FlowFile flowFile) {
return session.removeAttribute(flowFile, WAIT_START_TIMESTAMP);
}
private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final Map<String, Long> originalCount, final boolean replaceOriginal) { private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final Map<String, Long> originalCount, final boolean replaceOriginal) {
if (signal == null) { if (signal == null) {
return flowFile; return flowFile;

View File

@ -69,6 +69,8 @@ public class TestWait {
// no cache key attribute // no cache key attribute
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// timestamp must be present
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0).assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
runner.clearTransferState(); runner.clearTransferState();
} }
@ -101,6 +103,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
runner.clearTransferState(); runner.clearTransferState();
runner.enqueue(ff); runner.enqueue(ff);
@ -126,7 +129,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
runner.clearTransferState(); runner.clearTransferState();
runner.enqueue(ff); runner.enqueue(ff);
@ -164,6 +167,7 @@ public class TestWait {
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1); runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
runner.clearTransferState(); runner.clearTransferState();
} }
@ -178,6 +182,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1); runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total"); runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total");
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
runner.clearTransferState(); runner.clearTransferState();
} }
@ -231,6 +236,8 @@ public class TestWait {
final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
// timer cleared
outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// show a new attribute was copied from the cache // show a new attribute was copied from the cache
assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
// show that the original attributes are still there // show that the original attributes are still there
@ -272,6 +279,8 @@ public class TestWait {
final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
// timer cleared
outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// show a new attribute was copied from the cache // show a new attribute was copied from the cache
assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
// show that the original attributes are still there // show that the original attributes are still there
@ -310,7 +319,7 @@ public class TestWait {
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/* /*
* 2nd iteration. * 2nd iteration.
*/ */
@ -324,6 +333,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since total count doesn't reach to 3. // Still waiting since total count doesn't reach to 3.
waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/* /*
* 3rd iteration. * 3rd iteration.
@ -335,6 +345,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since total count doesn't reach to 3. // Still waiting since total count doesn't reach to 3.
waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/* /*
* 4th iteration. * 4th iteration.
@ -350,6 +361,9 @@ public class TestWait {
final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
// wait timer cleared
outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// show a new attribute was copied from the cache // show a new attribute was copied from the cache
assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
// show that the original attributes are still there // show that the original attributes are still there
@ -391,6 +405,7 @@ public class TestWait {
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/* /*
* 2nd iteration. * 2nd iteration.
@ -405,6 +420,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since counter-B doesn't reach to 2. // Still waiting since counter-B doesn't reach to 2.
waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/* /*
* 3rd iteration. * 3rd iteration.
@ -419,6 +435,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since total count doesn't reach to 3. // Still waiting since total count doesn't reach to 3.
waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/* /*
* 4th iteration. * 4th iteration.
@ -434,6 +451,8 @@ public class TestWait {
final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
// wait timer cleared
outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// show a new attribute was copied from the cache // show a new attribute was copied from the cache
assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
// show that the original attributes are still there // show that the original attributes are still there
@ -486,6 +505,8 @@ public class TestWait {
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
// timer cleared
outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
outputFlowFile.assertAttributeEquals("wait.counter.counter", "2"); outputFlowFile.assertAttributeEquals("wait.counter.counter", "2");
// expect counter to be decremented to 0 and releasable count remains 1. // expect counter to be decremented to 0 and releasable count remains 1.
@ -502,6 +523,8 @@ public class TestWait {
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
// timer cleared
outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// All counters are consumed. // All counters are consumed.
outputFlowFile.assertAttributeEquals("wait.counter.counter", "0"); outputFlowFile.assertAttributeEquals("wait.counter.counter", "0");