From c6f2d761d5430eac6b9f07f137a7028de4e0660c Mon Sep 17 00:00:00 2001 From: Varun Vasudev Date: Tue, 23 Feb 2016 13:05:18 +0530 Subject: [PATCH] MAPREDUCE-6635. Unsafe long to int conversion in UncompressedSplitLineReader and IndexOutOfBoundsException. Contributed by Junping Du. --- hadoop-mapreduce-project/CHANGES.txt | 9 ++++ .../input/UncompressedSplitLineReader.java | 7 ++- .../hadoop/mapred/TestLineRecordReader.java | 53 +++++++++++++++++++ 3 files changed, 67 insertions(+), 2 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f0ad171c644..9628d4916c3 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -724,6 +724,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6616. Fail to create jobhistory file if there are some multibyte characters in the job name. (Kousuke Saruta via aajisaka) + MAPREDUCE-6635. Unsafe long to int conversion in UncompressedSplitLineReader + and IndexOutOfBoundsException. (Junping Du via vvasudev) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES @@ -763,6 +766,9 @@ Release 2.7.3 - UNRELEASED MAPREDUCE-6191. Improve clearing stale state of Java serialization testcase. (Sam Liu via Eric Yang) + MAPREDUCE-6635. Unsafe long to int conversion in UncompressedSplitLineReader + and IndexOutOfBoundsException. (Junping Du via vvasudev) + Release 2.7.2 - 2016-01-25 INCOMPATIBLE CHANGES @@ -1056,6 +1062,9 @@ Release 2.6.5 - UNRELEASED MAPREDUCE-6191. Improve clearing stale state of Java serialization testcase. (Sam Liu via Eric Yang) + MAPREDUCE-6635. Unsafe long to int conversion in UncompressedSplitLineReader + and IndexOutOfBoundsException. (Junping Du via vvasudev) + Release 2.6.4 - 2016-02-11 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java index 6d495ef3da2..bda02186e88 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java @@ -53,8 +53,11 @@ public class UncompressedSplitLineReader extends SplitLineReader { throws IOException { int maxBytesToRead = buffer.length; if (totalBytesRead < splitLength) { - maxBytesToRead = Math.min(maxBytesToRead, - (int)(splitLength - totalBytesRead)); + long leftBytesForSplit = splitLength - totalBytesRead; + // check if leftBytesForSplit exceed Integer.MAX_VALUE + if (leftBytesForSplit <= Integer.MAX_VALUE) { + maxBytesToRead = Math.min(maxBytesToRead, (int)leftBytesForSplit); + } } int bytesRead = in.read(buffer, 0, maxBytesToRead); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java index f0cf9f5a548..f50e1efb7ba 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java @@ -110,6 +110,43 @@ public class TestLineRecordReader { numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits); } + private void testLargeSplitRecordForFile(Configuration conf, + long firstSplitLength, long testFileSize, Path testFilePath) + throws IOException { + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + assertTrue("unexpected firstSplitLength:" + firstSplitLength, + testFileSize < firstSplitLength); + String delimiter = conf.get("textinputformat.record.delimiter"); + byte[] recordDelimiterBytes = null; + if (null != delimiter) { + recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + } + // read the data without splitting to count the records + FileSplit split = new FileSplit(testFilePath, 0, testFileSize, + (String[])null); + LineRecordReader reader = new LineRecordReader(conf, split, + recordDelimiterBytes); + LongWritable key = new LongWritable(); + Text value = new Text(); + int numRecordsNoSplits = 0; + while (reader.next(key, value)) { + ++numRecordsNoSplits; + } + reader.close(); + + // count the records in the first split + split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + int numRecordsFirstSplit = 0; + while (reader.next(key, value)) { + ++numRecordsFirstSplit; + } + reader.close(); + assertEquals("Unexpected number of records in split", + numRecordsNoSplits, numRecordsFirstSplit); + } + @Test public void testBzip2SplitEndsAtCR() throws IOException { // the test data contains a carriage-return at the end of the first @@ -331,6 +368,22 @@ public class TestLineRecordReader { return file; } + @Test + public void testUncompressedInputWithLargeSplitSize() throws Exception { + Configuration conf = new Configuration(); + // single char delimiter + String inputData = "abcde +fghij+ klmno+pqrst+uvwxyz"; + Path inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+"); + // split size over max value of integer + long longSplitSize = (long)Integer.MAX_VALUE + 1; + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testLargeSplitRecordForFile(conf, longSplitSize, inputData.length(), + inputFile); + } + } + @Test public void testUncompressedInput() throws Exception { Configuration conf = new Configuration();