NIFI-994: Ensure that processor is not valid due to the tail file not yet existing

This commit is contained in:
Mark Payne 2015-10-28 13:44:28 -04:00
parent 31f0909bd3
commit bfa9e45079
2 changed files with 186 additions and 30 deletions

View File

@ -49,6 +49,7 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -72,11 +73,18 @@ import org.apache.nifi.util.LongHolder;
+ "was not running (provided that the data still exists upon restart of NiFi).")
public class TailFile extends AbstractProcessor {
static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time",
"Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail");
static final AllowableValue START_CURRENT_FILE = new AllowableValue("Beginning of File", "Beginning of File",
"Start with the beginning of the File to Tail. Do not ingest any data that has already been rolled over");
static final AllowableValue START_CURRENT_TIME = new AllowableValue("Current Time", "Current Time",
"Start with the data at the end of the File to Tail. Do not ingest any data thas has already been rolled over or any data in the File to Tail that has already been written.");
static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
.name("File to Tail")
.description("Fully-qualified filename of the file that should be tailed")
.expressionLanguageSupported(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new PropertyDescriptor.Builder()
@ -95,6 +103,14 @@ public class TailFile extends AbstractProcessor {
.expressionLanguageSupported(false)
.required(true)
.build();
static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder()
.name("Initial Start Position")
.description("When the Processor first begins to tail data, this property specifies where the Processor should begin reading data. Once data has been ingested from the file, "
+ "the Processor will continue from the last point from which it has received data.")
.allowableValues(START_BEGINNING_OF_TIME, START_CURRENT_FILE, START_CURRENT_TIME)
.defaultValue(START_CURRENT_FILE.getValue())
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -104,6 +120,7 @@ public class TailFile extends AbstractProcessor {
private volatile TailFileState state = new TailFileState(null, null, null, 0L, 0L, null, new byte[65536]);
private volatile boolean recoveredRolledFiles = false;
private volatile Long expectedRecoveryChecksum;
private volatile boolean tailFileChanged = false;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -111,6 +128,7 @@ public class TailFile extends AbstractProcessor {
properties.add(FILENAME);
properties.add(ROLLING_FILENAME_PATTERN);
properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(STATE_FILE).defaultValue("./conf/state/" + getIdentifier()).build());
properties.add(START_POSITION);
return properties;
}
@ -124,6 +142,7 @@ public class TailFile extends AbstractProcessor {
if (FILENAME.equals(descriptor)) {
state = new TailFileState(newValue, null, null, 0L, 0L, null, new byte[65536]);
recoveredRolledFiles = false;
tailFileChanged = true;
} else if (ROLLING_FILENAME_PATTERN.equals(descriptor)) {
recoveredRolledFiles = false;
}
@ -218,7 +237,7 @@ public class TailFile extends AbstractProcessor {
try {
reader = new RandomAccessFile(file, "r");
} catch (final FileNotFoundException fnfe) {
getLogger().debug("File {} does not exist; yielding and returning", new Object[] {file});
getLogger().warn("File {} does not exist; will attempt to access file again after the configured Yield Duration has elapsed", new Object[] {file});
return null;
}
@ -280,18 +299,24 @@ public class TailFile extends AbstractProcessor {
rolledOffFiles.remove(0);
FlowFile flowFile = session.create();
flowFile = session.importFrom(in, flowFile);
flowFile = session.putAttribute(flowFile, "filename", firstFile.getName());
if (flowFile.getSize() == 0L) {
session.remove(flowFile);
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer());
} else {
flowFile = session.putAttribute(flowFile, "filename", firstFile.getName());
session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + state.getPosition() + " of source file",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + state.getPosition() + " of source file",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
session.transfer(flowFile, REL_SUCCESS);
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer());
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer());
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
session.commit();
persistState(state, context.getProperty(STATE_FILE).getValue());
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
session.commit();
persistState(state, context.getProperty(STATE_FILE).getValue());
}
}
}
}
@ -303,16 +328,20 @@ public class TailFile extends AbstractProcessor {
for (final File file : rolledOffFiles) {
FlowFile flowFile = session.create();
flowFile = session.importFrom(file.toPath(), true, flowFile);
flowFile = session.putAttribute(flowFile, "filename", file.getName());
session.getProvenanceReporter().receive(flowFile, file.toURI().toString());
session.transfer(flowFile, REL_SUCCESS);
if (flowFile.getSize() == 0L) {
session.remove(flowFile);
} else {
flowFile = session.putAttribute(flowFile, "filename", file.getName());
session.getProvenanceReporter().receive(flowFile, file.toURI().toString());
session.transfer(flowFile, REL_SUCCESS);
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, file.lastModified() + 1L, null, state.getBuffer());
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, file.lastModified() + 1L, null, state.getBuffer());
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
session.commit();
persistState(state, context.getProperty(STATE_FILE).getValue());
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
session.commit();
persistState(state, context.getProperty(STATE_FILE).getValue());
}
}
} catch (final IOException e) {
getLogger().error("Failed to recover files that have rolled over due to {}", new Object[] {e});
@ -320,13 +349,47 @@ public class TailFile extends AbstractProcessor {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// If this is the first time the processor has run since it was started, we need to check for any files that may have rolled over
// while the processor was stopped. If we find any, we need to import them into the flow.
if (!recoveredRolledFiles) {
recoverRolledFiles(context, session);
if (tailFileChanged) {
final String recoverPosition = context.getProperty(START_POSITION).getValue();
if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
recoverRolledFiles(context, session);
} else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 0L, null, state.getBuffer());
} else {
final String filename = context.getProperty(FILENAME).getValue();
final File file = new File(filename);
try {
final RandomAccessFile raf = new RandomAccessFile(file, "r");
final Checksum checksum = new CRC32();
final long position = file.length();
final long timestamp = file.lastModified();
try (final InputStream fis = new FileInputStream(file);
final CheckedInputStream in = new CheckedInputStream(fis, checksum)) {
StreamUtils.copy(in, new NullOutputStream(), position);
}
raf.seek(position);
state = new TailFileState(filename, file, raf, position, timestamp, checksum, state.getBuffer());
} catch (final IOException ioe) {
getLogger().error("Attempted to position Reader at current position in file {} but failed to do so due to {}", new Object[] {file, ioe.toString()}, ioe);
context.yield();
return;
}
}
tailFileChanged = false;
} else {
recoverRolledFiles(context, session);
}
recoveredRolledFiles = true;
}
@ -372,16 +435,20 @@ public class TailFile extends AbstractProcessor {
FlowFile flowFile = session.create();
flowFile = session.importFrom(fis, flowFile);
flowFile = session.putAttribute(flowFile, "filename", lastRolledOver.getName());
if (flowFile.getSize() == 0) {
session.remove(flowFile);
} else {
flowFile = session.putAttribute(flowFile, "filename", lastRolledOver.getName());
session.getProvenanceReporter().receive(flowFile, lastRolledOver.toURI().toString(), "FlowFile contains bytes " + state.getPosition() + " through " +
lastRolledOver.length() + " of source file", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
session.transfer(flowFile, REL_SUCCESS);
this.state = state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, lastRolledOver.lastModified() + 1L, null, state.getBuffer());
session.getProvenanceReporter().receive(flowFile, lastRolledOver.toURI().toString(), "FlowFile contains bytes " + state.getPosition() + " through " +
lastRolledOver.length() + " of source file", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
session.transfer(flowFile, REL_SUCCESS);
this.state = state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, lastRolledOver.lastModified() + 1L, null, state.getBuffer());
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
session.commit();
persistState(state, context.getProperty(STATE_FILE).getValue());
// must ensure that we do session.commit() before persisting state in order to avoid data loss.
session.commit();
persistState(state, context.getProperty(STATE_FILE).getValue());
}
}
}
}
@ -429,6 +496,7 @@ public class TailFile extends AbstractProcessor {
if (consumeData) {
// data has been written to file. Stream it to a new FlowFile.
FlowFile flowFile = session.create();
final RandomAccessFile fileReader = reader;
final LongHolder positionHolder = new LongHolder(position);
flowFile = session.write(flowFile, new OutputStreamCallback() {

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.nifi.processors.standard.TailFile.TailFileState;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
@ -78,7 +79,40 @@ public class TestTailFile {
@Test
public void testConsumeAfterTruncation() throws IOException {
public void testConsumeAfterTruncationStartAtBeginningOfFile() throws IOException, InterruptedException {
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE.getValue());
runner.run();
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
raf.write("hello\n".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
runner.clearTransferState();
// roll over the file
raf.close();
file.renameTo(new File(file.getParentFile(), file.getName() + ".previous"));
raf = new RandomAccessFile(file, "rw");
// truncate file
raf.setLength(0L);
runner.run();
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
// write some bytes to the file.
Thread.sleep(1000L); // we need to wait at least one second because of the granularity of timestamps on many file systems.
raf.write("HELLO\n".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("HELLO\n");
}
@Test
public void testConsumeAfterTruncationStartAtCurrentTime() throws IOException, InterruptedException {
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_TIME.getValue());
runner.run();
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
@ -92,6 +126,8 @@ public class TestTailFile {
raf.setLength(0L);
runner.run();
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
Thread.sleep(1000L); // we need to wait at least one second because of the granularity of timestamps on many file systems.
raf.write("HELLO\n".getBytes());
runner.run();
@ -100,6 +136,58 @@ public class TestTailFile {
}
@Test
public void testStartAtBeginningOfFile() throws IOException, InterruptedException {
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE.getValue());
raf.write("hello world\n".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello world\n");
}
@Test
public void testStartAtCurrentTime() throws IOException, InterruptedException {
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_TIME.getValue());
raf.write("hello world\n".getBytes());
runner.run(100);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
}
@Test
public void testStartAtBeginningOfTime() throws IOException, InterruptedException {
raf.write("hello".getBytes());
raf.close();
file.renameTo(new File(file.getParentFile(), file.getName() + ".previous"));
raf = new RandomAccessFile(file, "rw");
raf.write("world\n".getBytes());
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
runner.setProperty(TailFile.START_POSITION, TailFile.START_BEGINNING_OF_TIME.getValue());
runner.run();
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
boolean world = false;
boolean hello = false;
for (final MockFlowFile mff : runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS)) {
final String content = new String(mff.toByteArray());
if ("world\n".equals(content)) {
world = true;
} else if ("hello".equals(content)) {
hello = true;
} else {
Assert.fail("Got unexpected content: " + content);
}
}
assertTrue(hello);
assertTrue(world);
}
@Test
public void testRemainderOfFileRecoveredAfterRestart() throws IOException {
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log*.txt");