From 8e777203a02c6e6d9772c9dec86966c9516d56fe Mon Sep 17 00:00:00 2001 From: "a.durov" Date: Fri, 25 Jan 2019 16:57:13 +0300 Subject: [PATCH] NIFI-5974 fix: Fragment Attributes are populated in case no split has occured. Unit test is implemented: testNoSplitterInString NIFI-5974: Fixed Checkstyle violations Signed-off-by: Matthew Burgess This closes #3275 --- .../processors/standard/SplitContent.java | 48 +++++++++---------- .../processors/standard/TestSplitContent.java | 29 +++++++++++ 2 files changed, 53 insertions(+), 24 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java index e80f76a2f8..4877d6a979 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java @@ -257,36 +257,36 @@ public class SplitContent extends AbstractProcessor { }); long lastOffsetPlusSize = -1L; + final ArrayList splitList = new ArrayList<>(); + if (splits.isEmpty()) { FlowFile clone = session.clone(flowFile); - session.transfer(flowFile, REL_ORIGINAL); - session.transfer(clone, REL_SPLITS); + // finishFragmentAttributes performs .clear() so List must be mutable + splitList.add(clone); logger.info("Found no match for {}; transferring original 'original' and transferring clone {} to 'splits'", new Object[]{flowFile, clone}); - return; - } + } else { + for (final Tuple tuple : splits) { + long offset = tuple.getKey(); + long size = tuple.getValue(); + if (size > 0) { + FlowFile split = session.clone(flowFile, offset, size); + splitList.add(split); + } - final ArrayList splitList = new ArrayList<>(); - for (final Tuple tuple : splits) { - long offset = tuple.getKey(); - long size = tuple.getValue(); - if (size > 0) { - FlowFile split = session.clone(flowFile, offset, size); - splitList.add(split); + lastOffsetPlusSize = offset + size; } - lastOffsetPlusSize = offset + size; - } - - // lastOffsetPlusSize indicates the ending position of the last split. - // if the data didn't end with the byte sequence, we need one final split to run from the end - // of the last split to the end of the content. - long finalSplitOffset = lastOffsetPlusSize; - if (!keepTrailingSequence && !keepLeadingSequence) { - finalSplitOffset += byteSequence.length; - } - if (finalSplitOffset > -1L && finalSplitOffset < flowFile.getSize()) { - FlowFile finalSplit = session.clone(flowFile, finalSplitOffset, flowFile.getSize() - finalSplitOffset); - splitList.add(finalSplit); + // lastOffsetPlusSize indicates the ending position of the last split. + // if the data didn't end with the byte sequence, we need one final split to run from the end + // of the last split to the end of the content. + long finalSplitOffset = lastOffsetPlusSize; + if (!keepTrailingSequence && !keepLeadingSequence) { + finalSplitOffset += byteSequence.length; + } + if (finalSplitOffset > -1L && finalSplitOffset < flowFile.getSize()) { + FlowFile finalSplit = session.clone(flowFile, finalSplitOffset, flowFile.getSize() - finalSplitOffset); + splitList.add(finalSplit); + } } final String fragmentId = finishFragmentAttributes(session, flowFile, splitList); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java index 0dec5a12c0..dce8f43c64 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java @@ -27,6 +27,8 @@ import org.junit.Test; import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_COUNT; import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_ID; +import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_INDEX; +import static org.apache.nifi.processors.standard.SplitContent.SEGMENT_ORIGINAL_FILENAME; public class TestSplitContent { @@ -371,4 +373,31 @@ public class TestSplitContent { final List packed = mergeRunner.getFlowFilesForRelationship(MergeContent.REL_MERGED); packed.get(0).assertContentEquals(new byte[]{1, 2, 3, 4, 5, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 5, 4, 3, 2, 1}); } + + @Test + public void testNoSplitterInString() { + + String content = "UVAT"; + + final TestRunner runner = TestRunners.newTestRunner(new SplitContent()); + runner.setProperty(SplitContent.FORMAT, SplitContent.UTF8_FORMAT.getValue()); + runner.setProperty(SplitContent.BYTE_SEQUENCE, ","); + runner.setProperty(SplitContent.KEEP_SEQUENCE, "false"); + runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.TRAILING_POSITION.getValue()); + + runner.enqueue(content.getBytes()); + runner.run(); + + runner.assertTransferCount(SplitContent.REL_SPLITS, 1); + MockFlowFile splitResult = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS).get(0); + splitResult.assertAttributeExists(FRAGMENT_ID); + splitResult.assertAttributeExists(SEGMENT_ORIGINAL_FILENAME); + splitResult.assertAttributeEquals(FRAGMENT_COUNT, "1"); + splitResult.assertAttributeEquals(FRAGMENT_INDEX, "1"); + runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1); + + runner.assertQueueEmpty(); + final List splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS); + splits.get(0).assertContentEquals(content); + } }