MAPREDUCE-6635. Unsafe long to int conversion in UncompressedSplitLineReader and IndexOutOfBoundsException. Contributed by Junping Du.

(cherry picked from commit c6f2d761d5)
(cherry picked from commit f1999fe275)

 Conflicts:
	hadoop-mapreduce-project/CHANGES.txt
This commit is contained in:
Varun Vasudev 2016-02-23 13:05:18 +05:30
parent 33b961ee88
commit 0edc764184
3 changed files with 64 additions and 2 deletions

View File

@ -52,6 +52,9 @@ Release 2.7.3 - UNRELEASED
MAPREDUCE-6460. TestRMContainerAllocator.
testAttemptNotFoundCausesRMCommunicatorException fails. (Zhihai Xu)
MAPREDUCE-6635. Unsafe long to int conversion in UncompressedSplitLineReader
and IndexOutOfBoundsException. (Junping Du via vvasudev)
Release 2.7.2 - 2016-01-25
INCOMPATIBLE CHANGES
@ -342,6 +345,9 @@ Release 2.6.5 - UNRELEASED
MAPREDUCE-6191. Improve clearing stale state of Java serialization
testcase. (Sam Liu via Eric Yang)
MAPREDUCE-6635. Unsafe long to int conversion in UncompressedSplitLineReader
and IndexOutOfBoundsException. (Junping Du via vvasudev)
Release 2.6.4 - 2016-02-11
INCOMPATIBLE CHANGES

View File

@ -53,8 +53,11 @@ public class UncompressedSplitLineReader extends SplitLineReader {
throws IOException {
int maxBytesToRead = buffer.length;
if (totalBytesRead < splitLength) {
maxBytesToRead = Math.min(maxBytesToRead,
(int)(splitLength - totalBytesRead));
long leftBytesForSplit = splitLength - totalBytesRead;
// check if leftBytesForSplit exceed Integer.MAX_VALUE
if (leftBytesForSplit <= Integer.MAX_VALUE) {
maxBytesToRead = Math.min(maxBytesToRead, (int)leftBytesForSplit);
}
}
int bytesRead = in.read(buffer, 0, maxBytesToRead);

View File

@ -110,6 +110,43 @@ public class TestLineRecordReader {
numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits);
}
private void testLargeSplitRecordForFile(Configuration conf,
long firstSplitLength, long testFileSize, Path testFilePath)
throws IOException {
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
assertTrue("unexpected firstSplitLength:" + firstSplitLength,
testFileSize < firstSplitLength);
String delimiter = conf.get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
// read the data without splitting to count the records
FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
(String[])null);
LineRecordReader reader = new LineRecordReader(conf, split,
recordDelimiterBytes);
LongWritable key = new LongWritable();
Text value = new Text();
int numRecordsNoSplits = 0;
while (reader.next(key, value)) {
++numRecordsNoSplits;
}
reader.close();
// count the records in the first split
split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
int numRecordsFirstSplit = 0;
while (reader.next(key, value)) {
++numRecordsFirstSplit;
}
reader.close();
assertEquals("Unexpected number of records in split",
numRecordsNoSplits, numRecordsFirstSplit);
}
@Test
public void testBzip2SplitEndsAtCR() throws IOException {
// the test data contains a carriage-return at the end of the first
@ -324,6 +361,22 @@ public class TestLineRecordReader {
return file;
}
@Test
public void testUncompressedInputWithLargeSplitSize() throws Exception {
Configuration conf = new Configuration();
// single char delimiter
String inputData = "abcde +fghij+ klmno+pqrst+uvwxyz";
Path inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+");
// split size over max value of integer
long longSplitSize = (long)Integer.MAX_VALUE + 1;
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testLargeSplitRecordForFile(conf, longSplitSize, inputData.length(),
inputFile);
}
}
@Test
public void testUncompressedInput() throws Exception {
Configuration conf = new Configuration();