From e307078e78058ed8afb756b413539f07faf0d475 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 23 Nov 2015 21:07:39 +0000 Subject: [PATCH] MAPREDUCE-5948. org.apache.hadoop.mapred.LineRecordReader does not handle multibyte record delimiters well. Contributed by Vinayakumar B, Rushabh Shah, and Akira AJISAKA (cherry picked from commit 077250d8d7b4b757543a39a6ce8bb6e3be356c6f) Conflicts: hadoop-mapreduce-project/CHANGES.txt hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java --- .../org/apache/hadoop/util/LineReader.java | 8 ++ hadoop-mapreduce-project/CHANGES.txt | 4 + .../hadoop/mapred/LineRecordReader.java | 4 +- .../mapreduce/lib/input/LineRecordReader.java | 3 +- .../input/UncompressedSplitLineReader.java | 125 ++++++++++++++++++ .../hadoop/mapred/TestLineRecordReader.java | 77 ++++++++++- .../lib/input/TestLineRecordReader.java | 79 ++++++++++- 7 files changed, 286 insertions(+), 14 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java index 3188cb5fe95..1d1b5693b95 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java @@ -369,4 +369,12 @@ public class LineReader implements Closeable { public int readLine(Text str) throws IOException { return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); } + + protected int getBufferPosn() { + return bufferPosn; + } + + protected int getBufferSize() { + return bufferSize; + } } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index b80039ed2cd..49d26e47a4f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -24,6 +24,10 @@ Release 2.6.3 - UNRELEASED MAPREDUCE-6377. JHS sorting on state column not working in webUi. (zhihai xu via devaraj) + MAPREDUCE-5948. org.apache.hadoop.mapred.LineRecordReader does not handle + multibyte record delimiters well (Vinayakumar B, Rushabh Shah, and Akira + AJISAKA via jlowe) + Release 2.6.2 - 2015-10-28 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java index 6b5c26ee47b..d9ee26fafd1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java @@ -38,6 +38,7 @@ import org.apache.hadoop.io.compress.SplitCompressionInputStream; import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader; import org.apache.hadoop.mapreduce.lib.input.SplitLineReader; +import org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; @@ -124,7 +125,8 @@ public class LineRecordReader implements RecordReader { } } else { fileIn.seek(start); - in = new SplitLineReader(fileIn, job, recordDelimiter); + in = new UncompressedSplitLineReader( + fileIn, job, recordDelimiter, split.getLength()); filePosition = fileIn; } // If this is not the first split, we always throw away first record diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java index 880a1a21949..fa86823618a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java @@ -105,7 +105,8 @@ public class LineRecordReader extends RecordReader { } } else { fileIn.seek(start); - in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes); + in = new UncompressedSplitLineReader( + fileIn, job, this.recordDelimiterBytes, split.getLength()); filePosition = fileIn; } // If this is not the first split, we always throw away first record diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java new file mode 100644 index 00000000000..52fb7b0fc5f --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.io.Text; + +/** + * SplitLineReader for uncompressed files. + * This class can split the file correctly even if the delimiter is multi-bytes. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class UncompressedSplitLineReader extends SplitLineReader { + private boolean needAdditionalRecord = false; + private long splitLength; + /** Total bytes read from the input stream. */ + private long totalBytesRead = 0; + private boolean finished = false; + private boolean usingCRLF; + private int unusedBytes = 0; + private int lastBytesRead = 0; + + public UncompressedSplitLineReader(FSDataInputStream in, Configuration conf, + byte[] recordDelimiterBytes, long splitLength) throws IOException { + super(in, conf, recordDelimiterBytes); + this.splitLength = splitLength; + usingCRLF = (recordDelimiterBytes == null); + } + + @Override + protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) + throws IOException { + int maxBytesToRead = buffer.length; + if (totalBytesRead < splitLength) { + maxBytesToRead = Math.min(maxBytesToRead, + (int)(splitLength - totalBytesRead)); + } + int bytesRead = in.read(buffer, 0, maxBytesToRead); + lastBytesRead = bytesRead; + + // If the split ended in the middle of a record delimiter then we need + // to read one additional record, as the consumer of the next split will + // not recognize the partial delimiter as a record. + // However if using the default delimiter and the next character is a + // linefeed then next split will treat it as a delimiter all by itself + // and the additional record read should not be performed. + if (totalBytesRead == splitLength && inDelimiter && bytesRead > 0) { + if (usingCRLF) { + needAdditionalRecord = (buffer[0] != '\n'); + } else { + needAdditionalRecord = true; + } + } + if (bytesRead > 0) { + totalBytesRead += bytesRead; + } + return bytesRead; + } + + @Override + public int readLine(Text str, int maxLineLength, int maxBytesToConsume) + throws IOException { + long bytesRead = 0; + if (!finished) { + // only allow at most one more record to be read after the stream + // reports the split ended + if (totalBytesRead > splitLength) { + finished = true; + } + bytesRead = totalBytesRead; + int bytesConsumed = super.readLine(str, maxLineLength, maxBytesToConsume); + bytesRead = totalBytesRead - bytesRead; + + // No records left. + if (bytesConsumed == 0 && bytesRead == 0) { + return 0; + } + + int bufferSize = getBufferSize(); + + // Add the remaining buffer size not used for the last call + // of fillBuffer method. + if (lastBytesRead <= 0) { + bytesRead += bufferSize; + } else if (bytesRead > 0) { + bytesRead += bufferSize - lastBytesRead; + } + + // Adjust the size of the buffer not used for this record. + // The size is carried over for the next calculation. + bytesRead += unusedBytes; + unusedBytes = bufferSize - getBufferPosn(); + bytesRead -= unusedBytes; + } + return (int) bytesRead; + } + + @Override + public boolean needAdditionalRecordAfterSplit() { + return !finished && needAdditionalRecord; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java index 7b664e93ace..768aa051248 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java @@ -25,17 +25,24 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; import java.net.URL; import java.util.ArrayList; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.io.Charsets; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.junit.Test; public class TestLineRecordReader { + private static Path workDir = new Path(new Path(System.getProperty( + "test.build.data", "target"), "data"), "TestTextInputFormat"); + private static Path inputDir = new Path(workDir, "input"); private void testSplitRecords(String testFileName, long firstSplitLength) throws IOException { @@ -45,15 +52,27 @@ public class TestLineRecordReader { long testFileSize = testFile.length(); Path testFilePath = new Path(testFile.getAbsolutePath()); Configuration conf = new Configuration(); + testSplitRecordsForFile(conf, firstSplitLength, testFileSize, testFilePath); + } + + private void testSplitRecordsForFile(Configuration conf, + long firstSplitLength, long testFileSize, Path testFilePath) + throws IOException { conf.setInt(org.apache.hadoop.mapreduce.lib.input. LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); - assertTrue("unexpected test data at " + testFile, + assertTrue("unexpected test data at " + testFilePath, testFileSize > firstSplitLength); + String delimiter = conf.get("textinputformat.record.delimiter"); + byte[] recordDelimiterBytes = null; + if (null != delimiter) { + recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + } // read the data without splitting to count the records FileSplit split = new FileSplit(testFilePath, 0, testFileSize, (String[])null); - LineRecordReader reader = new LineRecordReader(conf, split); + LineRecordReader reader = new LineRecordReader(conf, split, + recordDelimiterBytes); LongWritable key = new LongWritable(); Text value = new Text(); int numRecordsNoSplits = 0; @@ -64,7 +83,7 @@ public class TestLineRecordReader { // count the records in the first split split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null); - reader = new LineRecordReader(conf, split); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); int numRecordsFirstSplit = 0; while (reader.next(key, value)) { ++numRecordsFirstSplit; @@ -74,14 +93,14 @@ public class TestLineRecordReader { // count the records in the second split split = new FileSplit(testFilePath, firstSplitLength, testFileSize - firstSplitLength, (String[])null); - reader = new LineRecordReader(conf, split); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); int numRecordsRemainingSplits = 0; while (reader.next(key, value)) { ++numRecordsRemainingSplits; } reader.close(); - assertEquals("Unexpected number of records in bzip2 compressed split", + assertEquals("Unexpected number of records in split", numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits); } @@ -225,4 +244,52 @@ public class TestLineRecordReader { assertTrue("BOM is not skipped", skipBOM); } + + /** + * Writes the input test file + * + * @param conf + * @return Path of the file created + * @throws IOException + */ + private Path createInputFile(Configuration conf, String data) + throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + Path file = new Path(inputDir, "test.txt"); + Writer writer = new OutputStreamWriter(localFs.create(file)); + try { + writer.write(data); + } finally { + writer.close(); + } + return file; + } + + @Test + public void testUncompressedInput() throws Exception { + Configuration conf = new Configuration(); + String inputData = "abc+++def+++ghi+++" + + "jkl+++mno+++pqr+++stu+++vw +++xyz"; + Path inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+++"); + for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for(int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + } + + @Test + public void testUncompressedInputContainingCRLF() throws Exception { + Configuration conf = new Configuration(); + String inputData = "a\r\nb\rc\nd\r\n"; + Path inputFile = createInputFile(conf, inputData); + for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for(int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java index a1b5147c0c6..70908a6baca 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java @@ -25,11 +25,15 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; import java.net.URL; import java.util.ArrayList; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.io.Charsets; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -37,6 +41,9 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.junit.Test; public class TestLineRecordReader { + private static Path workDir = new Path(new Path(System.getProperty( + "test.build.data", "target"), "data"), "TestTextInputFormat"); + private static Path inputDir = new Path(workDir, "input"); private void testSplitRecords(String testFileName, long firstSplitLength) throws IOException { @@ -46,17 +53,28 @@ public class TestLineRecordReader { long testFileSize = testFile.length(); Path testFilePath = new Path(testFile.getAbsolutePath()); Configuration conf = new Configuration(); + testSplitRecordsForFile(conf, firstSplitLength, testFileSize, testFilePath); + } + + private void testSplitRecordsForFile(Configuration conf, + long firstSplitLength, long testFileSize, Path testFilePath) + throws IOException { conf.setInt(org.apache.hadoop.mapreduce.lib.input. LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); - assertTrue("unexpected test data at " + testFile, + assertTrue("unexpected test data at " + testFilePath, testFileSize > firstSplitLength); + String delimiter = conf.get("textinputformat.record.delimiter"); + byte[] recordDelimiterBytes = null; + if (null != delimiter) { + recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + } TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); // read the data without splitting to count the records FileSplit split = new FileSplit(testFilePath, 0, testFileSize, (String[])null); - LineRecordReader reader = new LineRecordReader(); + LineRecordReader reader = new LineRecordReader(recordDelimiterBytes); reader.initialize(split, context); int numRecordsNoSplits = 0; while (reader.nextKeyValue()) { @@ -66,7 +84,7 @@ public class TestLineRecordReader { // count the records in the first split split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null); - reader = new LineRecordReader(); + reader = new LineRecordReader(recordDelimiterBytes); reader.initialize(split, context); int numRecordsFirstSplit = 0; while (reader.nextKeyValue()) { @@ -77,16 +95,15 @@ public class TestLineRecordReader { // count the records in the second split split = new FileSplit(testFilePath, firstSplitLength, testFileSize - firstSplitLength, (String[])null); - reader = new LineRecordReader(); + reader = new LineRecordReader(recordDelimiterBytes); reader.initialize(split, context); int numRecordsRemainingSplits = 0; while (reader.nextKeyValue()) { ++numRecordsRemainingSplits; } reader.close(); - - assertEquals("Unexpected number of records in bzip2 compressed split", - numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits); + assertEquals("Unexpected number of records in split ", numRecordsNoSplits, + numRecordsFirstSplit + numRecordsRemainingSplits); } @Test @@ -231,4 +248,52 @@ public class TestLineRecordReader { assertTrue("BOM is not skipped", skipBOM); } + + /** + * Writes the input test file + * + * @param conf + * @return Path of the file created + * @throws IOException + */ + private Path createInputFile(Configuration conf, String data) + throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + Path file = new Path(inputDir, "test.txt"); + Writer writer = new OutputStreamWriter(localFs.create(file)); + try { + writer.write(data); + } finally { + writer.close(); + } + return file; + } + + @Test + public void testUncompressedInput() throws Exception { + Configuration conf = new Configuration(); + String inputData = "abc+++def+++ghi+++" + + "jkl+++mno+++pqr+++stu+++vw +++xyz"; + Path inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+++"); + for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for(int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + } + + @Test + public void testUncompressedInputContainingCRLF() throws Exception { + Configuration conf = new Configuration(); + String inputData = "a\r\nb\rc\nd\r\n"; + Path inputFile = createInputFile(conf, inputData); + for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for(int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + } }