diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java index 684e3ccfa0..1c32c00447 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java @@ -27,10 +27,12 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -50,6 +52,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; @@ -120,7 +123,7 @@ public class TailFile extends AbstractProcessor { .description("All FlowFiles are routed to this Relationship.") .build(); - private volatile TailFileState state = new TailFileState(null, null, null, 0L, 0L, null, new byte[65536]); + private volatile TailFileState state = new TailFileState(null, null, null, 0L, 0L, null, ByteBuffer.allocate(65536)); private volatile boolean recoveredRolledFiles = false; private volatile Long expectedRecoveryChecksum; private volatile boolean tailFileChanged = false; @@ -143,7 +146,7 @@ public class TailFile extends AbstractProcessor { @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { if (FILENAME.equals(descriptor)) { - state = new TailFileState(newValue, null, null, 0L, 0L, null, new byte[65536]); + state = new TailFileState(newValue, null, null, 0L, 0L, null, ByteBuffer.allocate(65536)); recoveredRolledFiles = false; tailFileChanged = true; } else if (ROLLING_FILENAME_PATTERN.equals(descriptor)) { @@ -173,7 +176,7 @@ public class TailFile extends AbstractProcessor { final long timestamp = dis.readLong(); final boolean checksumPresent = dis.readBoolean(); - RandomAccessFile reader = null; + FileChannel reader = null; File tailFile = null; if (checksumPresent && tailFilename.equals(filename)) { @@ -198,19 +201,21 @@ public class TailFile extends AbstractProcessor { // beginning of the file. getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off."); tailFile = existingTailFile; - reader = new RandomAccessFile(tailFile, "r"); - reader.seek(position); + reader = FileChannel.open(tailFile.toPath(), StandardOpenOption.READ); + getLogger().debug("Created RandomAccessFile {} for {} in recoverState", new Object[] {reader, tailFile}); + + reader.position(position); } else { getLogger().debug("When recovering state, checksum of tailed file does not match the stored checksum. Will begin tailing current file from beginning."); } } } - state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, checksum, new byte[65536]); + state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, checksum, ByteBuffer.allocate(65536)); } else { expectedRecoveryChecksum = null; // tailing a new file since the state file was written out. We will reset state. - state = new TailFileState(tailFilename, null, null, 0L, 0L, null, new byte[65536]); + state = new TailFileState(tailFilename, null, null, 0L, 0L, null, ByteBuffer.allocate(65536)); } getLogger().debug("Recovered state {}", new Object[] {state}); @@ -222,6 +227,30 @@ public class TailFile extends AbstractProcessor { } + @OnStopped + public void cleanup() { + final TailFileState state = this.state; + if (state == null) { + return; + } + + final FileChannel channel = state.getReader(); + if (channel == null) { + return; + } + + try { + channel.close(); + } catch (final IOException ioe) { + getLogger().warn("Failed to close file handle during cleanup"); + } + + getLogger().debug("Closed FileChannel {}", new Object[] {channel}); + + this.state = new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition(), state.getTimestamp(), state.getChecksum(), state.getBuffer()); + } + + public void persistState(final TailFileState state, final String stateFilename) throws IOException { getLogger().debug("Persisting state {} to {}", new Object[] {state, stateFilename}); @@ -247,23 +276,26 @@ public class TailFile extends AbstractProcessor { } } - private RandomAccessFile createReader(final File file, final long position) { - final RandomAccessFile reader; + private FileChannel createReader(final File file, final long position) { + final FileChannel reader; try { - reader = new RandomAccessFile(file, "r"); - } catch (final FileNotFoundException fnfe) { - getLogger().warn("File {} does not exist; will attempt to access file again after the configured Yield Duration has elapsed", new Object[] {file}); + reader = FileChannel.open(file.toPath(), StandardOpenOption.READ); + } catch (final IOException ioe) { + getLogger().warn("Could not open {} due to {}; will attempt to access file again after the configured Yield Duration has elapsed", new Object[] {file, ioe}); return null; } + getLogger().debug("Created RandomAccessFile {} for {}", new Object[] {reader, file}); + try { - reader.seek(position); + reader.position(position); } catch (final IOException ioe) { getLogger().error("Failed to read from {} due to {}", new Object[] {file, ioe}); try { reader.close(); + getLogger().debug("Closed RandomAccessFile {}", new Object[] {reader}); } catch (final IOException ioe2) { } @@ -321,6 +353,7 @@ public class TailFile extends AbstractProcessor { if (flowFile.getSize() == 0L) { session.remove(flowFile); // use a timestamp of lastModified() + 1 so that we do not ingest this file again. + cleanup(); state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer()); } else { flowFile = session.putAttribute(flowFile, "filename", firstFile.getName()); @@ -330,6 +363,7 @@ public class TailFile extends AbstractProcessor { session.transfer(flowFile, REL_SUCCESS); // use a timestamp of lastModified() + 1 so that we do not ingest this file again. + cleanup(); state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer()); // must ensure that we do session.commit() before persisting state in order to avoid data loss. @@ -358,6 +392,7 @@ public class TailFile extends AbstractProcessor { session.transfer(flowFile, REL_SUCCESS); // use a timestamp of lastModified() + 1 so that we do not ingest this file again. + cleanup(); state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, file.lastModified() + 1L, null, state.getBuffer()); // must ensure that we do session.commit() before persisting state in order to avoid data loss. @@ -382,13 +417,16 @@ public class TailFile extends AbstractProcessor { if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) { recoverRolledFiles(context, session); } else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) { + cleanup(); state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 0L, null, state.getBuffer()); } else { final String filename = context.getProperty(FILENAME).getValue(); final File file = new File(filename); try { - final RandomAccessFile raf = new RandomAccessFile(file, "r"); + final FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ); + getLogger().debug("Created FileChannel {} for {}", new Object[] {channel, file}); + final Checksum checksum = new CRC32(); final long position = file.length(); final long timestamp = file.lastModified(); @@ -398,8 +436,9 @@ public class TailFile extends AbstractProcessor { StreamUtils.copy(in, new NullOutputStream(), position); } - raf.seek(position); - state = new TailFileState(filename, file, raf, position, timestamp, checksum, state.getBuffer()); + channel.position(position); + cleanup(); + state = new TailFileState(filename, file, channel, position, timestamp, checksum, state.getBuffer()); } catch (final IOException ioe) { getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", new Object[] {file, ioe.toString()}, ioe); context.yield(); @@ -419,7 +458,7 @@ public class TailFile extends AbstractProcessor { // the onTrigger method and then create a new state object after we finish processing the files. TailFileState state = this.state; File file = state.getFile(); - RandomAccessFile reader = state.getReader(); + FileChannel reader = state.getReader(); Checksum checksum = state.getChecksum(); if (checksum == null) { checksum = new CRC32(); @@ -441,6 +480,7 @@ public class TailFile extends AbstractProcessor { // Check if file has rotated long fileLength = file.length(); + final long lastModified = file.lastModified(); if (fileLength < position) { // File has rotated. It's possible that it rotated before we finished reading all of the data. As a result, we need // to check the last rolled-over file and see if it is longer than our position. If so, consume the data past our @@ -489,6 +529,7 @@ public class TailFile extends AbstractProcessor { // Since file has rotated, we close the reader, create a new one, and then reset our state. try { reader.close(); + getLogger().debug("Closed RandomAccessFile {}", new Object[] {reader, reader}); } catch (final IOException ioe) { getLogger().warn("Failed to close reader for {} due to {}", new Object[] {file, ioe}); } @@ -503,12 +544,12 @@ public class TailFile extends AbstractProcessor { boolean consumeData = false; if (fileLength > position) { consumeData = true; - } else if (file.lastModified() > timestamp) { + } else if (lastModified > timestamp) { // This can happen if file is truncated, or is replaced with the same amount of data as the old file. position = 0; try { - reader.seek(0L); + reader.position(0L); } catch (final IOException ioe) { getLogger().error("Failed to seek to beginning of file due to {}", new Object[] {ioe}); context.yield(); @@ -525,7 +566,7 @@ public class TailFile extends AbstractProcessor { // data has been written to file. Stream it to a new FlowFile. FlowFile flowFile = session.create(); - final RandomAccessFile fileReader = reader; + final FileChannel fileReader = reader; final LongHolder positionHolder = new LongHolder(position); flowFile = session.write(flowFile, new OutputStreamCallback() { @Override @@ -539,6 +580,7 @@ public class TailFile extends AbstractProcessor { // If there ended up being no data, just remove the FlowFile if (flowFile.getSize() == 0) { session.remove(flowFile); + getLogger().debug("No data to consume; removed created FlowFile"); } else { // determine filename for FlowFile by using .-. final String tailFilename = file.getName(); @@ -559,7 +601,8 @@ public class TailFile extends AbstractProcessor { TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos)); session.transfer(flowFile, REL_SUCCESS); position = positionHolder.get(); - timestamp = System.currentTimeMillis(); + timestamp = lastModified; + getLogger().debug("Created {} and routed to success", new Object[] {flowFile}); } } @@ -571,6 +614,7 @@ public class TailFile extends AbstractProcessor { // no data to consume so rather than continually running, yield to allow other processors to use the thread. // In this case, the state should not have changed, and we will have created no FlowFiles, so we don't have to // persist the state or commit the session; instead, just return here. + getLogger().debug("No data to consume; created no FlowFiles"); context.yield(); return; } @@ -598,17 +642,20 @@ public class TailFile extends AbstractProcessor { * @return The new position after the lines have been read * @throws java.io.IOException if an I/O error occurs. */ - private long readLines(final RandomAccessFile reader, final byte[] buffer, final OutputStream out, final Checksum checksum) throws IOException { + private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum) throws IOException { try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - long pos = reader.getFilePointer(); + long pos = reader.position(); long rePos = pos; // position to re-read int num; int linesRead = 0; boolean seenCR = false; + buffer.clear(); while (((num = reader.read(buffer)) != -1)) { + buffer.flip(); + for (int i = 0; i < num; i++) { - byte ch = buffer[i]; + byte ch = buffer.get(); switch (ch) { case '\n': @@ -647,11 +694,12 @@ public class TailFile extends AbstractProcessor { } } - pos = reader.getFilePointer(); + pos = reader.position(); } getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[] {linesRead, pos, rePos}); - reader.seek(rePos); // Ensure we can re-read if necessary + reader.position(rePos); + buffer.clear(); return rePos; } } @@ -726,23 +774,26 @@ public class TailFile extends AbstractProcessor { /** - * A simple Java class to hold information about our state so that we can maintain this state across multiple invocations of the Processor + * A simple Java class to hold information about our state so that we can maintain this state across multiple invocations of the Processor. + * + * We use a FileChannel to read from the file, rather than a BufferedInputStream, etc. because we want to be able to read any amount of data + * and then reposition the reader if we need to, as a result of a line not being terminated (i.e., no new-line). */ static class TailFileState { private final String filename; // hold onto filename and not just File because we want to match that against the user-defined filename to recover from private final File file; - private final RandomAccessFile raf; + private final FileChannel fileChannel; private final long position; private final long timestamp; private final Checksum checksum; - private final byte[] buffer; + private final ByteBuffer buffer; - public TailFileState(final String filename, final File file, final RandomAccessFile raf, final long position, final long timestamp, final Checksum checksum, final byte[] buffer) { + public TailFileState(final String filename, final File file, final FileChannel fileChannel, final long position, final long timestamp, final Checksum checksum, final ByteBuffer buffer) { this.filename = filename; this.file = file; - this.raf = raf; + this.fileChannel = fileChannel; this.position = position; - this.timestamp = (timestamp / 1000) * 1000; // many operating systems will use only second-level precision for last-modified times so cut off milliseconds + this.timestamp = timestamp; // many operating systems will use only second-level precision for last-modified times so cut off milliseconds this.checksum = checksum; this.buffer = buffer; } @@ -755,8 +806,8 @@ public class TailFile extends AbstractProcessor { return file; } - public RandomAccessFile getReader() { - return raf; + public FileChannel getReader() { + return fileChannel; } public long getPosition() { @@ -771,7 +822,7 @@ public class TailFile extends AbstractProcessor { return checksum; } - public byte[] getBuffer() { + public ByteBuffer getBuffer() { return buffer; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java index 85638ad3af..1445d889ae 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java @@ -37,6 +37,7 @@ import org.junit.Test; public class TestTailFile { private File file; + private TailFile processor; private RandomAccessFile raf; private TestRunner runner; @@ -64,7 +65,8 @@ public class TestTailFile { stateFile.delete(); Assert.assertFalse(stateFile.exists()); - runner = TestRunners.newTestRunner(new TailFile()); + processor = new TailFile(); + runner = TestRunners.newTestRunner(processor); runner.setProperty(TailFile.FILENAME, "target/log.txt"); runner.setProperty(TailFile.STATE_FILE, "target/tail-file.state"); runner.assertValid(); @@ -77,6 +79,8 @@ public class TestTailFile { if (raf != null) { raf.close(); } + + processor.cleanup(); } @@ -153,6 +157,7 @@ public class TestTailFile { runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_TIME.getValue()); raf.write("hello world\n".getBytes()); + Thread.sleep(1000); runner.run(100); runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); } @@ -231,7 +236,9 @@ public class TestTailFile { raf.write("world".getBytes()); raf.close(); - file.renameTo(new File("target/log1.txt")); + + processor.cleanup(); // Need to do this for Windows because otherwise we cannot rename the file because we have the file open still in the same process. + assertTrue(file.renameTo(new File("target/log1.txt"))); raf = new RandomAccessFile(new File("target/log.txt"), "rw"); raf.write("1\n".getBytes()); @@ -255,7 +262,7 @@ public class TestTailFile { runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); raf.write("hello\n".getBytes()); - runner.run(1, false, false); + runner.run(1, true, false); runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1); runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n"); runner.clearTransferState(); @@ -288,7 +295,7 @@ public class TestTailFile { runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); raf.write("hello\n".getBytes()); - runner.run(1, false, false); + runner.run(1, true, false); runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1); runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n"); runner.clearTransferState(); @@ -305,6 +312,8 @@ public class TestTailFile { // write to a new file. file = new File("target/log.txt"); raf = new RandomAccessFile(file, "rw"); + + Thread.sleep(1000L); raf.write("abc\n".getBytes()); // rename file to log.1 @@ -331,9 +340,6 @@ public class TestTailFile { runner.run(); runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); - final long start = System.currentTimeMillis(); - Thread.sleep(1100L); - raf.write("Hello, World".getBytes()); runner.run(); runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); @@ -346,7 +352,6 @@ public class TestTailFile { assertNotNull(state); assertEquals("target/log.txt", state.getFilename()); assertTrue(state.getTimestamp() <= System.currentTimeMillis()); - assertTrue(state.getTimestamp() >= start); assertEquals(14, state.getPosition()); runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("Hello, World\r\n");