diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java index dfe489ea41..4d7e6f5360 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java @@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard; import java.io.BufferedOutputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -60,6 +59,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.NullOutputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.LongHolder; @@ -179,27 +179,38 @@ public class TailFile extends AbstractProcessor { // We have an expected checksum and the currently configured filename is the same as the state file. // We need to check if the existing file is the same as the one referred to in the state file based on // the checksum. + final Checksum checksum = new CRC32(); final File existingTailFile = new File(filename); if (existingTailFile.length() >= position) { try (final InputStream tailFileIs = new FileInputStream(existingTailFile); - final CheckedInputStream in = new CheckedInputStream(tailFileIs, new CRC32())) { + final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) { StreamUtils.copy(in, new NullOutputStream(), state.getPosition()); final long checksumResult = in.getChecksum().getValue(); if (checksumResult == expectedRecoveryChecksum) { + // Checksums match. This means that we want to resume reading from where we left off. + // So we will populate the reader object so that it will be used in onTrigger. If the + // checksums do not match, then we will leave the reader object null, so that the next + // call to onTrigger will result in a new Reader being created and starting at the + // beginning of the file. + getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off."); tailFile = existingTailFile; reader = new RandomAccessFile(tailFile, "r"); reader.seek(position); + } else { + getLogger().debug("When recovering state, checksum of tailed file does not match the stored checksum. Will begin tailing current file from beginning."); } } } - state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, null, new byte[65536]); + state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, checksum, new byte[65536]); } else { expectedRecoveryChecksum = null; // tailing a new file since the state file was written out. We will reset state. state = new TailFileState(tailFilename, null, null, 0L, 0L, null, new byte[65536]); } + + getLogger().debug("Recovered state {}", new Object[] {state}); } else { // encoding Version == -1... no data in file. Just move on. } @@ -209,6 +220,8 @@ public class TailFile extends AbstractProcessor { public void persistState(final TailFileState state, final String stateFilename) throws IOException { + getLogger().debug("Persisting state {} to {}", new Object[] {state, stateFilename}); + final File stateFile = new File(stateFilename); File directory = stateFile.getParentFile(); if (directory != null && !directory.exists() && !directory.mkdirs()) { @@ -279,6 +292,7 @@ public class TailFile extends AbstractProcessor { // a file when we stopped running, then that file that we were reading from should be the first file in this list, // assuming that the file still exists on the file system. final List rolledOffFiles = getRolledOffFiles(context, state.getTimestamp()); + getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[] {rolledOffFiles.size()}); // For first file that we find, it may or may not be the file that we were last reading from. // As a result, we have to read up to the position we stored, while calculating the checksum. If the checksums match, @@ -295,6 +309,8 @@ public class TailFile extends AbstractProcessor { final long checksumResult = in.getChecksum().getValue(); if (checksumResult == expectedRecoveryChecksum) { + getLogger().debug("Checksum for {} matched expected checksum. Will skip first {} bytes", new Object[] {firstFile, state.getPosition()}); + // This is the same file that we were reading when we shutdown. Start reading from this point on. rolledOffFiles.remove(0); FlowFile flowFile = session.create(); @@ -317,6 +333,9 @@ public class TailFile extends AbstractProcessor { session.commit(); persistState(state, context.getProperty(STATE_FILE).getValue()); } + } else { + getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file", + new Object[] {firstFile, checksumResult, expectedRecoveryChecksum}); } } } @@ -425,11 +444,15 @@ public class TailFile extends AbstractProcessor { // marked position. try { final List updatedRolledOverFiles = getRolledOffFiles(context, timestamp); + getLogger().debug("Tailed file has rotated. Total number of rolled off files to check for un-consumed modifications: {}", new Object[] {updatedRolledOverFiles.size()}); + if (!updatedRolledOverFiles.isEmpty()) { final File lastRolledOver = updatedRolledOverFiles.get(updatedRolledOverFiles.size() - 1); // there is more data in the file that has not yet been consumed. if (lastRolledOver.length() > state.getPosition()) { + getLogger().debug("Last rolled over file {} is larger than our last position; will consume data from it after offset {}", new Object[] {lastRolledOver, state.getPosition()}); + try (final FileInputStream fis = new FileInputStream(lastRolledOver)) { StreamUtils.skip(fis, state.getPosition()); @@ -450,6 +473,8 @@ public class TailFile extends AbstractProcessor { persistState(state, context.getProperty(STATE_FILE).getValue()); } } + } else { + getLogger().debug("Last rolled over file {} is not larger than our last position; will not consume data from it", new Object[] {lastRolledOver}); } } } catch (final IOException ioe) { @@ -531,11 +556,11 @@ public class TailFile extends AbstractProcessor { TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos)); session.transfer(flowFile, REL_SUCCESS); position = positionHolder.get(); + timestamp = System.currentTimeMillis(); } } // Create a new state object to represent our current position, timestamp, etc. - timestamp = System.currentTimeMillis(); final TailFileState updatedState = new TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, timestamp, checksum, state.getBuffer()); this.state = updatedState; @@ -571,47 +596,61 @@ public class TailFile extends AbstractProcessor { * @throws java.io.IOException if an I/O error occurs. */ private long readLines(final RandomAccessFile reader, final byte[] buffer, final OutputStream out, final Checksum checksum) throws IOException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - long pos = reader.getFilePointer(); - long rePos = pos; // position to re-read + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + long pos = reader.getFilePointer(); + long rePos = pos; // position to re-read - int num; - boolean seenCR = false; - while (((num = reader.read(buffer)) != -1)) { - for (int i = 0; i < num; i++) { - byte ch = buffer[i]; + int num; + int linesRead = 0; + boolean seenCR = false; + while (((num = reader.read(buffer)) != -1)) { + for (int i = 0; i < num; i++) { + byte ch = buffer[i]; - switch (ch) { - case '\n': - baos.write(ch); - seenCR = false; - baos.writeTo(out); - baos.reset(); - rePos = pos + i + 1; - break; - case '\r': - baos.write(ch); - seenCR = true; - break; - default: - if (seenCR) { + switch (ch) { + case '\n': + baos.write(ch); seenCR = false; baos.writeTo(out); + checksum.update(baos.getUnderlyingBuffer(), 0, baos.size()); + if (getLogger().isTraceEnabled()) { + getLogger().trace("Checksum updated to {}", new Object[] {checksum.getValue()}); + } + baos.reset(); + rePos = pos + i + 1; + linesRead++; + break; + case '\r': baos.write(ch); - rePos = pos + i; - } else { - baos.write(ch); - } + seenCR = true; + break; + default: + if (seenCR) { + seenCR = false; + baos.writeTo(out); + checksum.update(baos.getUnderlyingBuffer(), 0, baos.size()); + if (getLogger().isTraceEnabled()) { + getLogger().trace("Checksum updated to {}", new Object[] {checksum.getValue()}); + } + + linesRead++; + baos.reset(); + baos.write(ch); + rePos = pos + i; + } else { + baos.write(ch); + } + } } + + pos = reader.getFilePointer(); } - checksum.update(buffer, 0, num); - pos = reader.getFilePointer(); + getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[] {linesRead, pos, rePos}); + reader.seek(rePos); // Ensure we can re-read if necessary + return rePos; } - - reader.seek(rePos); // Ensure we can re-read if necessary - return rePos; } @@ -650,7 +689,14 @@ public class TailFile extends AbstractProcessor { final DirectoryStream dirStream = Files.newDirectoryStream(directory.toPath(), rollingPattern); for (final Path path : dirStream) { final File file = path.toFile(); - if (file.lastModified() < minTimestamp || file.equals(tailFile)) { + final long lastMod = file.lastModified(); + + if (file.lastModified() < minTimestamp) { + getLogger().debug("Found rolled off file {} but its last modified timestamp is before the cutoff (Last Mod = {}, Cutoff = {}) so will not consume it", + new Object[] {file, lastMod, minTimestamp}); + + continue; + } else if (file.equals(tailFile)) { continue; } @@ -725,5 +771,10 @@ public class TailFile extends AbstractProcessor { public byte[] getBuffer() { return buffer; } + + @Override + public String toString() { + return "TailFileState[filename=" + filename + ", position=" + position + ", timestamp=" + timestamp + ", checksum=" + (checksum == null ? "null" : checksum.getValue()) + "]"; + } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java index 993f2b16c9..85638ad3af 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java @@ -42,6 +42,8 @@ public class TestTailFile { @Before public void setup() throws IOException { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "TRACE"); + final File targetDir = new File("target"); final File[] files = targetDir.listFiles(new FilenameFilter() { @Override @@ -240,6 +242,89 @@ public class TestTailFile { runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n"); } + @Test + public void testRolloverAfterHavingReadAllData() throws IOException, InterruptedException { + // If we have read all data in a file, and that file does not end with a new-line, then the last line + // in the file will have been read, added to the checksum, and then we would re-seek to "unread" that + // last line since it didn't have a new-line. We need to ensure that if the data is then rolled over + // that our checksum does not take into account those bytes that have been "unread." + + // this mimics the case when we are reading a log file that rolls over while processor is running. + runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*"); + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); + + raf.write("hello\n".getBytes()); + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n"); + runner.clearTransferState(); + + raf.write("world".getBytes()); + + Thread.sleep(1000L); + + runner.run(1); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); // should not pull in data because no \n + + raf.close(); + file.renameTo(new File("target/log.1")); + + raf = new RandomAccessFile(new File("target/log.txt"), "rw"); + raf.write("1\n".getBytes()); + runner.run(1, false, false); + + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world"); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n"); + } + + + @Test + public void testMultipleRolloversAfterHavingReadAllData() throws IOException, InterruptedException { + // this mimics the case when we are reading a log file that rolls over while processor is running. + runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*"); + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); + + raf.write("hello\n".getBytes()); + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n"); + runner.clearTransferState(); + + raf.write("world".getBytes()); + runner.run(1); // ensure that we've read 'world' but not consumed it into a flowfile. + + Thread.sleep(1000L); + + // rename file to log.2 + raf.close(); + file.renameTo(new File("target/log.2")); + + // write to a new file. + file = new File("target/log.txt"); + raf = new RandomAccessFile(file, "rw"); + raf.write("abc\n".getBytes()); + + // rename file to log.1 + raf.close(); + file.renameTo(new File("target/log.1")); + + // write to a new file. + file = new File("target/log.txt"); + raf = new RandomAccessFile(file, "rw"); + raf.write("1\n".getBytes()); + raf.close(); + + runner.run(1); + + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 3); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world"); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("abc\n"); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("1\n"); + } + @Test public void testConsumeWhenNewLineFound() throws IOException, InterruptedException {