NIFI-5974 fix: Fragment Attributes are populated in case no split has occured.

Unit test is implemented: testNoSplitterInString

NIFI-5974: Fixed Checkstyle violations

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #3275
This commit is contained in:
a.durov 2019-01-25 16:57:13 +03:00 committed by Matthew Burgess
parent 2eac0e96c7
commit 8e777203a0
2 changed files with 53 additions and 24 deletions

View File

@ -257,36 +257,36 @@ public class SplitContent extends AbstractProcessor {
});
long lastOffsetPlusSize = -1L;
final ArrayList<FlowFile> splitList = new ArrayList<>();
if (splits.isEmpty()) {
FlowFile clone = session.clone(flowFile);
session.transfer(flowFile, REL_ORIGINAL);
session.transfer(clone, REL_SPLITS);
// finishFragmentAttributes performs .clear() so List must be mutable
splitList.add(clone);
logger.info("Found no match for {}; transferring original 'original' and transferring clone {} to 'splits'", new Object[]{flowFile, clone});
return;
}
} else {
for (final Tuple<Long, Long> tuple : splits) {
long offset = tuple.getKey();
long size = tuple.getValue();
if (size > 0) {
FlowFile split = session.clone(flowFile, offset, size);
splitList.add(split);
}
final ArrayList<FlowFile> splitList = new ArrayList<>();
for (final Tuple<Long, Long> tuple : splits) {
long offset = tuple.getKey();
long size = tuple.getValue();
if (size > 0) {
FlowFile split = session.clone(flowFile, offset, size);
splitList.add(split);
lastOffsetPlusSize = offset + size;
}
lastOffsetPlusSize = offset + size;
}
// lastOffsetPlusSize indicates the ending position of the last split.
// if the data didn't end with the byte sequence, we need one final split to run from the end
// of the last split to the end of the content.
long finalSplitOffset = lastOffsetPlusSize;
if (!keepTrailingSequence && !keepLeadingSequence) {
finalSplitOffset += byteSequence.length;
}
if (finalSplitOffset > -1L && finalSplitOffset < flowFile.getSize()) {
FlowFile finalSplit = session.clone(flowFile, finalSplitOffset, flowFile.getSize() - finalSplitOffset);
splitList.add(finalSplit);
// lastOffsetPlusSize indicates the ending position of the last split.
// if the data didn't end with the byte sequence, we need one final split to run from the end
// of the last split to the end of the content.
long finalSplitOffset = lastOffsetPlusSize;
if (!keepTrailingSequence && !keepLeadingSequence) {
finalSplitOffset += byteSequence.length;
}
if (finalSplitOffset > -1L && finalSplitOffset < flowFile.getSize()) {
FlowFile finalSplit = session.clone(flowFile, finalSplitOffset, flowFile.getSize() - finalSplitOffset);
splitList.add(finalSplit);
}
}
final String fragmentId = finishFragmentAttributes(session, flowFile, splitList);

View File

@ -27,6 +27,8 @@ import org.junit.Test;
import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_COUNT;
import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_ID;
import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_INDEX;
import static org.apache.nifi.processors.standard.SplitContent.SEGMENT_ORIGINAL_FILENAME;
public class TestSplitContent {
@ -371,4 +373,31 @@ public class TestSplitContent {
final List<MockFlowFile> packed = mergeRunner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
packed.get(0).assertContentEquals(new byte[]{1, 2, 3, 4, 5, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 5, 4, 3, 2, 1});
}
@Test
public void testNoSplitterInString() {
String content = "UVAT";
final TestRunner runner = TestRunners.newTestRunner(new SplitContent());
runner.setProperty(SplitContent.FORMAT, SplitContent.UTF8_FORMAT.getValue());
runner.setProperty(SplitContent.BYTE_SEQUENCE, ",");
runner.setProperty(SplitContent.KEEP_SEQUENCE, "false");
runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.TRAILING_POSITION.getValue());
runner.enqueue(content.getBytes());
runner.run();
runner.assertTransferCount(SplitContent.REL_SPLITS, 1);
MockFlowFile splitResult = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS).get(0);
splitResult.assertAttributeExists(FRAGMENT_ID);
splitResult.assertAttributeExists(SEGMENT_ORIGINAL_FILENAME);
splitResult.assertAttributeEquals(FRAGMENT_COUNT, "1");
splitResult.assertAttributeEquals(FRAGMENT_INDEX, "1");
runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
runner.assertQueueEmpty();
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
splits.get(0).assertContentEquals(content);
}
}