NIFI-2876 refactored demarcators into a common abstract class

This closes #1214.
This commit is contained in:
Oleg Zhurakousky 2016-11-11 17:36:18 -05:00 committed by Mark Payne
parent fba17159a9
commit 8e17929d6a
6 changed files with 377 additions and 261 deletions

View File

@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stream.io.util;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.BufferOverflowException;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
/**
* Base class for implementing streaming demarcators.
* <p>
* NOTE: Not intended for multi-thread usage hence not Thread-safe.
* </p>
*/
abstract class AbstractDemarcator implements Closeable {
final static int INIT_BUFFER_SIZE = 8192;
private final InputStream is;
/*
* The size of the initial buffer. Its value is also used when bufer needs
* to be expanded.
*/
private final int initialBufferSize;
/*
* The maximum allowed size of the token. In the event such size is exceeded
* TokenTooLargeException is thrown.
*/
private final int maxDataSize;
/*
* Buffer into which the bytes are read from the provided stream. The size
* of the buffer is defined by the 'initialBufferSize' provided in the
* constructor or defaults to the value of INIT_BUFFER_SIZE constant.
*/
byte[] buffer;
/*
* Starting offset of the demarcated token within the current 'buffer'.
*/
int index;
/*
* Starting offset of the demarcated token within the current 'buffer'. Keep
* in mind that while most of the time it is the same as the 'index' it may
* also have a value of 0 at which point it serves as a signal to the fill()
* operation that buffer needs to be expended if end of token is not reached
* (see fill() operation for more details).
*/
int mark;
/*
* Starting offset (from the beginning of the stream) of the demarcated
* token.
*/
long offset;
/*
* The length of the bytes valid for reading. It is different from the
* buffer length, since this number may be smaller (e.g., at he end of the
* stream) then actual buffer length. It is set by the fill() operation
* every time more bytes read into buffer.
*/
int availableBytesLength;
/**
* Constructs an instance of demarcator with provided {@link InputStream}
* and max buffer size. Each demarcated token must fit within max buffer
* size, otherwise the exception will be raised.
*/
AbstractDemarcator(InputStream is, int maxDataSize) {
this(is, maxDataSize, INIT_BUFFER_SIZE);
}
/**
* Constructs an instance of demarcator with provided {@link InputStream}
* and max buffer size and initial buffer size. Each demarcated token must
* fit within max buffer size, otherwise the exception will be raised.
*/
AbstractDemarcator(InputStream is, int maxDataSize, int initialBufferSize) {
this.validate(is, maxDataSize, initialBufferSize);
this.is = is;
this.initialBufferSize = initialBufferSize;
this.buffer = new byte[initialBufferSize];
this.maxDataSize = maxDataSize;
}
@Override
public void close() throws IOException {
this.is.close();
}
/**
* Will fill the current buffer from current 'index' position, expanding it
* and or shuffling it if necessary. If buffer exceeds max buffer size a
* {@link TokenTooLargeException} will be thrown.
*
* @throws IOException
* if unable to read from the stream
*/
void fill() throws IOException {
if (this.index >= this.buffer.length) {
if (this.mark == 0) { // expand
long expandedSize = this.buffer.length + this.initialBufferSize;
if (expandedSize > Integer.MAX_VALUE) {
throw new BufferOverflowException(); // will probably OOM before this will ever happen, but just in case.
}
byte[] newBuff = new byte[(int) expandedSize];
System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length);
this.buffer = newBuff;
} else { // shuffle
int length = this.index - this.mark;
System.arraycopy(this.buffer, this.mark, this.buffer, 0, length);
this.index = length;
this.mark = 0;
}
}
int bytesRead;
/*
* The do/while pattern is used here similar to the way it is used in
* BufferedReader essentially protecting from assuming the EOS until it
* actually is since not every implementation of InputStream guarantees
* that bytes are always available while the stream is open.
*/
do {
bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
} while (bytesRead == 0);
this.availableBytesLength = bytesRead != -1 ? this.index + bytesRead : -1;
}
/**
* Will extract data token of the provided length from the current buffer
* starting at the 'mark'.
*/
byte[] extractDataToken(int length) throws IOException {
if (length > this.maxDataSize) {
throw new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + this.maxDataSize + " bytes.");
}
byte[] data = null;
if (length > 0) {
data = new byte[length];
System.arraycopy(this.buffer, this.mark, data, 0, data.length);
}
return data;
}
/**
* Validates prerequisites for constructor arguments
*/
private void validate(InputStream is, int maxDataSize, int initialBufferSize) {
if (is == null) {
throw new IllegalArgumentException("'is' must not be null");
} else if (maxDataSize <= 0) {
throw new IllegalArgumentException("'maxDataSize' must be > 0");
} else if (initialBufferSize <= 0) {
throw new IllegalArgumentException("'initialBufferSize' must be > 0");
}
}
}

