diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java index 3188cb5fe95..1d1b5693b95 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java @@ -369,4 +369,12 @@ public class LineReader implements Closeable { public int readLine(Text str) throws IOException { return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); } + + protected int getBufferPosn() { + return bufferPosn; + } + + protected int getBufferSize() { + return bufferSize; + } } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index bae2674b85e..da1a2f33db5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -498,6 +498,10 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6403. Fix typo in the usage of NNBench. (Jagadesh Kiran N via aajisaka) + MAPREDUCE-5948. org.apache.hadoop.mapred.LineRecordReader does not handle + multibyte record delimiters well (Vinayakumar B, Rushabh Shah, and Akira + AJISAKA via jlowe) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java index 45263c4256c..980269700ae 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java @@ -38,6 +38,7 @@ import org.apache.hadoop.io.compress.SplitCompressionInputStream; import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader; import org.apache.hadoop.mapreduce.lib.input.SplitLineReader; +import org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; @@ -131,7 +132,8 @@ public class LineRecordReader implements RecordReader { } } else { fileIn.seek(start); - in = new SplitLineReader(fileIn, job, recordDelimiter); + in = new UncompressedSplitLineReader( + fileIn, job, recordDelimiter, split.getLength()); filePosition = fileIn; } // If this is not the first split, we always throw away first record diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java index 5af8f43bbec..9e1ca2a6391 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java @@ -112,7 +112,8 @@ public class LineRecordReader extends RecordReader { } } else { fileIn.seek(start); - in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes); + in = new UncompressedSplitLineReader( + fileIn, job, this.recordDelimiterBytes, split.getLength()); filePosition = fileIn; } // If this is not the first split, we always throw away first record diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java new file mode 100644 index 00000000000..52fb7b0fc5f --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.io.Text; + +/** + * SplitLineReader for uncompressed files. + * This class can split the file correctly even if the delimiter is multi-bytes. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class UncompressedSplitLineReader extends SplitLineReader { + private boolean needAdditionalRecord = false; + private long splitLength; + /** Total bytes read from the input stream. */ + private long totalBytesRead = 0; + private boolean finished = false; + private boolean usingCRLF; + private int unusedBytes = 0; + private int lastBytesRead = 0; + + public UncompressedSplitLineReader(FSDataInputStream in, Configuration conf, + byte[] recordDelimiterBytes, long splitLength) throws IOException { + super(in, conf, recordDelimiterBytes); + this.splitLength = splitLength; + usingCRLF = (recordDelimiterBytes == null); + } + + @Override + protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) + throws IOException { + int maxBytesToRead = buffer.length; + if (totalBytesRead < splitLength) { + maxBytesToRead = Math.min(maxBytesToRead, + (int)(splitLength - totalBytesRead)); + } + int bytesRead = in.read(buffer, 0, maxBytesToRead); + lastBytesRead = bytesRead; + + // If the split ended in the middle of a record delimiter then we need + // to read one additional record, as the consumer of the next split will + // not recognize the partial delimiter as a record. + // However if using the default delimiter and the next character is a + // linefeed then next split will treat it as a delimiter all by itself + // and the additional record read should not be performed. + if (totalBytesRead == splitLength && inDelimiter && bytesRead > 0) { + if (usingCRLF) { + needAdditionalRecord = (buffer[0] != '\n'); + } else { + needAdditionalRecord = true; + } + } + if (bytesRead > 0) { + totalBytesRead += bytesRead; + } + return bytesRead; + } + + @Override + public int readLine(Text str, int maxLineLength, int maxBytesToConsume) + throws IOException { + long bytesRead = 0; + if (!finished) { + // only allow at most one more record to be read after the stream + // reports the split ended + if (totalBytesRead > splitLength) { + finished = true; + } + bytesRead = totalBytesRead; + int bytesConsumed = super.readLine(str, maxLineLength, maxBytesToConsume); + bytesRead = totalBytesRead - bytesRead; + + // No records left. + if (bytesConsumed == 0 && bytesRead == 0) { + return 0; + } + + int bufferSize = getBufferSize(); + + // Add the remaining buffer size not used for the last call + // of fillBuffer method. + if (lastBytesRead <= 0) { + bytesRead += bufferSize; + } else if (bytesRead > 0) { + bytesRead += bufferSize - lastBytesRead; + } + + // Adjust the size of the buffer not used for this record. + // The size is carried over for the next calculation. + bytesRead += unusedBytes; + unusedBytes = bufferSize - getBufferPosn(); + bytesRead -= unusedBytes; + } + return (int) bytesRead; + } + + @Override + public boolean needAdditionalRecordAfterSplit() { + return !finished && needAdditionalRecord; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java index cbbbeaaa9c1..a5c99334f26 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java @@ -25,13 +25,17 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; import java.net.URL; import java.util.ArrayList; import java.util.HashSet; import java.util.Set; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.io.Charsets; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -41,6 +45,9 @@ import org.apache.hadoop.io.compress.Decompressor; import org.junit.Test; public class TestLineRecordReader { + private static Path workDir = new Path(new Path(System.getProperty( + "test.build.data", "target"), "data"), "TestTextInputFormat"); + private static Path inputDir = new Path(workDir, "input"); private void testSplitRecords(String testFileName, long firstSplitLength) throws IOException { @@ -50,15 +57,27 @@ public class TestLineRecordReader { long testFileSize = testFile.length(); Path testFilePath = new Path(testFile.getAbsolutePath()); Configuration conf = new Configuration(); + testSplitRecordsForFile(conf, firstSplitLength, testFileSize, testFilePath); + } + + private void testSplitRecordsForFile(Configuration conf, + long firstSplitLength, long testFileSize, Path testFilePath) + throws IOException { conf.setInt(org.apache.hadoop.mapreduce.lib.input. LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); - assertTrue("unexpected test data at " + testFile, + assertTrue("unexpected test data at " + testFilePath, testFileSize > firstSplitLength); + String delimiter = conf.get("textinputformat.record.delimiter"); + byte[] recordDelimiterBytes = null; + if (null != delimiter) { + recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + } // read the data without splitting to count the records FileSplit split = new FileSplit(testFilePath, 0, testFileSize, (String[])null); - LineRecordReader reader = new LineRecordReader(conf, split); + LineRecordReader reader = new LineRecordReader(conf, split, + recordDelimiterBytes); LongWritable key = new LongWritable(); Text value = new Text(); int numRecordsNoSplits = 0; @@ -69,7 +88,7 @@ public class TestLineRecordReader { // count the records in the first split split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null); - reader = new LineRecordReader(conf, split); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); int numRecordsFirstSplit = 0; while (reader.next(key, value)) { ++numRecordsFirstSplit; @@ -79,14 +98,14 @@ public class TestLineRecordReader { // count the records in the second split split = new FileSplit(testFilePath, firstSplitLength, testFileSize - firstSplitLength, (String[])null); - reader = new LineRecordReader(conf, split); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); int numRecordsRemainingSplits = 0; while (reader.next(key, value)) { ++numRecordsRemainingSplits; } reader.close(); - assertEquals("Unexpected number of records in bzip2 compressed split", + assertEquals("Unexpected number of records in split", numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits); } @@ -290,4 +309,52 @@ public class TestLineRecordReader { } assertEquals(10, decompressors.size()); } + + /** + * Writes the input test file + * + * @param conf + * @return Path of the file created + * @throws IOException + */ + private Path createInputFile(Configuration conf, String data) + throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + Path file = new Path(inputDir, "test.txt"); + Writer writer = new OutputStreamWriter(localFs.create(file)); + try { + writer.write(data); + } finally { + writer.close(); + } + return file; + } + + @Test + public void testUncompressedInput() throws Exception { + Configuration conf = new Configuration(); + String inputData = "abc+++def+++ghi+++" + + "jkl+++mno+++pqr+++stu+++vw +++xyz"; + Path inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+++"); + for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for(int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + } + + @Test + public void testUncompressedInputContainingCRLF() throws Exception { + Configuration conf = new Configuration(); + String inputData = "a\r\nb\rc\nd\r\n"; + Path inputFile = createInputFile(conf, inputData); + for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for(int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java index 8b385a0103e..3c1f28f6f80 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java @@ -25,13 +25,17 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; import java.net.URL; import java.util.ArrayList; import java.util.HashSet; import java.util.Set; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.io.Charsets; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CodecPool; @@ -42,6 +46,9 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.junit.Test; public class TestLineRecordReader { + private static Path workDir = new Path(new Path(System.getProperty( + "test.build.data", "target"), "data"), "TestTextInputFormat"); + private static Path inputDir = new Path(workDir, "input"); private void testSplitRecords(String testFileName, long firstSplitLength) throws IOException { @@ -51,17 +58,28 @@ public class TestLineRecordReader { long testFileSize = testFile.length(); Path testFilePath = new Path(testFile.getAbsolutePath()); Configuration conf = new Configuration(); + testSplitRecordsForFile(conf, firstSplitLength, testFileSize, testFilePath); + } + + private void testSplitRecordsForFile(Configuration conf, + long firstSplitLength, long testFileSize, Path testFilePath) + throws IOException { conf.setInt(org.apache.hadoop.mapreduce.lib.input. LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); - assertTrue("unexpected test data at " + testFile, + assertTrue("unexpected test data at " + testFilePath, testFileSize > firstSplitLength); + String delimiter = conf.get("textinputformat.record.delimiter"); + byte[] recordDelimiterBytes = null; + if (null != delimiter) { + recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + } TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); // read the data without splitting to count the records FileSplit split = new FileSplit(testFilePath, 0, testFileSize, (String[])null); - LineRecordReader reader = new LineRecordReader(); + LineRecordReader reader = new LineRecordReader(recordDelimiterBytes); reader.initialize(split, context); int numRecordsNoSplits = 0; while (reader.nextKeyValue()) { @@ -71,7 +89,7 @@ public class TestLineRecordReader { // count the records in the first split split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null); - reader = new LineRecordReader(); + reader = new LineRecordReader(recordDelimiterBytes); reader.initialize(split, context); int numRecordsFirstSplit = 0; while (reader.nextKeyValue()) { @@ -82,16 +100,15 @@ public class TestLineRecordReader { // count the records in the second split split = new FileSplit(testFilePath, firstSplitLength, testFileSize - firstSplitLength, (String[])null); - reader = new LineRecordReader(); + reader = new LineRecordReader(recordDelimiterBytes); reader.initialize(split, context); int numRecordsRemainingSplits = 0; while (reader.nextKeyValue()) { ++numRecordsRemainingSplits; } reader.close(); - - assertEquals("Unexpected number of records in bzip2 compressed split", - numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits); + assertEquals("Unexpected number of records in split ", numRecordsNoSplits, + numRecordsFirstSplit + numRecordsRemainingSplits); } @Test @@ -276,4 +293,52 @@ public class TestLineRecordReader { } assertEquals(10, decompressors.size()); } + + /** + * Writes the input test file + * + * @param conf + * @return Path of the file created + * @throws IOException + */ + private Path createInputFile(Configuration conf, String data) + throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + Path file = new Path(inputDir, "test.txt"); + Writer writer = new OutputStreamWriter(localFs.create(file)); + try { + writer.write(data); + } finally { + writer.close(); + } + return file; + } + + @Test + public void testUncompressedInput() throws Exception { + Configuration conf = new Configuration(); + String inputData = "abc+++def+++ghi+++" + + "jkl+++mno+++pqr+++stu+++vw +++xyz"; + Path inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+++"); + for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for(int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + } + + @Test + public void testUncompressedInputContainingCRLF() throws Exception { + Configuration conf = new Configuration(); + String inputData = "a\r\nb\rc\nd\r\n"; + Path inputFile = createInputFile(conf, inputData); + for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for(int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + } }