NIFI-478: Fixed bug that caused byte sequence to be dropped for last split under certain circumstances; added new unit tests

This commit is contained in:
Mark Payne 2015-03-31 08:34:23 -04:00
parent 878e950acc
commit 509933f631
2 changed files with 202 additions and 14 deletions

View File

@ -18,7 +18,9 @@ package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -28,6 +30,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -35,8 +38,10 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -50,6 +55,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.util.NaiveSearchRingBuffer;
import org.apache.nifi.util.Tuple;
@ -72,19 +78,39 @@ public class SplitContent extends AbstractProcessor {
public static final String FRAGMENT_COUNT = "fragment.count";
public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
static final AllowableValue HEX_FORMAT = new AllowableValue("Hexadecimal", "Hexadecimal", "The Byte Sequence will be interpreted as a hexadecimal representation of bytes");
static final AllowableValue UTF8_FORMAT = new AllowableValue("Text", "Text", "The Byte Sequence will be interpreted as UTF-8 Encoded text");
static final AllowableValue TRAILING_POSITION = new AllowableValue("Trailing", "Trailing", "Keep the Byte Sequence at the end of the first split if <Keep Byte Sequence> is true");
static final AllowableValue LEADING_POSITION = new AllowableValue("Leading", "Leading", "Keep the Byte Sequence at the beginning of the second split if <Keep Byte Sequence> is true");
public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
.name("Byte Sequence Format")
.description("Specifies how the <Byte Sequence> property should be interpreted")
.required(true)
.allowableValues(HEX_FORMAT, UTF8_FORMAT)
.defaultValue(HEX_FORMAT.getValue())
.build();
public static final PropertyDescriptor BYTE_SEQUENCE = new PropertyDescriptor.Builder()
.name("Byte Sequence")
.description("A hex representation of bytes to look for and upon which to split the source file into separate files")
.addValidator(new HexStringPropertyValidator())
.description("A representation of bytes to look for and upon which to split the source file into separate files")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
public static final PropertyDescriptor KEEP_SEQUENCE = new PropertyDescriptor.Builder()
.name("Keep Byte Sequence")
.description("Determines whether or not the Byte Sequence should be included at the end of each Split")
.description("Determines whether or not the Byte Sequence should be included with each Split")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final PropertyDescriptor BYTE_SEQUENCE_LOCATION = new PropertyDescriptor.Builder()
.name("Byte Sequence Location")
.description("If <Keep Byte Sequence> is set to true, specifies whether the byte sequence should be added to the end of the first split or the beginning of the second; if <Keep Byte Sequence> is false, this property is ignored.")
.required(true)
.allowableValues(TRAILING_POSITION, LEADING_POSITION)
.defaultValue(TRAILING_POSITION.getValue())
.build();
public static final Relationship REL_SPLITS = new Relationship.Builder()
.name("splits")
@ -108,8 +134,10 @@ public class SplitContent extends AbstractProcessor {
this.relationships = Collections.unmodifiableSet(relationships);
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(FORMAT);
properties.add(BYTE_SEQUENCE);
properties.add(KEEP_SEQUENCE);
properties.add(BYTE_SEQUENCE_LOCATION);
this.properties = Collections.unmodifiableList(properties);
}
@ -124,13 +152,27 @@ public class SplitContent extends AbstractProcessor {
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (descriptor.equals(BYTE_SEQUENCE)) {
try {
this.byteSequence.set(Hex.decodeHex(newValue.toCharArray()));
} catch (final Exception e) {
this.byteSequence.set(null);
}
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(1);
final String format = validationContext.getProperty(FORMAT).getValue();
if ( HEX_FORMAT.getValue().equals(format) ) {
final String byteSequence = validationContext.getProperty(BYTE_SEQUENCE).getValue();
final ValidationResult result = new HexStringPropertyValidator().validate(BYTE_SEQUENCE.getName(), byteSequence, validationContext);
results.add(result);
}
return results;
}
@OnScheduled
public void initializeByteSequence(final ProcessContext context) throws DecoderException {
final String bytePattern = context.getProperty(BYTE_SEQUENCE).getValue();
final String format = context.getProperty(FORMAT).getValue();
if ( HEX_FORMAT.getValue().equals(format) ) {
this.byteSequence.set(Hex.decodeHex(bytePattern.toCharArray()));
} else {
this.byteSequence.set(bytePattern.getBytes(StandardCharsets.UTF_8));
}
}
@ -143,6 +185,21 @@ public class SplitContent extends AbstractProcessor {
final ProcessorLog logger = getLogger();
final boolean keepSequence = context.getProperty(KEEP_SEQUENCE).asBoolean();
final boolean keepTrailingSequence;
final boolean keepLeadingSequence;
if ( keepSequence ) {
if ( context.getProperty(BYTE_SEQUENCE_LOCATION).getValue().equals(TRAILING_POSITION.getValue()) ) {
keepTrailingSequence = true;
keepLeadingSequence = false;
} else {
keepTrailingSequence = false;
keepLeadingSequence = true;
}
} else {
keepTrailingSequence = false;
keepLeadingSequence = false;
}
final byte[] byteSequence = this.byteSequence.get();
if (byteSequence == null) { // should never happen. But just in case...
logger.error("{} Unable to obtain Byte Sequence", new Object[]{this});
@ -169,15 +226,20 @@ public class SplitContent extends AbstractProcessor {
bytesRead++;
boolean matched = buffer.addAndCompare((byte) (nextByte & 0xFF));
if (matched) {
final long splitLength;
long splitLength;
if (keepSequence) {
if (keepTrailingSequence) {
splitLength = bytesRead - startOffset;
} else {
splitLength = bytesRead - startOffset - byteSequence.length;
}
splits.add(new Tuple<>(startOffset, splitLength));
if ( keepLeadingSequence && startOffset > 0 ) {
splitLength += byteSequence.length;
}
final long splitStart = (keepLeadingSequence && startOffset > 0) ? startOffset - byteSequence.length : startOffset;
splits.add(new Tuple<>(splitStart, splitLength));
startOffset = bytesRead;
buffer.clear();
}
@ -207,8 +269,11 @@ public class SplitContent extends AbstractProcessor {
lastOffsetPlusSize = offset + size;
}
// lastOffsetPlusSize indicates the ending position of the last split.
// if the data didn't end with the byte sequence, we need one final split to run from the end
// of the last split to the end of the content.
long finalSplitOffset = lastOffsetPlusSize;
if (!keepSequence) {
if (!keepTrailingSequence && !keepLeadingSequence) {
finalSplitOffset += byteSequence.length;
}
if (finalSplitOffset > -1L && finalSplitOffset < flowFile.getSize()) {

View File

@ -29,6 +29,129 @@ import org.junit.Test;
public class TestSplitContent {
@Test
public void testTextFormatLeadingPosition() {
final TestRunner runner = TestRunners.newTestRunner(new SplitContent());
runner.setProperty(SplitContent.FORMAT, SplitContent.UTF8_FORMAT.getValue());
runner.setProperty(SplitContent.BYTE_SEQUENCE, "ub");
runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.LEADING_POSITION.getValue());
runner.enqueue("rub-a-dub-dub".getBytes());
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
runner.assertQueueEmpty();
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals("r");
splits.get(1).assertContentEquals("ub-a-d");
splits.get(2).assertContentEquals("ub-d");
splits.get(3).assertContentEquals("ub");
}
@Test
public void testTextFormatSplits() {
final TestRunner runner = TestRunners.newTestRunner(new SplitContent());
runner.setProperty(SplitContent.FORMAT, SplitContent.UTF8_FORMAT.getValue());
runner.setProperty(SplitContent.BYTE_SEQUENCE, "test");
runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.LEADING_POSITION.getValue());
final byte[] input = "This is a test. This is another test. And this is yet another test. Finally this is the last Test.".getBytes();
runner.enqueue(input);
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
runner.assertQueueEmpty();
List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals("This is a ");
splits.get(1).assertContentEquals("test. This is another ");
splits.get(2).assertContentEquals("test. And this is yet another ");
splits.get(3).assertContentEquals("test. Finally this is the last Test.");
runner.clearTransferState();
runner.setProperty(SplitContent.KEEP_SEQUENCE, "false");
runner.enqueue(input);
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals("This is a ");
splits.get(1).assertContentEquals(". This is another ");
splits.get(2).assertContentEquals(". And this is yet another ");
splits.get(3).assertContentEquals(". Finally this is the last Test.");
runner.clearTransferState();
runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.TRAILING_POSITION.getValue());
runner.enqueue(input);
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals("This is a test");
splits.get(1).assertContentEquals(". This is another test");
splits.get(2).assertContentEquals(". And this is yet another test");
splits.get(3).assertContentEquals(". Finally this is the last Test.");
runner.clearTransferState();
runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.TRAILING_POSITION.getValue());
runner.enqueue("This is a test. This is another test. And this is yet another test. Finally this is the last test".getBytes());
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals("This is a test");
splits.get(1).assertContentEquals(". This is another test");
splits.get(2).assertContentEquals(". And this is yet another test");
splits.get(3).assertContentEquals(". Finally this is the last test");
runner.clearTransferState();
runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.LEADING_POSITION.getValue());
runner.enqueue("This is a test. This is another test. And this is yet another test. Finally this is the last test".getBytes());
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitContent.REL_SPLITS, 5);
splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals("This is a ");
splits.get(1).assertContentEquals("test. This is another ");
splits.get(2).assertContentEquals("test. And this is yet another ");
splits.get(3).assertContentEquals("test. Finally this is the last ");
splits.get(4).assertContentEquals("test");
runner.clearTransferState();
}
@Test
public void testTextFormatTrailingPosition() {
final TestRunner runner = TestRunners.newTestRunner(new SplitContent());
runner.setProperty(SplitContent.FORMAT, SplitContent.UTF8_FORMAT.getValue());
runner.setProperty(SplitContent.BYTE_SEQUENCE, "ub");
runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.TRAILING_POSITION.getValue());
runner.enqueue("rub-a-dub-dub".getBytes());
runner.run();
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitContent.REL_SPLITS, 3);
runner.assertQueueEmpty();
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals("rub");
splits.get(1).assertContentEquals("-a-dub");
splits.get(2).assertContentEquals("-dub");
}
@Test
public void testSmallSplits() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitContent());