NIFI-7972 Add a boolean property by which the user can tell the processor to yield (and try again later) whenever it encounters a NUL character.

This commit is contained in:
Tamas Palfy 2020-11-11 14:51:56 +01:00 committed by Mark Payne
parent 5c925371c2
commit 47cc2867ba
2 changed files with 139 additions and 3 deletions

View File

@ -76,6 +76,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
@ -220,6 +221,20 @@ public class TailFile extends AbstractProcessor {
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor REREAD_ON_NUL = new PropertyDescriptor.Builder()
.name("reread-on-nul")
.displayName("Reread when NUL encountered")
.description("If this option is set to 'true', when a NUL character is read, the processor will yield and try to read the same part again later. "
+ "(Note: Yielding may delay the processing of other files tailed by this processor, not just the one with the NUL character.) "
+ "The purpose of this flag is to allow users to handle cases where reading a file may return temporary NUL values. "
+ "NFS for example may send file contents out of order. In this case the missing parts are temporarily replaced by NUL values. "
+ "CAUTION! If the file contains legitimate NUL values, setting this flag causes this processor to get stuck indefinitely. "
+ "For this reason users should refrain from using this feature if they can help it and try to avoid having the target file on a file system where reads are unreliable.")
.required(false)
.allowableValues("true", "false")
.defaultValue("false")
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles are routed to this Relationship.")
@ -242,6 +257,7 @@ public class TailFile extends AbstractProcessor {
properties.add(RECURSIVE);
properties.add(LOOKUP_FREQUENCY);
properties.add(MAXIMUM_AGE);
properties.add(REREAD_ON_NUL);
return properties;
}
@ -610,7 +626,13 @@ public class TailFile extends AbstractProcessor {
}
for (String tailFile : states.keySet()) {
processTailFile(context, session, tailFile);
try {
processTailFile(context, session, tailFile);
} catch (NulCharacterEncounteredException e) {
getLogger().warn("NUL character encountered in " + tailFile + " and '" + REREAD_ON_NUL.getDisplayName() + "' is set to 'true', yielding.");
context.yield();
return;
}
}
}
@ -760,15 +782,26 @@ public class TailFile extends AbstractProcessor {
final FileChannel fileReader = reader;
final AtomicLong positionHolder = new AtomicLong(position);
final Boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean();
AtomicReference<NulCharacterEncounteredException> abort = new AtomicReference<>();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream rawOut) throws IOException {
try (final OutputStream out = new BufferedOutputStream(rawOut)) {
positionHolder.set(readLines(fileReader, currentState.getBuffer(), out, chksum));
positionHolder.set(readLines(fileReader, currentState.getBuffer(), out, chksum, reReadOnNul));
} catch (NulCharacterEncounteredException e) {
positionHolder.set(e.getRePos());
abort.set(e);
}
}
});
if (abort.get() != null) {
session.remove(flowFile);
throw abort.get();
}
// If there ended up being no data, just remove the FlowFile
if (flowFile.getSize() == 0) {
session.remove(flowFile);
@ -823,11 +856,15 @@ public class TailFile extends AbstractProcessor {
* @param out the OutputStream to copy the data to
* @param checksum the Checksum object to use in order to calculate checksum
* for recovery purposes
* @param reReadOnNul If set to 'true', ASCII NUL characters will be treated as
* temporary values and a NulCharacterEncounteredException is thrown.
* This allows the caller to re-attempt a read from the same position.
* If set to 'false' these characters will be treated as regular content.
*
* @return The new position after the lines have been read
* @throws java.io.IOException if an I/O error occurs.
*/
private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum) throws IOException {
private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum, Boolean reReadOnNul) throws IOException {
getLogger().debug("Reading lines starting at position {}", new Object[]{reader.position()});
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
@ -866,6 +903,11 @@ public class TailFile extends AbstractProcessor {
seenCR = true;
break;
}
case '\0': {
if (reReadOnNul) {
throw new NulCharacterEncounteredException(rePos);
}
}
default: {
if (seenCR) {
seenCR = false;
@ -1365,4 +1407,21 @@ public class TailFile extends AbstractProcessor {
return map;
}
}
static class NulCharacterEncounteredException extends RuntimeException {
private final long rePos;
public NulCharacterEncounteredException(long rePos) {
this.rePos = rePos;
}
public long getRePos() {
return rePos;
}
@Override
public Throwable fillInStackTrace() {
return this;
}
}
}

View File

@ -36,6 +36,7 @@ import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -127,6 +128,82 @@ public class TestTailFile {
}
}
@Test
public void testNULContentWithReReadOnNulFalseLeaveNul() throws Exception {
// GIVEN
runner.setProperty(TailFile.REREAD_ON_NUL, "false");
// WHEN
// THEN
testNULContentWithReReadOnNulDefault();
}
@Test
public void testNULContentWithReReadOnNulDefault() throws Exception {
// GIVEN
String content1 = "first_line_with_nul\0\n";
Integer reposition = null;
String content2 = "second_line\n";
List<String> expected = Arrays.asList("first_line_with_nul\0\n", "second_line\n");
// WHEN
// THEN
testNULContent(content1, reposition, content2, expected);
}
@Test
public void testNULContentWithReReadOnNulFalseOverwriteNul() throws Exception {
// GIVEN
runner.setProperty(TailFile.REREAD_ON_NUL, "false");
String content1 = "first_line_with_nul\0\n";
Integer reposition = "first_line_with_nul".length();
String content2 = "!!overwrite_nul_and_continue_first_line_but_end_up_in_second_line_anyway\n";
List<String> expected = Arrays.asList("first_line_with_nul\0\n", "overwrite_nul_and_continue_first_line_but_end_up_in_second_line_anyway\n");
// WHEN
// THEN
testNULContent(content1, reposition, content2, expected);
}
@Test
public void testNULContentWithReReadOnNulTrue() throws Exception {
// GIVEN
runner.setProperty(TailFile.REREAD_ON_NUL, "true");
String content1 = "first_line_with_nul\0\n";
Integer reposition = "first_line_with_nul".length();
String content2 = " overwrite_nul_and_continue_first_line\n";
List<String> expected = Arrays.asList("first_line_with_nul overwrite_nul_and_continue_first_line\n");
// WHEN
// THEN
testNULContent(content1, reposition, content2, expected);
}
private void testNULContent(String content1, Integer reposition, String content2, List<String> expected) throws IOException {
// GIVEN
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE.getValue());
raf.write(content1.getBytes());
// WHEN
runner.run();
if (reposition != null) {
raf.seek(reposition);
}
raf.write(content2.getBytes());
runner.run();
// THEN
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, expected.size());
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS);
List<String> lines = flowFiles.stream().map(MockFlowFile::toByteArray).map(String::new).collect(Collectors.toList());
assertEquals(expected, lines);
}
@Test
public void testRotateMultipleBeforeConsuming() throws IOException {