From 41f519e84cb5d0bf8be4ce93e26a042d16cda273 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Fri, 7 Oct 2016 11:37:32 -0400 Subject: [PATCH] NIFI-2851 initial commit of perf improvements on SplitText - introduced org.apache.nifi.stream.io.util.TextLineDemarcator - refactored SplitText to use org.apache.nifi.stream.io.util.TextLineDemarcator - updated SplitText's capability discription to provide more clarity around splits with headers. --- .../stream/io/util/TextLineDemarcator.java | 253 ++++++ .../io/util/TextLineDemarcatorTest.java | 242 ++++++ .../nifi/processors/standard/SplitText.java | 784 ++++++------------ .../processors/standard/TestSplitText.java | 20 +- 4 files changed, 779 insertions(+), 520 deletions(-) create mode 100644 nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java create mode 100644 nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java new file mode 100644 index 0000000000..e4c213a2c2 --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; + +/** + * Implementation of demarcator of text lines in the provided + * {@link InputStream}. It works similar to the {@link BufferedReader} and its + * {@link BufferedReader#readLine()} methods except that it does not create a + * String representing the text line and instead returns the offset info for the + * computed text line. See {@link #nextOffsetInfo()} and + * {@link #nextOffsetInfo(byte[])} for more details. + *

+ * This class is NOT thread-safe. + *

+ */ +public class TextLineDemarcator { + + private final static int INIT_BUFFER_SIZE = 8192; + + private final InputStream is; + + private final int initialBufferSize; + + private byte[] buffer; + + private int index; + + private int mark; + + private long offset; + + private int bufferLength; + + /** + * Constructs an instance of demarcator with provided {@link InputStream} + * and default buffer size. + */ + public TextLineDemarcator(InputStream is) { + this(is, INIT_BUFFER_SIZE); + } + + /** + * Constructs an instance of demarcator with provided {@link InputStream} + * and initial buffer size. + */ + public TextLineDemarcator(InputStream is, int initialBufferSize) { + if (is == null) { + throw new IllegalArgumentException("'is' must not be null."); + } + if (initialBufferSize < 1) { + throw new IllegalArgumentException("'initialBufferSize' must be > 0."); + } + this.is = is; + this.initialBufferSize = initialBufferSize; + this.buffer = new byte[initialBufferSize]; + } + + /** + * Will compute the next offset info for a text line (line terminated + * by either '\r', '\n' or '\r\n').
+ * The offset info computed and returned as {@link OffsetInfo} where + * {@link OffsetInfo#isStartsWithMatch()} will always return true. + * + * @return offset info + */ + public OffsetInfo nextOffsetInfo() { + return this.nextOffsetInfo(null); + } + + /** + * Will compute the next offset info for a text line (line terminated + * by either '\r', '\n' or '\r\n').
+ * The offset info computed and returned as {@link OffsetInfo} where + * {@link OffsetInfo#isStartsWithMatch()} will return true if + * startsWith was successfully matched with the stsarting bytes + * of the text line. + * + * @return offset info + */ + public OffsetInfo nextOffsetInfo(byte[] startsWith) { + OffsetInfo offsetInfo = null; + int lineLength = 0; + byte[] token = null; + lineLoop: + while (this.bufferLength != -1) { + if (this.index >= this.bufferLength) { + this.fill(); + } + if (this.bufferLength != -1) { + int i; + byte byteVal; + for (i = this.index; i < this.bufferLength; i++) { + byteVal = this.buffer[i]; + lineLength++; + int crlfLength = isEol(byteVal, i); + if (crlfLength > 0) { + i += crlfLength; + if (crlfLength == 2) { + lineLength++; + } + offsetInfo = new OffsetInfo(this.offset, lineLength, crlfLength); + if (startsWith != null) { + token = this.extractDataToken(lineLength); + } + this.index = i; + this.mark = this.index; + break lineLoop; + } + } + this.index = i; + } + } + // EOF where last char(s) are not CRLF. + if (lineLength > 0 && offsetInfo == null) { + offsetInfo = new OffsetInfo(this.offset, lineLength, 0); + if (startsWith != null) { + token = this.extractDataToken(lineLength); + } + } + this.offset += lineLength; + + // checks if the new line starts with 'startsWith' chars + if (startsWith != null) { + for (int i = 0; i < startsWith.length; i++) { + byte sB = startsWith[i]; + if (token != null && sB != token[i]) { + offsetInfo.setStartsWithMatch(0); + break; + } + } + } + return offsetInfo; + } + + private int isEol(byte currentByte, int currentIndex) { + int crlfLength = 0; + if (currentByte == '\n') { + crlfLength = 1; + } else if (currentByte == '\r') { + if ((currentIndex + 1) >= this.bufferLength) { + this.index = currentIndex + 1; + this.fill(); + } + currentByte = this.buffer[currentIndex + 1]; + crlfLength = currentByte == '\n' ? 2 : 1; + } + return crlfLength; + } + + private byte[] extractDataToken(int length) { + byte[] data = null; + if (length > 0) { + data = new byte[length]; + System.arraycopy(this.buffer, this.mark, data, 0, data.length); + } + return data; + } + + /** + * Will fill the current buffer from current 'index' position, expanding it + * and or shuffling it if necessary + */ + private void fill() { + if (this.index >= this.buffer.length) { + if (this.mark == 0) { // expand + byte[] newBuff = new byte[this.buffer.length + this.initialBufferSize]; + System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length); + this.buffer = newBuff; + } else { // shuffle + int length = this.index - this.mark; + System.arraycopy(this.buffer, this.mark, this.buffer, 0, length); + this.index = length; + this.mark = 0; + } + } + + try { + int bytesRead; + do { + bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index); + } while (bytesRead == 0); + this.bufferLength = bytesRead != -1 ? this.index + bytesRead : -1; + } catch (IOException e) { + throw new IllegalStateException("Failed while reading InputStream", e); + } + } + + /** + * Container to hold offset and meta info for a computed text line. + * The offset and meta info is represented with the following 4 values: + * + **/ + public static class OffsetInfo { + private final long startOffset, length; + private final int crlfLength; + + private boolean startsWithMatch = true; + + OffsetInfo(long startOffset, long length, int crlfLength) { + this.startOffset = startOffset; + this.length = length; + this.crlfLength = crlfLength; + } + + public long getStartOffset() { + return startOffset; + } + + public long getLength() { + return length; + } + + public int getCrlfLength() { + return this.crlfLength; + } + + public boolean isStartsWithMatch() { + return this.startsWithMatch; + } + + void setStartsWithMatch(int startsWithMatch) { + this.startsWithMatch = startsWithMatch == 1 ? true : false; + } + } +} diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java new file mode 100644 index 0000000000..3b64a17794 --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo; +import org.junit.Test; + +public class TextLineDemarcatorTest { + + @Test(expected = IllegalArgumentException.class) + public void nullStream() { + new TextLineDemarcator(null); + } + + @Test(expected = IllegalArgumentException.class) + public void illegalBufferSize() { + new TextLineDemarcator(mock(InputStream.class), -234); + } + + @Test + public void emptyStreamNoStartWithFilter() { + String data = ""; + InputStream is = stringToIs(data); + TextLineDemarcator demarcator = new TextLineDemarcator(is); + assertNull(demarcator.nextOffsetInfo()); + } + + @Test + public void emptyStreamAndStartWithFilter() { + String data = ""; + InputStream is = stringToIs(data); + TextLineDemarcator demarcator = new TextLineDemarcator(is); + assertNull(demarcator.nextOffsetInfo("hello".getBytes())); + } + + @Test + public void singleCR() { + InputStream is = stringToIs("\r"); + TextLineDemarcator demarcator = new TextLineDemarcator(is); + OffsetInfo offsetInfo = demarcator.nextOffsetInfo(); + assertEquals(0, offsetInfo.getStartOffset()); + assertEquals(1, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + assertTrue(offsetInfo.isStartsWithMatch()); + } + + @Test + public void singleLF() { + InputStream is = stringToIs("\n"); + TextLineDemarcator demarcator = new TextLineDemarcator(is); + OffsetInfo offsetInfo = demarcator.nextOffsetInfo(); + assertEquals(0, offsetInfo.getStartOffset()); + assertEquals(1, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + assertTrue(offsetInfo.isStartsWithMatch()); + } + + @Test + // essentially validates the internal 'isEol()' operation to ensure it will perform read-ahead + public void crlfWhereLFdoesNotFitInInitialBuffer() throws Exception { + InputStream is = stringToIs("oleg\r\njoe"); + TextLineDemarcator demarcator = new TextLineDemarcator(is, 5); + OffsetInfo offsetInfo = demarcator.nextOffsetInfo(); + assertEquals(0, offsetInfo.getStartOffset()); + assertEquals(6, offsetInfo.getLength()); + assertEquals(2, offsetInfo.getCrlfLength()); + assertTrue(offsetInfo.isStartsWithMatch()); + + offsetInfo = demarcator.nextOffsetInfo(); + assertEquals(6, offsetInfo.getStartOffset()); + assertEquals(3, offsetInfo.getLength()); + assertEquals(0, offsetInfo.getCrlfLength()); + assertTrue(offsetInfo.isStartsWithMatch()); + } + + @Test + public void mixedCRLF() throws Exception { + InputStream is = stringToIs("oleg\rjoe\njack\r\nstacymike\r\n"); + TextLineDemarcator demarcator = new TextLineDemarcator(is, 4); + OffsetInfo offsetInfo = demarcator.nextOffsetInfo(); + assertEquals(0, offsetInfo.getStartOffset()); + assertEquals(5, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + assertTrue(offsetInfo.isStartsWithMatch()); + + offsetInfo = demarcator.nextOffsetInfo(); + assertEquals(5, offsetInfo.getStartOffset()); + assertEquals(4, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + assertTrue(offsetInfo.isStartsWithMatch()); + + offsetInfo = demarcator.nextOffsetInfo(); + assertEquals(9, offsetInfo.getStartOffset()); + assertEquals(6, offsetInfo.getLength()); + assertEquals(2, offsetInfo.getCrlfLength()); + assertTrue(offsetInfo.isStartsWithMatch()); + + offsetInfo = demarcator.nextOffsetInfo(); + assertEquals(15, offsetInfo.getStartOffset()); + assertEquals(11, offsetInfo.getLength()); + assertEquals(2, offsetInfo.getCrlfLength()); + assertTrue(offsetInfo.isStartsWithMatch()); + } + + @Test + public void consecutiveAndMixedCRLF() throws Exception { + InputStream is = stringToIs("oleg\r\r\njoe\n\n\rjack\n\r\nstacymike\r\n\n\n\r"); + TextLineDemarcator demarcator = new TextLineDemarcator(is, 4); + + OffsetInfo offsetInfo = demarcator.nextOffsetInfo(); // oleg\r + assertEquals(5, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + + offsetInfo = demarcator.nextOffsetInfo(); // \r\n + assertEquals(2, offsetInfo.getLength()); + assertEquals(2, offsetInfo.getCrlfLength()); + + offsetInfo = demarcator.nextOffsetInfo(); // joe\n + assertEquals(4, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + + offsetInfo = demarcator.nextOffsetInfo(); // \n + assertEquals(1, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + + offsetInfo = demarcator.nextOffsetInfo(); // \r + assertEquals(1, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + + offsetInfo = demarcator.nextOffsetInfo(); // jack\n + assertEquals(5, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + + offsetInfo = demarcator.nextOffsetInfo(); // \r\n + assertEquals(2, offsetInfo.getLength()); + assertEquals(2, offsetInfo.getCrlfLength()); + + offsetInfo = demarcator.nextOffsetInfo(); // stacymike\r\n + assertEquals(11, offsetInfo.getLength()); + assertEquals(2, offsetInfo.getCrlfLength()); + + offsetInfo = demarcator.nextOffsetInfo(); // \n + assertEquals(1, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + + offsetInfo = demarcator.nextOffsetInfo(); // \n + assertEquals(1, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + + offsetInfo = demarcator.nextOffsetInfo(); // \r + assertEquals(1, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + } + + @Test + public void startWithNoMatchOnWholeStream() throws Exception { + InputStream is = stringToIs("oleg\rjoe\njack\r\nstacymike\r\n"); + TextLineDemarcator demarcator = new TextLineDemarcator(is, 4); + + OffsetInfo offsetInfo = demarcator.nextOffsetInfo("foojhkj".getBytes()); + assertEquals(0, offsetInfo.getStartOffset()); + assertEquals(5, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + assertFalse(offsetInfo.isStartsWithMatch()); + + offsetInfo = demarcator.nextOffsetInfo("foo".getBytes()); + assertEquals(5, offsetInfo.getStartOffset()); + assertEquals(4, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + assertFalse(offsetInfo.isStartsWithMatch()); + + offsetInfo = demarcator.nextOffsetInfo("joe".getBytes()); + assertEquals(9, offsetInfo.getStartOffset()); + assertEquals(6, offsetInfo.getLength()); + assertEquals(2, offsetInfo.getCrlfLength()); + assertFalse(offsetInfo.isStartsWithMatch()); + + offsetInfo = demarcator.nextOffsetInfo("stasy".getBytes()); + assertEquals(15, offsetInfo.getStartOffset()); + assertEquals(11, offsetInfo.getLength()); + assertEquals(2, offsetInfo.getCrlfLength()); + assertFalse(offsetInfo.isStartsWithMatch()); + } + + @Test + public void startWithSomeMatches() throws Exception { + InputStream is = stringToIs("oleg\rjoe\njack\r\nstacymike\r\n"); + TextLineDemarcator demarcator = new TextLineDemarcator(is, 7); + + OffsetInfo offsetInfo = demarcator.nextOffsetInfo("foojhkj".getBytes()); + assertEquals(0, offsetInfo.getStartOffset()); + assertEquals(5, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + assertFalse(offsetInfo.isStartsWithMatch()); + + offsetInfo = demarcator.nextOffsetInfo("jo".getBytes()); + assertEquals(5, offsetInfo.getStartOffset()); + assertEquals(4, offsetInfo.getLength()); + assertEquals(1, offsetInfo.getCrlfLength()); + assertTrue(offsetInfo.isStartsWithMatch()); + + offsetInfo = demarcator.nextOffsetInfo("joe".getBytes()); + assertEquals(9, offsetInfo.getStartOffset()); + assertEquals(6, offsetInfo.getLength()); + assertEquals(2, offsetInfo.getCrlfLength()); + assertFalse(offsetInfo.isStartsWithMatch()); + + offsetInfo = demarcator.nextOffsetInfo("stacy".getBytes()); + assertEquals(15, offsetInfo.getStartOffset()); + assertEquals(11, offsetInfo.getLength()); + assertEquals(2, offsetInfo.getCrlfLength()); + assertTrue(offsetInfo.isStartsWithMatch()); + } + + private InputStream stringToIs(String data) { + return new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java index 81c06f88da..2cbcd449fb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java @@ -16,13 +16,11 @@ */ package org.apache.nifi.processors.standard; -import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.BitSet; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -31,9 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.annotation.behavior.EventDriven; @@ -46,26 +42,22 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.stream.io.ByteArrayOutputStream; -import org.apache.nifi.stream.io.ByteCountingInputStream; -import org.apache.nifi.stream.io.ByteCountingOutputStream; +import org.apache.nifi.stream.io.util.TextLineDemarcator; +import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo; @EventDriven @SideEffectFree @@ -76,7 +68,12 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream; + "or total size of fragment. Each output split file will contain no more than the configured number of lines or bytes. " + "If both Line Split Count and Maximum Fragment Size are specified, the split occurs at whichever limit is reached first. " + "If the first line of a fragment exceeds the Maximum Fragment Size, that line will be output in a single split file which " - +" exceeds the configured maximum size limit.") + + "exceeds the configured maximum size limit. This component also allows one to specify that each split should include a header " + + "lines. Header lines can be computed by either specifying the amount of lines that should constitute a header or by using header " + + "marker to match against the read lines. If such match happens then the corresponding line will be treated as header. Keep in mind " + + "that upon the first failure of header marker match, no more marches will be performed and the rest of the data will be parsed as " + + "regular lines for a given split. If after computation of the header there are no more data, the resulting split will consists " + + "of only header lines.") @WritesAttributes({ @WritesAttribute(attribute = "text.line.count", description = "The number of lines of text from the original FlowFile that were copied to this FlowFile"), @WritesAttribute(attribute = "fragment.size", description = "The number of bytes from the original FlowFile that were copied to this FlowFile, " + @@ -87,7 +84,6 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream; @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile")}) @SeeAlso(MergeContent.class) public class SplitText extends AbstractProcessor { - // attribute keys public static final String SPLIT_LINE_COUNT = "text.line.count"; public static final String FRAGMENT_SIZE = "fragment.size"; @@ -150,548 +146,316 @@ public class SplitText extends AbstractProcessor { .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere") .build(); - private List properties; - private Set relationships; + private static final List properties; + private static final Set relationships; + + static { + properties = Collections.unmodifiableList(Arrays.asList(new PropertyDescriptor[]{ + LINE_SPLIT_COUNT, + FRAGMENT_MAX_SIZE, + HEADER_LINE_COUNT, + HEADER_MARKER, + REMOVE_TRAILING_NEWLINES + })); + + relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(new Relationship[]{ + REL_ORIGINAL, + REL_SPLITS, + REL_FAILURE + }))); + } + + private volatile boolean removeTrailingNewLines; + + private volatile long maxSplitSize; + + private volatile int lineCount; + + private volatile int headerLineCount; + + private volatile String headerMarker; @Override - protected void init(final ProcessorInitializationContext context) { - final List properties = new ArrayList<>(); - properties.add(LINE_SPLIT_COUNT); - properties.add(FRAGMENT_MAX_SIZE); - properties.add(HEADER_LINE_COUNT); - properties.add(HEADER_MARKER); - properties.add(REMOVE_TRAILING_NEWLINES); - this.properties = Collections.unmodifiableList(properties); + public Set getRelationships() { + return relationships; + } - final Set relationships = new HashSet<>(); - relationships.add(REL_ORIGINAL); - relationships.add(REL_SPLITS); - relationships.add(REL_FAILURE); - this.relationships = Collections.unmodifiableSet(relationships); + @OnScheduled + public void onSchedule(ProcessContext context) { + this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet() + ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false; + this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet() + ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE; + this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger(); + this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger(); + this.headerMarker = context.getProperty(HEADER_MARKER).getValue(); + } + + /** + * Will split the incoming stream releasing all splits as FlowFile at once. + */ + @Override + public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException { + FlowFile sourceFlowFile = processSession.get(); + if (sourceFlowFile == null) { + return; + } + AtomicBoolean error = new AtomicBoolean(); + List computedSplitsInfo = new ArrayList<>(); + AtomicReference headerSplitInfoRef = new AtomicReference<>(); + processSession.read(sourceFlowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + TextLineDemarcator demarcator = new TextLineDemarcator(in); + SplitInfo splitInfo = null; + long startOffset = 0; + + // Compute fragment representing the header (if available) + long start = System.nanoTime(); + try { + if (SplitText.this.headerLineCount > 0) { + splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null); + if (splitInfo.lineCount < SplitText.this.headerLineCount) { + error.set(true); + getLogger().error("Unable to split " + sourceFlowFile + " due to insufficient amount of header lines. Required " + + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure."); + } + } else if (SplitText.this.headerMarker != null) { + splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null); + } + headerSplitInfoRef.set(splitInfo); + } catch (IllegalStateException e) { + error.set(true); + getLogger().error(e.getMessage() + " Routing to failure.", e); + } + + // Compute and collect fragments representing the individual splits + if (!error.get()) { + if (headerSplitInfoRef.get() != null) { + startOffset = headerSplitInfoRef.get().length; + } + long preAccumulatedLength = startOffset; + while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) { + computedSplitsInfo.add(splitInfo); + startOffset += splitInfo.length; + } + long stop = System.nanoTime(); + if (getLogger().isDebugEnabled()) { + getLogger().debug("Computed splits in " + (stop - start) + " milliseconds."); + } + } + } + }); + + if (error.get()){ + processSession.transfer(sourceFlowFile, REL_FAILURE); + } else { + List splitFlowFiles = this.generateSplitFlowFiles(sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession); + processSession.transfer(sourceFlowFile, REL_ORIGINAL); + processSession.transfer(splitFlowFiles, REL_SPLITS); + } } @Override protected Collection customValidate(ValidationContext validationContext) { List results = new ArrayList<>(); - - final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 + boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0 && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet()); - - results.add(new ValidationResult.Builder() - .subject("Maximum Fragment Size") - .valid(!invalidState) - .explanation("Property must be specified when Line Split Count is 0") - .build() - ); + results.add(new ValidationResult.Builder().subject("Maximum Fragment Size").valid(!invalidState) + .explanation("Property must be specified when Line Split Count is 0").build()); return results; } - @Override - public Set getRelationships() { - return relationships; - } - @Override protected List getSupportedPropertyDescriptors() { return properties; } - private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out, - final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException { - final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - - byte[] leadingBytes = leadingNewLineBytes; - int numLines = 0; - long totalBytes = 0L; - for (int i = 0; i < maxNumLines; i++) { - final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes); - final long bytes = eolMarker.getBytesConsumed(); - leadingBytes = eolMarker.getLeadingNewLineBytes(); - - if (includeLineDelimiter && out != null) { - if (leadingBytes != null) { - out.write(leadingBytes); - leadingBytes = null; - } - eolBuffer.drainTo(out); - } - totalBytes += bytes; - if (bytes <= 0) { - return numLines; - } - numLines++; - if (totalBytes >= maxByteCount) { - break; - } + /** + * Generates the list of {@link FlowFile}s representing splits. If + * {@link SplitInfo} provided as an argument to this operation is not null + * it signifies the header information and its contents will be included in + * each and every computed split. + */ + private List generateSplitFlowFiles(FlowFile sourceFlowFile, SplitInfo splitInfo, List computedSplitsInfo, ProcessSession processSession){ + List splitFlowFiles = new ArrayList<>(); + FlowFile headerFlowFile = null; + long headerCrlfLength = 0; + if (splitInfo != null) { + headerFlowFile = processSession.clone(sourceFlowFile, splitInfo.startOffset, splitInfo.length); + headerCrlfLength = splitInfo.trimmedLength; } - return numLines; - } + int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme + String fragmentId = UUID.randomUUID().toString(); - private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize, - final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException { - long bytesRead = 0L; - final ByteArrayOutputStream buffer; - if (out != null) { - buffer = new ByteArrayOutputStream(); + if (computedSplitsInfo.size() == 0) { + FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength); + splitFlowFile = SplitText.this.updateAttributes(processSession, splitFlowFile, 0, splitFlowFile.getSize(), + fragmentId, fragmentIndex++, 0, sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key())); + splitFlowFiles.add(splitFlowFile); } else { - buffer = null; - } - byte[] bytesToWriteFirst = leadingNewLineBytes; + for (SplitInfo computedSplitInfo : computedSplitsInfo) { + long length = SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length; + boolean proceedWithClone = headerFlowFile != null || length > 0; + if (proceedWithClone) { + FlowFile splitFlowFile = null; + if (headerFlowFile != null) { + if (length > 0) { + splitFlowFile = processSession.clone(sourceFlowFile, computedSplitInfo.startOffset, length); + splitFlowFile = processSession.merge( Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile); + } else { + splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength); // trim the last CRLF if split consists of only HEADER + } + } else { + splitFlowFile = processSession.clone(sourceFlowFile, computedSplitInfo.startOffset, length); + } - in.mark(Integer.MAX_VALUE); - while (true) { - final int nextByte = in.read(); - - // if we hit end of stream we're done - if (nextByte == -1) { - if (buffer != null) { - buffer.writeTo(out); - buffer.close(); - } - return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); // bytesToWriteFirst should be "null"? - } - - // Verify leading bytes do not violate size limitation - if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) { - return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes); - } - // Write leadingNewLines, if appropriate - if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) { - bytesRead += bytesToWriteFirst.length; - buffer.write(bytesToWriteFirst); - bytesToWriteFirst = null; - } - // buffer the output - bytesRead++; - if (buffer != null && nextByte != '\n' && nextByte != '\r') { - if (bytesToWriteFirst != null) { - buffer.write(bytesToWriteFirst); - } - bytesToWriteFirst = null; - eolBuffer.drainTo(buffer); - eolBuffer.clear(); - buffer.write(nextByte); - } - - // check the size limit - if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) { - in.reset(); - if (buffer != null) { - buffer.close(); - } - return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes); - } - - // if we have a new line, then we're done - if (nextByte == '\n') { - if (buffer != null) { - buffer.writeTo(out); - buffer.close(); - eolBuffer.addEndOfLine(false, true); - } - return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst); - } - - // Determine if \n follows \r; in either case, end of line has been reached - if (nextByte == '\r') { - if (buffer != null) { - buffer.writeTo(out); - buffer.close(); - } - in.mark(1); - final int lookAheadByte = in.read(); - if (lookAheadByte == '\n') { - eolBuffer.addEndOfLine(true, true); - return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst); - } else { - in.reset(); - eolBuffer.addEndOfLine(true, false); - return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst); + splitFlowFile = SplitText.this.updateAttributes(processSession, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++, + computedSplitsInfo.size(), sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key())); + splitFlowFiles.add(splitFlowFile); } } } + + getLogger().info("Split " + sourceFlowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : ".")); + if (headerFlowFile != null) { + processSession.remove(headerFlowFile); + } + return splitFlowFiles; } - private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize, - final long bufferedBytes) throws IOException { - final SplitInfo info = new SplitInfo(); - final EndOfLineBuffer eolBuffer = new EndOfLineBuffer(); - int lastByte = -1; - info.lengthBytes = bufferedBytes; - long lastEolBufferLength = 0L; + private FlowFile updateAttributes(ProcessSession processSession, FlowFile splitFlowFile, long splitLineCount, long splitFlowFileSize, + String splitId, int splitIndex, int splitCount, String origFileName) { + Map attributes = new HashMap<>(); + attributes.put(SPLIT_LINE_COUNT, String.valueOf(splitLineCount)); + attributes.put(FRAGMENT_SIZE, String.valueOf(splitFlowFile.getSize())); + attributes.put(FRAGMENT_ID, splitId); + attributes.put(FRAGMENT_INDEX, String.valueOf(splitIndex)); + attributes.put(FRAGMENT_COUNT, String.valueOf(splitCount)); + attributes.put(SEGMENT_ORIGINAL_FILENAME, origFileName); + return processSession.putAllAttributes(splitFlowFile, attributes); + } - while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r')) - && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0) - && eolBuffer.length() < maxSize) { - in.mark(1); - final int nextByte = in.read(); - // Check for \n following \r on last line - if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') { - in.reset(); + /** + * Will generate {@link SplitInfo} for the next fragment that represents the + * header of the future split. + * + * If split size is controlled by the amount of lines in the split then the + * resulting {@link SplitInfo} line count will always be <= 'splitMaxLineCount'. It can only be less IF it reaches the EOF. + * If split size is controlled by the {@link #maxSplitSize}, then the resulting {@link SplitInfo} line count + * will vary but the length of the split will never be > {@link #maxSplitSize} and {@link IllegalStateException} will be thrown. + * This method also allows one to provide 'startsWithFilter' to allow headers to be determined via such filter (see {@link #HEADER_MARKER}. + */ + private SplitInfo computeHeader(TextLineDemarcator demarcator, long startOffset, long splitMaxLineCount, byte[] startsWithFilter, SplitInfo previousSplitInfo) { + long length = 0; + long actualLineCount = 0; + OffsetInfo offsetInfo = null; + SplitInfo splitInfo = null; + OffsetInfo previousOffsetInfo = null; + long lastCrlfLength = 0; + while ((offsetInfo = demarcator.nextOffsetInfo(startsWithFilter)) != null) { + lastCrlfLength = offsetInfo.getCrlfLength(); + + if (startsWithFilter != null && !offsetInfo.isStartsWithMatch()) { + if (offsetInfo.getCrlfLength() != -1) { + previousOffsetInfo = offsetInfo; + } break; - } - switch (nextByte) { - case -1: - info.endOfStream = true; - if (keepAllNewLines) { - info.lengthBytes += eolBuffer.length(); - } - if (lastByte != '\r') { - info.lengthLines++; - } - info.bufferedBytes = 0; - return info; - case '\r': - eolBuffer.addEndOfLine(true, false); - info.lengthLines++; - info.bufferedBytes = 0; - break; - case '\n': - eolBuffer.addEndOfLine(false, true); - if (lastByte != '\r') { - info.lengthLines++; - } - info.bufferedBytes = 0; - break; - default: - if (eolBuffer.length() > 0) { - info.lengthBytes += eolBuffer.length(); - lastEolBufferLength = eolBuffer.length(); - eolBuffer.clear(); - } - info.lengthBytes++; - info.bufferedBytes++; - break; - } - lastByte = nextByte; - } - // if current line exceeds size and not keeping eol characters, remove previously applied eol characters - if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) { - info.lengthBytes -= lastEolBufferLength; - } - if (keepAllNewLines) { - info.lengthBytes += eolBuffer.length(); - } - return info; - } - - private int countHeaderLines(final ByteCountingInputStream in, - final String headerMarker) throws IOException { - int headerInfo = 0; - - final BufferedReader br = new BufferedReader(new InputStreamReader(in)); - in.mark(Integer.MAX_VALUE); - String line = br.readLine(); - while (line != null) { - // if line is not a header line, reset stream and return header counts - if (!line.startsWith(headerMarker)) { - in.reset(); - return headerInfo; } else { - headerInfo++; - } - line = br.readLine(); - } - in.reset(); - return headerInfo; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - final FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final ComponentLog logger = getLogger(); - final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger(); - final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0) - ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger(); - final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet() - ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE; - final String headerMarker = context.getProperty(HEADER_MARKER).getValue(); - final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean(); - - final AtomicReference errorMessage = new AtomicReference<>(null); - final ArrayList splitInfos = new ArrayList<>(); - - final long startNanos = System.nanoTime(); - final List splits = new ArrayList<>(); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn); - final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { - - long bufferedPartialLine = 0; - - // if we have header lines, copy them into a ByteArrayOutputStream - final ByteArrayOutputStream headerStream = new ByteArrayOutputStream(); - // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property - int headerInfoLineCount = 0; - if (headerCount > 0) { - headerInfoLineCount = headerCount; - } else { - if (headerMarker != null) { - headerInfoLineCount = countHeaderLines(in, headerMarker); - } - } - - final byte[] headerNewLineBytes; - final byte[] headerBytesWithoutTrailingNewLines; - if (headerInfoLineCount > 0) { - final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null); - - if (headerLinesCopied < headerInfoLineCount) { - errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines"); - return; - } - - // Break header apart into trailing newlines and remaining text - final byte[] headerBytes = headerStream.toByteArray(); - int headerNewLineByteCount = 0; - for (int i = headerBytes.length - 1; i >= 0; i--) { - final byte headerByte = headerBytes[i]; - - if (headerByte == '\r' || headerByte == '\n') { - headerNewLineByteCount++; - } else { - break; - } - } - - if (headerNewLineByteCount == 0) { - headerNewLineBytes = null; - headerBytesWithoutTrailingNewLines = headerBytes; - } else { - headerNewLineBytes = new byte[headerNewLineByteCount]; - System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount); - - headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount]; - System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount); - } - } else { - headerBytesWithoutTrailingNewLines = null; - headerNewLineBytes = null; - } - - while (true) { - if (headerInfoLineCount > 0) { - // if we have header lines, create a new FlowFile, copy the header lines to that file, - // and then start copying lines - final AtomicInteger linesCopied = new AtomicInteger(0); - final AtomicLong bytesCopied = new AtomicLong(0L); - FlowFile splitFile = session.create(flowFile); - try { - splitFile = session.write(splitFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final BufferedOutputStream out = new BufferedOutputStream(rawOut); - final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) { - countingOut.write(headerBytesWithoutTrailingNewLines); - //readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already - linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut, - includeLineDelimiter, headerNewLineBytes)); - bytesCopied.set(countingOut.getBytesWritten()); - } - } - }); - splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get())); - splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get())); - logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()}); - } finally { - if (linesCopied.get() > 0) { - splits.add(splitFile); - } else { - // if the number of content lines is a multiple of the SPLIT_LINE_COUNT, - // the last flow file will contain just a header; don't forward that one - session.remove(splitFile); - } - } - - // Check for EOF - in.mark(1); - if (in.read() == -1) { - break; - } - in.reset(); - - } else { - // We have no header lines, so we can simply demarcate the original File via the - // ProcessSession#clone method. - long beforeReadingLines = in.getBytesConsumed() - bufferedPartialLine; - final SplitInfo info = locateSplitPoint(in, maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine); - if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) { - bufferedPartialLine = info.bufferedBytes; - } - if (info.endOfStream) { - // stream is out of data - if (info.lengthBytes > 0) { - info.offsetBytes = beforeReadingLines; - splitInfos.add(info); - final long procNanos = System.nanoTime() - startNanos; - final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS); - logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; " - + "total splits = {}; total processing time = {} ms", - new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis}); - } - break; - } else { - if (info.lengthBytes != 0) { - info.offsetBytes = beforeReadingLines; - info.lengthBytes -= bufferedPartialLine; - splitInfos.add(info); - final long procNanos = System.nanoTime() - startNanos; - final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS); - logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; " - + "total splits = {}; total processing time = {} ms", - new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis}); - } - } - } + if (length + offsetInfo.getLength() > this.maxSplitSize) { + throw new IllegalStateException( "Computing header resulted in header size being > MAX split size of " + this.maxSplitSize + "."); + } else { + length += offsetInfo.getLength(); + actualLineCount++; + if (actualLineCount == splitMaxLineCount) { + break; } } } - }); - - if (errorMessage.get() != null) { - logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()}); - session.transfer(flowFile, REL_FAILURE); - if (!splits.isEmpty()) { - session.remove(splits); - } - return; } - if (!splitInfos.isEmpty()) { - // Create the splits - for (final SplitInfo info : splitInfos) { - FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes); - split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines)); - split = session.putAttribute(split, FRAGMENT_SIZE, String.valueOf(info.lengthBytes)); - splits.add(split); - } + if (actualLineCount > 0) { + splitInfo = new SplitInfo(startOffset, length, lastCrlfLength, actualLineCount, previousOffsetInfo); } - finishFragmentAttributes(session, flowFile, splits); - - if (splits.size() > 10) { - logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()}); - } else { - logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits}); - } - - session.transfer(flowFile, REL_ORIGINAL); - session.transfer(splits, REL_SPLITS); + return splitInfo; } - private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List splits) { - final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key()); - - final String fragmentId = UUID.randomUUID().toString(); - final ArrayList newList = new ArrayList<>(splits); - splits.clear(); - for (int i = 1; i <= newList.size(); i++) { - FlowFile ff = newList.get(i - 1); - final Map attributes = new HashMap<>(); - attributes.put(FRAGMENT_ID, fragmentId); - attributes.put(FRAGMENT_INDEX, String.valueOf(i)); - attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size())); - attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename); - FlowFile newFF = session.putAllAttributes(ff, attributes); - splits.add(newFF); + /** + * Will generate {@link SplitInfo} for the next split. + * + * If split size is controlled by the amount of lines in the split then the resulting + * {@link SplitInfo} line count will always be <= 'splitMaxLineCount'. + * If split size is controlled by the {@link #maxSplitSize}, then the resulting {@link SplitInfo} + * line count will vary but the length of the split will never be > {@link #maxSplitSize}. + */ + private SplitInfo nextSplit(TextLineDemarcator demarcator, long startOffset, long splitMaxLineCount, SplitInfo remainderSplitInfo, long startingLength) { + long length = 0; + long trailingCrlfLength = 0; + long actualLineCount = 0; + OffsetInfo offsetInfo = null; + SplitInfo splitInfo = null; + // the remainder from the previous read after which it was determined that adding it would make + // the split size > 'maxSplitSize'. So it's being carried over to the next line. + if (remainderSplitInfo != null && remainderSplitInfo.remaningOffsetInfo != null) { + length += remainderSplitInfo.remaningOffsetInfo.getLength(); + actualLineCount++; } - } + OffsetInfo remaningOffsetInfo = null; + long lastCrlfLength = 0; + while ((offsetInfo = demarcator.nextOffsetInfo()) != null) { + lastCrlfLength = offsetInfo.getCrlfLength(); - private static class SplitInfo { - - public long offsetBytes; - public long lengthBytes; - public long lengthLines; - public long bufferedBytes; - public boolean endOfStream; - - public SplitInfo() { - this.offsetBytes = 0L; - this.lengthBytes = 0L; - this.lengthLines = 0L; - this.bufferedBytes = 0L; - this.endOfStream = false; - } - } - - public static class EndOfLineBuffer { - private static final byte CARRIAGE_RETURN = (byte) '\r'; - private static final byte NEWLINE = (byte) '\n'; - - private final BitSet buffer = new BitSet(); - private int index = 0; - - public void clear() { - index = 0; - } - - public void addEndOfLine(final boolean carriageReturn, final boolean newLine) { - buffer.set(index++, carriageReturn); - buffer.set(index++, newLine); - } - - private void drainTo(final OutputStream out) throws IOException { - for (int i = 0; i < index; i += 2) { - final boolean cr = buffer.get(i); - final boolean nl = buffer.get(i + 1); - - // we've consumed all data in the buffer - if (!cr && !nl) { - return; - } - - if (cr) { - out.write(CARRIAGE_RETURN); - } - - if (nl) { - out.write(NEWLINE); - } + if (offsetInfo.getLength() == offsetInfo.getCrlfLength()) { + trailingCrlfLength += offsetInfo.getCrlfLength(); + } else if (offsetInfo.getLength() > offsetInfo.getCrlfLength()) { + trailingCrlfLength = 0; // non-empty line came in, thus resetting counter } - clear(); + if (length + offsetInfo.getLength() + startingLength > this.maxSplitSize) { + if (length == 0) { // single line per split + length += offsetInfo.getLength(); + actualLineCount++; + } else { + remaningOffsetInfo = offsetInfo; + } + break; + } else { + length += offsetInfo.getLength(); + actualLineCount++; + if (splitMaxLineCount > 0 && actualLineCount >= splitMaxLineCount) { + break; + } + } } - /** - * @return the number of line endings in the buffer - */ - public int length() { - return index / 2; + if (actualLineCount > 0) { + if (length - trailingCrlfLength >= lastCrlfLength) { + trailingCrlfLength += lastCrlfLength; // trim CRLF from the last line + } + splitInfo = new SplitInfo(startOffset, length, length - trailingCrlfLength, actualLineCount, remaningOffsetInfo); } + return splitInfo; } - public static class EndOfLineMarker { - private final long bytesConsumed; - private final EndOfLineBuffer eolBuffer; - private final boolean streamEnded; - private final byte[] leadingNewLineBytes; + /** + * Container for hosting meta-information pertaining to the split so it can + * be used later to create {@link FlowFile} representing the split. + */ + private class SplitInfo { + final long startOffset, length, trimmedLength, lineCount; + OffsetInfo remaningOffsetInfo; - public EndOfLineMarker(final long bytesCounted, final EndOfLineBuffer eolBuffer, final boolean streamEnded, final byte[] leadingNewLineBytes) { - this.bytesConsumed = bytesCounted; - this.eolBuffer = eolBuffer; - this.streamEnded = streamEnded; - this.leadingNewLineBytes = leadingNewLineBytes; + SplitInfo(long startOffset, long length, long trimmedLength, long lineCount, OffsetInfo remaningOffsetInfo) { + this.startOffset = startOffset; + this.length = length; + this.lineCount = lineCount; + this.remaningOffsetInfo = remaningOffsetInfo; + this.trimmedLength = trimmedLength; } - public long getBytesConsumed() { - return bytesConsumed; - } - - public EndOfLineBuffer getEndOfLineBuffer() { - return eolBuffer; - } - - public boolean isStreamEnded() { - return streamEnded; - } - - public byte[] getLeadingNewLineBytes() { - return leadingNewLineBytes; + @Override + public String toString() { + return "offset:" + startOffset + "; length:" + length + "; lineCount:" + lineCount; } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java index c7ac4f6e13..1a2089c97b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java @@ -17,12 +17,6 @@ package org.apache.nifi.processors.standard; import static org.junit.Assert.assertEquals; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; - -import org.junit.Test; import java.io.IOException; import java.nio.file.Files; @@ -30,6 +24,12 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + public class TestSplitText { final String originalFilename = "original.txt"; @@ -176,7 +176,7 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); - runner.assertTransferCount(SplitText.REL_SPLITS, 0); + runner.assertTransferCount(SplitText.REL_SPLITS, 1); // repeat with header cou8nt versus header marker runner.clearTransferState(); @@ -189,7 +189,7 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); - runner.assertTransferCount(SplitText.REL_SPLITS, 0); + runner.assertTransferCount(SplitText.REL_SPLITS, 1); // repeat single header line with no newline characters runner.clearTransferState(); @@ -202,7 +202,7 @@ public class TestSplitText { runner.assertTransferCount(SplitText.REL_FAILURE, 0); runner.assertTransferCount(SplitText.REL_ORIGINAL, 1); - runner.assertTransferCount(SplitText.REL_SPLITS, 0); + runner.assertTransferCount(SplitText.REL_SPLITS, 1); } @Test @@ -813,7 +813,7 @@ public class TestSplitText { final List splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS); splits.get(0).assertContentEquals("\n\n1"); - splits.get(1).assertContentEquals(""); + splits.get(1).assertContentEquals("\n"); } }