NIFI-8183 TailFile intermittently creates records with ascii NULL after rollover

This closes #4792.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Denes Arvay 2021-01-28 23:08:11 +01:00 committed by Mark Payne
parent 6e1f737c53
commit c1f88ec740
2 changed files with 85 additions and 28 deletions

View File

@ -782,7 +782,7 @@ public class TailFile extends AbstractProcessor {
final FileChannel fileReader = reader;
final AtomicLong positionHolder = new AtomicLong(position);
final Boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean();
final boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean();
AtomicReference<NulCharacterEncounteredException> abort = new AtomicReference<>();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@ -857,6 +857,10 @@ public class TailFile extends AbstractProcessor {
persistState(tfo, session, context);
}
private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum, Boolean reReadOnNul) throws IOException {
return readLines(reader, buffer, out, checksum, reReadOnNul, false);
}
/**
* Read new lines from the given FileChannel, copying it to the given Output
* Stream. The Checksum is used in order to later determine whether or not
@ -871,11 +875,14 @@ public class TailFile extends AbstractProcessor {
* temporary values and a NulCharacterEncounteredException is thrown.
* This allows the caller to re-attempt a read from the same position.
* If set to 'false' these characters will be treated as regular content.
* @param readFully If set to 'true' the last chunk of bytes after the last whole line
* will be also written to the OutputStream
*
* @return The new position after the lines have been read
* @throws java.io.IOException if an I/O error occurs.
*/
private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum, Boolean reReadOnNul) throws IOException {
private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum,
Boolean reReadOnNul, final boolean readFully) throws IOException {
getLogger().debug("Reading lines starting at position {}", new Object[]{reader.position()});
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
@ -887,7 +894,7 @@ public class TailFile extends AbstractProcessor {
boolean seenCR = false;
buffer.clear();
while (((num = reader.read(buffer)) != -1)) {
while ((num = reader.read(buffer)) != -1) {
buffer.flip();
for (int i = 0; i < num; i++) {
@ -897,14 +904,7 @@ public class TailFile extends AbstractProcessor {
case '\n': {
baos.write(ch);
seenCR = false;
baos.writeTo(out);
final byte[] baosBuffer = baos.toByteArray();
checksum.update(baosBuffer, 0, baos.size());
if (getLogger().isTraceEnabled()) {
getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
}
baos.reset();
flushByteArrayOutputStream(baos, out, checksum);
rePos = pos + i + 1;
linesRead++;
break;
@ -922,15 +922,8 @@ public class TailFile extends AbstractProcessor {
default: {
if (seenCR) {
seenCR = false;
baos.writeTo(out);
final byte[] baosBuffer = baos.toByteArray();
checksum.update(baosBuffer, 0, baos.size());
if (getLogger().isTraceEnabled()) {
getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
}
flushByteArrayOutputStream(baos, out, checksum);
linesRead++;
baos.reset();
baos.write(ch);
rePos = pos + i;
} else {
@ -943,6 +936,11 @@ public class TailFile extends AbstractProcessor {
pos = reader.position();
}
if (readFully) {
flushByteArrayOutputStream(baos, out, checksum);
rePos = reader.position();
}
if (rePos < reader.position()) {
getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[]{linesRead, pos, rePos});
reader.position(rePos); // Ensure we can re-read if necessary
@ -952,6 +950,16 @@ public class TailFile extends AbstractProcessor {
}
}
private void flushByteArrayOutputStream(final ByteArrayOutputStream baos, final OutputStream out, final Checksum checksum) throws IOException {
baos.writeTo(out);
final byte[] baosBuffer = baos.toByteArray();
checksum.update(baosBuffer, 0, baos.size());
if (getLogger().isTraceEnabled()) {
getLogger().trace("Checksum updated to {}", checksum.getValue());
}
baos.reset();
}
/**
* Returns a list of all Files that match the following criteria:
*
@ -1127,7 +1135,7 @@ public class TailFile extends AbstractProcessor {
// a file when we stopped running, then that file that we were reading from should be the first file in this list,
// assuming that the file still exists on the file system.
final List<File> rolledOffFiles = getRolledOffFiles(context, timestamp, tailFile);
return recoverRolledFiles(context, session, tailFile, rolledOffFiles, expectedChecksum, timestamp, position);
return recoverRolledFiles(context, session, tailFile, rolledOffFiles, expectedChecksum, position);
} catch (final IOException e) {
getLogger().error("Failed to recover files that have rolled over due to {}", new Object[]{e});
return false;
@ -1147,9 +1155,6 @@ public class TailFile extends AbstractProcessor {
* FlowFile creation and content.
* @param expectedChecksum the checksum value that is expected for the
* oldest file from offset 0 through &lt;position&gt;.
* @param timestamp the latest Last Modfiied Timestamp that has been
* consumed. Any data that was written before this data will not be
* ingested.
* @param position the byte offset in the file being tailed, where tailing
* last left off.
*
@ -1157,7 +1162,7 @@ public class TailFile extends AbstractProcessor {
* otherwise
*/
private boolean recoverRolledFiles(final ProcessContext context, final ProcessSession session, final String tailFile, final List<File> rolledOffFiles, final Long expectedChecksum,
final long timestamp, final long position) {
final long position) {
try {
getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[]{rolledOffFiles.size()});
TailFileObject tfo = states.get(tailFile);
@ -1172,8 +1177,9 @@ public class TailFile extends AbstractProcessor {
final File firstFile = rolledOffFiles.get(0);
final long startNanos = System.nanoTime();
final Boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean();
if (position > 0) {
try (final InputStream fis = new FileInputStream(firstFile);
try (final FileInputStream fis = new FileInputStream(firstFile);
final CheckedInputStream in = new CheckedInputStream(fis, new CRC32())) {
StreamUtils.copy(in, new NullOutputStream(), position);
@ -1184,7 +1190,15 @@ public class TailFile extends AbstractProcessor {
// This is the same file that we were reading when we shutdown. Start reading from this point on.
rolledOffFiles.remove(0);
FlowFile flowFile = session.create();
flowFile = session.importFrom(in, flowFile);
try {
flowFile = session.write(flowFile,
out -> readLines(fis.getChannel(), ByteBuffer.allocate(65536), out, new CRC32(), reReadOnNul, true));
} catch (NulCharacterEncounteredException ncee) {
rolledOffFiles.add(0, firstFile);
session.remove(flowFile);
throw ncee;
}
if (flowFile.getSize() == 0L) {
session.remove(flowFile);
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.

View File

@ -205,6 +205,47 @@ public class TestTailFile {
assertEquals(expected, lines);
}
@Test
public void testNULContentWhenRolledOver() throws IOException {
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE.getValue());
runner.setProperty(TailFile.REREAD_ON_NUL, "true");
// first line fully written, second partially
raf.write("a\nb".getBytes());
// read the first line
runner.run(1, false, true);
// zero bytes and rollover occurs between two runs
raf.write(new byte[] { 0, 0 });
final long originalLastMod = file.lastModified();
final File rolledOverFile = rollover(0);
// this should not pick up the zeros, still one file in the success relationship
runner.run(1, false, false);
runner.assertTransferCount(TailFile.REL_SUCCESS, 1);
// nuls replaced
try (final RandomAccessFile rolledOverRAF = new RandomAccessFile(rolledOverFile, "rw")) {
rolledOverRAF.seek(3);
rolledOverRAF.write("c\n".getBytes());
}
// lastmod reset to the TailFile not to consider this as an updated file (as NFS "nul-replacement" doesn't touch the lastmod timestamp)
rolledOverFile.setLastModified(originalLastMod);
runner.run(1, false, false);
raf.write("d\n".getBytes());
runner.run(1, true, false);
runner.assertTransferCount(TailFile.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS);
List<String> lines = flowFiles.stream().map(MockFlowFile::toByteArray).map(String::new).collect(Collectors.toList());
assertEquals(Arrays.asList("a\n", "bc\n", "d\n"), lines);
}
@Test
public void testRotateMultipleBeforeConsuming() throws IOException {
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
@ -261,10 +302,12 @@ public class TestTailFile {
out.assertContentEquals("6\n");
}
private void rollover(final int index) throws IOException {
private File rollover(final int index) throws IOException {
raf.close();
file.renameTo(new File(file.getParentFile(), file.getName() + "." + index + ".log"));
final File rolledOverFile = new File(file.getParentFile(), file.getName() + "." + index + ".log");
file.renameTo(rolledOverFile);
raf = new RandomAccessFile(file, "rw");
return rolledOverFile;
}