From 16ca767f0fbaf2bf6d3f803f24d92f0212d744f0 Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Thu, 23 Aug 2012 16:59:26 +0000 Subject: [PATCH] svn merge -c 1376592 FIXES: HADOOP-8655. Fix TextInputFormat for large deliminators. (Gelesh via bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1376594 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../org/apache/hadoop/util/LineReader.java | 82 ++++++++++-- .../apache/hadoop/util/TestLineReader.java | 124 +++++++++++++++--- 3 files changed, 179 insertions(+), 30 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index d8f17a63b13..7123ec62202 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -648,6 +648,9 @@ Release 2.0.0-alpha - 05-23-2012 HADOOP-7868. Hadoop native fails to compile when default linker option is -Wl,--as-needed. (Trevor Robinson via eli) + HADOOP-8655. Fix TextInputFormat for large deliminators. (Gelesh via + bobby) + Release 0.23.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java index 254fc68b764..1681d6dfe70 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java @@ -204,11 +204,13 @@ public class LineReader { int startPosn = bufferPosn; //starting from where we left off the last time if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; - if (prevCharCR) + if (prevCharCR) { ++bytesConsumed; //account for CR from previous read + } bufferLength = in.read(buffer); - if (bufferLength <= 0) + if (bufferLength <= 0) { break; // EOF + } } for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline if (buffer[bufferPosn] == LF) { @@ -223,8 +225,9 @@ public class LineReader { prevCharCR = (buffer[bufferPosn] == CR); } int readLength = bufferPosn - startPosn; - if (prevCharCR && newlineLength == 0) + if (prevCharCR && newlineLength == 0) { --readLength; //CR at the end of the buffer + } bytesConsumed += readLength; int appendLength = readLength - newlineLength; if (appendLength > maxLineLength - txtLength) { @@ -236,8 +239,9 @@ public class LineReader { } } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); - if (bytesConsumed > (long)Integer.MAX_VALUE) - throw new IOException("Too many bytes before newline: " + bytesConsumed); + if (bytesConsumed > (long)Integer.MAX_VALUE) { + throw new IOException("Too many bytes before newline: " + bytesConsumed); + } return (int)bytesConsumed; } @@ -246,18 +250,56 @@ public class LineReader { */ private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { + /* We're reading data from inputStream, but the head of the stream may be + * already captured in the previous buffer, so we have several cases: + * + * 1. The buffer tail does not contain any character sequence which + * matches with the head of delimiter. We count it as a + * ambiguous byte count = 0 + * + * 2. The buffer tail contains a X number of characters, + * that forms a sequence, which matches with the + * head of delimiter. We count ambiguous byte count = X + * + * // *** eg: A segment of input file is as follows + * + * " record 1792: I found this bug very interesting and + * I have completely read about it. record 1793: This bug + * can be solved easily record 1794: This ." + * + * delimiter = "record"; + * + * supposing:- String at the end of buffer = + * "I found this bug very interesting and I have completely re" + * There for next buffer = "ad about it. record 179 ...." + * + * The matching characters in the input + * buffer tail and delimiter head = "re" + * Therefore, ambiguous byte count = 2 **** // + * + * 2.1 If the following bytes are the remaining characters of + * the delimiter, then we have to capture only up to the starting + * position of delimiter. That means, we need not include the + * ambiguous characters in str. + * + * 2.2 If the following bytes are not the remaining characters of + * the delimiter ( as mentioned in the example ), + * then we have to include the ambiguous characters in str. + */ str.clear(); int txtLength = 0; // tracks str.getLength(), as an optimization long bytesConsumed = 0; int delPosn = 0; + int ambiguousByteCount=0; // To capture the ambiguous characters count do { - int startPosn = bufferPosn; // starting from where we left off the last - // time + int startPosn = bufferPosn; // Start from previous end position if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; bufferLength = in.read(buffer); - if (bufferLength <= 0) + if (bufferLength <= 0) { + str.append(recordDelimiterBytes, 0, ambiguousByteCount); break; // EOF + } } for (; bufferPosn < bufferLength; ++bufferPosn) { if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) { @@ -267,7 +309,7 @@ public class LineReader { break; } } else if (delPosn != 0) { - bufferPosn--; // recheck if bufferPosn matches start of delimiter + bufferPosn--; delPosn = 0; } } @@ -278,14 +320,27 @@ public class LineReader { appendLength = maxLineLength - txtLength; } if (appendLength > 0) { + if (ambiguousByteCount > 0) { + str.append(recordDelimiterBytes, 0, ambiguousByteCount); + //appending the ambiguous characters (refer case 2.2) + bytesConsumed += ambiguousByteCount; + ambiguousByteCount=0; + } str.append(buffer, startPosn, appendLength); txtLength += appendLength; } - } while (delPosn < recordDelimiterBytes.length + if (bufferPosn >= bufferLength) { + if (delPosn > 0 && delPosn < recordDelimiterBytes.length) { + ambiguousByteCount = delPosn; + bytesConsumed -= ambiguousByteCount; //to be consumed in next + } + } + } while (delPosn < recordDelimiterBytes.length && bytesConsumed < maxBytesToConsume); - if (bytesConsumed > (long) Integer.MAX_VALUE) + if (bytesConsumed > (long) Integer.MAX_VALUE) { throw new IOException("Too many bytes before delimiter: " + bytesConsumed); - return (int) bytesConsumed; + } + return (int) bytesConsumed; } /** @@ -297,7 +352,7 @@ public class LineReader { */ public int readLine(Text str, int maxLineLength) throws IOException { return readLine(str, maxLineLength, Integer.MAX_VALUE); -} + } /** * Read from the InputStream into the given Text. @@ -308,5 +363,4 @@ public class LineReader { public int readLine(Text str) throws IOException { return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); } - } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLineReader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLineReader.java index dc98357ef28..cbb06eabf05 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLineReader.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLineReader.java @@ -21,29 +21,121 @@ package org.apache.hadoop.util; import java.io.ByteArrayInputStream; import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.LineReader; import org.junit.Test; import junit.framework.Assert; public class TestLineReader { + private LineReader lineReader; + private String TestData; + private String Delimiter; + private Text line; @Test public void testCustomDelimiter() throws Exception { - String data = "record Bangalorrecord recorrecordrecord Kerala"; - String delimiter = "record"; - LineReader reader = new LineReader( - new ByteArrayInputStream(data.getBytes()), - delimiter.getBytes()); - Text line = new Text(); - reader.readLine(line); - Assert.assertEquals("", line.toString()); - reader.readLine(line); - Assert.assertEquals(" Bangalor", line.toString()); - reader.readLine(line); - Assert.assertEquals(" recor", line.toString()); - reader.readLine(line); - Assert.assertEquals("", line.toString()); - reader.readLine(line); - Assert.assertEquals(" Kerala", line.toString()); + /* TEST_1 + * The test scenario is the tail of the buffer + * equals the starting character/s of delimiter + * + * The Test Data is such that, + * + * 1) we will have "" as delimiter + * + * 2) The tail of the current buffer would be "" + * which does NOT match with the remaining characters of delimiter. + * + * 4) Input data would be prefixed by char 'a' + * about numberOfCharToFillTheBuffer times. + * So that, one iteration to buffer the input data, + * would end at '" from next token + */ + + Delimiter=""; + + String CurrentBufferTailToken= + "GeleshOmathil"; + // Supposing the start of next buffer is this + + String Expected = + (CurrentBufferTailToken+NextBufferHeadToken) + .replace(Delimiter, ""); + // Expected ,must capture from both the buffer, excluding Delimiter + + String TestPartOfInput = CurrentBufferTailToken+NextBufferHeadToken; + + int BufferSize=64 * 1024; + int numberOfCharToFillTheBuffer=BufferSize-CurrentBufferTailToken.length(); + StringBuilder fillerString=new StringBuilder(); + for (int i=0;i