NIFI-8344: Addressed corner case in which we didn't handle a case where we tailed a file after being rolled over and then processor encounters a newline followed by NUL characters

This closes #5009.

Signed-off-by: Tamas Palfy <tamas.bertalan.palfy@gmail.com>
This commit is contained in:
Mark Payne 2021-04-19 16:49:14 -04:00 committed by Tamas Palfy
parent be716c1621
commit 1ef1905461
2 changed files with 43 additions and 8 deletions

View File

@ -1306,12 +1306,21 @@ public class TailFile extends AbstractProcessor {
final FileChannel channel = fis.getChannel();
final long timestamp = fileToTail.lastModified();
try {
flowFile = session.write(flowFile, out -> readLines(channel, buffer, out, checksum, reReadOnNul, readFully));
} catch (NulCharacterEncounteredException ncee) {
session.remove(flowFile);
throw ncee;
}
final AtomicReference<NulCharacterEncounteredException> abort = new AtomicReference<>();
flowFile = session.write(flowFile, out -> {
try {
readLines(channel, buffer, out, checksum, reReadOnNul, readFully);
} catch (final NulCharacterEncounteredException ncee) {
abort.set(ncee);
// Log the fact that we encountered a NUL character and yield. But we don't re-throw the Exception because
// we want to continue on with the same logic of transferring non-zero flowfiles, removing 0-byte flowfiles,
// and maintaining our state.
getLogger().info("Encountered NUL character when tailing file {}; will yield", tailFile);
context.yield();
}
});
if (flowFile.getSize() == 0L) {
session.remove(flowFile);
@ -1346,6 +1355,11 @@ public class TailFile extends AbstractProcessor {
tfo.setState(updatedState);
} else {
final NulCharacterEncounteredException ncee = abort.get();
if (ncee != null) {
throw ncee;
}
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
getLogger().debug("Completed tailing of file {}; will cleanup state", tailFile);
cleanup();
@ -1357,7 +1371,6 @@ public class TailFile extends AbstractProcessor {
}
}
/**
* Creates a new FlowFile that contains the entire contents of the given
* file and transfers that FlowFile to success. This method will commit the

View File

@ -364,6 +364,28 @@ public class TestTailFile {
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("e\n");
runner.clearTransferState();
// Write out some more characters and then write NUL characters. This should result in the processor not consuming the data.
raf.write("\n".getBytes());
raf.write(0);
raf.write(0);
raf.write(0);
System.out.println("Wrote \\n\\0\\0\\0");
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("f\n");
runner.clearTransferState();
// Truncate the NUL bytes and replace with additional data, ending with a new line. This should ingest the entire line of text.
raf.setLength(raf.length() - 3);
raf.write("g\nh".getBytes());
System.out.println("Truncated the NUL bytes and replaced with g\\nh");
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("g\n");
runner.clearTransferState();
// Ensure that no data comes in for a bit, since the last modified date on the rolled over file isn't old enough.
for (int i=0; i < 100; i++) {
runner.run(1, false, false);
@ -378,7 +400,7 @@ public class TestTailFile {
// Verify results
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("f");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("h");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("new file\n");
runner.clearTransferState();