NIFI-8344: Introduced new Rollover Tail Period property

This closes #4916.

Signed-off-by: Tamas Palfy <tamas.bertalan.palfy@gmail.com>
This commit is contained in:
Mark Payne 2021-03-18 17:21:31 -04:00 committed by Tamas Palfy
parent 3508b5b12f
commit e1b9548ab6
2 changed files with 130 additions and 11 deletions

View File

@ -37,7 +37,6 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
@ -82,6 +81,9 @@ import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream; import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum; import java.util.zip.Checksum;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
// note: it is important that this Processor is not marked as @SupportsBatching because the session commits must complete before persisting state locally; otherwise, data loss may occur // note: it is important that this Processor is not marked as @SupportsBatching because the session commits must complete before persisting state locally; otherwise, data loss may occur
@TriggerSerially @TriggerSerially
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@ -131,7 +133,7 @@ public class TailFile extends AbstractProcessor {
.name("tail-base-directory") .name("tail-base-directory")
.displayName("Base directory") .displayName("Base directory")
.description("Base directory used to look for files to tail. This property is required when using Multifile mode.") .description("Base directory used to look for files to tail. This property is required when using Multifile mode.")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .expressionLanguageSupported(VARIABLE_REGISTRY)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.required(false) .required(false)
.build(); .build();
@ -141,7 +143,7 @@ public class TailFile extends AbstractProcessor {
.displayName("Tailing mode") .displayName("Tailing mode")
.description("Mode to use: single file will tail only one file, multiple file will look for a list of file. In Multiple mode" .description("Mode to use: single file will tail only one file, multiple file will look for a list of file. In Multiple mode"
+ " the Base directory is required.") + " the Base directory is required.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE) .expressionLanguageSupported(NONE)
.required(true) .required(true)
.allowableValues(MODE_SINGLEFILE, MODE_MULTIFILE) .allowableValues(MODE_SINGLEFILE, MODE_MULTIFILE)
.defaultValue(MODE_SINGLEFILE.getValue()) .defaultValue(MODE_SINGLEFILE.getValue())
@ -153,7 +155,7 @@ public class TailFile extends AbstractProcessor {
.description("Path of the file to tail in case of single file mode. If using multifile mode, regular expression to find files " .description("Path of the file to tail in case of single file mode. If using multifile mode, regular expression to find files "
+ "to tail in the base directory. In case recursivity is set to true, the regular expression will be used to match the " + "to tail in the base directory. In case recursivity is set to true, the regular expression will be used to match the "
+ "path starting from the base directory (see additional details for examples).") + "path starting from the base directory (see additional details for examples).")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .expressionLanguageSupported(VARIABLE_REGISTRY)
.addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true)) .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
.required(true) .required(true)
.build(); .build();
@ -166,10 +168,21 @@ public class TailFile extends AbstractProcessor {
+ "(without extension), and will assume that the files that have rolled over live in the same directory as the file being tailed. " + "(without extension), and will assume that the files that have rolled over live in the same directory as the file being tailed. "
+ "The same glob pattern will be used for all files.") + "The same glob pattern will be used for all files.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE) .expressionLanguageSupported(NONE)
.required(false) .required(false)
.build(); .build();
static final PropertyDescriptor POST_ROLLOVER_TAIL_PERIOD = new PropertyDescriptor.Builder()
.name("Post-Rollover Tail Period")
.description("When a file is rolled over, the processor will continue tailing the rolled over file until it has not been modified for this amount of time. " +
"This allows for another process to rollover a file, and then flush out any buffered data. Note that when this value is set, and the tailed file rolls over, " +
"the new file will not be tailed until the old file has not been modified for the configured amount of time.")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(NONE)
.defaultValue("0 sec")
.build();
static final PropertyDescriptor STATE_LOCATION = new PropertyDescriptor.Builder() static final PropertyDescriptor STATE_LOCATION = new PropertyDescriptor.Builder()
.displayName("State Location") .displayName("State Location")
.name("File Location") //retained name of property for backward compatibility of configs .name("File Location") //retained name of property for backward compatibility of configs
@ -251,6 +264,7 @@ public class TailFile extends AbstractProcessor {
properties.add(MODE); properties.add(MODE);
properties.add(FILENAME); properties.add(FILENAME);
properties.add(ROLLING_FILENAME_PATTERN); properties.add(ROLLING_FILENAME_PATTERN);
properties.add(POST_ROLLOVER_TAIL_PERIOD);
properties.add(BASE_DIRECTORY); properties.add(BASE_DIRECTORY);
properties.add(START_POSITION); properties.add(START_POSITION);
properties.add(STATE_LOCATION); properties.add(STATE_LOCATION);
@ -691,6 +705,14 @@ public class TailFile extends AbstractProcessor {
} }
rolloverOccurred = recoverRolledFiles(context, session, tailFile, expectedChecksumValue, tfo.getState().getTimestamp(), tfo.getState().getPosition()); rolloverOccurred = recoverRolledFiles(context, session, tailFile, expectedChecksumValue, tfo.getState().getTimestamp(), tfo.getState().getPosition());
if (rolloverOccurred) {
final boolean tailAfterRollover = context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS) > 0;
if (tailAfterRollover) {
getLogger().debug("File {} was rolled over and the Rollover Tail Period is set, so will not consume from new file during this iteration.", tailFile);
return;
}
}
tfo.setExpectedRecoveryChecksum(null); tfo.setExpectedRecoveryChecksum(null);
} }
@ -735,6 +757,7 @@ public class TailFile extends AbstractProcessor {
if (!rotated) { if (!rotated) {
final long fileLength = file.length(); final long fileLength = file.length();
if (length > fileLength) { if (length > fileLength) {
getLogger().debug("Rotated = true because TailFileState Length = {}, File Length = {}", length, fileLength);
rotated = true; rotated = true;
} else { } else {
try { try {
@ -742,6 +765,7 @@ public class TailFile extends AbstractProcessor {
final long readerPosition = reader.position(); final long readerPosition = reader.position();
if (readerSize == readerPosition && readerSize != fileLength) { if (readerSize == readerPosition && readerSize != fileLength) {
getLogger().debug("Rotated = true because readerSize={}, readerPosition={}, fileLength={}", readerSize, readerPosition, fileLength);
rotated = true; rotated = true;
} }
} catch (final IOException e) { } catch (final IOException e) {
@ -1191,19 +1215,22 @@ public class TailFile extends AbstractProcessor {
rolledOffFiles.remove(0); rolledOffFiles.remove(0);
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
final TailFileState currentState = tfo.getState();
final Checksum checksum = currentState.getChecksum() == null ? new CRC32() : currentState.getChecksum();
final ByteBuffer buffer = currentState.getBuffer() == null ? ByteBuffer.allocate(65536) : currentState.getBuffer();
final FileChannel channel = fis.getChannel();
final long timestamp = firstFile.lastModified();
try { try {
flowFile = session.write(flowFile, flowFile = session.write(flowFile, out -> readLines(channel, buffer, out, checksum, reReadOnNul, true));
out -> readLines(fis.getChannel(), ByteBuffer.allocate(65536), out, new CRC32(), reReadOnNul, true));
} catch (NulCharacterEncounteredException ncee) { } catch (NulCharacterEncounteredException ncee) {
rolledOffFiles.add(0, firstFile); rolledOffFiles.add(0, firstFile);
session.remove(flowFile); session.remove(flowFile);
throw ncee; throw ncee;
} }
if (flowFile.getSize() == 0L) { if (flowFile.getSize() == 0L) {
session.remove(flowFile); session.remove(flowFile);
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
cleanup();
tfo.setState(new TailFileState(tailFile, null, null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null, tfo.getState().getBuffer()));
} else { } else {
final Map<String, String> attributes = new HashMap<>(3); final Map<String, String> attributes = new HashMap<>(3);
attributes.put(CoreAttributes.FILENAME.key(), firstFile.getName()); attributes.put(CoreAttributes.FILENAME.key(), firstFile.getName());
@ -1215,11 +1242,30 @@ public class TailFile extends AbstractProcessor {
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos)); TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
getLogger().debug("Created {} from rolled over file {} and routed to success", new Object[]{flowFile, firstFile}); getLogger().debug("Created {} from rolled over file {} and routed to success", new Object[]{flowFile, firstFile});
}
// We need to update the state to account for the fact that we just brought data in.
// If we are going to tail a rolled over file for some amount of time, then we need to keep the state pointing to the
// same file, just using an updated position/timestamp/checksum/length. This way, the next iteration will compare against these
// updated values.
// But if we are not going to tail the rolled over file for any period of time, we can essentially reset the state.
final long postRolloverTailMillis = context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
final long millisSinceUpdate = System.currentTimeMillis() - timestamp;
if (postRolloverTailMillis > 0 && millisSinceUpdate < postRolloverTailMillis) {
getLogger().debug("File {} has been rolled over, but it was updated {} millis ago, which is less than the configured {} ({} ms), so will continue tailing",
firstFile, millisSinceUpdate, POST_ROLLOVER_TAIL_PERIOD.getDisplayName(), postRolloverTailMillis);
final long length = currentState.getLength() + flowFile.getSize();
final long updatedPosition = position + flowFile.getSize();
final TailFileState updatedState = new TailFileState(currentState.getFilename(), currentState.getFile(), channel, updatedPosition, timestamp, length, checksum,
buffer);
tfo.setState(updatedState);
persistState(tfo, session, context);
} else {
// use a timestamp of lastModified() + 1 so that we do not ingest this file again. // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
cleanup(); cleanup();
tfo.setState(new TailFileState(tailFile, null, null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null, tfo.getState().getBuffer())); tfo.setState(new TailFileState(tailFile, null, null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null, tfo.getState().getBuffer()));
persistState(tfo, session, context); persistState(tfo, session, context);
} }
} else { } else {

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.processors.standard.TailFile.TailFileState; import org.apache.nifi.processors.standard.TailFile.TailFileState;
@ -25,6 +26,7 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -311,6 +313,77 @@ public class TestTailFile {
} }
@Test
public void testFileWrittenToAfterRollover() throws IOException, InterruptedException {
Assume.assumeTrue("Test requires renaming a file while a file handle is still open to it, so it won't run on Windows", !SystemUtils.IS_OS_WINDOWS);
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
runner.setProperty(TailFile.START_POSITION, TailFile.START_BEGINNING_OF_TIME.getValue());
runner.setProperty(TailFile.REREAD_ON_NUL, "true");
runner.setProperty(TailFile.POST_ROLLOVER_TAIL_PERIOD, "10 mins");
raf.write("a\nb\n".getBytes());
runner.run(1, false, true);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("a\nb\n");
runner.clearTransferState();
raf.write("c\n".getBytes());
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("c\n");
runner.clearTransferState();
// Write additional data to file, then roll file over
raf.write("d\n".getBytes());
final File rolledFile = new File("target/log.1");
final boolean renamed = file.renameTo(rolledFile);
assertTrue(renamed);
raf.getChannel().force(true);
System.out.println("Wrote d\\n and rolled file");
// Create the new file
final RandomAccessFile newFile = new RandomAccessFile(new File("target/log.txt"), "rw");
newFile.write("new file\n".getBytes()); // This should not get consumed until the old file's last modified date indicates it's complete
// Trigger processor and verify data is consumed properly
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("d\n");
runner.clearTransferState();
// Write to the file and trigger again.
raf.write("e\n".getBytes());
System.out.println("Wrote e\\n");
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("e\n");
runner.clearTransferState();
// Ensure that no data comes in for a bit, since the last modified date on the rolled over file isn't old enough.
for (int i=0; i < 100; i++) {
runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
Thread.sleep(1L);
}
// Set last modified time so that processor believes file to have not been modified in a very long time, then run again.
assertTrue(rolledFile.setLastModified(500L));
System.out.println("Set lastModified on " + rolledFile + " to 500");
runner.run(1, false, false);
// Verify results
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("new file\n");
runner.clearTransferState();
raf.close();
}
@Test @Test
public void testConsumeAfterTruncationStartAtBeginningOfFile() throws IOException, InterruptedException { public void testConsumeAfterTruncationStartAtBeginningOfFile() throws IOException, InterruptedException {
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*"); runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");