From 58d1a02b8d66b1d2a6ac2158be32bd35ad2e69bd Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 17 Sep 2015 14:30:18 +0000 Subject: [PATCH] MAPREDUCE-6481. LineRecordReader may give incomplete record and wrong position/key information for uncompressed input sometimes. Contributed by Zhihai Xu --- .../org/apache/hadoop/util/LineReader.java | 17 +- hadoop-mapreduce-project/CHANGES.txt | 4 + .../input/UncompressedSplitLineReader.java | 31 +--- .../hadoop/mapred/TestLineRecordReader.java | 138 +++++++++++++++ .../lib/input/TestLineRecordReader.java | 161 ++++++++++++++++++ 5 files changed, 316 insertions(+), 35 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java index 1d1b5693b95..900215af4a9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java @@ -303,7 +303,10 @@ public class LineReader implements Closeable { startPosn = bufferPosn = 0; bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0); if (bufferLength <= 0) { - str.append(recordDelimiterBytes, 0, ambiguousByteCount); + if (ambiguousByteCount > 0) { + str.append(recordDelimiterBytes, 0, ambiguousByteCount); + bytesConsumed += ambiguousByteCount; + } break; // EOF } } @@ -325,13 +328,13 @@ public class LineReader implements Closeable { if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } + bytesConsumed += ambiguousByteCount; + if (appendLength >= 0 && ambiguousByteCount > 0) { + //appending the ambiguous characters (refer case 2.2) + str.append(recordDelimiterBytes, 0, ambiguousByteCount); + ambiguousByteCount = 0; + } if (appendLength > 0) { - if (ambiguousByteCount > 0) { - str.append(recordDelimiterBytes, 0, ambiguousByteCount); - //appending the ambiguous characters (refer case 2.2) - bytesConsumed += ambiguousByteCount; - ambiguousByteCount=0; - } str.append(buffer, startPosn, appendLength); txtLength += appendLength; } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 669fee50961..cde6d929c6b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -566,6 +566,10 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6452. NPE when intermediate encrypt enabled for LocalRunner. (Zhihai Xu) + MAPREDUCE-6481. LineRecordReader may give incomplete record and wrong + position/key information for uncompressed input sometimes. (Zhihai Xu via + jlowe) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java index 52fb7b0fc5f..38491b0b0b2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java @@ -40,8 +40,6 @@ public class UncompressedSplitLineReader extends SplitLineReader { private long totalBytesRead = 0; private boolean finished = false; private boolean usingCRLF; - private int unusedBytes = 0; - private int lastBytesRead = 0; public UncompressedSplitLineReader(FSDataInputStream in, Configuration conf, byte[] recordDelimiterBytes, long splitLength) throws IOException { @@ -59,7 +57,6 @@ public class UncompressedSplitLineReader extends SplitLineReader { (int)(splitLength - totalBytesRead)); } int bytesRead = in.read(buffer, 0, maxBytesToRead); - lastBytesRead = bytesRead; // If the split ended in the middle of a record delimiter then we need // to read one additional record, as the consumer of the next split will @@ -83,39 +80,17 @@ public class UncompressedSplitLineReader extends SplitLineReader { @Override public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { - long bytesRead = 0; + int bytesRead = 0; if (!finished) { // only allow at most one more record to be read after the stream // reports the split ended if (totalBytesRead > splitLength) { finished = true; } - bytesRead = totalBytesRead; - int bytesConsumed = super.readLine(str, maxLineLength, maxBytesToConsume); - bytesRead = totalBytesRead - bytesRead; - // No records left. - if (bytesConsumed == 0 && bytesRead == 0) { - return 0; - } - - int bufferSize = getBufferSize(); - - // Add the remaining buffer size not used for the last call - // of fillBuffer method. - if (lastBytesRead <= 0) { - bytesRead += bufferSize; - } else if (bytesRead > 0) { - bytesRead += bufferSize - lastBytesRead; - } - - // Adjust the size of the buffer not used for this record. - // The size is carried over for the next calculation. - bytesRead += unusedBytes; - unusedBytes = bufferSize - getBufferPosn(); - bytesRead -= unusedBytes; + bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume); } - return (int) bytesRead; + return bytesRead; } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java index a5c99334f26..d33a614207c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapred; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -357,4 +358,141 @@ public class TestLineRecordReader { } } } + + @Test + public void testUncompressedInputCustomDelimiterPosValue() + throws Exception { + Configuration conf = new Configuration(); + String inputData = "1234567890ab12ab345"; + Path inputFile = createInputFile(conf, inputData); + conf.setInt("io.file.buffer.size", 10); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + String delimiter = "ab"; + byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + LineRecordReader reader = new LineRecordReader(conf, split, + recordDelimiterBytes); + LongWritable key = new LongWritable(); + Text value = new Text(); + reader.next(key, value); + // Get first record:"1234567890" + assertEquals(10, value.getLength()); + // Position should be 12 right after "1234567890ab" + assertEquals(12, reader.getPos()); + reader.next(key, value); + // Get second record:"12" + assertEquals(2, value.getLength()); + // Position should be 16 right after "1234567890ab12ab" + assertEquals(16, reader.getPos()); + reader.next(key, value); + // Get third record:"345" + assertEquals(3, value.getLength()); + // Position should be 19 right after "1234567890ab12ab345" + assertEquals(19, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(19, reader.getPos()); + + split = new FileSplit(inputFile, 15, 4, (String[])null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + // No record is in the second split because the second split dropped + // the first record, which was already reported by the first split. + // The position should be 19 right after "1234567890ab12ab345" + assertEquals(19, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(19, reader.getPos()); + + inputData = "123456789aab"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 12, (String[])null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + reader.next(key, value); + // Get first record:"123456789a" + assertEquals(10, value.getLength()); + // Position should be 12 right after "123456789aab" + assertEquals(12, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(12, reader.getPos()); + + inputData = "123456789a"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 10, (String[])null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + reader.next(key, value); + // Get first record:"123456789a" + assertEquals(10, value.getLength()); + // Position should be 10 right after "123456789a" + assertEquals(10, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(10, reader.getPos()); + + inputData = "123456789ab"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 11, (String[])null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + reader.next(key, value); + // Get first record:"123456789" + assertEquals(9, value.getLength()); + // Position should be 11 right after "123456789ab" + assertEquals(11, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(11, reader.getPos()); + } + + @Test + public void testUncompressedInputDefaultDelimiterPosValue() + throws Exception { + Configuration conf = new Configuration(); + String inputData = "1234567890\r\n12\r\n345"; + Path inputFile = createInputFile(conf, inputData); + conf.setInt("io.file.buffer.size", 10); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + LineRecordReader reader = new LineRecordReader(conf, split, + null); + LongWritable key = new LongWritable(); + Text value = new Text(); + reader.next(key, value); + // Get first record:"1234567890" + assertEquals(10, value.getLength()); + // Position should be 12 right after "1234567890\r\n" + assertEquals(12, reader.getPos()); + reader.next(key, value); + // Get second record:"12" + assertEquals(2, value.getLength()); + // Position should be 16 right after "1234567890\r\n12\r\n" + assertEquals(16, reader.getPos()); + assertFalse(reader.next(key, value)); + + split = new FileSplit(inputFile, 15, 4, (String[])null); + reader = new LineRecordReader(conf, split, null); + // The second split dropped the first record "\n" + // The position should be 16 right after "1234567890\r\n12\r\n" + assertEquals(16, reader.getPos()); + reader.next(key, value); + // Get third record:"345" + assertEquals(3, value.getLength()); + // Position should be 19 right after "1234567890\r\n12\r\n345" + assertEquals(19, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(19, reader.getPos()); + + inputData = "123456789\r\r\n"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 12, (String[])null); + reader = new LineRecordReader(conf, split, null); + reader.next(key, value); + // Get first record:"123456789" + assertEquals(9, value.getLength()); + // Position should be 10 right after "123456789\r" + assertEquals(10, reader.getPos()); + reader.next(key, value); + // Get second record:"" + assertEquals(0, value.getLength()); + // Position should be 12 right after "123456789\r\r\n" + assertEquals(12, reader.getPos()); + assertFalse(reader.next(key, value)); + assertEquals(12, reader.getPos()); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java index 3c1f28f6f80..dfe8b5d0d81 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.lib.input; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -37,6 +38,8 @@ import org.apache.commons.io.Charsets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.Decompressor; @@ -341,4 +344,162 @@ public class TestLineRecordReader { } } } + + @Test + public void testUncompressedInputCustomDelimiterPosValue() + throws Exception { + Configuration conf = new Configuration(); + String inputData = "1234567890ab12ab345"; + Path inputFile = createInputFile(conf, inputData); + conf.setInt("io.file.buffer.size", 10); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + String delimiter = "ab"; + byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, + new TaskAttemptID()); + LineRecordReader reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + LongWritable key; + Text value; + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"1234567890" + assertEquals(10, value.getLength()); + assertEquals(0, key.get()); + reader.nextKeyValue(); + // Get second record:"12" + assertEquals(2, value.getLength()); + // Key should be 12 right after "1234567890ab" + assertEquals(12, key.get()); + reader.nextKeyValue(); + // Get third record:"345" + assertEquals(3, value.getLength()); + // Key should be 16 right after "1234567890ab12ab" + assertEquals(16, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 19 right after "1234567890ab12ab345" + assertEquals(19, key.get()); + + split = new FileSplit(inputFile, 15, 4, (String[])null); + reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + // No record is in the second split because the second split dropped + // the first record, which was already reported by the first split. + assertFalse(reader.nextKeyValue()); + + inputData = "123456789aab"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 12, (String[])null); + reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"123456789a" + assertEquals(10, value.getLength()); + assertEquals(0, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 12 right after "123456789aab" + assertEquals(12, key.get()); + + inputData = "123456789a"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 10, (String[])null); + reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"123456789a" + assertEquals(10, value.getLength()); + assertEquals(0, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 10 right after "123456789a" + assertEquals(10, key.get()); + + inputData = "123456789ab"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 11, (String[])null); + reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"123456789" + assertEquals(9, value.getLength()); + assertEquals(0, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 11 right after "123456789ab" + assertEquals(11, key.get()); + } + + @Test + public void testUncompressedInputDefaultDelimiterPosValue() + throws Exception { + Configuration conf = new Configuration(); + String inputData = "1234567890\r\n12\r\n345"; + Path inputFile = createInputFile(conf, inputData); + conf.setInt("io.file.buffer.size", 10); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, + new TaskAttemptID()); + LineRecordReader reader = new LineRecordReader(null); + reader.initialize(split, context); + LongWritable key; + Text value; + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"1234567890" + assertEquals(10, value.getLength()); + assertEquals(0, key.get()); + reader.nextKeyValue(); + // Get second record:"12" + assertEquals(2, value.getLength()); + // Key should be 12 right after "1234567890\r\n" + assertEquals(12, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 16 right after "1234567890\r\n12\r\n" + assertEquals(16, key.get()); + + split = new FileSplit(inputFile, 15, 4, (String[])null); + reader = new LineRecordReader(null); + reader.initialize(split, context); + // The second split dropped the first record "\n" + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get third record:"345" + assertEquals(3, value.getLength()); + // Key should be 16 right after "1234567890\r\n12\r\n" + assertEquals(16, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 19 right after "1234567890\r\n12\r\n345" + assertEquals(19, key.get()); + + inputData = "123456789\r\r\n"; + inputFile = createInputFile(conf, inputData); + split = new FileSplit(inputFile, 0, 12, (String[])null); + reader = new LineRecordReader(null); + reader.initialize(split, context); + reader.nextKeyValue(); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + // Get first record:"123456789" + assertEquals(9, value.getLength()); + assertEquals(0, key.get()); + reader.nextKeyValue(); + // Get second record:"" + assertEquals(0, value.getLength()); + // Key should be 10 right after "123456789\r" + assertEquals(10, key.get()); + assertFalse(reader.nextKeyValue()); + // Key should be 12 right after "123456789\r\r\n" + assertEquals(12, key.get()); + } }