MAPREDUCE-6635. Unsafe long to int conversion in UncompressedSplitLineReader and IndexOutOfBoundsException. Contributed by Junping Du.
(cherry picked from commit c6f2d761d5
)
This commit is contained in:
parent
2c218ca8a8
commit
f1999fe275
|
@ -435,6 +435,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
MAPREDUCE-6616. Fail to create jobhistory file if there are some multibyte
|
MAPREDUCE-6616. Fail to create jobhistory file if there are some multibyte
|
||||||
characters in the job name. (Kousuke Saruta via aajisaka)
|
characters in the job name. (Kousuke Saruta via aajisaka)
|
||||||
|
|
||||||
|
MAPREDUCE-6635. Unsafe long to int conversion in UncompressedSplitLineReader
|
||||||
|
and IndexOutOfBoundsException. (Junping Du via vvasudev)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -474,6 +477,9 @@ Release 2.7.3 - UNRELEASED
|
||||||
MAPREDUCE-6191. Improve clearing stale state of Java serialization
|
MAPREDUCE-6191. Improve clearing stale state of Java serialization
|
||||||
testcase. (Sam Liu via Eric Yang)
|
testcase. (Sam Liu via Eric Yang)
|
||||||
|
|
||||||
|
MAPREDUCE-6635. Unsafe long to int conversion in UncompressedSplitLineReader
|
||||||
|
and IndexOutOfBoundsException. (Junping Du via vvasudev)
|
||||||
|
|
||||||
Release 2.7.2 - 2016-01-25
|
Release 2.7.2 - 2016-01-25
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -764,6 +770,9 @@ Release 2.6.5 - UNRELEASED
|
||||||
MAPREDUCE-6191. Improve clearing stale state of Java serialization
|
MAPREDUCE-6191. Improve clearing stale state of Java serialization
|
||||||
testcase. (Sam Liu via Eric Yang)
|
testcase. (Sam Liu via Eric Yang)
|
||||||
|
|
||||||
|
MAPREDUCE-6635. Unsafe long to int conversion in UncompressedSplitLineReader
|
||||||
|
and IndexOutOfBoundsException. (Junping Du via vvasudev)
|
||||||
|
|
||||||
Release 2.6.4 - 2016-02-11
|
Release 2.6.4 - 2016-02-11
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -53,8 +53,11 @@ public class UncompressedSplitLineReader extends SplitLineReader {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int maxBytesToRead = buffer.length;
|
int maxBytesToRead = buffer.length;
|
||||||
if (totalBytesRead < splitLength) {
|
if (totalBytesRead < splitLength) {
|
||||||
maxBytesToRead = Math.min(maxBytesToRead,
|
long leftBytesForSplit = splitLength - totalBytesRead;
|
||||||
(int)(splitLength - totalBytesRead));
|
// check if leftBytesForSplit exceed Integer.MAX_VALUE
|
||||||
|
if (leftBytesForSplit <= Integer.MAX_VALUE) {
|
||||||
|
maxBytesToRead = Math.min(maxBytesToRead, (int)leftBytesForSplit);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
int bytesRead = in.read(buffer, 0, maxBytesToRead);
|
int bytesRead = in.read(buffer, 0, maxBytesToRead);
|
||||||
|
|
||||||
|
|
|
@ -110,6 +110,43 @@ public class TestLineRecordReader {
|
||||||
numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits);
|
numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void testLargeSplitRecordForFile(Configuration conf,
|
||||||
|
long firstSplitLength, long testFileSize, Path testFilePath)
|
||||||
|
throws IOException {
|
||||||
|
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
|
||||||
|
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
||||||
|
assertTrue("unexpected firstSplitLength:" + firstSplitLength,
|
||||||
|
testFileSize < firstSplitLength);
|
||||||
|
String delimiter = conf.get("textinputformat.record.delimiter");
|
||||||
|
byte[] recordDelimiterBytes = null;
|
||||||
|
if (null != delimiter) {
|
||||||
|
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
|
||||||
|
}
|
||||||
|
// read the data without splitting to count the records
|
||||||
|
FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
|
||||||
|
(String[])null);
|
||||||
|
LineRecordReader reader = new LineRecordReader(conf, split,
|
||||||
|
recordDelimiterBytes);
|
||||||
|
LongWritable key = new LongWritable();
|
||||||
|
Text value = new Text();
|
||||||
|
int numRecordsNoSplits = 0;
|
||||||
|
while (reader.next(key, value)) {
|
||||||
|
++numRecordsNoSplits;
|
||||||
|
}
|
||||||
|
reader.close();
|
||||||
|
|
||||||
|
// count the records in the first split
|
||||||
|
split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null);
|
||||||
|
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
|
||||||
|
int numRecordsFirstSplit = 0;
|
||||||
|
while (reader.next(key, value)) {
|
||||||
|
++numRecordsFirstSplit;
|
||||||
|
}
|
||||||
|
reader.close();
|
||||||
|
assertEquals("Unexpected number of records in split",
|
||||||
|
numRecordsNoSplits, numRecordsFirstSplit);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBzip2SplitEndsAtCR() throws IOException {
|
public void testBzip2SplitEndsAtCR() throws IOException {
|
||||||
// the test data contains a carriage-return at the end of the first
|
// the test data contains a carriage-return at the end of the first
|
||||||
|
@ -331,6 +368,22 @@ public class TestLineRecordReader {
|
||||||
return file;
|
return file;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUncompressedInputWithLargeSplitSize() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// single char delimiter
|
||||||
|
String inputData = "abcde +fghij+ klmno+pqrst+uvwxyz";
|
||||||
|
Path inputFile = createInputFile(conf, inputData);
|
||||||
|
conf.set("textinputformat.record.delimiter", "+");
|
||||||
|
// split size over max value of integer
|
||||||
|
long longSplitSize = (long)Integer.MAX_VALUE + 1;
|
||||||
|
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||||
|
conf.setInt("io.file.buffer.size", bufferSize);
|
||||||
|
testLargeSplitRecordForFile(conf, longSplitSize, inputData.length(),
|
||||||
|
inputFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUncompressedInput() throws Exception {
|
public void testUncompressedInput() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
|
Loading…
Reference in New Issue