NIFI-3255 removed dependency on session.merge from SplitText

NIFI-3255 addressed PR comments

NIFI-3255 fixed linkage for Split creation

This closes #1394
This commit is contained in:
Oleg Zhurakousky 2017-01-04 15:37:30 -05:00
parent ec868362f3
commit ded18b94db

View File

@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@ -32,6 +33,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -56,6 +58,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.util.TextLineDemarcator;
import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo;
@ -282,8 +285,7 @@ public class SplitText extends AbstractProcessor {
* it signifies the header information and its contents will be included in
* each and every computed split.
*/
private List<FlowFile> generateSplitFlowFiles(final String fragmentId, FlowFile sourceFlowFile, SplitInfo splitInfo,
List<SplitInfo> computedSplitsInfo, ProcessSession processSession){
private List<FlowFile> generateSplitFlowFiles(String fragmentId, FlowFile sourceFlowFile, SplitInfo splitInfo, List<SplitInfo> computedSplitsInfo, ProcessSession processSession){
List<FlowFile> splitFlowFiles = new ArrayList<>();
FlowFile headerFlowFile = null;
long headerCrlfLength = 0;
@ -295,19 +297,19 @@ public class SplitText extends AbstractProcessor {
if ((computedSplitsInfo.size() == 0) && (headerFlowFile != null)) {
FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength);
splitFlowFile = SplitText.this.updateAttributes(processSession, splitFlowFile, 0, splitFlowFile.getSize(),
splitFlowFile = this.updateAttributes(processSession, splitFlowFile, 0, splitFlowFile.getSize(),
fragmentId, fragmentIndex++, 0, sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
splitFlowFiles.add(splitFlowFile);
} else {
for (SplitInfo computedSplitInfo : computedSplitsInfo) {
long length = SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length;
long length = this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length;
boolean proceedWithClone = headerFlowFile != null || length > 0;
if (proceedWithClone) {
FlowFile splitFlowFile = null;
if (headerFlowFile != null) {
if (length > 0) {
splitFlowFile = processSession.clone(sourceFlowFile, computedSplitInfo.startOffset, length);
splitFlowFile = processSession.merge( Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile);
splitFlowFile = this.concatenateContents(sourceFlowFile, processSession, headerFlowFile, splitFlowFile);
} else {
splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength); // trim the last CRLF if split consists of only HEADER
}
@ -315,7 +317,7 @@ public class SplitText extends AbstractProcessor {
splitFlowFile = processSession.clone(sourceFlowFile, computedSplitInfo.startOffset, length);
}
splitFlowFile = SplitText.this.updateAttributes(processSession, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++,
splitFlowFile = this.updateAttributes(processSession, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++,
computedSplitsInfo.size(), sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
splitFlowFiles.add(splitFlowFile);
}
@ -329,6 +331,30 @@ public class SplitText extends AbstractProcessor {
return splitFlowFiles;
}
/**
* Will concatenate the contents of the provided array of {@link FlowFile}s
* into a single {@link FlowFile}. While this operation is as general as it
* is described in the previous sentence, in the context of this processor
* there can only be two {@link FlowFile}s with the first {@link FlowFile}
* representing the header content of the split and the second
* {@link FlowFile} represents the split itself.
*/
private FlowFile concatenateContents(FlowFile sourceFlowFile, ProcessSession session, FlowFile... flowFiles) {
FlowFile mergedFlowFile = session.create(sourceFlowFile);
for (FlowFile flowFile : flowFiles) {
mergedFlowFile = session.append(mergedFlowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
try (InputStream is = session.read(flowFile)) {
IOUtils.copy(is, out);
}
}
});
}
session.remove(flowFiles[1]); // in current usage we always have 2 files
return mergedFlowFile;
}
private FlowFile updateAttributes(ProcessSession processSession, FlowFile splitFlowFile, long splitLineCount, long splitFlowFileSize,
String splitId, int splitIndex, int splitCount, String origFileName) {
Map<String, String> attributes = new HashMap<>();