mirror of https://github.com/apache/nifi.git
NIFI-1165: Use FileChannel instead of RandomAccessFile in order to avoid locking files in Windows
Reviewed by Tony Kurc (tkurc@apache.org)
This commit is contained in:
parent
1e5cc070a3
commit
e862f7ff03
|
@ -27,10 +27,12 @@ import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.RandomAccessFile;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.FileChannel;
|
||||||
import java.nio.file.DirectoryStream;
|
import java.nio.file.DirectoryStream;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.StandardOpenOption;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
@ -50,6 +52,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||||
import org.apache.nifi.components.AllowableValue;
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
@ -120,7 +123,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
.description("All FlowFiles are routed to this Relationship.")
|
.description("All FlowFiles are routed to this Relationship.")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
private volatile TailFileState state = new TailFileState(null, null, null, 0L, 0L, null, new byte[65536]);
|
private volatile TailFileState state = new TailFileState(null, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
|
||||||
private volatile boolean recoveredRolledFiles = false;
|
private volatile boolean recoveredRolledFiles = false;
|
||||||
private volatile Long expectedRecoveryChecksum;
|
private volatile Long expectedRecoveryChecksum;
|
||||||
private volatile boolean tailFileChanged = false;
|
private volatile boolean tailFileChanged = false;
|
||||||
|
@ -143,7 +146,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
@Override
|
@Override
|
||||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||||
if (FILENAME.equals(descriptor)) {
|
if (FILENAME.equals(descriptor)) {
|
||||||
state = new TailFileState(newValue, null, null, 0L, 0L, null, new byte[65536]);
|
state = new TailFileState(newValue, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
|
||||||
recoveredRolledFiles = false;
|
recoveredRolledFiles = false;
|
||||||
tailFileChanged = true;
|
tailFileChanged = true;
|
||||||
} else if (ROLLING_FILENAME_PATTERN.equals(descriptor)) {
|
} else if (ROLLING_FILENAME_PATTERN.equals(descriptor)) {
|
||||||
|
@ -173,7 +176,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
final long timestamp = dis.readLong();
|
final long timestamp = dis.readLong();
|
||||||
final boolean checksumPresent = dis.readBoolean();
|
final boolean checksumPresent = dis.readBoolean();
|
||||||
|
|
||||||
RandomAccessFile reader = null;
|
FileChannel reader = null;
|
||||||
File tailFile = null;
|
File tailFile = null;
|
||||||
|
|
||||||
if (checksumPresent && tailFilename.equals(filename)) {
|
if (checksumPresent && tailFilename.equals(filename)) {
|
||||||
|
@ -198,19 +201,21 @@ public class TailFile extends AbstractProcessor {
|
||||||
// beginning of the file.
|
// beginning of the file.
|
||||||
getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off.");
|
getLogger().debug("When recovering state, checksum of tailed file matches the stored checksum. Will resume where left off.");
|
||||||
tailFile = existingTailFile;
|
tailFile = existingTailFile;
|
||||||
reader = new RandomAccessFile(tailFile, "r");
|
reader = FileChannel.open(tailFile.toPath(), StandardOpenOption.READ);
|
||||||
reader.seek(position);
|
getLogger().debug("Created RandomAccessFile {} for {} in recoverState", new Object[] {reader, tailFile});
|
||||||
|
|
||||||
|
reader.position(position);
|
||||||
} else {
|
} else {
|
||||||
getLogger().debug("When recovering state, checksum of tailed file does not match the stored checksum. Will begin tailing current file from beginning.");
|
getLogger().debug("When recovering state, checksum of tailed file does not match the stored checksum. Will begin tailing current file from beginning.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, checksum, new byte[65536]);
|
state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, checksum, ByteBuffer.allocate(65536));
|
||||||
} else {
|
} else {
|
||||||
expectedRecoveryChecksum = null;
|
expectedRecoveryChecksum = null;
|
||||||
// tailing a new file since the state file was written out. We will reset state.
|
// tailing a new file since the state file was written out. We will reset state.
|
||||||
state = new TailFileState(tailFilename, null, null, 0L, 0L, null, new byte[65536]);
|
state = new TailFileState(tailFilename, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
|
||||||
}
|
}
|
||||||
|
|
||||||
getLogger().debug("Recovered state {}", new Object[] {state});
|
getLogger().debug("Recovered state {}", new Object[] {state});
|
||||||
|
@ -222,6 +227,30 @@ public class TailFile extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@OnStopped
|
||||||
|
public void cleanup() {
|
||||||
|
final TailFileState state = this.state;
|
||||||
|
if (state == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final FileChannel channel = state.getReader();
|
||||||
|
if (channel == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
channel.close();
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
getLogger().warn("Failed to close file handle during cleanup");
|
||||||
|
}
|
||||||
|
|
||||||
|
getLogger().debug("Closed FileChannel {}", new Object[] {channel});
|
||||||
|
|
||||||
|
this.state = new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition(), state.getTimestamp(), state.getChecksum(), state.getBuffer());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public void persistState(final TailFileState state, final String stateFilename) throws IOException {
|
public void persistState(final TailFileState state, final String stateFilename) throws IOException {
|
||||||
getLogger().debug("Persisting state {} to {}", new Object[] {state, stateFilename});
|
getLogger().debug("Persisting state {} to {}", new Object[] {state, stateFilename});
|
||||||
|
|
||||||
|
@ -247,23 +276,26 @@ public class TailFile extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private RandomAccessFile createReader(final File file, final long position) {
|
private FileChannel createReader(final File file, final long position) {
|
||||||
final RandomAccessFile reader;
|
final FileChannel reader;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
reader = new RandomAccessFile(file, "r");
|
reader = FileChannel.open(file.toPath(), StandardOpenOption.READ);
|
||||||
} catch (final FileNotFoundException fnfe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().warn("File {} does not exist; will attempt to access file again after the configured Yield Duration has elapsed", new Object[] {file});
|
getLogger().warn("Could not open {} due to {}; will attempt to access file again after the configured Yield Duration has elapsed", new Object[] {file, ioe});
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getLogger().debug("Created RandomAccessFile {} for {}", new Object[] {reader, file});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
reader.seek(position);
|
reader.position(position);
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().error("Failed to read from {} due to {}", new Object[] {file, ioe});
|
getLogger().error("Failed to read from {} due to {}", new Object[] {file, ioe});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
reader.close();
|
reader.close();
|
||||||
|
getLogger().debug("Closed RandomAccessFile {}", new Object[] {reader});
|
||||||
} catch (final IOException ioe2) {
|
} catch (final IOException ioe2) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -321,6 +353,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
if (flowFile.getSize() == 0L) {
|
if (flowFile.getSize() == 0L) {
|
||||||
session.remove(flowFile);
|
session.remove(flowFile);
|
||||||
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
|
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
|
||||||
|
cleanup();
|
||||||
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer());
|
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer());
|
||||||
} else {
|
} else {
|
||||||
flowFile = session.putAttribute(flowFile, "filename", firstFile.getName());
|
flowFile = session.putAttribute(flowFile, "filename", firstFile.getName());
|
||||||
|
@ -330,6 +363,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
|
||||||
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
|
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
|
||||||
|
cleanup();
|
||||||
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer());
|
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer());
|
||||||
|
|
||||||
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
|
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
|
||||||
|
@ -358,6 +392,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
|
||||||
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
|
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
|
||||||
|
cleanup();
|
||||||
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, file.lastModified() + 1L, null, state.getBuffer());
|
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, file.lastModified() + 1L, null, state.getBuffer());
|
||||||
|
|
||||||
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
|
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
|
||||||
|
@ -382,13 +417,16 @@ public class TailFile extends AbstractProcessor {
|
||||||
if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
|
if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
|
||||||
recoverRolledFiles(context, session);
|
recoverRolledFiles(context, session);
|
||||||
} else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
|
} else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
|
||||||
|
cleanup();
|
||||||
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 0L, null, state.getBuffer());
|
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 0L, null, state.getBuffer());
|
||||||
} else {
|
} else {
|
||||||
final String filename = context.getProperty(FILENAME).getValue();
|
final String filename = context.getProperty(FILENAME).getValue();
|
||||||
final File file = new File(filename);
|
final File file = new File(filename);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final RandomAccessFile raf = new RandomAccessFile(file, "r");
|
final FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
|
||||||
|
getLogger().debug("Created FileChannel {} for {}", new Object[] {channel, file});
|
||||||
|
|
||||||
final Checksum checksum = new CRC32();
|
final Checksum checksum = new CRC32();
|
||||||
final long position = file.length();
|
final long position = file.length();
|
||||||
final long timestamp = file.lastModified();
|
final long timestamp = file.lastModified();
|
||||||
|
@ -398,8 +436,9 @@ public class TailFile extends AbstractProcessor {
|
||||||
StreamUtils.copy(in, new NullOutputStream(), position);
|
StreamUtils.copy(in, new NullOutputStream(), position);
|
||||||
}
|
}
|
||||||
|
|
||||||
raf.seek(position);
|
channel.position(position);
|
||||||
state = new TailFileState(filename, file, raf, position, timestamp, checksum, state.getBuffer());
|
cleanup();
|
||||||
|
state = new TailFileState(filename, file, channel, position, timestamp, checksum, state.getBuffer());
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", new Object[] {file, ioe.toString()}, ioe);
|
getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", new Object[] {file, ioe.toString()}, ioe);
|
||||||
context.yield();
|
context.yield();
|
||||||
|
@ -419,7 +458,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
// the onTrigger method and then create a new state object after we finish processing the files.
|
// the onTrigger method and then create a new state object after we finish processing the files.
|
||||||
TailFileState state = this.state;
|
TailFileState state = this.state;
|
||||||
File file = state.getFile();
|
File file = state.getFile();
|
||||||
RandomAccessFile reader = state.getReader();
|
FileChannel reader = state.getReader();
|
||||||
Checksum checksum = state.getChecksum();
|
Checksum checksum = state.getChecksum();
|
||||||
if (checksum == null) {
|
if (checksum == null) {
|
||||||
checksum = new CRC32();
|
checksum = new CRC32();
|
||||||
|
@ -441,6 +480,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
|
|
||||||
// Check if file has rotated
|
// Check if file has rotated
|
||||||
long fileLength = file.length();
|
long fileLength = file.length();
|
||||||
|
final long lastModified = file.lastModified();
|
||||||
if (fileLength < position) {
|
if (fileLength < position) {
|
||||||
// File has rotated. It's possible that it rotated before we finished reading all of the data. As a result, we need
|
// File has rotated. It's possible that it rotated before we finished reading all of the data. As a result, we need
|
||||||
// to check the last rolled-over file and see if it is longer than our position. If so, consume the data past our
|
// to check the last rolled-over file and see if it is longer than our position. If so, consume the data past our
|
||||||
|
@ -489,6 +529,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
// Since file has rotated, we close the reader, create a new one, and then reset our state.
|
// Since file has rotated, we close the reader, create a new one, and then reset our state.
|
||||||
try {
|
try {
|
||||||
reader.close();
|
reader.close();
|
||||||
|
getLogger().debug("Closed RandomAccessFile {}", new Object[] {reader, reader});
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().warn("Failed to close reader for {} due to {}", new Object[] {file, ioe});
|
getLogger().warn("Failed to close reader for {} due to {}", new Object[] {file, ioe});
|
||||||
}
|
}
|
||||||
|
@ -503,12 +544,12 @@ public class TailFile extends AbstractProcessor {
|
||||||
boolean consumeData = false;
|
boolean consumeData = false;
|
||||||
if (fileLength > position) {
|
if (fileLength > position) {
|
||||||
consumeData = true;
|
consumeData = true;
|
||||||
} else if (file.lastModified() > timestamp) {
|
} else if (lastModified > timestamp) {
|
||||||
// This can happen if file is truncated, or is replaced with the same amount of data as the old file.
|
// This can happen if file is truncated, or is replaced with the same amount of data as the old file.
|
||||||
position = 0;
|
position = 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
reader.seek(0L);
|
reader.position(0L);
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().error("Failed to seek to beginning of file due to {}", new Object[] {ioe});
|
getLogger().error("Failed to seek to beginning of file due to {}", new Object[] {ioe});
|
||||||
context.yield();
|
context.yield();
|
||||||
|
@ -525,7 +566,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
// data has been written to file. Stream it to a new FlowFile.
|
// data has been written to file. Stream it to a new FlowFile.
|
||||||
FlowFile flowFile = session.create();
|
FlowFile flowFile = session.create();
|
||||||
|
|
||||||
final RandomAccessFile fileReader = reader;
|
final FileChannel fileReader = reader;
|
||||||
final LongHolder positionHolder = new LongHolder(position);
|
final LongHolder positionHolder = new LongHolder(position);
|
||||||
flowFile = session.write(flowFile, new OutputStreamCallback() {
|
flowFile = session.write(flowFile, new OutputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -539,6 +580,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
// If there ended up being no data, just remove the FlowFile
|
// If there ended up being no data, just remove the FlowFile
|
||||||
if (flowFile.getSize() == 0) {
|
if (flowFile.getSize() == 0) {
|
||||||
session.remove(flowFile);
|
session.remove(flowFile);
|
||||||
|
getLogger().debug("No data to consume; removed created FlowFile");
|
||||||
} else {
|
} else {
|
||||||
// determine filename for FlowFile by using <base filename of log file>.<initial offset>-<final offset>.<extension>
|
// determine filename for FlowFile by using <base filename of log file>.<initial offset>-<final offset>.<extension>
|
||||||
final String tailFilename = file.getName();
|
final String tailFilename = file.getName();
|
||||||
|
@ -559,7 +601,8 @@ public class TailFile extends AbstractProcessor {
|
||||||
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
|
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
position = positionHolder.get();
|
position = positionHolder.get();
|
||||||
timestamp = System.currentTimeMillis();
|
timestamp = lastModified;
|
||||||
|
getLogger().debug("Created {} and routed to success", new Object[] {flowFile});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -571,6 +614,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
// no data to consume so rather than continually running, yield to allow other processors to use the thread.
|
// no data to consume so rather than continually running, yield to allow other processors to use the thread.
|
||||||
// In this case, the state should not have changed, and we will have created no FlowFiles, so we don't have to
|
// In this case, the state should not have changed, and we will have created no FlowFiles, so we don't have to
|
||||||
// persist the state or commit the session; instead, just return here.
|
// persist the state or commit the session; instead, just return here.
|
||||||
|
getLogger().debug("No data to consume; created no FlowFiles");
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -598,17 +642,20 @@ public class TailFile extends AbstractProcessor {
|
||||||
* @return The new position after the lines have been read
|
* @return The new position after the lines have been read
|
||||||
* @throws java.io.IOException if an I/O error occurs.
|
* @throws java.io.IOException if an I/O error occurs.
|
||||||
*/
|
*/
|
||||||
private long readLines(final RandomAccessFile reader, final byte[] buffer, final OutputStream out, final Checksum checksum) throws IOException {
|
private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum) throws IOException {
|
||||||
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
||||||
long pos = reader.getFilePointer();
|
long pos = reader.position();
|
||||||
long rePos = pos; // position to re-read
|
long rePos = pos; // position to re-read
|
||||||
|
|
||||||
int num;
|
int num;
|
||||||
int linesRead = 0;
|
int linesRead = 0;
|
||||||
boolean seenCR = false;
|
boolean seenCR = false;
|
||||||
|
buffer.clear();
|
||||||
while (((num = reader.read(buffer)) != -1)) {
|
while (((num = reader.read(buffer)) != -1)) {
|
||||||
|
buffer.flip();
|
||||||
|
|
||||||
for (int i = 0; i < num; i++) {
|
for (int i = 0; i < num; i++) {
|
||||||
byte ch = buffer[i];
|
byte ch = buffer.get();
|
||||||
|
|
||||||
switch (ch) {
|
switch (ch) {
|
||||||
case '\n':
|
case '\n':
|
||||||
|
@ -647,11 +694,12 @@ public class TailFile extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pos = reader.getFilePointer();
|
pos = reader.position();
|
||||||
}
|
}
|
||||||
|
|
||||||
getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[] {linesRead, pos, rePos});
|
getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[] {linesRead, pos, rePos});
|
||||||
reader.seek(rePos); // Ensure we can re-read if necessary
|
reader.position(rePos);
|
||||||
|
buffer.clear();
|
||||||
return rePos;
|
return rePos;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -726,23 +774,26 @@ public class TailFile extends AbstractProcessor {
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A simple Java class to hold information about our state so that we can maintain this state across multiple invocations of the Processor
|
* A simple Java class to hold information about our state so that we can maintain this state across multiple invocations of the Processor.
|
||||||
|
*
|
||||||
|
* We use a FileChannel to read from the file, rather than a BufferedInputStream, etc. because we want to be able to read any amount of data
|
||||||
|
* and then reposition the reader if we need to, as a result of a line not being terminated (i.e., no new-line).
|
||||||
*/
|
*/
|
||||||
static class TailFileState {
|
static class TailFileState {
|
||||||
private final String filename; // hold onto filename and not just File because we want to match that against the user-defined filename to recover from
|
private final String filename; // hold onto filename and not just File because we want to match that against the user-defined filename to recover from
|
||||||
private final File file;
|
private final File file;
|
||||||
private final RandomAccessFile raf;
|
private final FileChannel fileChannel;
|
||||||
private final long position;
|
private final long position;
|
||||||
private final long timestamp;
|
private final long timestamp;
|
||||||
private final Checksum checksum;
|
private final Checksum checksum;
|
||||||
private final byte[] buffer;
|
private final ByteBuffer buffer;
|
||||||
|
|
||||||
public TailFileState(final String filename, final File file, final RandomAccessFile raf, final long position, final long timestamp, final Checksum checksum, final byte[] buffer) {
|
public TailFileState(final String filename, final File file, final FileChannel fileChannel, final long position, final long timestamp, final Checksum checksum, final ByteBuffer buffer) {
|
||||||
this.filename = filename;
|
this.filename = filename;
|
||||||
this.file = file;
|
this.file = file;
|
||||||
this.raf = raf;
|
this.fileChannel = fileChannel;
|
||||||
this.position = position;
|
this.position = position;
|
||||||
this.timestamp = (timestamp / 1000) * 1000; // many operating systems will use only second-level precision for last-modified times so cut off milliseconds
|
this.timestamp = timestamp; // many operating systems will use only second-level precision for last-modified times so cut off milliseconds
|
||||||
this.checksum = checksum;
|
this.checksum = checksum;
|
||||||
this.buffer = buffer;
|
this.buffer = buffer;
|
||||||
}
|
}
|
||||||
|
@ -755,8 +806,8 @@ public class TailFile extends AbstractProcessor {
|
||||||
return file;
|
return file;
|
||||||
}
|
}
|
||||||
|
|
||||||
public RandomAccessFile getReader() {
|
public FileChannel getReader() {
|
||||||
return raf;
|
return fileChannel;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getPosition() {
|
public long getPosition() {
|
||||||
|
@ -771,7 +822,7 @@ public class TailFile extends AbstractProcessor {
|
||||||
return checksum;
|
return checksum;
|
||||||
}
|
}
|
||||||
|
|
||||||
public byte[] getBuffer() {
|
public ByteBuffer getBuffer() {
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.junit.Test;
|
||||||
|
|
||||||
public class TestTailFile {
|
public class TestTailFile {
|
||||||
private File file;
|
private File file;
|
||||||
|
private TailFile processor;
|
||||||
private RandomAccessFile raf;
|
private RandomAccessFile raf;
|
||||||
private TestRunner runner;
|
private TestRunner runner;
|
||||||
|
|
||||||
|
@ -64,7 +65,8 @@ public class TestTailFile {
|
||||||
stateFile.delete();
|
stateFile.delete();
|
||||||
Assert.assertFalse(stateFile.exists());
|
Assert.assertFalse(stateFile.exists());
|
||||||
|
|
||||||
runner = TestRunners.newTestRunner(new TailFile());
|
processor = new TailFile();
|
||||||
|
runner = TestRunners.newTestRunner(processor);
|
||||||
runner.setProperty(TailFile.FILENAME, "target/log.txt");
|
runner.setProperty(TailFile.FILENAME, "target/log.txt");
|
||||||
runner.setProperty(TailFile.STATE_FILE, "target/tail-file.state");
|
runner.setProperty(TailFile.STATE_FILE, "target/tail-file.state");
|
||||||
runner.assertValid();
|
runner.assertValid();
|
||||||
|
@ -77,6 +79,8 @@ public class TestTailFile {
|
||||||
if (raf != null) {
|
if (raf != null) {
|
||||||
raf.close();
|
raf.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
processor.cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -153,6 +157,7 @@ public class TestTailFile {
|
||||||
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_TIME.getValue());
|
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_TIME.getValue());
|
||||||
|
|
||||||
raf.write("hello world\n".getBytes());
|
raf.write("hello world\n".getBytes());
|
||||||
|
Thread.sleep(1000);
|
||||||
runner.run(100);
|
runner.run(100);
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
}
|
}
|
||||||
|
@ -231,7 +236,9 @@ public class TestTailFile {
|
||||||
|
|
||||||
raf.write("world".getBytes());
|
raf.write("world".getBytes());
|
||||||
raf.close();
|
raf.close();
|
||||||
file.renameTo(new File("target/log1.txt"));
|
|
||||||
|
processor.cleanup(); // Need to do this for Windows because otherwise we cannot rename the file because we have the file open still in the same process.
|
||||||
|
assertTrue(file.renameTo(new File("target/log1.txt")));
|
||||||
|
|
||||||
raf = new RandomAccessFile(new File("target/log.txt"), "rw");
|
raf = new RandomAccessFile(new File("target/log.txt"), "rw");
|
||||||
raf.write("1\n".getBytes());
|
raf.write("1\n".getBytes());
|
||||||
|
@ -255,7 +262,7 @@ public class TestTailFile {
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
|
||||||
raf.write("hello\n".getBytes());
|
raf.write("hello\n".getBytes());
|
||||||
runner.run(1, false, false);
|
runner.run(1, true, false);
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
|
||||||
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
|
||||||
runner.clearTransferState();
|
runner.clearTransferState();
|
||||||
|
@ -288,7 +295,7 @@ public class TestTailFile {
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
|
||||||
raf.write("hello\n".getBytes());
|
raf.write("hello\n".getBytes());
|
||||||
runner.run(1, false, false);
|
runner.run(1, true, false);
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
|
||||||
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
|
||||||
runner.clearTransferState();
|
runner.clearTransferState();
|
||||||
|
@ -305,6 +312,8 @@ public class TestTailFile {
|
||||||
// write to a new file.
|
// write to a new file.
|
||||||
file = new File("target/log.txt");
|
file = new File("target/log.txt");
|
||||||
raf = new RandomAccessFile(file, "rw");
|
raf = new RandomAccessFile(file, "rw");
|
||||||
|
|
||||||
|
Thread.sleep(1000L);
|
||||||
raf.write("abc\n".getBytes());
|
raf.write("abc\n".getBytes());
|
||||||
|
|
||||||
// rename file to log.1
|
// rename file to log.1
|
||||||
|
@ -331,9 +340,6 @@ public class TestTailFile {
|
||||||
runner.run();
|
runner.run();
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
|
||||||
final long start = System.currentTimeMillis();
|
|
||||||
Thread.sleep(1100L);
|
|
||||||
|
|
||||||
raf.write("Hello, World".getBytes());
|
raf.write("Hello, World".getBytes());
|
||||||
runner.run();
|
runner.run();
|
||||||
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
@ -346,7 +352,6 @@ public class TestTailFile {
|
||||||
assertNotNull(state);
|
assertNotNull(state);
|
||||||
assertEquals("target/log.txt", state.getFilename());
|
assertEquals("target/log.txt", state.getFilename());
|
||||||
assertTrue(state.getTimestamp() <= System.currentTimeMillis());
|
assertTrue(state.getTimestamp() <= System.currentTimeMillis());
|
||||||
assertTrue(state.getTimestamp() >= start);
|
|
||||||
assertEquals(14, state.getPosition());
|
assertEquals(14, state.getPosition());
|
||||||
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("Hello, World\r\n");
|
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("Hello, World\r\n");
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue