NIFI-4156: Fixed fragment.count in SplitText to equal emitted flow files

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2014.
This commit is contained in:
Matt Burgess 2017-07-17 09:23:55 -04:00 committed by Pierre Villard
parent a6e94de0bb
commit 844dbe4edb
2 changed files with 42 additions and 5 deletions

View File

@ -298,7 +298,7 @@ public class SplitText extends AbstractProcessor {
if ((computedSplitsInfo.size() == 0) && (headerFlowFile != null)) {
FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength);
splitFlowFile = this.updateAttributes(processSession, splitFlowFile, 0, splitFlowFile.getSize(),
fragmentId, fragmentIndex++, 0, sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
fragmentId, fragmentIndex++, sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
splitFlowFiles.add(splitFlowFile);
} else {
for (SplitInfo computedSplitInfo : computedSplitsInfo) {
@ -318,10 +318,14 @@ public class SplitText extends AbstractProcessor {
}
splitFlowFile = this.updateAttributes(processSession, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++,
computedSplitsInfo.size(), sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
splitFlowFiles.add(splitFlowFile);
}
}
// Update fragment.count with real split count (i.e. don't count files for which there was no clone)
for (FlowFile splitFlowFile : splitFlowFiles) {
splitFlowFile = processSession.putAttribute(splitFlowFile, FRAGMENT_COUNT, String.valueOf(fragmentIndex - 1)); // -1 because the index starts at 1 (see above)
}
}
getLogger().info("Split " + sourceFlowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : "."));
@ -356,13 +360,12 @@ public class SplitText extends AbstractProcessor {
}
private FlowFile updateAttributes(ProcessSession processSession, FlowFile splitFlowFile, long splitLineCount, long splitFlowFileSize,
String splitId, int splitIndex, int splitCount, String origFileName) {
String splitId, int splitIndex, String origFileName) {
Map<String, String> attributes = new HashMap<>();
attributes.put(SPLIT_LINE_COUNT, String.valueOf(splitLineCount));
attributes.put(FRAGMENT_SIZE, String.valueOf(splitFlowFile.getSize()));
attributes.put(FRAGMENT_ID, splitId);
attributes.put(FRAGMENT_INDEX, String.valueOf(splitIndex));
attributes.put(FRAGMENT_COUNT, String.valueOf(splitCount));
attributes.put(SEGMENT_ORIGINAL_FILENAME, origFileName);
return processSession.putAllAttributes(splitFlowFile, attributes);
}
@ -424,6 +427,7 @@ public class SplitText extends AbstractProcessor {
SplitInfo remainderSplitInfo, long startingLength) throws IOException {
long length = 0;
long trailingCrlfLength = 0;
long trailingLineCount = 0;
long actualLineCount = 0;
OffsetInfo offsetInfo = null;
SplitInfo splitInfo = null;
@ -440,6 +444,7 @@ public class SplitText extends AbstractProcessor {
if (offsetInfo.getLength() == offsetInfo.getCrlfLength()) {
trailingCrlfLength += offsetInfo.getCrlfLength();
trailingLineCount++;
} else if (offsetInfo.getLength() > offsetInfo.getCrlfLength()) {
trailingCrlfLength = 0; // non-empty line came in, thus resetting counter
}
@ -465,6 +470,7 @@ public class SplitText extends AbstractProcessor {
if (length - trailingCrlfLength >= lastCrlfLength) {
trailingCrlfLength += lastCrlfLength; // trim CRLF from the last line
}
actualLineCount -= trailingLineCount;
splitInfo = new SplitInfo(startOffset, length, length - trailingCrlfLength, actualLineCount, remaningOffsetInfo);
}
return splitInfo;

View File

@ -771,7 +771,38 @@ public class TestSplitText {
splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0);
final List<MockFlowFile> splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("1\n2");
MockFlowFile split0 = splits.get(0);
split0.assertContentEquals("1\n2");
split0.assertAttributeEquals(SplitText.FRAGMENT_INDEX, "1");
split0.assertAttributeEquals(SplitText.FRAGMENT_COUNT, "1");
split0.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "2");
}
@Test
public void testFragmentCountIsActualFlowFileCount() {
final TestRunner splitRunner = TestRunners.newTestRunner(new SplitText());
splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "1");
splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true");
splitRunner.enqueue("1\n2\n\n\n\n\n\n\n\n");
splitRunner.run();
splitRunner.assertTransferCount(SplitText.REL_SPLITS, 2);
splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0);
final List<MockFlowFile> splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
MockFlowFile split0 = splits.get(0);
split0.assertContentEquals("1");
split0.assertAttributeEquals(SplitText.FRAGMENT_INDEX, "1");
split0.assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
split0.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "1");
MockFlowFile split1 = splits.get(1);
split1.assertContentEquals("2");
split1.assertAttributeEquals(SplitText.FRAGMENT_INDEX, "2");
split1.assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
split1.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "1");
}