NIFI-1118 adding SplitText features of size limit and header marker characters

Signed-off-by: Mike Moser <mosermw@apache.org>

This closes #444
This commit is contained in:
Mark Bean 2016-05-15 20:07:04 -04:00 committed by Mike Moser
parent a490c29a4f
commit 1ac44a0a4b
2 changed files with 592 additions and 141 deletions

View File

@ -16,11 +16,14 @@
*/
package org.apache.nifi.processors.standard;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -41,10 +44,13 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
@ -56,7 +62,9 @@ import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.util.IntegerHolder;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.ObjectHolder;
@EventDriven
@ -64,9 +72,15 @@ import org.apache.nifi.util.ObjectHolder;
@SupportsBatching
@Tags({"split", "text"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Splits a text file into multiple smaller text files on line boundaries, each having up to a configured number of lines.")
@CapabilityDescription("Splits a text file into multiple smaller text files on line boundaries limited by maximum number of lines "
+ "or total size of fragment. Each output split file will contain no more than the configured number of lines or bytes. "
+ "If both Line Split Count and Maximum Fragment Size are specified, the split occurs at whichever limit is reached first. "
+ "If the first line of a fragment exceeds the Maximum Fragment Size, that line will be output in a single split file which "
+" exceeds the configured maximum size limit.")
@WritesAttributes({
@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text from the original FlowFile that were copied to this FlowFile"),
@WritesAttribute(attribute = "fragment.size", description = "The number of bytes from the original FlowFile that were copied to this FlowFile, " +
"including header, if applicable, which is duplicated in each split FlowFile"),
@WritesAttribute(attribute = "fragment.identifier", description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
@WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"),
@WritesAttribute(attribute = "fragment.count", description = "The number of split FlowFiles generated from the parent FlowFile"),
@ -76,6 +90,7 @@ public class SplitText extends AbstractProcessor {
// attribute keys
public static final String SPLIT_LINE_COUNT = "text.line.count";
public static final String FRAGMENT_SIZE = "fragment.size";
public static final String FRAGMENT_ID = "fragment.identifier";
public static final String FRAGMENT_INDEX = "fragment.index";
public static final String FRAGMENT_COUNT = "fragment.count";
@ -83,9 +98,18 @@ public class SplitText extends AbstractProcessor {
public static final PropertyDescriptor LINE_SPLIT_COUNT = new PropertyDescriptor.Builder()
.name("Line Split Count")
.description("The number of lines that will be added to each split file (excluding the header, if the Header Line Count property is greater than 0).")
.description("The number of lines that will be added to each split file, excluding header lines. " +
"A value of zero requires Maximum Fragment Size to be set, and line count will not be considered in determining splits.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor FRAGMENT_MAX_SIZE = new PropertyDescriptor.Builder()
.name("Maximum Fragment Size")
.description("The maximum size of each split file, including header lines. NOTE: in the case where a " +
"single line exceeds this property (including headers, if applicable), that line will be output " +
"in a split of its own which exceeds this Maximum Fragment Size setting.")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor HEADER_LINE_COUNT = new PropertyDescriptor.Builder()
.name("Header Line Count")
@ -94,12 +118,19 @@ public class SplitText extends AbstractProcessor {
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.defaultValue("0")
.build();
public static final PropertyDescriptor HEADER_MARKER = new PropertyDescriptor.Builder()
.name("Header Line Marker Characters")
.description("The first character(s) on the line of the datafile which signifies a header line. This value is ignored when Header Line Count is non-zero. " +
"The first line not containing the Header Line Marker Characters and all subsequent lines are considered non-header")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor REMOVE_TRAILING_NEWLINES = new PropertyDescriptor.Builder()
.name("Remove Trailing Newlines")
.description("Whether to remove newlines at the end of each split file. This should be false if you intend to merge the split files later. If this is set to "
+ "'true' and a FlowFile is generated that contains only 'empty lines' (i.e., consists only of \r and \n characters), the FlowFile will not be emitted. "
+ "Note, however, that if the Header Line Count is greater than 0, the resultant FlowFile will never be empty as it will consist of the header lines, so "
+ "a FlowFile may be emitted that contians only the header lines.")
+ "'true' and a FlowFile is generated that contains only 'empty lines' (i.e., consists only of \r and \n characters), the FlowFile will not be emitted. "
+ "Note, however, that if header lines are specified, the resultant FlowFile will never be empty as it will consist of the header lines, so "
+ "a FlowFile may be emitted that contains only the header lines.")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
@ -126,7 +157,9 @@ public class SplitText extends AbstractProcessor {
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(LINE_SPLIT_COUNT);
properties.add(FRAGMENT_MAX_SIZE);
properties.add(HEADER_LINE_COUNT);
properties.add(HEADER_MARKER);
properties.add(REMOVE_TRAILING_NEWLINES);
this.properties = Collections.unmodifiableList(properties);
@ -137,6 +170,22 @@ public class SplitText extends AbstractProcessor {
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
List<ValidationResult> results = new ArrayList<>();
final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
results.add(new ValidationResult.Builder()
.subject("Maximum Fragment Size")
.valid(!invalidState)
.explanation("Property must be specified when Line Split Count is 0")
.build()
);
return results;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
@ -147,140 +196,206 @@ public class SplitText extends AbstractProcessor {
return properties;
}
private int readLines(final InputStream in, final int maxNumLines, final OutputStream out, final boolean keepAllNewLines, final byte[] leadingNewLineBytes) throws IOException {
private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
int numLines = 0;
byte[] leadingBytes = leadingNewLineBytes;
int numLines = 0;
long totalBytes = 0L;
for (int i = 0; i < maxNumLines; i++) {
final EndOfLineMarker eolMarker = locateEndOfLine(in, out, false, eolBuffer, leadingBytes);
final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
final long bytes = eolMarker.getBytesConsumed();
leadingBytes = eolMarker.getLeadingNewLineBytes();
if (keepAllNewLines && out != null) {
if (includeLineDelimiter && out != null) {
if (leadingBytes != null) {
out.write(leadingBytes);
leadingBytes = null;
}
eolBuffer.drainTo(out);
}
if (eolBuffer.length() > 0 || eolMarker.getBytesConsumed() > 0L) {
numLines++;
totalBytes += bytes;
if (bytes <= 0) {
return numLines;
}
if (eolMarker.isStreamEnded()) {
numLines++;
if (totalBytes >= maxByteCount) {
break;
}
}
return numLines;
}
private EndOfLineMarker locateEndOfLine(final InputStream in, final OutputStream out, final boolean includeLineDelimiter,
final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
int lastByte = -1;
private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
long bytesRead = 0L;
final ByteArrayOutputStream buffer;
if (out != null) {
buffer = new ByteArrayOutputStream();
} else {
buffer = null;
}
byte[] bytesToWriteFirst = leadingNewLineBytes;
in.mark(Integer.MAX_VALUE);
while (true) {
in.mark(1);
final int nextByte = in.read();
final boolean isNewLineChar = nextByte == '\r' || nextByte == '\n';
// if we hit end of stream or new line we're done
// if we hit end of stream we're done
if (nextByte == -1) {
if (lastByte == '\r') {
eolBuffer.addEndOfLine(true, false);
if (buffer != null) {
buffer.writeTo(out);
buffer.close();
}
return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);
return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); // bytesToWriteFirst should be "null"?
}
// If we get a character that's not an end-of-line char, then
// we need to write out the EOL's that we have buffered (if out != null).
// Then, we need to reset our EOL buffer because we no longer have consecutive EOL's
if (!isNewLineChar) {
if (bytesToWriteFirst != null) {
if (out != null) {
out.write(bytesToWriteFirst);
}
bytesToWriteFirst = null;
}
if (out != null) {
eolBuffer.drainTo(out);
}
eolBuffer.clear();
// Verify leading bytes do not violate size limitation
if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
}
// if there's an OutputStream to copy the data to, copy it, if appropriate.
// "if appropriate" means that it's not a line delimiter or that we want to copy line delimiters
// Write leadingNewLines, if appropriate
if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
bytesRead += bytesToWriteFirst.length;
buffer.write(bytesToWriteFirst);
bytesToWriteFirst = null;
}
// buffer the output
bytesRead++;
if (out != null && (includeLineDelimiter || !isNewLineChar)) {
if (buffer != null && nextByte != '\n' && nextByte != '\r') {
if (bytesToWriteFirst != null) {
out.write(bytesToWriteFirst);
bytesToWriteFirst = null;
buffer.write(bytesToWriteFirst);
}
bytesToWriteFirst = null;
eolBuffer.drainTo(buffer);
eolBuffer.clear();
buffer.write(nextByte);
}
out.write(nextByte);
// check the size limit
if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
in.reset();
if (buffer != null) {
buffer.close();
}
return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
}
// if we have a new line, then we're done
if (nextByte == '\n') {
eolBuffer.addEndOfLine(lastByte == '\r', true);
if (buffer != null) {
buffer.writeTo(out);
buffer.close();
eolBuffer.addEndOfLine(false, true);
}
return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
}
// we didn't get a new line but if last byte was carriage return we've reached a new-line.
// so we roll back the last byte that we read and return
if (lastByte == '\r') {
in.reset();
bytesRead--; // we reset the stream by 1 byte so decrement the number of bytes read by 1
eolBuffer.addEndOfLine(true, false);
return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
// Determine if \n follows \r; in either case, end of line has been reached
if (nextByte == '\r') {
if (buffer != null) {
buffer.writeTo(out);
buffer.close();
}
in.mark(1);
final int lookAheadByte = in.read();
if (lookAheadByte == '\n') {
eolBuffer.addEndOfLine(true, true);
return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
} else {
in.reset();
eolBuffer.addEndOfLine(true, false);
return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
}
}
// keep track of what the last byte was that we read so that we can detect \r followed by some other
// character.
lastByte = nextByte;
}
}
private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines) throws IOException {
private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
final long bufferedBytes) throws IOException {
final SplitInfo info = new SplitInfo();
final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
int lastByte = -1;
info.lengthBytes = bufferedBytes;
long lastEolBufferLength = 0L;
while (info.lengthLines < numLines) {
final boolean keepNewLine = keepAllNewLines || (info.lengthLines != numLines - 1);
final EndOfLineMarker eolMarker = locateEndOfLine(in, null, keepNewLine, eolBuffer, null);
long bytesTillNext = eolMarker.getBytesConsumed();
info.lengthLines++;
// if this is the last line in the split and we don't want to keep all lines
// (i.e., we want to remove trailing new lines), then decrement out lengthBytes
final boolean isLastLine = eolMarker.isStreamEnded() || info.lengthLines >= numLines;
if (isLastLine && !keepAllNewLines) {
bytesTillNext -= eolBuffer.length();
}
info.lengthBytes += bytesTillNext;
if (eolMarker.isStreamEnded()) {
info.endOfStream = true;
while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
&& (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
&& eolBuffer.length() < maxSize) {
in.mark(1);
final int nextByte = in.read();
// Check for \n following \r on last line
if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
in.reset();
break;
}
switch (nextByte) {
case -1:
info.endOfStream = true;
if (keepAllNewLines) {
info.lengthBytes += eolBuffer.length();
}
if (lastByte != '\r') {
info.lengthLines++;
}
info.bufferedBytes = 0;
return info;
case '\r':
eolBuffer.addEndOfLine(true, false);
info.lengthLines++;
info.bufferedBytes = 0;
break;
case '\n':
eolBuffer.addEndOfLine(false, true);
if (lastByte != '\r') {
info.lengthLines++;
}
info.bufferedBytes = 0;
break;
default:
if (eolBuffer.length() > 0) {
info.lengthBytes += eolBuffer.length();
lastEolBufferLength = eolBuffer.length();
eolBuffer.clear();
}
info.lengthBytes++;
info.bufferedBytes++;
break;
}
lastByte = nextByte;
}
// if current line exceeds size and not keeping eol characters, remove previously applied eol characters
if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
info.lengthBytes -= lastEolBufferLength;
}
if (keepAllNewLines) {
info.lengthBytes += eolBuffer.length();
}
return info;
}
private int countHeaderLines(final ByteCountingInputStream in,
final String headerMarker) throws IOException {
int headerInfo = 0;
final BufferedReader br = new BufferedReader(new InputStreamReader(in));
in.mark(Integer.MAX_VALUE);
String line = br.readLine();
while (line != null) {
// if line is not a header line, reset stream and return header counts
if (!line.startsWith(headerMarker)) {
in.reset();
return headerInfo;
} else {
headerInfo++;
}
line = br.readLine();
}
in.reset();
return headerInfo;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final FlowFile flowFile = session.get();
@ -290,8 +405,12 @@ public class SplitText extends AbstractProcessor {
final ComponentLog logger = getLogger();
final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
final int splitCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
final boolean removeTrailingNewlines = context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
final ObjectHolder<String> errorMessage = new ObjectHolder<>(null);
final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
@ -304,59 +423,82 @@ public class SplitText extends AbstractProcessor {
try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
long bufferedPartialLine = 0;
// if we have header lines, copy them into a ByteArrayOutputStream
final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
final int headerLinesCopied = readLines(in, headerCount, headerStream, true, null);
if (headerLinesCopied < headerCount) {
errorMessage.set("Header Line Count is set to " + headerCount + " but file had only " + headerLinesCopied + " lines");
return;
}
// Break header apart into trailing newlines and
final byte[] headerBytes = headerStream.toByteArray();
int headerNewLineByteCount = 0;
for (int i = headerBytes.length - 1; i >= 0; i--) {
final byte headerByte = headerBytes[i];
if (headerByte == '\r' || headerByte == '\n') {
headerNewLineByteCount++;
} else {
break;
// Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
int headerInfoLineCount = 0;
if (headerCount > 0) {
headerInfoLineCount = headerCount;
} else {
if (headerMarker != null) {
headerInfoLineCount = countHeaderLines(in, headerMarker);
}
}
final byte[] headerNewLineBytes;
final byte[] headerBytesWithoutTrailingNewLines;
if (headerNewLineByteCount == 0) {
headerNewLineBytes = null;
headerBytesWithoutTrailingNewLines = headerBytes;
} else {
headerNewLineBytes = new byte[headerNewLineByteCount];
System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
if (headerInfoLineCount > 0) {
final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
if (headerLinesCopied < headerInfoLineCount) {
errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
return;
}
// Break header apart into trailing newlines and remaining text
final byte[] headerBytes = headerStream.toByteArray();
int headerNewLineByteCount = 0;
for (int i = headerBytes.length - 1; i >= 0; i--) {
final byte headerByte = headerBytes[i];
if (headerByte == '\r' || headerByte == '\n') {
headerNewLineByteCount++;
} else {
break;
}
}
if (headerNewLineByteCount == 0) {
headerNewLineBytes = null;
headerBytesWithoutTrailingNewLines = headerBytes;
} else {
headerNewLineBytes = new byte[headerNewLineByteCount];
System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
}
} else {
headerBytesWithoutTrailingNewLines = null;
headerNewLineBytes = null;
}
while (true) {
if (headerCount > 0) {
if (headerInfoLineCount > 0) {
// if we have header lines, create a new FlowFile, copy the header lines to that file,
// and then start copying lines
final IntegerHolder linesCopied = new IntegerHolder(0);
final LongHolder bytesCopied = new LongHolder(0L);
FlowFile splitFile = session.create(flowFile);
try {
splitFile = session.write(splitFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream rawOut) throws IOException {
try (final BufferedOutputStream out = new BufferedOutputStream(rawOut)) {
out.write(headerBytesWithoutTrailingNewLines);
linesCopied.set(readLines(in, splitCount, out, !removeTrailingNewlines, headerNewLineBytes));
try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
countingOut.write(headerBytesWithoutTrailingNewLines);
//readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already
linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
includeLineDelimiter, headerNewLineBytes));
bytesCopied.set(countingOut.getBytesWritten());
}
}
});
splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
logger.debug("Created Split File {} with {} lines", new Object[]{splitFile, linesCopied.get()});
splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get()));
logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()});
} finally {
if (linesCopied.get() > 0) {
splits.add(splitFile);
@ -367,27 +509,44 @@ public class SplitText extends AbstractProcessor {
}
}
// If we copied fewer lines than what we want, then we're done copying data (we've hit EOF).
if (linesCopied.get() < splitCount) {
// Check for EOF
in.mark(1);
if (in.read() == -1) {
break;
}
in.reset();
} else {
// We have no header lines, so we can simply demarcate the original File via the
// ProcessSession#clone method.
long beforeReadingLines = in.getBytesConsumed();
final SplitInfo info = locateSplitPoint(in, splitCount, !removeTrailingNewlines);
if (info.lengthBytes > 0) {
info.offsetBytes = beforeReadingLines;
splitInfos.add(info);
final long procNanos = System.nanoTime() - startNanos;
final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
+ "total splits = {}; total processing time = {} ms",
new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
long beforeReadingLines = in.getBytesConsumed() - bufferedPartialLine;
final SplitInfo info = locateSplitPoint(in, maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine);
if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
bufferedPartialLine = info.bufferedBytes;
}
if (info.endOfStream) {
// stream is out of data
if (info.lengthBytes > 0) {
info.offsetBytes = beforeReadingLines;
splitInfos.add(info);
final long procNanos = System.nanoTime() - startNanos;
final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
+ "total splits = {}; total processing time = {} ms",
new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
}
break;
} else {
if (info.lengthBytes != 0) {
info.offsetBytes = beforeReadingLines;
info.lengthBytes -= bufferedPartialLine;
splitInfos.add(info);
final long procNanos = System.nanoTime() - startNanos;
final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
+ "total splits = {}; total processing time = {} ms",
new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
}
}
}
}
@ -398,7 +557,7 @@ public class SplitText extends AbstractProcessor {
if (errorMessage.get() != null) {
logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()});
session.transfer(flowFile, REL_FAILURE);
if (splits != null && !splits.isEmpty()) {
if (!splits.isEmpty()) {
session.remove(splits);
}
return;
@ -409,6 +568,7 @@ public class SplitText extends AbstractProcessor {
for (final SplitInfo info : splitInfos) {
FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes);
split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines));
split = session.putAttribute(split, FRAGMENT_SIZE, String.valueOf(info.lengthBytes));
splits.add(split);
}
}
@ -443,26 +603,20 @@ public class SplitText extends AbstractProcessor {
}
private static class SplitInfo {
public long offsetBytes;
public long lengthBytes;
public long lengthLines;
public long bufferedBytes;
public boolean endOfStream;
public SplitInfo() {
super();
this.offsetBytes = 0L;
this.lengthBytes = 0L;
this.lengthLines = 0L;
this.bufferedBytes = 0L;
this.endOfStream = false;
}
@SuppressWarnings("unused")
public SplitInfo(long offsetBytes, long lengthBytes, long lengthLines) {
super();
this.offsetBytes = offsetBytes;
this.lengthBytes = lengthBytes;
this.lengthLines = lengthLines;
}
}
public static class EndOfLineBuffer {

View File

@ -38,6 +38,303 @@ public class TestSplitText {
final static String TEST_INPUT_DATA = "HeaderLine1\nLine2SpacesAtEnd \nLine3\nLine4\n\n\nLine8\nLine9\n\n\n13\n14\n15 EndofLine15\n16\n"
+ "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nLastLine\n";
@Test
public void testLastLineExceedsSizeLimit() {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "2");
runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "20 B");
runner.enqueue("Line #1\nLine #2\nLine #3\nLong line exceeding limit");
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 3);
}
@Test
public void testIncompleteHeader() {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.HEADER_LINE_COUNT, "2");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "2");
runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "50 B");
runner.enqueue("Header Line #1");
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 1);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 0);
runner.assertTransferCount(SplitText.REL_SPLITS, 0);
}
@Test
public void testSingleCharacterHeaderMarker() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "7");
runner.setProperty(SplitText.HEADER_MARKER, "H");
runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "false");
runner.enqueue(file);
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 2);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "7");
splits.get(0).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "86");
splits.get(1).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "3");
splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "54");
final String fragmentUUID = splits.get(0).getAttribute("fragment.identifier");
for (int i = 0; i < splits.size(); i++) {
final MockFlowFile split = splits.get(i);
split.assertAttributeEquals(SplitText.FRAGMENT_INDEX, String.valueOf(i+1));
split.assertAttributeEquals(SplitText.FRAGMENT_ID, fragmentUUID);
split.assertAttributeEquals(SplitText.FRAGMENT_COUNT, String.valueOf(splits.size()));
split.assertAttributeEquals(SplitText.SEGMENT_ORIGINAL_FILENAME, file.getFileName().toString());
}
}
@Test
public void testMultipleHeaderIndicators() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.HEADER_LINE_COUNT, "1");
runner.setProperty(SplitText.HEADER_MARKER, "Head");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "5");
runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "false");
runner.enqueue(file);
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 3);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "5");
splits.get(0).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "62");
splits.get(1).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "5");
splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "55");
splits.get(2).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "1");
splits.get(2).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "23");
final String fragmentUUID = splits.get(0).getAttribute("fragment.identifier");
for (int i = 0; i < splits.size(); i++) {
final MockFlowFile split = splits.get(i);
split.assertAttributeEquals(SplitText.FRAGMENT_INDEX, String.valueOf(i + 1));
split.assertAttributeEquals(SplitText.FRAGMENT_ID, fragmentUUID);
split.assertAttributeEquals(SplitText.FRAGMENT_COUNT, String.valueOf(splits.size()));
split.assertAttributeEquals(SplitText.SEGMENT_ORIGINAL_FILENAME, file.getFileName().toString());
}
}
@Test
public void testZeroLinesNoMaxSize() {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "0");
runner.assertNotValid();
}
@Test
public void testMultipleSplitDirectives() {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.HEADER_LINE_COUNT, "2");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "2");
runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "50 B");
runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "false");
runner.enqueue("Header Line #1\nHeader Line #2\nLine #1\nLine #2\n"
+ "Line #3 This line has additional text added so that it exceeds the maximum fragment size\n"
+ "Line #4\nLine #5\nLine #6\nLine #7\nLine #8\nLine #9\nLine #10\n");
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 6);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
assertEquals(46, splits.get(0).getSize());
assertEquals(119, splits.get(1).getSize());
assertEquals(46, splits.get(2).getSize());
assertEquals(46, splits.get(3).getSize());
assertEquals(46, splits.get(4).getSize());
assertEquals(39, splits.get(5).getSize());
}
@Test
public void testFlowFileIsOnlyHeader() {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "2");
runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "50 B");
runner.setProperty(SplitText.HEADER_MARKER, "Head");
runner.enqueue("Header Line #1\nHeaderLine#2\n");
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 0);
// repeat with header cou8nt versus header marker
runner.clearTransferState();
runner.setProperty(SplitText.HEADER_LINE_COUNT, "2");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "2");
runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "50 B");
runner.enqueue("Header Line #1\nHeaderLine #2\n");
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 0);
// repeat single header line with no newline characters
runner.clearTransferState();
runner.setProperty(SplitText.HEADER_LINE_COUNT, "1");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "2");
runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "50 B");
runner.enqueue("Header Line #1");
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 0);
}
@Test
public void testMaxSizeExceeded() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.HEADER_LINE_COUNT, "2");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "0");
runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "71 B");
runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "false");
runner.enqueue(file);
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 2);
List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
String fragmentUUID = splits.get(0).getAttribute("fragment.identifier");
for (int i = 0; i < splits.size(); i++) {
final MockFlowFile split = splits.get(i);
split.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "5");
split.assertAttributeEquals(SplitText.FRAGMENT_SIZE, "70");
split.assertAttributeEquals(SplitText.FRAGMENT_INDEX, String.valueOf(i + 1));
split.assertAttributeEquals(SplitText.FRAGMENT_ID, fragmentUUID);
split.assertAttributeEquals(SplitText.FRAGMENT_COUNT, String.valueOf(splits.size()));
split.assertAttributeEquals(SplitText.SEGMENT_ORIGINAL_FILENAME, file.getFileName().toString());
}
// Repeat test without header
runner.clearTransferState();
runner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "0");
runner.setProperty(SplitText.FRAGMENT_MAX_SIZE, "71 B");
runner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "false");
runner.enqueue(file);
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 2);
splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
fragmentUUID = splits.get(0).getAttribute("fragment.identifier");
splits.get(0).assertContentEquals("Header Line #1\nHeader Line #2\nLine #1\nLine #2\nLine #3\nLine #4\nLine #5\n");
splits.get(1).assertContentEquals("Line #6\nLine #7\nLine #8\nLine #9\nLine #10");
splits.get(0).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "7");
splits.get(0).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "70");
splits.get(0).assertAttributeEquals(SplitText.FRAGMENT_INDEX, "1");
splits.get(0).assertAttributeEquals(SplitText.FRAGMENT_ID, fragmentUUID);
splits.get(0).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
splits.get(0).assertAttributeEquals(SplitText.SEGMENT_ORIGINAL_FILENAME, file.getFileName().toString());
splits.get(1).assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "5");
splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_SIZE, "40");
splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_INDEX, "2");
splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_ID, fragmentUUID);
splits.get(1).assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
splits.get(1).assertAttributeEquals(SplitText.SEGMENT_ORIGINAL_FILENAME, file.getFileName().toString());
}
@Test
public void testSplitWithOnlyCarriageReturn() {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.HEADER_LINE_COUNT, "2");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "3");
runner.enqueue("H1\rH2\r1\r2\r3\r\r\r\r\r\r\r10\r11\r12\r");
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 4);
final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("H1\rH2\r1\r2\r3");
splits.get(1).assertContentEquals("H1\rH2");
splits.get(2).assertContentEquals("H1\rH2");
splits.get(3).assertContentEquals("H1\rH2\r10\r11\r12");
runner.clearTransferState();
runner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "3");
runner.enqueue("1\r2\r3\r\r\r\r\r\r\r10\r11\r12\r");
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 2);
final List<MockFlowFile> splitsWithNoHeader = runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splitsWithNoHeader.get(0).assertContentEquals("1\r2\r3");
splitsWithNoHeader.get(1).assertContentEquals("10\r11\r12");
}
@Test
public void testSplitWithCarriageReturnAndNewLines() {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());
runner.setProperty(SplitText.HEADER_LINE_COUNT, "2");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "3");
runner.enqueue("H1\r\nH2\r\n1\r\n2\r\n3\r\n\r\n\r\n\r\n\r\n\r\n\r\n10\r\n11\r\n12\r\n");
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 4);
final List<MockFlowFile> splits =runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("H1\r\nH2\r\n1\r\n2\r\n3");
splits.get(1).assertContentEquals("H1\r\nH2");
splits.get(2).assertContentEquals("H1\r\nH2");
splits.get(3).assertContentEquals("H1\r\nH2\r\n10\r\n11\r\n12");
runner.clearTransferState();
runner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
runner.setProperty(SplitText.LINE_SPLIT_COUNT, "3");
runner.enqueue("1\r\n2\r\n3\r\n\r\n\r\n\r\n\r\n\r\n\r\n10\r\n11\r\n12\r\n");
runner.run();
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 2);
final List<MockFlowFile> splitsWithNoHeader =runner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splitsWithNoHeader.get(0).assertContentEquals("1\r\n2\r\n3");
splitsWithNoHeader.get(1).assertContentEquals("10\r\n11\r\n12");
}
@Test
public void testRoutesToFailureIfHeaderLinesNotAllPresent() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitText());