NIFI-994: Fixed issue that could result in data duplication if more than 1 rollover of tailed file has occurred on restart of Processor

This commit is contained in:
Mark Payne 2015-11-10 16:45:46 -05:00
parent bfa9e45079
commit 7b9c8df6c5
2 changed files with 172 additions and 36 deletions

View File

@ -18,7 +18,6 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.File; import java.io.File;
@ -60,6 +59,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.NullOutputStream; import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.LongHolder; import org.apache.nifi.util.LongHolder;
@ -179,27 +179,38 @@ public class TailFile extends AbstractProcessor {
// We have an expected checksum and the currently configured filename is the same as the state file. // We have an expected checksum and the currently configured filename is the same as the state file.
// We need to check if the existing file is the same as the one referred to in the state file based on // We need to check if the existing file is the same as the one referred to in the state file based on
// the checksum. // the checksum.
final Checksum checksum = new CRC32();
final File existingTailFile = new File(filename); final File existingTailFile = new File(filename);
if (existingTailFile.length() >= position) { if (existingTailFile.length() >= position) {
try (final InputStream tailFileIs = new FileInputStream(existingTailFile); try (final InputStream tailFileIs = new FileInputStream(existingTailFile);
final CheckedInputStream in = new CheckedInputStream(tailFileIs, new CRC32())) { final CheckedInputStream in = new CheckedInputStream(tailFileIs, checksum)) {
StreamUtils.copy(in, new NullOutputStream(), state.getPosition()); StreamUtils.copy(in, new NullOutputStream(), state.getPosition());
final long checksumResult = in.getChecksum().getValue(); final long checksumResult = in.getChecksum().getValue();
if (checksumResult == expectedRecoveryChecksum) { if (checksumResult == expectedRecoveryChecksum) {
// Checksums match. This means that we want to resume reading from where we left off.
// So we will populate the reader object so that it will be used in onTrigger. If the
// checksums do not match, then we will leave the reader object null, so that the next
// call to onTrigger will result in a new Reader being created and starting at the
// beginning of the file.
getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off.");
tailFile = existingTailFile; tailFile = existingTailFile;
reader = new RandomAccessFile(tailFile, "r"); reader = new RandomAccessFile(tailFile, "r");
reader.seek(position); reader.seek(position);
} else {
getLogger().debug("When recovering state, checksum of tailed file does not match the stored checksum. Will begin tailing current file from beginning.");
} }
} }
} }
state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, null, new byte[65536]); state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, checksum, new byte[65536]);
} else { } else {
expectedRecoveryChecksum = null; expectedRecoveryChecksum = null;
// tailing a new file since the state file was written out. We will reset state. // tailing a new file since the state file was written out. We will reset state.
state = new TailFileState(tailFilename, null, null, 0L, 0L, null, new byte[65536]); state = new TailFileState(tailFilename, null, null, 0L, 0L, null, new byte[65536]);
} }
getLogger().debug("Recovered state {}", new Object[] {state});
} else { } else {
// encoding Version == -1... no data in file. Just move on. // encoding Version == -1... no data in file. Just move on.
} }
@ -209,6 +220,8 @@ public class TailFile extends AbstractProcessor {
public void persistState(final TailFileState state, final String stateFilename) throws IOException { public void persistState(final TailFileState state, final String stateFilename) throws IOException {
getLogger().debug("Persisting state {} to {}", new Object[] {state, stateFilename});
final File stateFile = new File(stateFilename); final File stateFile = new File(stateFilename);
File directory = stateFile.getParentFile(); File directory = stateFile.getParentFile();
if (directory != null && !directory.exists() && !directory.mkdirs()) { if (directory != null && !directory.exists() && !directory.mkdirs()) {
@ -279,6 +292,7 @@ public class TailFile extends AbstractProcessor {
// a file when we stopped running, then that file that we were reading from should be the first file in this list, // a file when we stopped running, then that file that we were reading from should be the first file in this list,
// assuming that the file still exists on the file system. // assuming that the file still exists on the file system.
final List<File> rolledOffFiles = getRolledOffFiles(context, state.getTimestamp()); final List<File> rolledOffFiles = getRolledOffFiles(context, state.getTimestamp());
getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[] {rolledOffFiles.size()});
// For first file that we find, it may or may not be the file that we were last reading from. // For first file that we find, it may or may not be the file that we were last reading from.
// As a result, we have to read up to the position we stored, while calculating the checksum. If the checksums match, // As a result, we have to read up to the position we stored, while calculating the checksum. If the checksums match,
@ -295,6 +309,8 @@ public class TailFile extends AbstractProcessor {
final long checksumResult = in.getChecksum().getValue(); final long checksumResult = in.getChecksum().getValue();
if (checksumResult == expectedRecoveryChecksum) { if (checksumResult == expectedRecoveryChecksum) {
getLogger().debug("Checksum for {} matched expected checksum. Will skip first {} bytes", new Object[] {firstFile, state.getPosition()});
// This is the same file that we were reading when we shutdown. Start reading from this point on. // This is the same file that we were reading when we shutdown. Start reading from this point on.
rolledOffFiles.remove(0); rolledOffFiles.remove(0);
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
@ -317,6 +333,9 @@ public class TailFile extends AbstractProcessor {
session.commit(); session.commit();
persistState(state, context.getProperty(STATE_FILE).getValue()); persistState(state, context.getProperty(STATE_FILE).getValue());
} }
} else {
getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file",
new Object[] {firstFile, checksumResult, expectedRecoveryChecksum});
} }
} }
} }
@ -425,11 +444,15 @@ public class TailFile extends AbstractProcessor {
// marked position. // marked position.
try { try {
final List<File> updatedRolledOverFiles = getRolledOffFiles(context, timestamp); final List<File> updatedRolledOverFiles = getRolledOffFiles(context, timestamp);
getLogger().debug("Tailed file has rotated. Total number of rolled off files to check for un-consumed modifications: {}", new Object[] {updatedRolledOverFiles.size()});
if (!updatedRolledOverFiles.isEmpty()) { if (!updatedRolledOverFiles.isEmpty()) {
final File lastRolledOver = updatedRolledOverFiles.get(updatedRolledOverFiles.size() - 1); final File lastRolledOver = updatedRolledOverFiles.get(updatedRolledOverFiles.size() - 1);
// there is more data in the file that has not yet been consumed. // there is more data in the file that has not yet been consumed.
if (lastRolledOver.length() > state.getPosition()) { if (lastRolledOver.length() > state.getPosition()) {
getLogger().debug("Last rolled over file {} is larger than our last position; will consume data from it after offset {}", new Object[] {lastRolledOver, state.getPosition()});
try (final FileInputStream fis = new FileInputStream(lastRolledOver)) { try (final FileInputStream fis = new FileInputStream(lastRolledOver)) {
StreamUtils.skip(fis, state.getPosition()); StreamUtils.skip(fis, state.getPosition());
@ -450,6 +473,8 @@ public class TailFile extends AbstractProcessor {
persistState(state, context.getProperty(STATE_FILE).getValue()); persistState(state, context.getProperty(STATE_FILE).getValue());
} }
} }
} else {
getLogger().debug("Last rolled over file {} is not larger than our last position; will not consume data from it", new Object[] {lastRolledOver});
} }
} }
} catch (final IOException ioe) { } catch (final IOException ioe) {
@ -531,11 +556,11 @@ public class TailFile extends AbstractProcessor {
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos)); TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
position = positionHolder.get(); position = positionHolder.get();
timestamp = System.currentTimeMillis();
} }
} }
// Create a new state object to represent our current position, timestamp, etc. // Create a new state object to represent our current position, timestamp, etc.
timestamp = System.currentTimeMillis();
final TailFileState updatedState = new TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, timestamp, checksum, state.getBuffer()); final TailFileState updatedState = new TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, timestamp, checksum, state.getBuffer());
this.state = updatedState; this.state = updatedState;
@ -571,47 +596,61 @@ public class TailFile extends AbstractProcessor {
* @throws java.io.IOException if an I/O error occurs. * @throws java.io.IOException if an I/O error occurs.
*/ */
private long readLines(final RandomAccessFile reader, final byte[] buffer, final OutputStream out, final Checksum checksum) throws IOException { private long readLines(final RandomAccessFile reader, final byte[] buffer, final OutputStream out, final Checksum checksum) throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
long pos = reader.getFilePointer(); long pos = reader.getFilePointer();
long rePos = pos; // position to re-read long rePos = pos; // position to re-read
int num; int num;
boolean seenCR = false; int linesRead = 0;
while (((num = reader.read(buffer)) != -1)) { boolean seenCR = false;
for (int i = 0; i < num; i++) { while (((num = reader.read(buffer)) != -1)) {
byte ch = buffer[i]; for (int i = 0; i < num; i++) {
byte ch = buffer[i];
switch (ch) { switch (ch) {
case '\n': case '\n':
baos.write(ch); baos.write(ch);
seenCR = false;
baos.writeTo(out);
baos.reset();
rePos = pos + i + 1;
break;
case '\r':
baos.write(ch);
seenCR = true;
break;
default:
if (seenCR) {
seenCR = false; seenCR = false;
baos.writeTo(out); baos.writeTo(out);
checksum.update(baos.getUnderlyingBuffer(), 0, baos.size());
if (getLogger().isTraceEnabled()) {
getLogger().trace("Checksum updated to {}", new Object[] {checksum.getValue()});
}
baos.reset(); baos.reset();
rePos = pos + i + 1;
linesRead++;
break;
case '\r':
baos.write(ch); baos.write(ch);
rePos = pos + i; seenCR = true;
} else { break;
baos.write(ch); default:
} if (seenCR) {
seenCR = false;
baos.writeTo(out);
checksum.update(baos.getUnderlyingBuffer(), 0, baos.size());
if (getLogger().isTraceEnabled()) {
getLogger().trace("Checksum updated to {}", new Object[] {checksum.getValue()});
}
linesRead++;
baos.reset();
baos.write(ch);
rePos = pos + i;
} else {
baos.write(ch);
}
}
} }
pos = reader.getFilePointer();
} }
checksum.update(buffer, 0, num); getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[] {linesRead, pos, rePos});
pos = reader.getFilePointer(); reader.seek(rePos); // Ensure we can re-read if necessary
return rePos;
} }
reader.seek(rePos); // Ensure we can re-read if necessary
return rePos;
} }
@ -650,7 +689,14 @@ public class TailFile extends AbstractProcessor {
final DirectoryStream<Path> dirStream = Files.newDirectoryStream(directory.toPath(), rollingPattern); final DirectoryStream<Path> dirStream = Files.newDirectoryStream(directory.toPath(), rollingPattern);
for (final Path path : dirStream) { for (final Path path : dirStream) {
final File file = path.toFile(); final File file = path.toFile();
if (file.lastModified() < minTimestamp || file.equals(tailFile)) { final long lastMod = file.lastModified();
if (file.lastModified() < minTimestamp) {
getLogger().debug("Found rolled off file {} but its last modified timestamp is before the cutoff (Last Mod = {}, Cutoff = {}) so will not consume it",
new Object[] {file, lastMod, minTimestamp});
continue;
} else if (file.equals(tailFile)) {
continue; continue;
} }
@ -725,5 +771,10 @@ public class TailFile extends AbstractProcessor {
public byte[] getBuffer() { public byte[] getBuffer() {
return buffer; return buffer;
} }
@Override
public String toString() {
return "TailFileState[filename=" + filename + ", position=" + position + ", timestamp=" + timestamp + ", checksum=" + (checksum == null ? "null" : checksum.getValue()) + "]";
}
} }
} }

View File

@ -42,6 +42,8 @@ public class TestTailFile {
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "TRACE");
final File targetDir = new File("target"); final File targetDir = new File("target");
final File[] files = targetDir.listFiles(new FilenameFilter() { final File[] files = targetDir.listFiles(new FilenameFilter() {
@Override @Override
@ -240,6 +242,89 @@ public class TestTailFile {
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n"); runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n");
} }
@Test
public void testRolloverAfterHavingReadAllData() throws IOException, InterruptedException {
// If we have read all data in a file, and that file does not end with a new-line, then the last line
// in the file will have been read, added to the checksum, and then we would re-seek to "unread" that
// last line since it didn't have a new-line. We need to ensure that if the data is then rolled over
// that our checksum does not take into account those bytes that have been "unread."
// this mimics the case when we are reading a log file that rolls over while processor is running.
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
raf.write("hello\n".getBytes());
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
runner.clearTransferState();
raf.write("world".getBytes());
Thread.sleep(1000L);
runner.run(1);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); // should not pull in data because no \n
raf.close();
file.renameTo(new File("target/log.1"));
raf = new RandomAccessFile(new File("target/log.txt"), "rw");
raf.write("1\n".getBytes());
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n");
}
@Test
public void testMultipleRolloversAfterHavingReadAllData() throws IOException, InterruptedException {
// this mimics the case when we are reading a log file that rolls over while processor is running.
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
raf.write("hello\n".getBytes());
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
runner.clearTransferState();
raf.write("world".getBytes());
runner.run(1); // ensure that we've read 'world' but not consumed it into a flowfile.
Thread.sleep(1000L);
// rename file to log.2
raf.close();
file.renameTo(new File("target/log.2"));
// write to a new file.
file = new File("target/log.txt");
raf = new RandomAccessFile(file, "rw");
raf.write("abc\n".getBytes());
// rename file to log.1
raf.close();
file.renameTo(new File("target/log.1"));
// write to a new file.
file = new File("target/log.txt");
raf = new RandomAccessFile(file, "rw");
raf.write("1\n".getBytes());
raf.close();
runner.run(1);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 3);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("abc\n");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("1\n");
}
@Test @Test
public void testConsumeWhenNewLineFound() throws IOException, InterruptedException { public void testConsumeWhenNewLineFound() throws IOException, InterruptedException {