NIFI-3495 fixed the index issue with TextLineDemarcator

This closes #1518.
This commit is contained in:
Oleg Zhurakousky 2017-02-16 21:05:59 -05:00 committed by Mark Payne
parent 095c04eda0
commit ec868362f3
2 changed files with 57 additions and 7 deletions

View File

@ -110,7 +110,7 @@ public class TextLineDemarcator {
for (i = this.index; i < this.bufferLength; i++) {
byteVal = this.buffer[i];
lineLength++;
int crlfLength = isEol(byteVal, i);
int crlfLength = computeEol(byteVal, i + 1);
if (crlfLength > 0) {
i += crlfLength;
if (crlfLength == 2) {
@ -120,7 +120,6 @@ public class TextLineDemarcator {
if (startsWith != null) {
token = this.extractDataToken(lineLength);
}
this.index = i;
this.mark = this.index;
break lineLoop;
}
@ -150,21 +149,40 @@ public class TextLineDemarcator {
return offsetInfo;
}
private int isEol(byte currentByte, int currentIndex) {
/**
* Determines if the line terminates. Returns int specifying the length of
* the CRLF (i.e., only CR or LF or CR and LF) and therefore can only have
* values of:
* 0 - not the end of the line
* 1 - the end of the line either via CR or LF
* 2 - the end of the line with both CR and LF
*
* It performs the read ahead on the buffer if need to.
*/
private int computeEol(byte currentByte, int providedIndex) {
int actualIndex = providedIndex - 1;
boolean readAhead = false;
int crlfLength = 0;
if (currentByte == '\n') {
crlfLength = 1;
} else if (currentByte == '\r') {
if ((currentIndex + 1) >= this.bufferLength) {
this.index = currentIndex + 1;
if (providedIndex >= this.bufferLength) {
this.index = this.bufferLength;
this.fill();
providedIndex = this.index;
readAhead = true;
}
crlfLength = 1;
if (currentIndex < this.buffer.length - 1) {
currentByte = this.buffer[currentIndex + 1];
if (providedIndex < this.buffer.length - 1) {
currentByte = this.buffer[providedIndex];
crlfLength = currentByte == '\n' ? 2 : 1;
}
}
if (crlfLength > 0) {
this.index = readAhead ? this.index + (crlfLength - 1) : (actualIndex + crlfLength);
}
return crlfLength;
}

View File

@ -120,6 +120,38 @@ public class TextLineDemarcatorTest {
assertTrue(offsetInfo.isStartsWithMatch());
}
@Test
public void validateNiFi_3495() {
String str = "he\ra-to-a\rb-to-b\rc-to-c\r\nd-to-d";
InputStream is = stringToIs(str);
TextLineDemarcator demarcator = new TextLineDemarcator(is, 10);
OffsetInfo info = demarcator.nextOffsetInfo();
assertEquals(0, info.getStartOffset());
assertEquals(3, info.getLength());
assertEquals(1, info.getCrlfLength());
info = demarcator.nextOffsetInfo();
assertEquals(3, info.getStartOffset());
assertEquals(7, info.getLength());
assertEquals(1, info.getCrlfLength());
info = demarcator.nextOffsetInfo();
assertEquals(10, info.getStartOffset());
assertEquals(7, info.getLength());
assertEquals(1, info.getCrlfLength());
info = demarcator.nextOffsetInfo();
assertEquals(17, info.getStartOffset());
assertEquals(8, info.getLength());
assertEquals(2, info.getCrlfLength());
info = demarcator.nextOffsetInfo();
assertEquals(25, info.getStartOffset());
assertEquals(6, info.getLength());
assertEquals(0, info.getCrlfLength());
}
@Test
public void mixedCRLF() throws Exception {
InputStream is = stringToIs("oleg\rjoe\njack\r\nstacymike\r\n");