diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index b9c1ea65489..8d56f5667fe 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -435,6 +435,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6616. Fail to create jobhistory file if there are some multibyte characters in the job name. (Kousuke Saruta via aajisaka) + MAPREDUCE-6635. Unsafe long to int conversion in UncompressedSplitLineReader + and IndexOutOfBoundsException. (Junping Du via vvasudev) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES @@ -474,6 +477,9 @@ Release 2.7.3 - UNRELEASED MAPREDUCE-6191. Improve clearing stale state of Java serialization testcase. (Sam Liu via Eric Yang) + MAPREDUCE-6635. Unsafe long to int conversion in UncompressedSplitLineReader + and IndexOutOfBoundsException. (Junping Du via vvasudev) + Release 2.7.2 - 2016-01-25 INCOMPATIBLE CHANGES @@ -764,6 +770,9 @@ Release 2.6.5 - UNRELEASED MAPREDUCE-6191. Improve clearing stale state of Java serialization testcase. (Sam Liu via Eric Yang) + MAPREDUCE-6635. Unsafe long to int conversion in UncompressedSplitLineReader + and IndexOutOfBoundsException. (Junping Du via vvasudev) + Release 2.6.4 - 2016-02-11 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java index 6d495ef3da2..bda02186e88 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java @@ -53,8 +53,11 @@ protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) throws IOException { int maxBytesToRead = buffer.length; if (totalBytesRead < splitLength) { - maxBytesToRead = Math.min(maxBytesToRead, - (int)(splitLength - totalBytesRead)); + long leftBytesForSplit = splitLength - totalBytesRead; + // check if leftBytesForSplit exceed Integer.MAX_VALUE + if (leftBytesForSplit <= Integer.MAX_VALUE) { + maxBytesToRead = Math.min(maxBytesToRead, (int)leftBytesForSplit); + } } int bytesRead = in.read(buffer, 0, maxBytesToRead); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java index f0cf9f5a548..f50e1efb7ba 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java @@ -110,6 +110,43 @@ private void testSplitRecordsForFile(Configuration conf, numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits); } + private void testLargeSplitRecordForFile(Configuration conf, + long firstSplitLength, long testFileSize, Path testFilePath) + throws IOException { + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + assertTrue("unexpected firstSplitLength:" + firstSplitLength, + testFileSize < firstSplitLength); + String delimiter = conf.get("textinputformat.record.delimiter"); + byte[] recordDelimiterBytes = null; + if (null != delimiter) { + recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + } + // read the data without splitting to count the records + FileSplit split = new FileSplit(testFilePath, 0, testFileSize, + (String[])null); + LineRecordReader reader = new LineRecordReader(conf, split, + recordDelimiterBytes); + LongWritable key = new LongWritable(); + Text value = new Text(); + int numRecordsNoSplits = 0; + while (reader.next(key, value)) { + ++numRecordsNoSplits; + } + reader.close(); + + // count the records in the first split + split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + int numRecordsFirstSplit = 0; + while (reader.next(key, value)) { + ++numRecordsFirstSplit; + } + reader.close(); + assertEquals("Unexpected number of records in split", + numRecordsNoSplits, numRecordsFirstSplit); + } + @Test public void testBzip2SplitEndsAtCR() throws IOException { // the test data contains a carriage-return at the end of the first @@ -331,6 +368,22 @@ private Path createInputFile(Configuration conf, String data) return file; } + @Test + public void testUncompressedInputWithLargeSplitSize() throws Exception { + Configuration conf = new Configuration(); + // single char delimiter + String inputData = "abcde +fghij+ klmno+pqrst+uvwxyz"; + Path inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+"); + // split size over max value of integer + long longSplitSize = (long)Integer.MAX_VALUE + 1; + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testLargeSplitRecordForFile(conf, longSplitSize, inputData.length(), + inputFile); + } + } + @Test public void testUncompressedInput() throws Exception { Configuration conf = new Configuration();