mirror of https://github.com/apache/nifi.git
NIFI-1171: Ensure that we pick up changes when files roll over and ensure that we don't pick up the rolled over file multiple times
This commit is contained in:
parent
8c2323dc8d
commit
2516b1dad2
|
@ -47,8 +47,8 @@ import java.util.zip.Checksum;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
|
@ -71,12 +71,14 @@ import org.apache.nifi.util.LongHolder;
|
||||||
|
|
||||||
// note: it is important that this Processor is not marked as @SupportsBatching because the session commits must complete before persisting state locally; otherwise, data loss may occur
|
// note: it is important that this Processor is not marked as @SupportsBatching because the session commits must complete before persisting state locally; otherwise, data loss may occur
|
||||||
@TriggerSerially
|
@TriggerSerially
|
||||||
|
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
||||||
@Tags({"tail", "file", "log", "text", "source"})
|
@Tags({"tail", "file", "log", "text", "source"})
|
||||||
@CapabilityDescription("\"Tails\" a file, ingesting data from the file as it is written to the file. The file is expected to be textual. Data is ingested only when a "
|
@CapabilityDescription("\"Tails\" a file, ingesting data from the file as it is written to the file. The file is expected to be textual. Data is ingested only when a "
|
||||||
+ "new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically \"rolled over\", as is generally the case "
|
+ "new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically \"rolled over\", as is generally the case "
|
||||||
+ "with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover occurred while NiFi "
|
+ "with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover occurred while NiFi "
|
||||||
+ "was not running (provided that the data still exists upon restart of NiFi).")
|
+ "was not running (provided that the data still exists upon restart of NiFi). It is generally advisable to set the Run Schedule to a few seconds, rather than running "
|
||||||
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
+ "with the default value of 0 secs, as this Processor will consume a lot of resources if scheduled very aggressively. At this time, this Processor does not support "
|
||||||
|
+ "ingesting files that have been compressed when 'rolled over'.")
|
||||||
public class TailFile extends AbstractProcessor {
|
public class TailFile extends AbstractProcessor {
|
||||||
|
|
||||||
static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time",
|
static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time",
|
||||||
|
@ -124,7 +126,6 @@ public class TailFile extends AbstractProcessor {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
private volatile TailFileState state = new TailFileState(null, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
|
private volatile TailFileState state = new TailFileState(null, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
|
||||||
private volatile boolean recoveredRolledFiles = false;
|
|
||||||
private volatile Long expectedRecoveryChecksum;
|
private volatile Long expectedRecoveryChecksum;
|
||||||
private volatile boolean tailFileChanged = false;
|
private volatile boolean tailFileChanged = false;
|
||||||
|
|
||||||
|
@ -147,17 +148,12 @@ public class TailFile extends AbstractProcessor {
|
||||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||||
if (FILENAME.equals(descriptor)) {
|
if (FILENAME.equals(descriptor)) {
|
||||||
state = new TailFileState(newValue, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
|
state = new TailFileState(newValue, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
|
||||||
recoveredRolledFiles = false;
|
|
||||||
tailFileChanged = true;
|
tailFileChanged = true;
|
||||||
} else if (ROLLING_FILENAME_PATTERN.equals(descriptor)) {
|
|
||||||
recoveredRolledFiles = false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnScheduled
|
@OnScheduled
|
||||||
public void recoverState(final ProcessContext context) throws IOException {
|
public void recoverState(final ProcessContext context) throws IOException {
|
||||||
recoveredRolledFiles = false;
|
|
||||||
|
|
||||||
final String tailFilename = context.getProperty(FILENAME).getValue();
|
final String tailFilename = context.getProperty(FILENAME).getValue();
|
||||||
final String stateFilename = context.getProperty(STATE_FILE).getValue();
|
final String stateFilename = context.getProperty(STATE_FILE).getValue();
|
||||||
|
|
||||||
|
@ -172,7 +168,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
|
|
||||||
if (encodingVersion == 0) {
|
if (encodingVersion == 0) {
|
||||||
final String filename = dis.readUTF();
|
final String filename = dis.readUTF();
|
||||||
final long position = dis.readLong();
|
long position = dis.readLong();
|
||||||
final long timestamp = dis.readLong();
|
final long timestamp = dis.readLong();
|
||||||
final boolean checksumPresent = dis.readBoolean();
|
final boolean checksumPresent = dis.readBoolean();
|
||||||
|
|
||||||
|
@ -202,18 +198,25 @@ public class TailFile extends AbstractProcessor {
|
||||||
getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off.");
|
getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off.");
|
||||||
tailFile = existingTailFile;
|
tailFile = existingTailFile;
|
||||||
reader = FileChannel.open(tailFile.toPath(), StandardOpenOption.READ);
|
reader = FileChannel.open(tailFile.toPath(), StandardOpenOption.READ);
|
||||||
getLogger().debug("Created RandomAccessFile {} for {} in recoverState", new Object[] {reader, tailFile});
|
getLogger().debug("Created FileChannel {} for {} in recoverState", new Object[] {reader, tailFile});
|
||||||
|
|
||||||
reader.position(position);
|
reader.position(position);
|
||||||
} else {
|
} else {
|
||||||
|
// we don't seek the reader to the position, so our reader will start at beginning of file.
|
||||||
getLogger().debug("When recovering state, checksum of tailed file does not match the stored checksum. Will begin tailing current file from beginning.");
|
getLogger().debug("When recovering state, checksum of tailed file does not match the stored checksum. Will begin tailing current file from beginning.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// fewer bytes than our position, so we know we weren't already reading from this file. Keep reader at a position of 0.
|
||||||
|
getLogger().debug("When recovering state, existing file to tail is only {} bytes but position flag is {}; "
|
||||||
|
+ "this indicates that the file has rotated. Will begin tailing current file from beginning.", new Object[] {existingTailFile.length(), position});
|
||||||
}
|
}
|
||||||
|
|
||||||
state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, checksum, ByteBuffer.allocate(65536));
|
state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, checksum, ByteBuffer.allocate(65536));
|
||||||
} else {
|
} else {
|
||||||
|
// If filename changed or there is no checksum present, then we have no expected checksum to use for recovery.
|
||||||
expectedRecoveryChecksum = null;
|
expectedRecoveryChecksum = null;
|
||||||
|
|
||||||
// tailing a new file since the state file was written out. We will reset state.
|
// tailing a new file since the state file was written out. We will reset state.
|
||||||
state = new TailFileState(tailFilename, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
|
state = new TailFileState(tailFilename, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
|
||||||
}
|
}
|
||||||
|
@ -234,188 +237,35 @@ public class TailFile extends AbstractProcessor {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final FileChannel channel = state.getReader();
|
final FileChannel reader = state.getReader();
|
||||||
if (channel == null) {
|
if (reader == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
channel.close();
|
reader.close();
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().warn("Failed to close file handle during cleanup");
|
getLogger().warn("Failed to close file handle during cleanup");
|
||||||
}
|
}
|
||||||
|
|
||||||
getLogger().debug("Closed FileChannel {}", new Object[] {channel});
|
getLogger().debug("Closed FileChannel {}", new Object[] {reader});
|
||||||
|
|
||||||
this.state = new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition(), state.getTimestamp(), state.getChecksum(), state.getBuffer());
|
this.state = new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition(), state.getTimestamp(), state.getChecksum(), state.getBuffer());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void persistState(final TailFileState state, final String stateFilename) throws IOException {
|
|
||||||
getLogger().debug("Persisting state {} to {}", new Object[] {state, stateFilename});
|
|
||||||
|
|
||||||
final File stateFile = new File(stateFilename);
|
|
||||||
File directory = stateFile.getParentFile();
|
|
||||||
if (directory != null && !directory.exists() && !directory.mkdirs()) {
|
|
||||||
getLogger().warn("Failed to persist state to {} because the parent directory does not exist and could not be created. This may result in data being duplicated upon restart of NiFi");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try (final FileOutputStream fos = new FileOutputStream(stateFile);
|
|
||||||
final DataOutputStream dos = new DataOutputStream(fos)) {
|
|
||||||
|
|
||||||
dos.writeInt(0); // version
|
|
||||||
dos.writeUTF(state.getFilename());
|
|
||||||
dos.writeLong(state.getPosition());
|
|
||||||
dos.writeLong(state.getTimestamp());
|
|
||||||
if (state.getChecksum() == null) {
|
|
||||||
dos.writeBoolean(false);
|
|
||||||
} else {
|
|
||||||
dos.writeBoolean(true);
|
|
||||||
dos.writeLong(state.getChecksum().getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private FileChannel createReader(final File file, final long position) {
|
|
||||||
final FileChannel reader;
|
|
||||||
|
|
||||||
try {
|
|
||||||
reader = FileChannel.open(file.toPath(), StandardOpenOption.READ);
|
|
||||||
} catch (final IOException ioe) {
|
|
||||||
getLogger().warn("Could not open {} due to {}; will attempt to access file again after the configured Yield Duration has elapsed", new Object[] {file, ioe});
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
getLogger().debug("Created RandomAccessFile {} for {}", new Object[] {reader, file});
|
|
||||||
|
|
||||||
try {
|
|
||||||
reader.position(position);
|
|
||||||
} catch (final IOException ioe) {
|
|
||||||
getLogger().error("Failed to read from {} due to {}", new Object[] {file, ioe});
|
|
||||||
|
|
||||||
try {
|
|
||||||
reader.close();
|
|
||||||
getLogger().debug("Closed RandomAccessFile {}", new Object[] {reader});
|
|
||||||
} catch (final IOException ioe2) {
|
|
||||||
}
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
return reader;
|
|
||||||
}
|
|
||||||
|
|
||||||
// for testing purposes
|
|
||||||
TailFileState getState() {
|
|
||||||
return state;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Finds any files that have rolled over and have not yet been ingested by this Processor. Each of these files that is found will be
|
|
||||||
* ingested as its own FlowFile. If a file is found that has been partially ingested, the rest of the file will be ingested as a
|
|
||||||
* single FlowFile but the data that already has been ingested will not be ingested again.
|
|
||||||
*
|
|
||||||
* @param context the ProcessContext to use in order to obtain Processor configuration
|
|
||||||
* @param session the ProcessSession to use in order to interact with FlowFile creation and content.
|
|
||||||
*/
|
|
||||||
private void recoverRolledFiles(final ProcessContext context, final ProcessSession session) {
|
|
||||||
try {
|
|
||||||
// Find all files that match our rollover pattern, if any, and order them based on their timestamp and filename.
|
|
||||||
// Ignore any file that has a timestamp earlier than the state that we have persisted. If we were reading from
|
|
||||||
// a file when we stopped running, then that file that we were reading from should be the first file in this list,
|
|
||||||
// assuming that the file still exists on the file system.
|
|
||||||
final List<File> rolledOffFiles = getRolledOffFiles(context, state.getTimestamp());
|
|
||||||
getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[] {rolledOffFiles.size()});
|
|
||||||
|
|
||||||
// For first file that we find, it may or may not be the file that we were last reading from.
|
|
||||||
// As a result, we have to read up to the position we stored, while calculating the checksum. If the checksums match,
|
|
||||||
// then we know we've already processed this file. If the checksums do not match, then we have not
|
|
||||||
// processed this file and we need to seek back to position 0 and ingest the entire file.
|
|
||||||
// For all other files that have been rolled over, we need to just ingest the entire file.
|
|
||||||
if (!rolledOffFiles.isEmpty() && expectedRecoveryChecksum != null && rolledOffFiles.get(0).length() >= state.getPosition()) {
|
|
||||||
final File firstFile = rolledOffFiles.get(0);
|
|
||||||
|
|
||||||
final long startNanos = System.nanoTime();
|
|
||||||
try (final InputStream fis = new FileInputStream(firstFile);
|
|
||||||
final CheckedInputStream in = new CheckedInputStream(fis, new CRC32())) {
|
|
||||||
StreamUtils.copy(in, new NullOutputStream(), state.getPosition());
|
|
||||||
|
|
||||||
final long checksumResult = in.getChecksum().getValue();
|
|
||||||
if (checksumResult == expectedRecoveryChecksum) {
|
|
||||||
getLogger().debug("Checksum for {} matched expected checksum. Will skip first {} bytes", new Object[] {firstFile, state.getPosition()});
|
|
||||||
|
|
||||||
// This is the same file that we were reading when we shutdown. Start reading from this point on.
|
|
||||||
rolledOffFiles.remove(0);
|
|
||||||
FlowFile flowFile = session.create();
|
|
||||||
flowFile = session.importFrom(in, flowFile);
|
|
||||||
if (flowFile.getSize() == 0L) {
|
|
||||||
session.remove(flowFile);
|
|
||||||
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
|
|
||||||
cleanup();
|
|
||||||
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer());
|
|
||||||
} else {
|
|
||||||
flowFile = session.putAttribute(flowFile, "filename", firstFile.getName());
|
|
||||||
|
|
||||||
session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + state.getPosition() + " of source file",
|
|
||||||
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
|
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
|
||||||
|
|
||||||
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
|
|
||||||
cleanup();
|
|
||||||
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer());
|
|
||||||
|
|
||||||
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
|
|
||||||
session.commit();
|
|
||||||
persistState(state, context.getProperty(STATE_FILE).getValue());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file",
|
|
||||||
new Object[] {firstFile, checksumResult, expectedRecoveryChecksum});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// For each file that we found that matches our Rollover Pattern, and has a last modified date later than the timestamp
|
|
||||||
// that we recovered from the state file, we need to consume the entire file. The only exception to this is the file that
|
|
||||||
// we were reading when we last stopped, as it may already have been partially consumed. That is taken care of in the
|
|
||||||
// above block of code.
|
|
||||||
for (final File file : rolledOffFiles) {
|
|
||||||
FlowFile flowFile = session.create();
|
|
||||||
flowFile = session.importFrom(file.toPath(), true, flowFile);
|
|
||||||
if (flowFile.getSize() == 0L) {
|
|
||||||
session.remove(flowFile);
|
|
||||||
} else {
|
|
||||||
flowFile = session.putAttribute(flowFile, "filename", file.getName());
|
|
||||||
session.getProvenanceReporter().receive(flowFile, file.toURI().toString());
|
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
|
||||||
|
|
||||||
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
|
|
||||||
cleanup();
|
|
||||||
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, file.lastModified() + 1L, null, state.getBuffer());
|
|
||||||
|
|
||||||
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
|
|
||||||
session.commit();
|
|
||||||
persistState(state, context.getProperty(STATE_FILE).getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (final IOException e) {
|
|
||||||
getLogger().error("Failed to recover files that have rolled over due to {}", new Object[] {e});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
// If this is the first time the processor has run since it was started, we need to check for any files that may have rolled over
|
// If user changes the file that is being tailed, we need to consume the already-rolled-over data according
|
||||||
// while the processor was stopped. If we find any, we need to import them into the flow.
|
// to the Initial Start Position property
|
||||||
if (!recoveredRolledFiles) {
|
boolean rolloverOccurred;
|
||||||
if (tailFileChanged) {
|
if (tailFileChanged) {
|
||||||
|
rolloverOccurred = false;
|
||||||
final String recoverPosition = context.getProperty(START_POSITION).getValue();
|
final String recoverPosition = context.getProperty(START_POSITION).getValue();
|
||||||
|
|
||||||
if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
|
if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
|
||||||
recoverRolledFiles(context, session);
|
recoverRolledFiles(context, session, this.expectedRecoveryChecksum, state.getTimestamp(), state.getPosition());
|
||||||
} else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
|
} else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
|
||||||
cleanup();
|
cleanup();
|
||||||
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 0L, null, state.getBuffer());
|
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 0L, null, state.getBuffer());
|
||||||
|
@ -424,8 +274,8 @@ public class TailFile extends AbstractProcessor {
|
||||||
final File file = new File(filename);
|
final File file = new File(filename);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
|
final FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
|
||||||
getLogger().debug("Created FileChannel {} for {}", new Object[] {channel, file});
|
getLogger().debug("Created FileChannel {} for {}", new Object[] {fileChannel, file});
|
||||||
|
|
||||||
final Checksum checksum = new CRC32();
|
final Checksum checksum = new CRC32();
|
||||||
final long position = file.length();
|
final long position = file.length();
|
||||||
|
@ -436,9 +286,9 @@ public class TailFile extends AbstractProcessor {
|
||||||
StreamUtils.copy(in, new NullOutputStream(), position);
|
StreamUtils.copy(in, new NullOutputStream(), position);
|
||||||
}
|
}
|
||||||
|
|
||||||
channel.position(position);
|
fileChannel.position(position);
|
||||||
cleanup();
|
cleanup();
|
||||||
state = new TailFileState(filename, file, channel, position, timestamp, checksum, state.getBuffer());
|
state = new TailFileState(filename, file, fileChannel, position, timestamp, checksum, state.getBuffer());
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", new Object[] {file, ioe.toString()}, ioe);
|
getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", new Object[] {file, ioe.toString()}, ioe);
|
||||||
context.yield();
|
context.yield();
|
||||||
|
@ -448,10 +298,18 @@ public class TailFile extends AbstractProcessor {
|
||||||
|
|
||||||
tailFileChanged = false;
|
tailFileChanged = false;
|
||||||
} else {
|
} else {
|
||||||
recoverRolledFiles(context, session);
|
// Recover any data that may have rolled over since the last time that this processor ran.
|
||||||
|
// If expectedRecoveryChecksum != null, that indicates that this is the first iteration since processor was started, so use whatever checksum value
|
||||||
|
// was present when the state was last persisted. In this case, we must then null out the value so that the next iteration won't keep using the "recovered"
|
||||||
|
// value. If the value is null, then we know that either the processor has already recovered that data, or there was no state persisted. In either case,
|
||||||
|
// use whatever checksum value is currently in the state.
|
||||||
|
Long expectedChecksumValue = expectedRecoveryChecksum;
|
||||||
|
if (expectedChecksumValue == null) {
|
||||||
|
expectedChecksumValue = state.getChecksum() == null ? null : state.getChecksum().getValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
recoveredRolledFiles = true;
|
rolloverOccurred = recoverRolledFiles(context, session, expectedChecksumValue, state.getTimestamp(), state.getPosition());
|
||||||
|
expectedRecoveryChecksum = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize local variables from state object; this is done so that we can easily change the values throughout
|
// initialize local variables from state object; this is done so that we can easily change the values throughout
|
||||||
|
@ -479,57 +337,11 @@ public class TailFile extends AbstractProcessor {
|
||||||
final long startNanos = System.nanoTime();
|
final long startNanos = System.nanoTime();
|
||||||
|
|
||||||
// Check if file has rotated
|
// Check if file has rotated
|
||||||
long fileLength = file.length();
|
if (rolloverOccurred) {
|
||||||
final long lastModified = file.lastModified();
|
|
||||||
if (fileLength < position) {
|
|
||||||
// File has rotated. It's possible that it rotated before we finished reading all of the data. As a result, we need
|
|
||||||
// to check the last rolled-over file and see if it is longer than our position. If so, consume the data past our
|
|
||||||
// marked position.
|
|
||||||
try {
|
|
||||||
final List<File> updatedRolledOverFiles = getRolledOffFiles(context, timestamp);
|
|
||||||
getLogger().debug("Tailed file has rotated. Total number of rolled off files to check for un-consumed modifications: {}", new Object[] {updatedRolledOverFiles.size()});
|
|
||||||
|
|
||||||
if (!updatedRolledOverFiles.isEmpty()) {
|
|
||||||
final File lastRolledOver = updatedRolledOverFiles.get(updatedRolledOverFiles.size() - 1);
|
|
||||||
|
|
||||||
// there is more data in the file that has not yet been consumed.
|
|
||||||
if (lastRolledOver.length() > state.getPosition()) {
|
|
||||||
getLogger().debug("Last rolled over file {} is larger than our last position; will consume data from it after offset {}", new Object[] {lastRolledOver, state.getPosition()});
|
|
||||||
|
|
||||||
try (final FileInputStream fis = new FileInputStream(lastRolledOver)) {
|
|
||||||
StreamUtils.skip(fis, state.getPosition());
|
|
||||||
|
|
||||||
FlowFile flowFile = session.create();
|
|
||||||
flowFile = session.importFrom(fis, flowFile);
|
|
||||||
if (flowFile.getSize() == 0) {
|
|
||||||
session.remove(flowFile);
|
|
||||||
} else {
|
|
||||||
flowFile = session.putAttribute(flowFile, "filename", lastRolledOver.getName());
|
|
||||||
|
|
||||||
session.getProvenanceReporter().receive(flowFile, lastRolledOver.toURI().toString(), "FlowFile contains bytes " + state.getPosition() + " through " +
|
|
||||||
lastRolledOver.length() + " of source file", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
|
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
|
||||||
this.state = state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, lastRolledOver.lastModified() + 1L, null, state.getBuffer());
|
|
||||||
|
|
||||||
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
|
|
||||||
session.commit();
|
|
||||||
persistState(state, context.getProperty(STATE_FILE).getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
getLogger().debug("Last rolled over file {} is not larger than our last position; will not consume data from it", new Object[] {lastRolledOver});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (final IOException ioe) {
|
|
||||||
getLogger().error("File being tailed was rolled over. However, was unable to determine which \"Rollover Files\" exist or read the last one due to {}. "
|
|
||||||
+ "It is possible that data at the end of the last file will be skipped as a result.", new Object[] {ioe});
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Since file has rotated, we close the reader, create a new one, and then reset our state.
|
// Since file has rotated, we close the reader, create a new one, and then reset our state.
|
||||||
try {
|
try {
|
||||||
reader.close();
|
reader.close();
|
||||||
getLogger().debug("Closed RandomAccessFile {}", new Object[] {reader, reader});
|
getLogger().debug("Closed FileChannel {}", new Object[] {reader, reader});
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().warn("Failed to close reader for {} due to {}", new Object[] {file, ioe});
|
getLogger().warn("Failed to close reader for {} due to {}", new Object[] {file, ioe});
|
||||||
}
|
}
|
||||||
|
@ -537,32 +349,22 @@ public class TailFile extends AbstractProcessor {
|
||||||
reader = createReader(file, 0L);
|
reader = createReader(file, 0L);
|
||||||
position = 0L;
|
position = 0L;
|
||||||
checksum.reset();
|
checksum.reset();
|
||||||
fileLength = file.length();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if there is any data to consume by checking if file has grown or last modified timestamp has changed.
|
if (file.length() == position) {
|
||||||
boolean consumeData = false;
|
// no data to consume so rather than continually running, yield to allow other processors to use the thread.
|
||||||
if (fileLength > position) {
|
// In this case, the state should not have changed, and we will have created no FlowFiles, so we don't have to
|
||||||
consumeData = true;
|
// persist the state or commit the session; instead, just return here.
|
||||||
} else if (lastModified > timestamp) {
|
getLogger().debug("No data to consume; created no FlowFiles");
|
||||||
// This can happen if file is truncated, or is replaced with the same amount of data as the old file.
|
state = this.state = new TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, timestamp, checksum, state.getBuffer());
|
||||||
position = 0;
|
persistState(state, context);
|
||||||
|
|
||||||
try {
|
|
||||||
reader.position(0L);
|
|
||||||
} catch (final IOException ioe) {
|
|
||||||
getLogger().error("Failed to seek to beginning of file due to {}", new Object[] {ioe});
|
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
consumeData = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If there is data to consume, read as much as we can.
|
// If there is data to consume, read as much as we can.
|
||||||
final TailFileState currentState = state;
|
final TailFileState currentState = state;
|
||||||
final Checksum chksum = checksum;
|
final Checksum chksum = checksum;
|
||||||
if (consumeData) {
|
|
||||||
// data has been written to file. Stream it to a new FlowFile.
|
// data has been written to file. Stream it to a new FlowFile.
|
||||||
FlowFile flowFile = session.create();
|
FlowFile flowFile = session.create();
|
||||||
|
|
||||||
|
@ -601,40 +403,31 @@ public class TailFile extends AbstractProcessor {
|
||||||
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
|
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
position = positionHolder.get();
|
position = positionHolder.get();
|
||||||
timestamp = lastModified;
|
|
||||||
|
// Set timestamp to the latest of when the file was modified and the current timestamp stored in the state.
|
||||||
|
// We do this because when we read a file that has been rolled over, we set the state to 1 millisecond later than the last mod date
|
||||||
|
// in order to avoid ingesting that file again. If we then read from this file during the same second (or millisecond, depending on the
|
||||||
|
// operating system file last mod precision), then we could set the timestamp to a smaller value, which could result in reading in the
|
||||||
|
// rotated file a second time.
|
||||||
|
timestamp = Math.max(state.getTimestamp(), file.lastModified());
|
||||||
getLogger().debug("Created {} and routed to success", new Object[] {flowFile});
|
getLogger().debug("Created {} and routed to success", new Object[] {flowFile});
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Create a new state object to represent our current position, timestamp, etc.
|
// Create a new state object to represent our current position, timestamp, etc.
|
||||||
final TailFileState updatedState = new TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, timestamp, checksum, state.getBuffer());
|
final TailFileState updatedState = new TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, timestamp, checksum, state.getBuffer());
|
||||||
this.state = updatedState;
|
this.state = updatedState;
|
||||||
|
|
||||||
if (!consumeData) {
|
|
||||||
// no data to consume so rather than continually running, yield to allow other processors to use the thread.
|
|
||||||
// In this case, the state should not have changed, and we will have created no FlowFiles, so we don't have to
|
|
||||||
// persist the state or commit the session; instead, just return here.
|
|
||||||
getLogger().debug("No data to consume; created no FlowFiles");
|
|
||||||
context.yield();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// We must commit session before persisting state in order to avoid data loss on restart
|
// We must commit session before persisting state in order to avoid data loss on restart
|
||||||
session.commit();
|
session.commit();
|
||||||
final String stateFilename = context.getProperty(STATE_FILE).getValue();
|
persistState(updatedState, context);
|
||||||
try {
|
|
||||||
persistState(updatedState, stateFilename);
|
|
||||||
} catch (final IOException e) {
|
|
||||||
getLogger().warn("Failed to update state file {} due to {}; some data may be duplicated on restart of NiFi", new Object[] {stateFilename, e});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read new lines from the given RandomAccessFile, copying it to the given Output Stream. The Checksum is used in order to later determine whether or not
|
* Read new lines from the given FileChannel, copying it to the given Output Stream. The Checksum is used in order to later determine whether or not
|
||||||
* data has been consumed.
|
* data has been consumed.
|
||||||
*
|
*
|
||||||
* @param reader The RandomAccessFile to read data from
|
* @param reader The FileChannel to read data from
|
||||||
* @param buffer the buffer to use for copying data
|
* @param buffer the buffer to use for copying data
|
||||||
* @param out the OutputStream to copy the data to
|
* @param out the OutputStream to copy the data to
|
||||||
* @param checksum the Checksum object to use in order to calculate checksum for recovery purposes
|
* @param checksum the Checksum object to use in order to calculate checksum for recovery purposes
|
||||||
|
@ -643,6 +436,8 @@ public class TailFile extends AbstractProcessor {
|
||||||
* @throws java.io.IOException if an I/O error occurs.
|
* @throws java.io.IOException if an I/O error occurs.
|
||||||
*/
|
*/
|
||||||
private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum) throws IOException {
|
private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum) throws IOException {
|
||||||
|
getLogger().debug("Reading lines starting at position {}", new Object[] {reader.position()});
|
||||||
|
|
||||||
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
||||||
long pos = reader.position();
|
long pos = reader.position();
|
||||||
long rePos = pos; // position to re-read
|
long rePos = pos; // position to re-read
|
||||||
|
@ -651,11 +446,12 @@ public class TailFile extends AbstractProcessor {
|
||||||
int linesRead = 0;
|
int linesRead = 0;
|
||||||
boolean seenCR = false;
|
boolean seenCR = false;
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
|
|
||||||
while (((num = reader.read(buffer)) != -1)) {
|
while (((num = reader.read(buffer)) != -1)) {
|
||||||
buffer.flip();
|
buffer.flip();
|
||||||
|
|
||||||
for (int i = 0; i < num; i++) {
|
for (int i = 0; i < num; i++) {
|
||||||
byte ch = buffer.get();
|
byte ch = buffer.get(i);
|
||||||
|
|
||||||
switch (ch) {
|
switch (ch) {
|
||||||
case '\n':
|
case '\n':
|
||||||
|
@ -697,9 +493,11 @@ public class TailFile extends AbstractProcessor {
|
||||||
pos = reader.position();
|
pos = reader.position();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (rePos < reader.position()) {
|
||||||
getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[] {linesRead, pos, rePos});
|
getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[] {linesRead, pos, rePos});
|
||||||
reader.position(rePos);
|
reader.position(rePos); // Ensure we can re-read if necessary
|
||||||
buffer.clear();
|
}
|
||||||
|
|
||||||
return rePos;
|
return rePos;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -773,25 +571,239 @@ public class TailFile extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
private void persistState(final TailFileState state, final ProcessContext context) {
|
||||||
|
final String stateFilename = context.getProperty(STATE_FILE).getValue();
|
||||||
|
try {
|
||||||
|
persistState(state, stateFilename);
|
||||||
|
} catch (final IOException e) {
|
||||||
|
getLogger().warn("Failed to update state file {} due to {}; some data may be duplicated on restart of NiFi", new Object[] {stateFilename, e});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void persistState(final TailFileState state, final String stateFilename) throws IOException {
|
||||||
|
getLogger().debug("Persisting state {} to {}", new Object[] {state, stateFilename});
|
||||||
|
|
||||||
|
final File stateFile = new File(stateFilename);
|
||||||
|
File directory = stateFile.getParentFile();
|
||||||
|
if (directory != null && !directory.exists() && !directory.mkdirs()) {
|
||||||
|
getLogger().warn("Failed to persist state to {} because the parent directory does not exist and could not be created. This may result in data being duplicated upon restart of NiFi");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try (final FileOutputStream fos = new FileOutputStream(stateFile);
|
||||||
|
final DataOutputStream dos = new DataOutputStream(fos)) {
|
||||||
|
|
||||||
|
dos.writeInt(0); // version
|
||||||
|
dos.writeUTF(state.getFilename());
|
||||||
|
dos.writeLong(state.getPosition());
|
||||||
|
dos.writeLong(state.getTimestamp());
|
||||||
|
if (state.getChecksum() == null) {
|
||||||
|
dos.writeBoolean(false);
|
||||||
|
} else {
|
||||||
|
dos.writeBoolean(true);
|
||||||
|
dos.writeLong(state.getChecksum().getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private FileChannel createReader(final File file, final long position) {
|
||||||
|
final FileChannel reader;
|
||||||
|
|
||||||
|
try {
|
||||||
|
reader = FileChannel.open(file.toPath(), StandardOpenOption.READ);
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
getLogger().warn("Unable to open file {}; will attempt to access file again after the configured Yield Duration has elapsed: {}", new Object[] {file, ioe});
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
getLogger().debug("Created FileChannel {} for {}", new Object[] {reader, file});
|
||||||
|
|
||||||
|
try {
|
||||||
|
reader.position(position);
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
getLogger().error("Failed to read from {} due to {}", new Object[] {file, ioe});
|
||||||
|
|
||||||
|
try {
|
||||||
|
reader.close();
|
||||||
|
getLogger().debug("Closed FileChannel {}", new Object[] {reader});
|
||||||
|
} catch (final IOException ioe2) {
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return reader;
|
||||||
|
}
|
||||||
|
|
||||||
|
// for testing purposes
|
||||||
|
TailFileState getState() {
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A simple Java class to hold information about our state so that we can maintain this state across multiple invocations of the Processor.
|
* Finds any files that have rolled over and have not yet been ingested by this Processor. Each of these files that is found will be
|
||||||
|
* ingested as its own FlowFile. If a file is found that has been partially ingested, the rest of the file will be ingested as a
|
||||||
|
* single FlowFile but the data that already has been ingested will not be ingested again.
|
||||||
*
|
*
|
||||||
* We use a FileChannel to read from the file, rather than a BufferedInputStream, etc. because we want to be able to read any amount of data
|
* @param context the ProcessContext to use in order to obtain Processor configuration.
|
||||||
* and then reposition the reader if we need to, as a result of a line not being terminated (i.e., no new-line).
|
* @param session the ProcessSession to use in order to interact with FlowFile creation and content.
|
||||||
|
* @param expectedChecksum the checksum value that is expected for the oldest file from offset 0 through <position>.
|
||||||
|
* @param timestamp the latest Last Modified Timestamp that has been consumed. Any data that was written before this data will not be ingested.
|
||||||
|
* @param position the byte offset in the file being tailed, where tailing last left off.
|
||||||
|
*
|
||||||
|
* @return <code>true</code> if the file being tailed has rolled over, <code>false</code> otherwise
|
||||||
|
*/
|
||||||
|
private boolean recoverRolledFiles(final ProcessContext context, final ProcessSession session, final Long expectedChecksum, final long timestamp, final long position) {
|
||||||
|
try {
|
||||||
|
// Find all files that match our rollover pattern, if any, and order them based on their timestamp and filename.
|
||||||
|
// Ignore any file that has a timestamp earlier than the state that we have persisted. If we were reading from
|
||||||
|
// a file when we stopped running, then that file that we were reading from should be the first file in this list,
|
||||||
|
// assuming that the file still exists on the file system.
|
||||||
|
final List<File> rolledOffFiles = getRolledOffFiles(context, timestamp);
|
||||||
|
return recoverRolledFiles(context, session, rolledOffFiles, expectedChecksum, timestamp, position);
|
||||||
|
} catch (final IOException e) {
|
||||||
|
getLogger().error("Failed to recover files that have rolled over due to {}", new Object[] {e});
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds any files that have rolled over and have not yet been ingested by this Processor. Each of these files that is found will be
|
||||||
|
* ingested as its own FlowFile. If a file is found that has been partially ingested, the rest of the file will be ingested as a
|
||||||
|
* single FlowFile but the data that already has been ingested will not be ingested again.
|
||||||
|
*
|
||||||
|
* @param context the ProcessContext to use in order to obtain Processor configuration.
|
||||||
|
* @param session the ProcessSession to use in order to interact with FlowFile creation and content.
|
||||||
|
* @param expectedChecksum the checksum value that is expected for the oldest file from offset 0 through <position>.
|
||||||
|
* @param timestamp the latest Last Modfiied Timestamp that has been consumed. Any data that was written before this data will not be ingested.
|
||||||
|
* @param position the byte offset in the file being tailed, where tailing last left off.
|
||||||
|
*
|
||||||
|
* @return <code>true</code> if the file being tailed has rolled over, false otherwise
|
||||||
|
*/
|
||||||
|
private boolean recoverRolledFiles(final ProcessContext context, final ProcessSession session, final List<File> rolledOffFiles, final Long expectedChecksum,
|
||||||
|
final long timestamp, final long position) {
|
||||||
|
try {
|
||||||
|
getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[] {rolledOffFiles.size()});
|
||||||
|
|
||||||
|
// For first file that we find, it may or may not be the file that we were last reading from.
|
||||||
|
// As a result, we have to read up to the position we stored, while calculating the checksum. If the checksums match,
|
||||||
|
// then we know we've already processed this file. If the checksums do not match, then we have not
|
||||||
|
// processed this file and we need to seek back to position 0 and ingest the entire file.
|
||||||
|
// For all other files that have been rolled over, we need to just ingest the entire file.
|
||||||
|
boolean rolloverOccurred = !rolledOffFiles.isEmpty();
|
||||||
|
if (rolloverOccurred && expectedChecksum != null && rolledOffFiles.get(0).length() >= position) {
|
||||||
|
final File firstFile = rolledOffFiles.get(0);
|
||||||
|
|
||||||
|
final long startNanos = System.nanoTime();
|
||||||
|
if (position > 0) {
|
||||||
|
try (final InputStream fis = new FileInputStream(firstFile);
|
||||||
|
final CheckedInputStream in = new CheckedInputStream(fis, new CRC32())) {
|
||||||
|
StreamUtils.copy(in, new NullOutputStream(), position);
|
||||||
|
|
||||||
|
final long checksumResult = in.getChecksum().getValue();
|
||||||
|
if (checksumResult == expectedChecksum) {
|
||||||
|
getLogger().debug("Checksum for {} matched expected checksum. Will skip first {} bytes", new Object[] {firstFile, position});
|
||||||
|
|
||||||
|
// This is the same file that we were reading when we shutdown. Start reading from this point on.
|
||||||
|
rolledOffFiles.remove(0);
|
||||||
|
FlowFile flowFile = session.create();
|
||||||
|
flowFile = session.importFrom(in, flowFile);
|
||||||
|
if (flowFile.getSize() == 0L) {
|
||||||
|
session.remove(flowFile);
|
||||||
|
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
|
||||||
|
cleanup();
|
||||||
|
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer());
|
||||||
|
} else {
|
||||||
|
flowFile = session.putAttribute(flowFile, "filename", firstFile.getName());
|
||||||
|
|
||||||
|
session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + position + " of source file",
|
||||||
|
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
|
||||||
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
getLogger().debug("Created {} from rolled over file {} and routed to success", new Object[] {flowFile, firstFile});
|
||||||
|
|
||||||
|
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
|
||||||
|
cleanup();
|
||||||
|
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer());
|
||||||
|
|
||||||
|
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
|
||||||
|
session.commit();
|
||||||
|
persistState(state, context.getProperty(STATE_FILE).getValue());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file",
|
||||||
|
new Object[] {firstFile, checksumResult, expectedChecksum});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// For each file that we found that matches our Rollover Pattern, and has a last modified date later than the timestamp
|
||||||
|
// that we recovered from the state file, we need to consume the entire file. The only exception to this is the file that
|
||||||
|
// we were reading when we last stopped, as it may already have been partially consumed. That is taken care of in the
|
||||||
|
// above block of code.
|
||||||
|
for (final File file : rolledOffFiles) {
|
||||||
|
state = consumeFileFully(file, context, session, state);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rolloverOccurred;
|
||||||
|
} catch (final IOException e) {
|
||||||
|
getLogger().error("Failed to recover files that have rolled over due to {}", new Object[] {e});
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new FlowFile that contains the entire contents of the given file and transfers that FlowFile to success. This method
|
||||||
|
* will commit the given session and emit an appropriate Provenance Event.
|
||||||
|
*
|
||||||
|
* @param file the file to ingest
|
||||||
|
* @param context the ProcessContext
|
||||||
|
* @param session the ProcessSession
|
||||||
|
* @param state the current state
|
||||||
|
*
|
||||||
|
* @return the new, updated state that reflects that the given file has been ingested.
|
||||||
|
*/
|
||||||
|
private TailFileState consumeFileFully(final File file, final ProcessContext context, final ProcessSession session, TailFileState state) {
|
||||||
|
FlowFile flowFile = session.create();
|
||||||
|
flowFile = session.importFrom(file.toPath(), true, flowFile);
|
||||||
|
if (flowFile.getSize() == 0L) {
|
||||||
|
session.remove(flowFile);
|
||||||
|
} else {
|
||||||
|
flowFile = session.putAttribute(flowFile, "filename", file.getName());
|
||||||
|
session.getProvenanceReporter().receive(flowFile, file.toURI().toString());
|
||||||
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
getLogger().debug("Created {} from {} and routed to success", new Object[] {flowFile, file});
|
||||||
|
|
||||||
|
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
|
||||||
|
cleanup();
|
||||||
|
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, file.lastModified() + 1L, null, state.getBuffer());
|
||||||
|
|
||||||
|
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
|
||||||
|
session.commit();
|
||||||
|
persistState(state, context);
|
||||||
|
}
|
||||||
|
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A simple Java class to hold information about our state so that we can maintain this state across multiple invocations of the Processor
|
||||||
*/
|
*/
|
||||||
static class TailFileState {
|
static class TailFileState {
|
||||||
private final String filename; // hold onto filename and not just File because we want to match that against the user-defined filename to recover from
|
private final String filename; // hold onto filename and not just File because we want to match that against the user-defined filename to recover from
|
||||||
private final File file;
|
private final File file;
|
||||||
private final FileChannel fileChannel;
|
private final FileChannel reader;
|
||||||
private final long position;
|
private final long position;
|
||||||
private final long timestamp;
|
private final long timestamp;
|
||||||
private final Checksum checksum;
|
private final Checksum checksum;
|
||||||
private final ByteBuffer buffer;
|
private final ByteBuffer buffer;
|
||||||
|
|
||||||
public TailFileState(final String filename, final File file, final FileChannel fileChannel, final long position, final long timestamp, final Checksum checksum, final ByteBuffer buffer) {
|
public TailFileState(final String filename, final File file, final FileChannel reader, final long position, final long timestamp, final Checksum checksum, final ByteBuffer buffer) {
|
||||||
this.filename = filename;
|
this.filename = filename;
|
||||||
this.file = file;
|
this.file = file;
|
||||||
this.fileChannel = fileChannel;
|
this.reader = reader;
|
||||||
this.position = position;
|
this.position = position;
|
||||||
this.timestamp = timestamp; // many operating systems will use only second-level precision for last-modified times so cut off milliseconds
|
this.timestamp = timestamp; // many operating systems will use only second-level precision for last-modified times so cut off milliseconds
|
||||||
this.checksum = checksum;
|
this.checksum = checksum;
|
||||||
|
@ -807,7 +819,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
public FileChannel getReader() {
|
public FileChannel getReader() {
|
||||||
return fileChannel;
|
return reader;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getPosition() {
|
public long getPosition() {
|
||||||
|
|
|
@ -95,12 +95,14 @@ public class TestTailFile {
|
||||||
runner.run();
|
runner.run();
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
|
||||||
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
|
||||||
|
System.out.println("Ingested 6 bytes");
|
||||||
runner.clearTransferState();
|
runner.clearTransferState();
|
||||||
|
|
||||||
// roll over the file
|
// roll over the file
|
||||||
raf.close();
|
raf.close();
|
||||||
file.renameTo(new File(file.getParentFile(), file.getName() + ".previous"));
|
file.renameTo(new File(file.getParentFile(), file.getName() + ".previous"));
|
||||||
raf = new RandomAccessFile(file, "rw");
|
raf = new RandomAccessFile(file, "rw");
|
||||||
|
System.out.println("Rolled over file to " + file.getName() + ".previous");
|
||||||
|
|
||||||
// truncate file
|
// truncate file
|
||||||
raf.setLength(0L);
|
raf.setLength(0L);
|
||||||
|
@ -111,6 +113,7 @@ public class TestTailFile {
|
||||||
Thread.sleep(1000L); // we need to wait at least one second because of the granularity of timestamps on many file systems.
|
Thread.sleep(1000L); // we need to wait at least one second because of the granularity of timestamps on many file systems.
|
||||||
raf.write("HELLO\n".getBytes());
|
raf.write("HELLO\n".getBytes());
|
||||||
|
|
||||||
|
System.out.println("Wrote out 6 bytes to tailed file");
|
||||||
runner.run();
|
runner.run();
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
|
||||||
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("HELLO\n");
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("HELLO\n");
|
||||||
|
@ -119,6 +122,7 @@ public class TestTailFile {
|
||||||
@Test
|
@Test
|
||||||
public void testConsumeAfterTruncationStartAtCurrentTime() throws IOException, InterruptedException {
|
public void testConsumeAfterTruncationStartAtCurrentTime() throws IOException, InterruptedException {
|
||||||
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_TIME.getValue());
|
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_TIME.getValue());
|
||||||
|
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
|
||||||
runner.run();
|
runner.run();
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
|
||||||
|
@ -129,7 +133,10 @@ public class TestTailFile {
|
||||||
runner.clearTransferState();
|
runner.clearTransferState();
|
||||||
|
|
||||||
// truncate and then write same number of bytes
|
// truncate and then write same number of bytes
|
||||||
raf.setLength(0L);
|
raf.close();
|
||||||
|
assertTrue(file.renameTo(new File("target/log.txt.1")));
|
||||||
|
raf = new RandomAccessFile(file, "rw");
|
||||||
|
|
||||||
runner.run();
|
runner.run();
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
|
||||||
|
@ -157,7 +164,7 @@ public class TestTailFile {
|
||||||
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_TIME.getValue());
|
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_TIME.getValue());
|
||||||
|
|
||||||
raf.write("hello world\n".getBytes());
|
raf.write("hello world\n".getBytes());
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000L);
|
||||||
runner.run(100);
|
runner.run(100);
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
}
|
}
|
||||||
|
@ -225,7 +232,7 @@ public class TestTailFile {
|
||||||
public void testRemainderOfFileRecoveredIfRolledOverWhileRunning() throws IOException {
|
public void testRemainderOfFileRecoveredIfRolledOverWhileRunning() throws IOException {
|
||||||
// this mimics the case when we are reading a log file that rolls over while processor is running.
|
// this mimics the case when we are reading a log file that rolls over while processor is running.
|
||||||
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log*.txt");
|
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log*.txt");
|
||||||
runner.run(1, false, false);
|
runner.run(1, false, true);
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
|
||||||
raf.write("hello\n".getBytes());
|
raf.write("hello\n".getBytes());
|
||||||
|
@ -236,13 +243,11 @@ public class TestTailFile {
|
||||||
|
|
||||||
raf.write("world".getBytes());
|
raf.write("world".getBytes());
|
||||||
raf.close();
|
raf.close();
|
||||||
|
file.renameTo(new File("target/log1.txt"));
|
||||||
processor.cleanup(); // Need to do this for Windows because otherwise we cannot rename the file because we have the file open still in the same process.
|
|
||||||
assertTrue(file.renameTo(new File("target/log1.txt")));
|
|
||||||
|
|
||||||
raf = new RandomAccessFile(new File("target/log.txt"), "rw");
|
raf = new RandomAccessFile(new File("target/log.txt"), "rw");
|
||||||
raf.write("1\n".getBytes());
|
raf.write("1\n".getBytes());
|
||||||
runner.run(1, false, false);
|
runner.run(1, true, false);
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
|
||||||
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
|
||||||
|
@ -258,7 +263,7 @@ public class TestTailFile {
|
||||||
|
|
||||||
// this mimics the case when we are reading a log file that rolls over while processor is running.
|
// this mimics the case when we are reading a log file that rolls over while processor is running.
|
||||||
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
|
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
|
||||||
runner.run(1, false, false);
|
runner.run(1, false, true);
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
|
||||||
raf.write("hello\n".getBytes());
|
raf.write("hello\n".getBytes());
|
||||||
|
@ -279,7 +284,7 @@ public class TestTailFile {
|
||||||
|
|
||||||
raf = new RandomAccessFile(new File("target/log.txt"), "rw");
|
raf = new RandomAccessFile(new File("target/log.txt"), "rw");
|
||||||
raf.write("1\n".getBytes());
|
raf.write("1\n".getBytes());
|
||||||
runner.run(1, false, false);
|
runner.run(1);
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
|
||||||
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
|
||||||
|
@ -291,7 +296,7 @@ public class TestTailFile {
|
||||||
public void testMultipleRolloversAfterHavingReadAllData() throws IOException, InterruptedException {
|
public void testMultipleRolloversAfterHavingReadAllData() throws IOException, InterruptedException {
|
||||||
// this mimics the case when we are reading a log file that rolls over while processor is running.
|
// this mimics the case when we are reading a log file that rolls over while processor is running.
|
||||||
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
|
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
|
||||||
runner.run(1, false, false);
|
runner.run(1, false, true);
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
|
||||||
raf.write("hello\n".getBytes());
|
raf.write("hello\n".getBytes());
|
||||||
|
@ -312,8 +317,6 @@ public class TestTailFile {
|
||||||
// write to a new file.
|
// write to a new file.
|
||||||
file = new File("target/log.txt");
|
file = new File("target/log.txt");
|
||||||
raf = new RandomAccessFile(file, "rw");
|
raf = new RandomAccessFile(file, "rw");
|
||||||
|
|
||||||
Thread.sleep(1000L);
|
|
||||||
raf.write("abc\n".getBytes());
|
raf.write("abc\n".getBytes());
|
||||||
|
|
||||||
// rename file to log.1
|
// rename file to log.1
|
||||||
|
@ -335,11 +338,105 @@ public class TestTailFile {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleRolloversAfterHavingReadAllDataWhileStillRunning() throws IOException, InterruptedException {
|
||||||
|
// this mimics the case when we are reading a log file that rolls over while processor is running.
|
||||||
|
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
|
||||||
|
runner.run(1, false, true);
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
|
||||||
|
raf.write("hello\n".getBytes());
|
||||||
|
runner.run(1, false, false);
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
|
||||||
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
|
||||||
|
runner.clearTransferState();
|
||||||
|
|
||||||
|
raf.write("world".getBytes());
|
||||||
|
runner.run(1, false, false); // ensure that we've read 'world' but not consumed it into a flowfile.
|
||||||
|
|
||||||
|
Thread.sleep(1000L);
|
||||||
|
|
||||||
|
// rename file to log.2
|
||||||
|
raf.close();
|
||||||
|
file.renameTo(new File("target/log.2"));
|
||||||
|
|
||||||
|
// write to a new file.
|
||||||
|
file = new File("target/log.txt");
|
||||||
|
raf = new RandomAccessFile(file, "rw");
|
||||||
|
raf.write("abc\n".getBytes());
|
||||||
|
|
||||||
|
// rename file to log.1
|
||||||
|
raf.close();
|
||||||
|
file.renameTo(new File("target/log.1"));
|
||||||
|
|
||||||
|
// write to a new file.
|
||||||
|
file = new File("target/log.txt");
|
||||||
|
raf = new RandomAccessFile(file, "rw");
|
||||||
|
raf.write("1\n".getBytes());
|
||||||
|
raf.close();
|
||||||
|
|
||||||
|
runner.run(1, true, false); // perform shutdown but do not perform initialization because last iteration didn't shutdown.
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 3);
|
||||||
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
|
||||||
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("abc\n");
|
||||||
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("1\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleRolloversWithLongerFileLength() throws IOException, InterruptedException {
|
||||||
|
// this mimics the case when we are reading a log file that rolls over while processor is running.
|
||||||
|
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
|
||||||
|
runner.run(1, false, true);
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
|
||||||
|
raf.write("hello\n".getBytes());
|
||||||
|
runner.run(1, false, false);
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
|
||||||
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
|
||||||
|
runner.clearTransferState();
|
||||||
|
|
||||||
|
raf.write("world".getBytes());
|
||||||
|
|
||||||
|
// rename file to log.2
|
||||||
|
raf.close();
|
||||||
|
file.renameTo(new File("target/log.2"));
|
||||||
|
|
||||||
|
Thread.sleep(1200L);
|
||||||
|
|
||||||
|
// write to a new file.
|
||||||
|
file = new File("target/log.txt");
|
||||||
|
raf = new RandomAccessFile(file, "rw");
|
||||||
|
raf.write("abc\n".getBytes());
|
||||||
|
|
||||||
|
// rename file to log.1
|
||||||
|
raf.close();
|
||||||
|
file.renameTo(new File("target/log.1"));
|
||||||
|
Thread.sleep(1200L);
|
||||||
|
|
||||||
|
// write to a new file.
|
||||||
|
file = new File("target/log.txt");
|
||||||
|
raf = new RandomAccessFile(file, "rw");
|
||||||
|
raf.write("This is a longer line than the other files had.\n".getBytes());
|
||||||
|
raf.close();
|
||||||
|
|
||||||
|
runner.run(1, true, false); // perform shutdown but do not perform initialization because last iteration didn't shutdown.
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 3);
|
||||||
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
|
||||||
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("abc\n");
|
||||||
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("This is a longer line than the other files had.\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConsumeWhenNewLineFound() throws IOException, InterruptedException {
|
public void testConsumeWhenNewLineFound() throws IOException, InterruptedException {
|
||||||
runner.run();
|
runner.run();
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
|
||||||
|
Thread.sleep(1100L);
|
||||||
|
|
||||||
raf.write("Hello, World".getBytes());
|
raf.write("Hello, World".getBytes());
|
||||||
runner.run();
|
runner.run();
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
@ -377,7 +474,39 @@ public class TestTailFile {
|
||||||
runner.run();
|
runner.run();
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
|
||||||
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("return\r\r\n");
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("return\r\r\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRolloverAndUpdateAtSameTime() throws IOException {
|
||||||
|
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
|
||||||
|
|
||||||
|
// write out some data and ingest it.
|
||||||
|
raf.write("hello there\n".getBytes());
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
|
||||||
|
runner.clearTransferState();
|
||||||
|
|
||||||
|
// roll the file over and write data to the new log.txt file.
|
||||||
|
raf.write("another".getBytes());
|
||||||
|
raf.close();
|
||||||
|
file.renameTo(new File("target/log.1"));
|
||||||
|
raf = new RandomAccessFile(file, "rw");
|
||||||
|
raf.write("new file\n".getBytes());
|
||||||
|
|
||||||
|
// Run the processor. We should get 2 files because we should get the rest of what was
|
||||||
|
// written to log.txt before it rolled, and then we should get some data from the new log.txt.
|
||||||
|
runner.run(1, false, true);
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
|
||||||
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("another");
|
||||||
|
|
||||||
|
// If we run again, we should get nothing.
|
||||||
|
// We did have an issue where we were recognizing the previously rolled over file again because the timestamps
|
||||||
|
// were still the same (second-level precision on many file systems). As a result, we verified the checksum of the
|
||||||
|
// already-rolled file against the checksum of the new file and they didn't match, so we ingested the entire rolled
|
||||||
|
// file as well as the new file again. Instead, we should ingest nothing!
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.run(1, true, false);
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue