NIFI-2851 initial commit of perf improvements on SplitText

- introduced org.apache.nifi.stream.io.util.TextLineDemarcator
- refactored SplitText to use org.apache.nifi.stream.io.util.TextLineDemarcator
- updated SplitText's capability discription to provide more clarity around splits with headers.
This commit is contained in:
Oleg Zhurakousky 2016-10-07 11:37:32 -04:00 committed by Mark Payne
parent b73ba7f8d4
commit 41f519e84c
4 changed files with 779 additions and 520 deletions

View File

@ -0,0 +1,253 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stream.io.util;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
/**
* Implementation of demarcator of text lines in the provided
* {@link InputStream}. It works similar to the {@link BufferedReader} and its
* {@link BufferedReader#readLine()} methods except that it does not create a
* String representing the text line and instead returns the offset info for the
* computed text line. See {@link #nextOffsetInfo()} and
* {@link #nextOffsetInfo(byte[])} for more details.
* <p>
* This class is NOT thread-safe.
* </p>
*/
public class TextLineDemarcator {
private final static int INIT_BUFFER_SIZE = 8192;
private final InputStream is;
private final int initialBufferSize;
private byte[] buffer;
private int index;
private int mark;
private long offset;
private int bufferLength;
/**
* Constructs an instance of demarcator with provided {@link InputStream}
* and default buffer size.
*/
public TextLineDemarcator(InputStream is) {
this(is, INIT_BUFFER_SIZE);
}
/**
* Constructs an instance of demarcator with provided {@link InputStream}
* and initial buffer size.
*/
public TextLineDemarcator(InputStream is, int initialBufferSize) {
if (is == null) {
throw new IllegalArgumentException("'is' must not be null.");
}
if (initialBufferSize < 1) {
throw new IllegalArgumentException("'initialBufferSize' must be > 0.");
}
this.is = is;
this.initialBufferSize = initialBufferSize;
this.buffer = new byte[initialBufferSize];
}
/**
* Will compute the next <i>offset info</i> for a text line (line terminated
* by either '\r', '\n' or '\r\n'). <br>
* The <i>offset info</i> computed and returned as {@link OffsetInfo} where
* {@link OffsetInfo#isStartsWithMatch()} will always return true.
*
* @return offset info
*/
public OffsetInfo nextOffsetInfo() {
return this.nextOffsetInfo(null);
}
/**
* Will compute the next <i>offset info</i> for a text line (line terminated
* by either '\r', '\n' or '\r\n'). <br>
* The <i>offset info</i> computed and returned as {@link OffsetInfo} where
* {@link OffsetInfo#isStartsWithMatch()} will return true if
* <code>startsWith</code> was successfully matched with the stsarting bytes
* of the text line.
*
* @return offset info
*/
public OffsetInfo nextOffsetInfo(byte[] startsWith) {
OffsetInfo offsetInfo = null;
int lineLength = 0;
byte[] token = null;
lineLoop:
while (this.bufferLength != -1) {
if (this.index >= this.bufferLength) {
this.fill();
}
if (this.bufferLength != -1) {
int i;
byte byteVal;
for (i = this.index; i < this.bufferLength; i++) {
byteVal = this.buffer[i];
lineLength++;
int crlfLength = isEol(byteVal, i);
if (crlfLength > 0) {
i += crlfLength;
if (crlfLength == 2) {
lineLength++;
}
offsetInfo = new OffsetInfo(this.offset, lineLength, crlfLength);
if (startsWith != null) {
token = this.extractDataToken(lineLength);
}
this.index = i;
this.mark = this.index;
break lineLoop;
}
}
this.index = i;
}
}
// EOF where last char(s) are not CRLF.
if (lineLength > 0 && offsetInfo == null) {
offsetInfo = new OffsetInfo(this.offset, lineLength, 0);
if (startsWith != null) {
token = this.extractDataToken(lineLength);
}
}
this.offset += lineLength;
// checks if the new line starts with 'startsWith' chars
if (startsWith != null) {
for (int i = 0; i < startsWith.length; i++) {
byte sB = startsWith[i];
if (token != null && sB != token[i]) {
offsetInfo.setStartsWithMatch(0);
break;
}
}
}
return offsetInfo;
}
private int isEol(byte currentByte, int currentIndex) {
int crlfLength = 0;
if (currentByte == '\n') {
crlfLength = 1;
} else if (currentByte == '\r') {
if ((currentIndex + 1) >= this.bufferLength) {
this.index = currentIndex + 1;
this.fill();
}
currentByte = this.buffer[currentIndex + 1];
crlfLength = currentByte == '\n' ? 2 : 1;
}
return crlfLength;
}
private byte[] extractDataToken(int length) {
byte[] data = null;
if (length > 0) {
data = new byte[length];
System.arraycopy(this.buffer, this.mark, data, 0, data.length);
}
return data;
}
/**
* Will fill the current buffer from current 'index' position, expanding it
* and or shuffling it if necessary
*/
private void fill() {
if (this.index >= this.buffer.length) {
if (this.mark == 0) { // expand
byte[] newBuff = new byte[this.buffer.length + this.initialBufferSize];
System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length);
this.buffer = newBuff;
} else { // shuffle
int length = this.index - this.mark;
System.arraycopy(this.buffer, this.mark, this.buffer, 0, length);
this.index = length;
this.mark = 0;
}
}
try {
int bytesRead;
do {
bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
} while (bytesRead == 0);
this.bufferLength = bytesRead != -1 ? this.index + bytesRead : -1;
} catch (IOException e) {
throw new IllegalStateException("Failed while reading InputStream", e);
}
}
/**
* Container to hold offset and meta info for a computed text line.
* The offset and meta info is represented with the following 4 values:
* <ul>
* <li><i>startOffset</i> - the offset in the overall stream which represents the beginning of the text line</li>
* <li><i>length</i> - length of the text line including CRLF characters</li>
* <li><i>crlfLength</i> - the length of the CRLF.
* Value 0 is returned if text line represents the last text line in the
* {@link InputStream} (i.e., EOF) and such line does not terminate with CR or LF or the combination of the two.
* Value 1 is returned if text line ends with '\n' or '\r'.
* Value 2 is returned if line ends with '\r\n').</li>
* <li><i>startsWithMatch</i> - <code>true</code> by default unless <code>startWith</code> bytes are provided and not matched.
* See {@link #nextOffsetInfo(byte[])} for more info.</li>
* </ul>
**/
public static class OffsetInfo {
private final long startOffset, length;
private final int crlfLength;
private boolean startsWithMatch = true;
OffsetInfo(long startOffset, long length, int crlfLength) {
this.startOffset = startOffset;
this.length = length;
this.crlfLength = crlfLength;
}
public long getStartOffset() {
return startOffset;
}
public long getLength() {
return length;
}
public int getCrlfLength() {
return this.crlfLength;
}
public boolean isStartsWithMatch() {
return this.startsWithMatch;
}
void setStartsWithMatch(int startsWithMatch) {
this.startsWithMatch = startsWithMatch == 1 ? true : false;
}
}
}

View File

@ -0,0 +1,242 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stream.io.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo;
import org.junit.Test;
public class TextLineDemarcatorTest {
@Test(expected = IllegalArgumentException.class)
public void nullStream() {
new TextLineDemarcator(null);
}
@Test(expected = IllegalArgumentException.class)
public void illegalBufferSize() {
new TextLineDemarcator(mock(InputStream.class), -234);
}
@Test
public void emptyStreamNoStartWithFilter() {
String data = "";
InputStream is = stringToIs(data);
TextLineDemarcator demarcator = new TextLineDemarcator(is);
assertNull(demarcator.nextOffsetInfo());
}
@Test
public void emptyStreamAndStartWithFilter() {
String data = "";
InputStream is = stringToIs(data);
TextLineDemarcator demarcator = new TextLineDemarcator(is);
assertNull(demarcator.nextOffsetInfo("hello".getBytes()));
}
@Test
public void singleCR() {
InputStream is = stringToIs("\r");
TextLineDemarcator demarcator = new TextLineDemarcator(is);
OffsetInfo offsetInfo = demarcator.nextOffsetInfo();
assertEquals(0, offsetInfo.getStartOffset());
assertEquals(1, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
assertTrue(offsetInfo.isStartsWithMatch());
}
@Test
public void singleLF() {
InputStream is = stringToIs("\n");
TextLineDemarcator demarcator = new TextLineDemarcator(is);
OffsetInfo offsetInfo = demarcator.nextOffsetInfo();
assertEquals(0, offsetInfo.getStartOffset());
assertEquals(1, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
assertTrue(offsetInfo.isStartsWithMatch());
}
@Test
// essentially validates the internal 'isEol()' operation to ensure it will perform read-ahead
public void crlfWhereLFdoesNotFitInInitialBuffer() throws Exception {
InputStream is = stringToIs("oleg\r\njoe");
TextLineDemarcator demarcator = new TextLineDemarcator(is, 5);
OffsetInfo offsetInfo = demarcator.nextOffsetInfo();
assertEquals(0, offsetInfo.getStartOffset());
assertEquals(6, offsetInfo.getLength());
assertEquals(2, offsetInfo.getCrlfLength());
assertTrue(offsetInfo.isStartsWithMatch());
offsetInfo = demarcator.nextOffsetInfo();
assertEquals(6, offsetInfo.getStartOffset());
assertEquals(3, offsetInfo.getLength());
assertEquals(0, offsetInfo.getCrlfLength());
assertTrue(offsetInfo.isStartsWithMatch());
}
@Test
public void mixedCRLF() throws Exception {
InputStream is = stringToIs("oleg\rjoe\njack\r\nstacymike\r\n");
TextLineDemarcator demarcator = new TextLineDemarcator(is, 4);
OffsetInfo offsetInfo = demarcator.nextOffsetInfo();
assertEquals(0, offsetInfo.getStartOffset());
assertEquals(5, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
assertTrue(offsetInfo.isStartsWithMatch());
offsetInfo = demarcator.nextOffsetInfo();
assertEquals(5, offsetInfo.getStartOffset());
assertEquals(4, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
assertTrue(offsetInfo.isStartsWithMatch());
offsetInfo = demarcator.nextOffsetInfo();
assertEquals(9, offsetInfo.getStartOffset());
assertEquals(6, offsetInfo.getLength());
assertEquals(2, offsetInfo.getCrlfLength());
assertTrue(offsetInfo.isStartsWithMatch());
offsetInfo = demarcator.nextOffsetInfo();
assertEquals(15, offsetInfo.getStartOffset());
assertEquals(11, offsetInfo.getLength());
assertEquals(2, offsetInfo.getCrlfLength());
assertTrue(offsetInfo.isStartsWithMatch());
}
@Test
public void consecutiveAndMixedCRLF() throws Exception {
InputStream is = stringToIs("oleg\r\r\njoe\n\n\rjack\n\r\nstacymike\r\n\n\n\r");
TextLineDemarcator demarcator = new TextLineDemarcator(is, 4);
OffsetInfo offsetInfo = demarcator.nextOffsetInfo(); // oleg\r
assertEquals(5, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
offsetInfo = demarcator.nextOffsetInfo(); // \r\n
assertEquals(2, offsetInfo.getLength());
assertEquals(2, offsetInfo.getCrlfLength());
offsetInfo = demarcator.nextOffsetInfo(); // joe\n
assertEquals(4, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
offsetInfo = demarcator.nextOffsetInfo(); // \n
assertEquals(1, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
offsetInfo = demarcator.nextOffsetInfo(); // \r
assertEquals(1, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
offsetInfo = demarcator.nextOffsetInfo(); // jack\n
assertEquals(5, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
offsetInfo = demarcator.nextOffsetInfo(); // \r\n
assertEquals(2, offsetInfo.getLength());
assertEquals(2, offsetInfo.getCrlfLength());
offsetInfo = demarcator.nextOffsetInfo(); // stacymike\r\n
assertEquals(11, offsetInfo.getLength());
assertEquals(2, offsetInfo.getCrlfLength());
offsetInfo = demarcator.nextOffsetInfo(); // \n
assertEquals(1, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
offsetInfo = demarcator.nextOffsetInfo(); // \n
assertEquals(1, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
offsetInfo = demarcator.nextOffsetInfo(); // \r
assertEquals(1, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
}
@Test
public void startWithNoMatchOnWholeStream() throws Exception {
InputStream is = stringToIs("oleg\rjoe\njack\r\nstacymike\r\n");
TextLineDemarcator demarcator = new TextLineDemarcator(is, 4);
OffsetInfo offsetInfo = demarcator.nextOffsetInfo("foojhkj".getBytes());
assertEquals(0, offsetInfo.getStartOffset());
assertEquals(5, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
assertFalse(offsetInfo.isStartsWithMatch());
offsetInfo = demarcator.nextOffsetInfo("foo".getBytes());
assertEquals(5, offsetInfo.getStartOffset());
assertEquals(4, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
assertFalse(offsetInfo.isStartsWithMatch());
offsetInfo = demarcator.nextOffsetInfo("joe".getBytes());
assertEquals(9, offsetInfo.getStartOffset());
assertEquals(6, offsetInfo.getLength());
assertEquals(2, offsetInfo.getCrlfLength());
assertFalse(offsetInfo.isStartsWithMatch());
offsetInfo = demarcator.nextOffsetInfo("stasy".getBytes());
assertEquals(15, offsetInfo.getStartOffset());
assertEquals(11, offsetInfo.getLength());
assertEquals(2, offsetInfo.getCrlfLength());
assertFalse(offsetInfo.isStartsWithMatch());
}
@Test
public void startWithSomeMatches() throws Exception {
InputStream is = stringToIs("oleg\rjoe\njack\r\nstacymike\r\n");
TextLineDemarcator demarcator = new TextLineDemarcator(is, 7);
OffsetInfo offsetInfo = demarcator.nextOffsetInfo("foojhkj".getBytes());
assertEquals(0, offsetInfo.getStartOffset());
assertEquals(5, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
assertFalse(offsetInfo.isStartsWithMatch());
offsetInfo = demarcator.nextOffsetInfo("jo".getBytes());
assertEquals(5, offsetInfo.getStartOffset());
assertEquals(4, offsetInfo.getLength());
assertEquals(1, offsetInfo.getCrlfLength());
assertTrue(offsetInfo.isStartsWithMatch());
offsetInfo = demarcator.nextOffsetInfo("joe".getBytes());
assertEquals(9, offsetInfo.getStartOffset());
assertEquals(6, offsetInfo.getLength());
assertEquals(2, offsetInfo.getCrlfLength());
assertFalse(offsetInfo.isStartsWithMatch());
offsetInfo = demarcator.nextOffsetInfo("stacy".getBytes());
assertEquals(15, offsetInfo.getStartOffset());
assertEquals(11, offsetInfo.getLength());
assertEquals(2, offsetInfo.getCrlfLength());
assertTrue(offsetInfo.isStartsWithMatch());
}
private InputStream stringToIs(String data) {
return new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
}
}

View File

@ -16,13 +16,11 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.nio.charset.StandardCharsets;
import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.BitSet; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -31,9 +29,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.EventDriven;
@ -46,26 +42,22 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.util.TextLineDemarcator;
import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
@EventDriven @EventDriven
@SideEffectFree @SideEffectFree
@ -76,7 +68,12 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream;
+ "or total size of fragment. Each output split file will contain no more than the configured number of lines or bytes. " + "or total size of fragment. Each output split file will contain no more than the configured number of lines or bytes. "
+ "If both Line Split Count and Maximum Fragment Size are specified, the split occurs at whichever limit is reached first. " + "If both Line Split Count and Maximum Fragment Size are specified, the split occurs at whichever limit is reached first. "
+ "If the first line of a fragment exceeds the Maximum Fragment Size, that line will be output in a single split file which " + "If the first line of a fragment exceeds the Maximum Fragment Size, that line will be output in a single split file which "
+" exceeds the configured maximum size limit.") + "exceeds the configured maximum size limit. This component also allows one to specify that each split should include a header "
+ "lines. Header lines can be computed by either specifying the amount of lines that should constitute a header or by using header "
+ "marker to match against the read lines. If such match happens then the corresponding line will be treated as header. Keep in mind "
+ "that upon the first failure of header marker match, no more marches will be performed and the rest of the data will be parsed as "
+ "regular lines for a given split. If after computation of the header there are no more data, the resulting split will consists "
+ "of only header lines.")
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text from the original FlowFile that were copied to this FlowFile"), @WritesAttribute(attribute = "text.line.count", description = "The number of lines of text from the original FlowFile that were copied to this FlowFile"),
@WritesAttribute(attribute = "fragment.size", description = "The number of bytes from the original FlowFile that were copied to this FlowFile, " + @WritesAttribute(attribute = "fragment.size", description = "The number of bytes from the original FlowFile that were copied to this FlowFile, " +
@ -87,7 +84,6 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream;
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile")}) @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile")})
@SeeAlso(MergeContent.class) @SeeAlso(MergeContent.class)
public class SplitText extends AbstractProcessor { public class SplitText extends AbstractProcessor {
// attribute keys // attribute keys
public static final String SPLIT_LINE_COUNT = "text.line.count"; public static final String SPLIT_LINE_COUNT = "text.line.count";
public static final String FRAGMENT_SIZE = "fragment.size"; public static final String FRAGMENT_SIZE = "fragment.size";
@ -150,548 +146,316 @@ public class SplitText extends AbstractProcessor {
.description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
.build(); .build();
private List<PropertyDescriptor> properties; private static final List<PropertyDescriptor> properties;
private Set<Relationship> relationships; private static final Set<Relationship> relationships;
static {
properties = Collections.unmodifiableList(Arrays.asList(new PropertyDescriptor[]{
LINE_SPLIT_COUNT,
FRAGMENT_MAX_SIZE,
HEADER_LINE_COUNT,
HEADER_MARKER,
REMOVE_TRAILING_NEWLINES
}));
relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(new Relationship[]{
REL_ORIGINAL,
REL_SPLITS,
REL_FAILURE
})));
}
private volatile boolean removeTrailingNewLines;
private volatile long maxSplitSize;
private volatile int lineCount;
private volatile int headerLineCount;
private volatile String headerMarker;
@Override @Override
protected void init(final ProcessorInitializationContext context) { public Set<Relationship> getRelationships() {
final List<PropertyDescriptor> properties = new ArrayList<>(); return relationships;
properties.add(LINE_SPLIT_COUNT); }
properties.add(FRAGMENT_MAX_SIZE);
properties.add(HEADER_LINE_COUNT);
properties.add(HEADER_MARKER);
properties.add(REMOVE_TRAILING_NEWLINES);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>(); @OnScheduled
relationships.add(REL_ORIGINAL); public void onSchedule(ProcessContext context) {
relationships.add(REL_SPLITS); this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
relationships.add(REL_FAILURE); ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
this.relationships = Collections.unmodifiableSet(relationships); this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
}
/**
* Will split the incoming stream releasing all splits as FlowFile at once.
*/
@Override
public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
FlowFile sourceFlowFile = processSession.get();
if (sourceFlowFile == null) {
return;
}
AtomicBoolean error = new AtomicBoolean();
List<SplitInfo> computedSplitsInfo = new ArrayList<>();
AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>();
processSession.read(sourceFlowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
TextLineDemarcator demarcator = new TextLineDemarcator(in);
SplitInfo splitInfo = null;
long startOffset = 0;
// Compute fragment representing the header (if available)
long start = System.nanoTime();
try {
if (SplitText.this.headerLineCount > 0) {
splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null);
if (splitInfo.lineCount < SplitText.this.headerLineCount) {
error.set(true);
getLogger().error("Unable to split " + sourceFlowFile + " due to insufficient amount of header lines. Required "
+ SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure.");
}
} else if (SplitText.this.headerMarker != null) {
splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
}
headerSplitInfoRef.set(splitInfo);
} catch (IllegalStateException e) {
error.set(true);
getLogger().error(e.getMessage() + " Routing to failure.", e);
}
// Compute and collect fragments representing the individual splits
if (!error.get()) {
if (headerSplitInfoRef.get() != null) {
startOffset = headerSplitInfoRef.get().length;
}
long preAccumulatedLength = startOffset;
while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) {
computedSplitsInfo.add(splitInfo);
startOffset += splitInfo.length;
}
long stop = System.nanoTime();
if (getLogger().isDebugEnabled()) {
getLogger().debug("Computed splits in " + (stop - start) + " milliseconds.");
}
}
}
});
if (error.get()){
processSession.transfer(sourceFlowFile, REL_FAILURE);
} else {
List<FlowFile> splitFlowFiles = this.generateSplitFlowFiles(sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession);
processSession.transfer(sourceFlowFile, REL_ORIGINAL);
processSession.transfer(splitFlowFiles, REL_SPLITS);
}
} }
@Override @Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
List<ValidationResult> results = new ArrayList<>(); List<ValidationResult> results = new ArrayList<>();
boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
&& !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
results.add(new ValidationResult.Builder().subject("Maximum Fragment Size").valid(!invalidState)
results.add(new ValidationResult.Builder() .explanation("Property must be specified when Line Split Count is 0").build());
.subject("Maximum Fragment Size")
.valid(!invalidState)
.explanation("Property must be specified when Line Split Count is 0")
.build()
);
return results; return results;
} }
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties; return properties;
} }
private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, /**
final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { * Generates the list of {@link FlowFile}s representing splits. If
final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); * {@link SplitInfo} provided as an argument to this operation is not null
* it signifies the header information and its contents will be included in
* each and every computed split.
*/
private List<FlowFile> generateSplitFlowFiles(FlowFile sourceFlowFile, SplitInfo splitInfo, List<SplitInfo> computedSplitsInfo, ProcessSession processSession){
List<FlowFile> splitFlowFiles = new ArrayList<>();
FlowFile headerFlowFile = null;
long headerCrlfLength = 0;
if (splitInfo != null) {
headerFlowFile = processSession.clone(sourceFlowFile, splitInfo.startOffset, splitInfo.length);
headerCrlfLength = splitInfo.trimmedLength;
}
int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme
String fragmentId = UUID.randomUUID().toString();
byte[] leadingBytes = leadingNewLineBytes; if (computedSplitsInfo.size() == 0) {
int numLines = 0; FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength);
long totalBytes = 0L; splitFlowFile = SplitText.this.updateAttributes(processSession, splitFlowFile, 0, splitFlowFile.getSize(),
for (int i = 0; i < maxNumLines; i++) { fragmentId, fragmentIndex++, 0, sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); splitFlowFiles.add(splitFlowFile);
final long bytes = eolMarker.getBytesConsumed();
leadingBytes = eolMarker.getLeadingNewLineBytes();
if (includeLineDelimiter && out != null) {
if (leadingBytes != null) {
out.write(leadingBytes);
leadingBytes = null;
}
eolBuffer.drainTo(out);
}
totalBytes += bytes;
if (bytes <= 0) {
return numLines;
}
numLines++;
if (totalBytes >= maxByteCount) {
break;
}
}
return numLines;
}
private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
long bytesRead = 0L;
final ByteArrayOutputStream buffer;
if (out != null) {
buffer = new ByteArrayOutputStream();
} else { } else {
buffer = null; for (SplitInfo computedSplitInfo : computedSplitsInfo) {
} long length = SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length;
byte[] bytesToWriteFirst = leadingNewLineBytes; boolean proceedWithClone = headerFlowFile != null || length > 0;
if (proceedWithClone) {
in.mark(Integer.MAX_VALUE); FlowFile splitFlowFile = null;
while (true) { if (headerFlowFile != null) {
final int nextByte = in.read(); if (length > 0) {
splitFlowFile = processSession.clone(sourceFlowFile, computedSplitInfo.startOffset, length);
// if we hit end of stream we're done splitFlowFile = processSession.merge( Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile);
if (nextByte == -1) {
if (buffer != null) {
buffer.writeTo(out);
buffer.close();
}
return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); // bytesToWriteFirst should be "null"?
}
// Verify leading bytes do not violate size limitation
if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
}
// Write leadingNewLines, if appropriate
if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
bytesRead += bytesToWriteFirst.length;
buffer.write(bytesToWriteFirst);
bytesToWriteFirst = null;
}
// buffer the output
bytesRead++;
if (buffer != null && nextByte != '\n' && nextByte != '\r') {
if (bytesToWriteFirst != null) {
buffer.write(bytesToWriteFirst);
}
bytesToWriteFirst = null;
eolBuffer.drainTo(buffer);
eolBuffer.clear();
buffer.write(nextByte);
}
// check the size limit
if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
in.reset();
if (buffer != null) {
buffer.close();
}
return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
}
// if we have a new line, then we're done
if (nextByte == '\n') {
if (buffer != null) {
buffer.writeTo(out);
buffer.close();
eolBuffer.addEndOfLine(false, true);
}
return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
}
// Determine if \n follows \r; in either case, end of line has been reached
if (nextByte == '\r') {
if (buffer != null) {
buffer.writeTo(out);
buffer.close();
}
in.mark(1);
final int lookAheadByte = in.read();
if (lookAheadByte == '\n') {
eolBuffer.addEndOfLine(true, true);
return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
} else { } else {
in.reset(); splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength); // trim the last CRLF if split consists of only HEADER
eolBuffer.addEndOfLine(true, false);
return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
}
}
}
}
private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
final long bufferedBytes) throws IOException {
final SplitInfo info = new SplitInfo();
final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
int lastByte = -1;
info.lengthBytes = bufferedBytes;
long lastEolBufferLength = 0L;
while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
&& (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
&& eolBuffer.length() < maxSize) {
in.mark(1);
final int nextByte = in.read();
// Check for \n following \r on last line
if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
in.reset();
break;
}
switch (nextByte) {
case -1:
info.endOfStream = true;
if (keepAllNewLines) {
info.lengthBytes += eolBuffer.length();
}
if (lastByte != '\r') {
info.lengthLines++;
}
info.bufferedBytes = 0;
return info;
case '\r':
eolBuffer.addEndOfLine(true, false);
info.lengthLines++;
info.bufferedBytes = 0;
break;
case '\n':
eolBuffer.addEndOfLine(false, true);
if (lastByte != '\r') {
info.lengthLines++;
}
info.bufferedBytes = 0;
break;
default:
if (eolBuffer.length() > 0) {
info.lengthBytes += eolBuffer.length();
lastEolBufferLength = eolBuffer.length();
eolBuffer.clear();
}
info.lengthBytes++;
info.bufferedBytes++;
break;
}
lastByte = nextByte;
}
// if current line exceeds size and not keeping eol characters, remove previously applied eol characters
if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
info.lengthBytes -= lastEolBufferLength;
}
if (keepAllNewLines) {
info.lengthBytes += eolBuffer.length();
}
return info;
}
private int countHeaderLines(final ByteCountingInputStream in,
final String headerMarker) throws IOException {
int headerInfo = 0;
final BufferedReader br = new BufferedReader(new InputStreamReader(in));
in.mark(Integer.MAX_VALUE);
String line = br.readLine();
while (line != null) {
// if line is not a header line, reset stream and return header counts
if (!line.startsWith(headerMarker)) {
in.reset();
return headerInfo;
} else {
headerInfo++;
}
line = br.readLine();
}
in.reset();
return headerInfo;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
final AtomicReference<String> errorMessage = new AtomicReference<>(null);
final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
final long startNanos = System.nanoTime();
final List<FlowFile> splits = new ArrayList<>();
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
long bufferedPartialLine = 0;
// if we have header lines, copy them into a ByteArrayOutputStream
final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
// Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
int headerInfoLineCount = 0;
if (headerCount > 0) {
headerInfoLineCount = headerCount;
} else {
if (headerMarker != null) {
headerInfoLineCount = countHeaderLines(in, headerMarker);
}
}
final byte[] headerNewLineBytes;
final byte[] headerBytesWithoutTrailingNewLines;
if (headerInfoLineCount > 0) {
final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
if (headerLinesCopied < headerInfoLineCount) {
errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
return;
}
// Break header apart into trailing newlines and remaining text
final byte[] headerBytes = headerStream.toByteArray();
int headerNewLineByteCount = 0;
for (int i = headerBytes.length - 1; i >= 0; i--) {
final byte headerByte = headerBytes[i];
if (headerByte == '\r' || headerByte == '\n') {
headerNewLineByteCount++;
} else {
break;
}
}
if (headerNewLineByteCount == 0) {
headerNewLineBytes = null;
headerBytesWithoutTrailingNewLines = headerBytes;
} else {
headerNewLineBytes = new byte[headerNewLineByteCount];
System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
} }
} else { } else {
headerBytesWithoutTrailingNewLines = null; splitFlowFile = processSession.clone(sourceFlowFile, computedSplitInfo.startOffset, length);
headerNewLineBytes = null;
} }
while (true) { splitFlowFile = SplitText.this.updateAttributes(processSession, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++,
if (headerInfoLineCount > 0) { computedSplitsInfo.size(), sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
// if we have header lines, create a new FlowFile, copy the header lines to that file, splitFlowFiles.add(splitFlowFile);
// and then start copying lines
final AtomicInteger linesCopied = new AtomicInteger(0);
final AtomicLong bytesCopied = new AtomicLong(0L);
FlowFile splitFile = session.create(flowFile);
try {
splitFile = session.write(splitFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream rawOut) throws IOException {
try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
countingOut.write(headerBytesWithoutTrailingNewLines);
//readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already
linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
includeLineDelimiter, headerNewLineBytes));
bytesCopied.set(countingOut.getBytesWritten());
} }
} }
});
splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get()));
logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()});
} finally {
if (linesCopied.get() > 0) {
splits.add(splitFile);
} else {
// if the number of content lines is a multiple of the SPLIT_LINE_COUNT,
// the last flow file will contain just a header; don't forward that one
session.remove(splitFile);
}
} }
// Check for EOF getLogger().info("Split " + sourceFlowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : "."));
in.mark(1); if (headerFlowFile != null) {
if (in.read() == -1) { processSession.remove(headerFlowFile);
break;
} }
in.reset(); return splitFlowFiles;
} else {
// We have no header lines, so we can simply demarcate the original File via the
// ProcessSession#clone method.
long beforeReadingLines = in.getBytesConsumed() - bufferedPartialLine;
final SplitInfo info = locateSplitPoint(in, maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine);
if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
bufferedPartialLine = info.bufferedBytes;
}
if (info.endOfStream) {
// stream is out of data
if (info.lengthBytes > 0) {
info.offsetBytes = beforeReadingLines;
splitInfos.add(info);
final long procNanos = System.nanoTime() - startNanos;
final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
+ "total splits = {}; total processing time = {} ms",
new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
}
break;
} else {
if (info.lengthBytes != 0) {
info.offsetBytes = beforeReadingLines;
info.lengthBytes -= bufferedPartialLine;
splitInfos.add(info);
final long procNanos = System.nanoTime() - startNanos;
final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
+ "total splits = {}; total processing time = {} ms",
new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
}
}
}
}
}
}
});
if (errorMessage.get() != null) {
logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()});
session.transfer(flowFile, REL_FAILURE);
if (!splits.isEmpty()) {
session.remove(splits);
}
return;
} }
if (!splitInfos.isEmpty()) { private FlowFile updateAttributes(ProcessSession processSession, FlowFile splitFlowFile, long splitLineCount, long splitFlowFileSize,
// Create the splits String splitId, int splitIndex, int splitCount, String origFileName) {
for (final SplitInfo info : splitInfos) { Map<String, String> attributes = new HashMap<>();
FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes); attributes.put(SPLIT_LINE_COUNT, String.valueOf(splitLineCount));
split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines)); attributes.put(FRAGMENT_SIZE, String.valueOf(splitFlowFile.getSize()));
split = session.putAttribute(split, FRAGMENT_SIZE, String.valueOf(info.lengthBytes)); attributes.put(FRAGMENT_ID, splitId);
splits.add(split); attributes.put(FRAGMENT_INDEX, String.valueOf(splitIndex));
} attributes.put(FRAGMENT_COUNT, String.valueOf(splitCount));
} attributes.put(SEGMENT_ORIGINAL_FILENAME, origFileName);
finishFragmentAttributes(session, flowFile, splits); return processSession.putAllAttributes(splitFlowFile, attributes);
if (splits.size() > 10) {
logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()});
} else {
logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits});
}
session.transfer(flowFile, REL_ORIGINAL);
session.transfer(splits, REL_SPLITS);
}
private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
final String fragmentId = UUID.randomUUID().toString();
final ArrayList<FlowFile> newList = new ArrayList<>(splits);
splits.clear();
for (int i = 1; i <= newList.size(); i++) {
FlowFile ff = newList.get(i - 1);
final Map<String, String> attributes = new HashMap<>();
attributes.put(FRAGMENT_ID, fragmentId);
attributes.put(FRAGMENT_INDEX, String.valueOf(i));
attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
FlowFile newFF = session.putAllAttributes(ff, attributes);
splits.add(newFF);
}
}
private static class SplitInfo {
public long offsetBytes;
public long lengthBytes;
public long lengthLines;
public long bufferedBytes;
public boolean endOfStream;
public SplitInfo() {
this.offsetBytes = 0L;
this.lengthBytes = 0L;
this.lengthLines = 0L;
this.bufferedBytes = 0L;
this.endOfStream = false;
}
}
public static class EndOfLineBuffer {
private static final byte CARRIAGE_RETURN = (byte) '\r';
private static final byte NEWLINE = (byte) '\n';
private final BitSet buffer = new BitSet();
private int index = 0;
public void clear() {
index = 0;
}
public void addEndOfLine(final boolean carriageReturn, final boolean newLine) {
buffer.set(index++, carriageReturn);
buffer.set(index++, newLine);
}
private void drainTo(final OutputStream out) throws IOException {
for (int i = 0; i < index; i += 2) {
final boolean cr = buffer.get(i);
final boolean nl = buffer.get(i + 1);
// we've consumed all data in the buffer
if (!cr && !nl) {
return;
}
if (cr) {
out.write(CARRIAGE_RETURN);
}
if (nl) {
out.write(NEWLINE);
}
}
clear();
} }
/** /**
* @return the number of line endings in the buffer * Will generate {@link SplitInfo} for the next fragment that represents the
* header of the future split.
*
* If split size is controlled by the amount of lines in the split then the
* resulting {@link SplitInfo} line count will always be <= 'splitMaxLineCount'. It can only be less IF it reaches the EOF.
* If split size is controlled by the {@link #maxSplitSize}, then the resulting {@link SplitInfo} line count
* will vary but the length of the split will never be > {@link #maxSplitSize} and {@link IllegalStateException} will be thrown.
* This method also allows one to provide 'startsWithFilter' to allow headers to be determined via such filter (see {@link #HEADER_MARKER}.
*/ */
public int length() { private SplitInfo computeHeader(TextLineDemarcator demarcator, long startOffset, long splitMaxLineCount, byte[] startsWithFilter, SplitInfo previousSplitInfo) {
return index / 2; long length = 0;
long actualLineCount = 0;
OffsetInfo offsetInfo = null;
SplitInfo splitInfo = null;
OffsetInfo previousOffsetInfo = null;
long lastCrlfLength = 0;
while ((offsetInfo = demarcator.nextOffsetInfo(startsWithFilter)) != null) {
lastCrlfLength = offsetInfo.getCrlfLength();
if (startsWithFilter != null && !offsetInfo.isStartsWithMatch()) {
if (offsetInfo.getCrlfLength() != -1) {
previousOffsetInfo = offsetInfo;
}
break;
} else {
if (length + offsetInfo.getLength() > this.maxSplitSize) {
throw new IllegalStateException( "Computing header resulted in header size being > MAX split size of " + this.maxSplitSize + ".");
} else {
length += offsetInfo.getLength();
actualLineCount++;
if (actualLineCount == splitMaxLineCount) {
break;
}
}
} }
} }
public static class EndOfLineMarker { if (actualLineCount > 0) {
private final long bytesConsumed; splitInfo = new SplitInfo(startOffset, length, lastCrlfLength, actualLineCount, previousOffsetInfo);
private final EndOfLineBuffer eolBuffer; }
private final boolean streamEnded; return splitInfo;
private final byte[] leadingNewLineBytes;
public EndOfLineMarker(final long bytesCounted, final EndOfLineBuffer eolBuffer, final boolean streamEnded, final byte[] leadingNewLineBytes) {
this.bytesConsumed = bytesCounted;
this.eolBuffer = eolBuffer;
this.streamEnded = streamEnded;
this.leadingNewLineBytes = leadingNewLineBytes;
} }
public long getBytesConsumed() { /**
return bytesConsumed; * Will generate {@link SplitInfo} for the next split.
*
* If split size is controlled by the amount of lines in the split then the resulting
* {@link SplitInfo} line count will always be <= 'splitMaxLineCount'.
* If split size is controlled by the {@link #maxSplitSize}, then the resulting {@link SplitInfo}
* line count will vary but the length of the split will never be > {@link #maxSplitSize}.
*/
private SplitInfo nextSplit(TextLineDemarcator demarcator, long startOffset, long splitMaxLineCount, SplitInfo remainderSplitInfo, long startingLength) {
long length = 0;
long trailingCrlfLength = 0;
long actualLineCount = 0;
OffsetInfo offsetInfo = null;
SplitInfo splitInfo = null;
// the remainder from the previous read after which it was determined that adding it would make
// the split size > 'maxSplitSize'. So it's being carried over to the next line.
if (remainderSplitInfo != null && remainderSplitInfo.remaningOffsetInfo != null) {
length += remainderSplitInfo.remaningOffsetInfo.getLength();
actualLineCount++;
}
OffsetInfo remaningOffsetInfo = null;
long lastCrlfLength = 0;
while ((offsetInfo = demarcator.nextOffsetInfo()) != null) {
lastCrlfLength = offsetInfo.getCrlfLength();
if (offsetInfo.getLength() == offsetInfo.getCrlfLength()) {
trailingCrlfLength += offsetInfo.getCrlfLength();
} else if (offsetInfo.getLength() > offsetInfo.getCrlfLength()) {
trailingCrlfLength = 0; // non-empty line came in, thus resetting counter
} }
public EndOfLineBuffer getEndOfLineBuffer() { if (length + offsetInfo.getLength() + startingLength > this.maxSplitSize) {
return eolBuffer; if (length == 0) { // single line per split
length += offsetInfo.getLength();
actualLineCount++;
} else {
remaningOffsetInfo = offsetInfo;
}
break;
} else {
length += offsetInfo.getLength();
actualLineCount++;
if (splitMaxLineCount > 0 && actualLineCount >= splitMaxLineCount) {
break;
}
}
} }
public boolean isStreamEnded() { if (actualLineCount > 0) {
return streamEnded; if (length - trailingCrlfLength >= lastCrlfLength) {
trailingCrlfLength += lastCrlfLength; // trim CRLF from the last line
}
splitInfo = new SplitInfo(startOffset, length, length - trailingCrlfLength, actualLineCount, remaningOffsetInfo);
}
return splitInfo;
} }
public byte[] getLeadingNewLineBytes() { /**
return leadingNewLineBytes; * Container for hosting meta-information pertaining to the split so it can
* be used later to create {@link FlowFile} representing the split.
*/
private class SplitInfo {
final long startOffset, length, trimmedLength, lineCount;
OffsetInfo remaningOffsetInfo;
SplitInfo(long startOffset, long length, long trimmedLength, long lineCount, OffsetInfo remaningOffsetInfo) {
this.startOffset = startOffset;
this.length = length;
this.lineCount = lineCount;
this.remaningOffsetInfo = remaningOffsetInfo;
this.trimmedLength = trimmedLength;
}
@Override
public String toString() {
return "offset:" + startOffset + "; length:" + length + "; lineCount:" + lineCount;
} }
} }
} }

View File

@ -17,12 +17,6 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
@ -30,6 +24,12 @@ import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.List; import java.util.List;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class TestSplitText { public class TestSplitText {
final String originalFilename = "original.txt"; final String originalFilename = "original.txt";
@ -176,7 +176,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 0); runner.assertTransferCount(SplitText.REL_SPLITS, 1);
// repeat with header cou8nt versus header marker // repeat with header cou8nt versus header marker
runner.clearTransferState(); runner.clearTransferState();
@ -189,7 +189,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 0); runner.assertTransferCount(SplitText.REL_SPLITS, 1);
// repeat single header line with no newline characters // repeat single header line with no newline characters
runner.clearTransferState(); runner.clearTransferState();
@ -202,7 +202,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitText.REL_SPLITS, 0); runner.assertTransferCount(SplitText.REL_SPLITS, 1);
} }
@Test @Test
@ -813,7 +813,7 @@ public class TestSplitText {
final List<MockFlowFile> splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS); final List<MockFlowFile> splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("\n\n1"); splits.get(0).assertContentEquals("\n\n1");
splits.get(1).assertContentEquals(""); splits.get(1).assertContentEquals("\n");
} }
} }