View File

@ -16,40 +16,24 @@
*/ */
package org.apache.nifi.stream.io.util; package org.apache.nifi.stream.io.util;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
/** /**
* The <code>StreamDemarcator</code> class takes an input stream and demarcates * The <code>StreamDemarcator</code> class takes an input stream and demarcates
* it so it could be read (see {@link #nextToken()}) as individual byte[] * it so it could be read (see {@link #nextToken()}) as individual byte[]
* demarcated by the provided delimiter. If delimiter is not provided the entire * demarcated by the provided delimiter (see 'delimiterBytes'). If delimiter is
* stream will be read into a single token which may result in * not provided the entire stream will be read into a single token which may
* {@link OutOfMemoryError} if stream is too large. * result in {@link OutOfMemoryError} if stream is too large. The 'maxDataSize'
* controls the maximum size of the buffer that accumulates a token.
* <p>
* NOTE: Not intended for multi-thread usage hence not Thread-safe.
* </p>
*/ */
public class StreamDemarcator implements Closeable { public class StreamDemarcator extends AbstractDemarcator {
private final static int INIT_BUFFER_SIZE = 8192;
private final InputStream is;
private final byte[] delimiterBytes; private final byte[] delimiterBytes;
private final int maxDataSize;
private final int initialBufferSize;
private byte[] buffer;
private int index;
private int mark;
private int readAheadLength;
/** /**
* Constructs a new instance * Constructs a new instance
* *
@ -57,10 +41,14 @@ public class StreamDemarcator implements Closeable {
* instance of {@link InputStream} representing the data * instance of {@link InputStream} representing the data
* @param delimiterBytes * @param delimiterBytes
* byte array representing delimiter bytes used to split the * byte array representing delimiter bytes used to split the
* input stream. Can be null * input stream. Can be 'null'. NOTE: the 'null' is allowed only
* for convenience and consistency since without delimiter this
* class is no different then BufferedReader which reads the
* entire stream into a byte array and there may be a more
* efficient ways to do that (if that is the case).
* @param maxDataSize * @param maxDataSize
* maximum size of data derived from the input stream. This means * maximum size of data derived from the input stream. This means
* that neither {@link InputStream} nor its individual chunks (if * that neither {@link InputStream} nor its individual tokens (if
* delimiter is used) can ever be greater then this size. * delimiter is used) can ever be greater then this size.
*/ */
public StreamDemarcator(InputStream is, byte[] delimiterBytes, int maxDataSize) { public StreamDemarcator(InputStream is, byte[] delimiterBytes, int maxDataSize) {
@ -74,10 +62,14 @@ public class StreamDemarcator implements Closeable {
* instance of {@link InputStream} representing the data * instance of {@link InputStream} representing the data
* @param delimiterBytes * @param delimiterBytes
* byte array representing delimiter bytes used to split the * byte array representing delimiter bytes used to split the
* input stream. Can be null * input stream. Can be 'null'. NOTE: the 'null' is allowed only
* for convenience and consistency since without delimiter this
* class is no different then BufferedReader which reads the
* entire stream into a byte array and there may be a more
* efficient ways to do that (if that is the case).
* @param maxDataSize * @param maxDataSize
* maximum size of data derived from the input stream. This means * maximum size of data derived from the input stream. This means
* that neither {@link InputStream} nor its individual chunks (if * that neither {@link InputStream} nor its individual tokens (if
* delimiter is used) can ever be greater then this size. * delimiter is used) can ever be greater then this size.
* @param initialBufferSize * @param initialBufferSize
* initial size of the buffer used to buffer {@link InputStream} * initial size of the buffer used to buffer {@link InputStream}
@ -87,12 +79,9 @@ public class StreamDemarcator implements Closeable {
* *
*/ */
public StreamDemarcator(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) { public StreamDemarcator(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) {
this.validateInput(is, delimiterBytes, maxDataSize, initialBufferSize); super(is, maxDataSize, initialBufferSize);
this.is = is; this.validate(delimiterBytes);
this.delimiterBytes = delimiterBytes; this.delimiterBytes = delimiterBytes;
this.initialBufferSize = initialBufferSize;
this.buffer = new byte[initialBufferSize];
this.maxDataSize = maxDataSize;
} }
/** /**
@ -102,99 +91,55 @@ public class StreamDemarcator implements Closeable {
* @throws IOException if unable to read from the stream * @throws IOException if unable to read from the stream
*/ */
public byte[] nextToken() throws IOException { public byte[] nextToken() throws IOException {
byte[] data = null; byte[] token = null;
int j = 0; int j = 0;
nextTokenLoop:
while (data == null && this.buffer != null) { while (token == null && this.availableBytesLength != -1) {
if (this.index >= this.readAheadLength) { if (this.index >= this.availableBytesLength) {
this.fill(); this.fill();
} }
if (this.index >= this.readAheadLength) { if (this.availableBytesLength != -1) {
data = this.extractDataToken(0); byte byteVal;
this.buffer = null; int i;
} else { for (i = this.index; i < this.availableBytesLength; i++) {
byte byteVal = this.buffer[this.index++]; byteVal = this.buffer[i];
if (this.delimiterBytes != null && this.delimiterBytes[j] == byteVal) {
if (++j == this.delimiterBytes.length) { boolean delimiterFound = false;
data = this.extractDataToken(this.delimiterBytes.length); if (this.delimiterBytes != null && this.delimiterBytes[j] == byteVal) {
this.mark = this.index; if (++j == this.delimiterBytes.length) {
delimiterFound = true;
}
} else {
j = 0; j = 0;
} }
} else {
j = 0; if (delimiterFound) {
this.index = i + 1;
int size = this.index - this.mark - this.delimiterBytes.length;
token = this.extractDataToken(size);
this.mark = this.index;
j = 0;
if (token != null) {
break nextTokenLoop;
}
}
} }
} this.index = i;
} } else {
return data; token = this.extractDataToken(this.index - this.mark);
}
/**
* Will fill the current buffer from current 'index' position, expanding it
* and or shuffling it if necessary
*
* @throws IOException if unable to read from the stream
*/
private void fill() throws IOException {
if (this.index >= this.buffer.length) {
if (this.mark == 0) { // expand
byte[] newBuff = new byte[this.buffer.length + this.initialBufferSize];
System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length);
this.buffer = newBuff;
} else { // shuffle
int length = this.index - this.mark;
System.arraycopy(this.buffer, this.mark, this.buffer, 0, length);
this.index = length;
this.mark = 0;
this.readAheadLength = length;
} }
} }
int bytesRead; return token;
do {
bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
} while (bytesRead == 0);
if (bytesRead != -1) {
this.readAheadLength = this.index + bytesRead;
if (this.readAheadLength > this.maxDataSize) {
throw new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + this.maxDataSize + " bytes.");
}
}
} }
/**
* Will extract data token from the current buffer. The length of the data
* token is between the current 'mark' and 'index' minus 'lengthSubtract'
* which signifies the length of the delimiter (if any). If the above
* subtraction results in length 0, null is returned.
*/
private byte[] extractDataToken(int lengthSubtract) {
byte[] data = null;
int length = this.index - this.mark - lengthSubtract;
if (length > 0) {
data = new byte[length];
System.arraycopy(this.buffer, this.mark, data, 0, data.length);
}
return data;
}
/** /**
* * Validates prerequisites for constructor arguments
*/ */
private void validateInput(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) { private void validate(byte[] delimiterBytes) {
if (is == null) { if (delimiterBytes != null && delimiterBytes.length == 0) {
throw new IllegalArgumentException("'is' must not be null");
} else if (maxDataSize <= 0) {
throw new IllegalArgumentException("'maxDataSize' must be > 0");
} else if (initialBufferSize <= 0) {
throw new IllegalArgumentException("'initialBufferSize' must be > 0");
} else if (delimiterBytes != null && delimiterBytes.length == 0){
throw new IllegalArgumentException("'delimiterBytes' is an optional argument, but when provided its length must be > 0"); throw new IllegalArgumentException("'delimiterBytes' is an optional argument, but when provided its length must be > 0");
} }
} }
@Override
public void close() throws IOException {
is.close();
}
} }

View File

@ -28,26 +28,14 @@ import java.io.InputStream;
* computed text line. See {@link #nextOffsetInfo()} and * computed text line. See {@link #nextOffsetInfo()} and
* {@link #nextOffsetInfo(byte[])} for more details. * {@link #nextOffsetInfo(byte[])} for more details.
* <p> * <p>
* This class is NOT thread-safe. * NOTE: Not intended for multi-thread usage hence not Thread-safe.
* </p> * </p>
*/ */
public class TextLineDemarcator { public class TextLineDemarcator extends AbstractDemarcator {
private final static int INIT_BUFFER_SIZE = 8192; private static int CR = 13; // \r
private final InputStream is; private static int LF = 10; // \n
private final int initialBufferSize;
private byte[] buffer;
private int index;
private int mark;
private long offset;
private int bufferLength;
/** /**
* Constructs an instance of demarcator with provided {@link InputStream} * Constructs an instance of demarcator with provided {@link InputStream}
@ -62,15 +50,7 @@ public class TextLineDemarcator {
* and initial buffer size. * and initial buffer size.
*/ */
public TextLineDemarcator(InputStream is, int initialBufferSize) { public TextLineDemarcator(InputStream is, int initialBufferSize) {
if (is == null) { super(is, Integer.MAX_VALUE, initialBufferSize);
throw new IllegalArgumentException("'is' must not be null.");
}
if (initialBufferSize < 1) {
throw new IllegalArgumentException("'initialBufferSize' must be > 0.");
}
this.is = is;
this.initialBufferSize = initialBufferSize;
this.buffer = new byte[initialBufferSize];
} }
/** /**
@ -81,7 +61,7 @@ public class TextLineDemarcator {
* *
* @return offset info * @return offset info
*/ */
public OffsetInfo nextOffsetInfo() { public OffsetInfo nextOffsetInfo() throws IOException {
return this.nextOffsetInfo(null); return this.nextOffsetInfo(null);
} }
@ -90,140 +70,84 @@ public class TextLineDemarcator {
* by either '\r', '\n' or '\r\n'). <br> * by either '\r', '\n' or '\r\n'). <br>
* The <i>offset info</i> computed and returned as {@link OffsetInfo} where * The <i>offset info</i> computed and returned as {@link OffsetInfo} where
* {@link OffsetInfo#isStartsWithMatch()} will return true if * {@link OffsetInfo#isStartsWithMatch()} will return true if
* <code>startsWith</code> was successfully matched with the stsarting bytes * <code>startsWith</code> was successfully matched with the starting bytes
* of the text line. * of the text line.
* *
* NOTE: The reason for 2 'nextOffsetInfo(..)' operations is that the
* 'startsWith' argument will force the actual token to be extracted and
* then matched introducing the overhead for System.arrayCopy and matching
* logic which is an optional scenario and is avoided all together if
* 'startsWith' is not provided (i.e., null).
*
* @return offset info * @return offset info
*/ */
public OffsetInfo nextOffsetInfo(byte[] startsWith) { public OffsetInfo nextOffsetInfo(byte[] startsWith) throws IOException {
OffsetInfo offsetInfo = null; OffsetInfo offsetInfo = null;
int lineLength = 0; byte previousByteVal = 0;
byte[] token = null; byte[] data = null;
lineLoop: nextTokenLoop:
while (this.bufferLength != -1) { while (data == null && this.availableBytesLength != -1) {
if (this.index >= this.bufferLength) { if (this.index >= this.availableBytesLength) {
this.fill(); this.fill();
} }
if (this.bufferLength != -1) { int delimiterSize = 0;
int i; if (this.availableBytesLength != -1) {
byte byteVal; byte byteVal;
for (i = this.index; i < this.bufferLength; i++) { int i;
for (i = this.index; i < this.availableBytesLength; i++) {
byteVal = this.buffer[i]; byteVal = this.buffer[i];
lineLength++;
int crlfLength = computeEol(byteVal, i + 1); if (byteVal == LF) {
if (crlfLength > 0) { delimiterSize = previousByteVal == CR ? 2 : 1;
i += crlfLength; } else if (previousByteVal == CR) {
if (crlfLength == 2) { delimiterSize = 1;
lineLength++; i--;
} }
offsetInfo = new OffsetInfo(this.offset, lineLength, crlfLength); previousByteVal = byteVal;
if (delimiterSize > 0) {
this.index = i + 1;
int size = Math.max(1, this.index - this.mark);
offsetInfo = new OffsetInfo(this.offset, size, delimiterSize);
this.offset += size;
if (startsWith != null) { if (startsWith != null) {
token = this.extractDataToken(lineLength); data = this.extractDataToken(size);
} }
this.mark = this.index; this.mark = this.index;
break lineLoop; break nextTokenLoop;
} }
} }
this.index = i; this.index = i;
} else {
delimiterSize = previousByteVal == CR || previousByteVal == LF ? 1 : 0;
if (offsetInfo == null) {
int size = this.index - this.mark;
if (size > 0) {
offsetInfo = new OffsetInfo(this.offset, size, delimiterSize);
this.offset += size;
}
}
if (startsWith != null) {
data = this.extractDataToken(this.index - this.mark);
}
} }
} }
// EOF where last char(s) are not CRLF.
if (lineLength > 0 && offsetInfo == null) {
offsetInfo = new OffsetInfo(this.offset, lineLength, 0);
if (startsWith != null) {
token = this.extractDataToken(lineLength);
}
}
this.offset += lineLength;
// checks if the new line starts with 'startsWith' chars if (startsWith != null && data != null) {
if (startsWith != null) { if (startsWith.length > data.length) {
for (int i = 0; i < startsWith.length; i++) { offsetInfo.setStartsWithMatch(false);
byte sB = startsWith[i]; } else {
if (token != null && sB != token[i]) { for (int i = 0; i < startsWith.length; i++) {
offsetInfo.setStartsWithMatch(0); byte sB = startsWith[i];
break; if (sB != data[i]) {
offsetInfo.setStartsWithMatch(false);
break;
}
} }
} }
} }
return offsetInfo; return offsetInfo;
} }
/**
* Determines if the line terminates. Returns int specifying the length of
* the CRLF (i.e., only CR or LF or CR and LF) and therefore can only have
* values of:
* 0 - not the end of the line
* 1 - the end of the line either via CR or LF
* 2 - the end of the line with both CR and LF
*
* It performs the read ahead on the buffer if need to.
*/
private int computeEol(byte currentByte, int providedIndex) {
int actualIndex = providedIndex - 1;
boolean readAhead = false;
int crlfLength = 0;
if (currentByte == '\n') {
crlfLength = 1;
} else if (currentByte == '\r') {
if (providedIndex >= this.bufferLength) {
this.index = this.bufferLength;
this.fill();
providedIndex = this.index;
readAhead = true;
}
crlfLength = 1;
if (providedIndex < this.buffer.length - 1) {
currentByte = this.buffer[providedIndex];
crlfLength = currentByte == '\n' ? 2 : 1;
}
}
if (crlfLength > 0) {
this.index = readAhead ? this.index + (crlfLength - 1) : (actualIndex + crlfLength);
}
return crlfLength;
}
private byte[] extractDataToken(int length) {
byte[] data = null;
if (length > 0) {
data = new byte[length];
System.arraycopy(this.buffer, this.mark, data, 0, data.length);
}
return data;
}
/**
* Will fill the current buffer from current 'index' position, expanding it
* and or shuffling it if necessary
*/
private void fill() {
if (this.index >= this.buffer.length) {
if (this.mark == 0) { // expand
byte[] newBuff = new byte[this.buffer.length + this.initialBufferSize];
System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length);
this.buffer = newBuff;
} else { // shuffle
int length = this.index - this.mark;
System.arraycopy(this.buffer, this.mark, this.buffer, 0, length);
this.index = length;
this.mark = 0;
}
}
try {
int bytesRead;
do {
bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
} while (bytesRead == 0);
this.bufferLength = bytesRead != -1 ? this.index + bytesRead : -1;
} catch (IOException e) {
throw new IllegalStateException("Failed while reading InputStream", e);
}
}
/** /**
* Container to hold offset and meta info for a computed text line. * Container to hold offset and meta info for a computed text line.
* The offset and meta info is represented with the following 4 values: * The offset and meta info is represented with the following 4 values:
@ -245,7 +169,7 @@ public class TextLineDemarcator {
private boolean startsWithMatch = true; private boolean startsWithMatch = true;
OffsetInfo(long startOffset, long length, int crlfLength) { private OffsetInfo(long startOffset, long length, int crlfLength) {
this.startOffset = startOffset; this.startOffset = startOffset;
this.length = length; this.length = length;
this.crlfLength = crlfLength; this.crlfLength = crlfLength;
@ -267,8 +191,13 @@ public class TextLineDemarcator {
return this.startsWithMatch; return this.startsWithMatch;
} }
void setStartsWithMatch(int startsWithMatch) { void setStartsWithMatch(boolean startsWithMatch) {
this.startsWithMatch = startsWithMatch == 1 ? true : false; this.startsWithMatch = startsWithMatch;
}
@Override
public String toString() {
return "offset:" + this.startOffset + "; length:" + this.length + "; crlfLength:" + this.crlfLength;
} }
} }
} }

View File

@ -32,8 +32,8 @@ import java.util.Arrays;
import org.junit.Test; import org.junit.Test;
@SuppressWarnings("resource")
public class StreamDemarcatorTest { public class StreamDemarcatorTest {
@Test @Test
public void validateInitializationFailure() { public void validateInitializationFailure() {
try { try {
@ -65,6 +65,41 @@ public class StreamDemarcatorTest {
} }
} }
@Test
public void validateLargeBufferSmallMaxSize() throws IOException {
final byte[] inputData = "A Great Benefit To Us All".getBytes(StandardCharsets.UTF_8);
try (final InputStream is = new ByteArrayInputStream(inputData);
final StreamDemarcator demarcator = new StreamDemarcator(is, "B".getBytes(StandardCharsets.UTF_8), 24, 4096)) {
final byte[] first = demarcator.nextToken();
assertNotNull(first);
assertEquals("A Great ", new String(first));
final byte[] second = demarcator.nextToken();
assertNotNull(second);
assertEquals("enefit To Us All", new String(second));
assertNull(demarcator.nextToken());
}
}
@Test
public void vaidateOnPartialMatchThenSubsequentPartialMatch() throws IOException {
final byte[] inputData = "A Great Big Boy".getBytes(StandardCharsets.UTF_8);
final byte[] delimBytes = "AB".getBytes(StandardCharsets.UTF_8);
try (final InputStream is = new ByteArrayInputStream(inputData);
final StreamDemarcator demarcator = new StreamDemarcator(is, delimBytes, 4096)) {
final byte[] bytes = demarcator.nextToken();
assertArrayEquals(inputData, bytes);
assertNull(demarcator.nextToken());
}
}
@Test @Test
public void validateNoDelimiter() throws IOException { public void validateNoDelimiter() throws IOException {
String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning."; String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
@ -196,10 +231,11 @@ public class StreamDemarcatorTest {
@Test(expected = IOException.class) @Test(expected = IOException.class)
public void validateMaxBufferSize() throws IOException { public void validateMaxBufferSize() throws IOException {
String data = "THIS IS MY TEXT<MY DELIMITER>THIS IS MY NEW TEXT<MY DELIMITER>THIS IS MY NEWEST TEXT"; String data = "THIS IS MY TEXT<MY DELIMITER>THIS IS MY NEW TEXT THEN<MY DELIMITER>THIS IS MY NEWEST TEXT";
ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
StreamDemarcator scanner = new StreamDemarcator(is, "<MY DELIMITER>".getBytes(StandardCharsets.UTF_8), 20); StreamDemarcator scanner = new StreamDemarcator(is, "<MY DELIMITER>".getBytes(StandardCharsets.UTF_8), 20);
scanner.nextToken(); scanner.nextToken();
scanner.nextToken();
} }
@Test @Test

View File

@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets;
import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo; import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo;
import org.junit.Test; import org.junit.Test;
@SuppressWarnings("resource")
public class TextLineDemarcatorTest { public class TextLineDemarcatorTest {
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
@ -44,7 +45,7 @@ public class TextLineDemarcatorTest {
} }
@Test @Test
public void emptyStreamNoStartWithFilter() { public void emptyStreamNoStartWithFilter() throws IOException {
String data = ""; String data = "";
InputStream is = stringToIs(data); InputStream is = stringToIs(data);
TextLineDemarcator demarcator = new TextLineDemarcator(is); TextLineDemarcator demarcator = new TextLineDemarcator(is);
@ -53,7 +54,7 @@ public class TextLineDemarcatorTest {
@Test @Test
public void emptyStreamAndStartWithFilter() { public void emptyStreamAndStartWithFilter() throws IOException {
String data = ""; String data = "";
InputStream is = stringToIs(data); InputStream is = stringToIs(data);
TextLineDemarcator demarcator = new TextLineDemarcator(is); TextLineDemarcator demarcator = new TextLineDemarcator(is);
@ -63,7 +64,7 @@ public class TextLineDemarcatorTest {
// this test has no assertions. It's success criteria is validated by lack // this test has no assertions. It's success criteria is validated by lack
// of failure (see NIFI-3278) // of failure (see NIFI-3278)
@Test @Test
public void endsWithCRWithBufferLengthEqualStringLengthA() { public void endsWithCRWithBufferLengthEqualStringLengthA() throws Exception {
String str = "\r"; String str = "\r";
InputStream is = stringToIs(str); InputStream is = stringToIs(str);
TextLineDemarcator demarcator = new TextLineDemarcator(is, str.length()); TextLineDemarcator demarcator = new TextLineDemarcator(is, str.length());
@ -72,7 +73,7 @@ public class TextLineDemarcatorTest {
} }
@Test @Test
public void endsWithCRWithBufferLengthEqualStringLengthB() { public void endsWithCRWithBufferLengthEqualStringLengthB() throws Exception {
String str = "abc\r"; String str = "abc\r";
InputStream is = stringToIs(str); InputStream is = stringToIs(str);
TextLineDemarcator demarcator = new TextLineDemarcator(is, str.length()); TextLineDemarcator demarcator = new TextLineDemarcator(is, str.length());
@ -81,7 +82,7 @@ public class TextLineDemarcatorTest {
} }
@Test @Test
public void singleCR() { public void singleCR() throws IOException {
InputStream is = stringToIs("\r"); InputStream is = stringToIs("\r");
TextLineDemarcator demarcator = new TextLineDemarcator(is); TextLineDemarcator demarcator = new TextLineDemarcator(is);
OffsetInfo offsetInfo = demarcator.nextOffsetInfo(); OffsetInfo offsetInfo = demarcator.nextOffsetInfo();
@ -92,7 +93,7 @@ public class TextLineDemarcatorTest {
} }
@Test @Test
public void singleLF() { public void singleLF() throws IOException {
InputStream is = stringToIs("\n"); InputStream is = stringToIs("\n");
TextLineDemarcator demarcator = new TextLineDemarcator(is); TextLineDemarcator demarcator = new TextLineDemarcator(is);
OffsetInfo offsetInfo = demarcator.nextOffsetInfo(); OffsetInfo offsetInfo = demarcator.nextOffsetInfo();
@ -121,7 +122,7 @@ public class TextLineDemarcatorTest {
} }
@Test @Test
public void validateNiFi_3495() { public void validateNiFi_3495() throws IOException {
String str = "he\ra-to-a\rb-to-b\rc-to-c\r\nd-to-d"; String str = "he\ra-to-a\rb-to-b\rc-to-c\r\nd-to-d";
InputStream is = stringToIs(str); InputStream is = stringToIs(str);
TextLineDemarcator demarcator = new TextLineDemarcator(is, 10); TextLineDemarcator demarcator = new TextLineDemarcator(is, 10);
@ -313,6 +314,30 @@ public class TextLineDemarcatorTest {
assertEquals(0, second.getCrlfLength()); assertEquals(0, second.getCrlfLength());
} }
@Test
public void validateStartsWithLongerThanLastToken() throws IOException {
final byte[] inputData = "This is going to be a spectacular test\nThis is".getBytes(StandardCharsets.UTF_8);
final byte[] startsWith = "This is going to be".getBytes(StandardCharsets.UTF_8);
try (final InputStream is = new ByteArrayInputStream(inputData);
final TextLineDemarcator demarcator = new TextLineDemarcator(is)) {
final OffsetInfo first = demarcator.nextOffsetInfo(startsWith);
assertNotNull(first);
assertEquals(0, first.getStartOffset());
assertEquals(39, first.getLength());
assertEquals(1, first.getCrlfLength());
assertTrue(first.isStartsWithMatch());
final OffsetInfo second = demarcator.nextOffsetInfo(startsWith);
assertNotNull(second);
assertEquals(39, second.getStartOffset());
assertEquals(7, second.getLength());
assertEquals(0, second.getCrlfLength());
assertFalse(second.isStartsWithMatch());
}
}
private InputStream stringToIs(String data) { private InputStream stringToIs(String data) {
return new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); return new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
} }

View File

@ -377,7 +377,8 @@ public class SplitText extends AbstractProcessor {
* will vary but the length of the split will never be > {@link #maxSplitSize} and {@link IllegalStateException} will be thrown. * will vary but the length of the split will never be > {@link #maxSplitSize} and {@link IllegalStateException} will be thrown.
* This method also allows one to provide 'startsWithFilter' to allow headers to be determined via such filter (see {@link #HEADER_MARKER}. * This method also allows one to provide 'startsWithFilter' to allow headers to be determined via such filter (see {@link #HEADER_MARKER}.
*/ */
private SplitInfo computeHeader(TextLineDemarcator demarcator, long startOffset, long splitMaxLineCount, byte[] startsWithFilter, SplitInfo previousSplitInfo) { private SplitInfo computeHeader(TextLineDemarcator demarcator, long startOffset, long splitMaxLineCount,
byte[] startsWithFilter, SplitInfo previousSplitInfo) throws IOException {
long length = 0; long length = 0;
long actualLineCount = 0; long actualLineCount = 0;
OffsetInfo offsetInfo = null; OffsetInfo offsetInfo = null;
@ -419,7 +420,8 @@ public class SplitText extends AbstractProcessor {
* If split size is controlled by the {@link #maxSplitSize}, then the resulting {@link SplitInfo} * If split size is controlled by the {@link #maxSplitSize}, then the resulting {@link SplitInfo}
* line count will vary but the length of the split will never be > {@link #maxSplitSize}. * line count will vary but the length of the split will never be > {@link #maxSplitSize}.
*/ */
private SplitInfo nextSplit(TextLineDemarcator demarcator, long startOffset, long splitMaxLineCount, SplitInfo remainderSplitInfo, long startingLength) { private SplitInfo nextSplit(TextLineDemarcator demarcator, long startOffset, long splitMaxLineCount,
SplitInfo remainderSplitInfo, long startingLength) throws IOException {
long length = 0; long length = 0;
long trailingCrlfLength = 0; long trailingCrlfLength = 0;
long actualLineCount = 0; long actualLineCount = 0;