From 4bb4de93d67873b47fd90c61396f52315165c7bf Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Wed, 28 May 2014 19:37:52 +0000 Subject: [PATCH] MAPREDUCE-5862. Line records longer than 2x split size aren't handled correctly. Contributed by bc Wong git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1598111 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop-mapreduce-client-core/pom.xml | 9 ++ .../hadoop/mapred/LineRecordReader.java | 5 +- .../mapreduce/lib/input/LineRecordReader.java | 5 +- .../hadoop/mapred/TestLineRecordReader.java | 91 +++++++++++++++++ .../lib/input/TestLineRecordReader.java | 92 ++++++++++++++++++ .../recordSpanningMultipleSplits.txt | 4 + .../recordSpanningMultipleSplits.txt.bz2 | Bin 0 -> 99 bytes 8 files changed, 203 insertions(+), 6 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt.bz2 diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 82e90984420..5491d1cd5e1 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -241,6 +241,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5309. 2.0.4 JobHistoryParser can't parse certain failed job history files generated by 2.0.3 history server (Rushabh S Shah via jlowe) + MAPREDUCE-5862. Line records longer than 2x split size aren't handled + correctly (bc Wong via jlowe) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml index 8b0c4887e84..3ca6dc75e13 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml @@ -85,6 +85,15 @@ + + org.apache.rat + apache-rat-plugin + + + src/test/resources/recordSpanningMultipleSplits.txt + + + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java index 8a1624cd0e6..8b26fbd16d8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java @@ -184,7 +184,7 @@ public class LineRecordReader implements RecordReader { private int maxBytesToConsume(long pos) { return isCompressedInput() ? Integer.MAX_VALUE - : (int) Math.min(Integer.MAX_VALUE, end - pos); + : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength); } private long getFilePosition() throws IOException { @@ -206,8 +206,7 @@ public class LineRecordReader implements RecordReader { while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { key.set(pos); - int newSize = in.readLine(value, maxLineLength, - Math.max(maxBytesToConsume(pos), maxLineLength)); + int newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); if (newSize == 0) { return false; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java index 0c7635e4f30..6d12d82d177 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java @@ -121,7 +121,7 @@ public class LineRecordReader extends RecordReader { private int maxBytesToConsume(long pos) { return isCompressedInput ? Integer.MAX_VALUE - : (int) Math.min(Integer.MAX_VALUE, end - pos); + : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength); } private long getFilePosition() throws IOException { @@ -146,8 +146,7 @@ public class LineRecordReader extends RecordReader { // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { - newSize = in.readLine(value, maxLineLength, - Math.max(maxBytesToConsume(pos), maxLineLength)); + newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); pos += newSize; if (newSize < maxLineLength) { break; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java index b8df069fccf..ee066e218f1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java @@ -23,9 +23,12 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.net.URL; +import java.util.ArrayList; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; @@ -97,4 +100,92 @@ public class TestLineRecordReader { // character is a linefeed testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498); } + + // Use the LineRecordReader to read records from the file + public ArrayList readRecords(URL testFileUrl, int splitSize) + throws IOException { + + // Set up context + File testFile = new File(testFileUrl.getFile()); + long testFileSize = testFile.length(); + Path testFilePath = new Path(testFile.getAbsolutePath()); + Configuration conf = new Configuration(); + conf.setInt("io.file.buffer.size", 1); + + // Gather the records returned by the record reader + ArrayList records = new ArrayList(); + + long offset = 0; + LongWritable key = new LongWritable(); + Text value = new Text(); + while (offset < testFileSize) { + FileSplit split = + new FileSplit(testFilePath, offset, splitSize, (String[]) null); + LineRecordReader reader = new LineRecordReader(conf, split); + + while (reader.next(key, value)) { + records.add(value.toString()); + } + offset += splitSize; + } + return records; + } + + // Gather the records by just splitting on new lines + public String[] readRecordsDirectly(URL testFileUrl, boolean bzip) + throws IOException { + int MAX_DATA_SIZE = 1024 * 1024; + byte[] data = new byte[MAX_DATA_SIZE]; + FileInputStream fis = new FileInputStream(testFileUrl.getFile()); + int count; + if (bzip) { + BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis); + count = bzIn.read(data); + bzIn.close(); + } else { + count = fis.read(data); + } + fis.close(); + assertTrue("Test file data too big for buffer", count < data.length); + return new String(data, 0, count, "UTF-8").split("\n"); + } + + public void checkRecordSpanningMultipleSplits(String testFile, + int splitSize, + boolean bzip) + throws IOException { + URL testFileUrl = getClass().getClassLoader().getResource(testFile); + ArrayList records = readRecords(testFileUrl, splitSize); + String[] actuals = readRecordsDirectly(testFileUrl, bzip); + + assertEquals("Wrong number of records", actuals.length, records.size()); + + boolean hasLargeRecord = false; + for (int i = 0; i < actuals.length; ++i) { + assertEquals(actuals[i], records.get(i)); + if (actuals[i].length() > 2 * splitSize) { + hasLargeRecord = true; + } + } + + assertTrue("Invalid test data. Doesn't have a large enough record", + hasLargeRecord); + } + + @Test + public void testRecordSpanningMultipleSplits() + throws IOException { + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt", + 10, false); + } + + @Test + public void testRecordSpanningMultipleSplitsCompressed() + throws IOException { + // The file is generated with bz2 block size of 100k. The split size + // needs to be larger than that for the CompressedSplitLineReader to + // work. + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2", + 200 * 1000, true); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java index 26c839614ee..46091850ead 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java @@ -23,9 +23,12 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.net.URL; +import java.util.ArrayList; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -101,4 +104,93 @@ public class TestLineRecordReader { // character is a linefeed testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498); } + + // Use the LineRecordReader to read records from the file + public ArrayList readRecords(URL testFileUrl, int splitSize) + throws IOException { + + // Set up context + File testFile = new File(testFileUrl.getFile()); + long testFileSize = testFile.length(); + Path testFilePath = new Path(testFile.getAbsolutePath()); + Configuration conf = new Configuration(); + conf.setInt("io.file.buffer.size", 1); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + + // Gather the records returned by the record reader + ArrayList records = new ArrayList(); + + long offset = 0; + while (offset < testFileSize) { + FileSplit split = new FileSplit(testFilePath, offset, splitSize, null); + LineRecordReader reader = new LineRecordReader(); + reader.initialize(split, context); + + while (reader.nextKeyValue()) { + records.add(reader.getCurrentValue().toString()); + } + offset += splitSize; + } + return records; + } + + // Gather the records by just splitting on new lines + public String[] readRecordsDirectly(URL testFileUrl, boolean bzip) + throws IOException { + int MAX_DATA_SIZE = 1024 * 1024; + byte[] data = new byte[MAX_DATA_SIZE]; + FileInputStream fis = new FileInputStream(testFileUrl.getFile()); + int count; + if (bzip) { + BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis); + count = bzIn.read(data); + bzIn.close(); + } else { + count = fis.read(data); + } + fis.close(); + assertTrue("Test file data too big for buffer", count < data.length); + return new String(data, 0, count, "UTF-8").split("\n"); + } + + public void checkRecordSpanningMultipleSplits(String testFile, + int splitSize, + boolean bzip) + throws IOException { + URL testFileUrl = getClass().getClassLoader().getResource(testFile); + ArrayList records = readRecords(testFileUrl, splitSize); + String[] actuals = readRecordsDirectly(testFileUrl, bzip); + + assertEquals("Wrong number of records", actuals.length, records.size()); + + boolean hasLargeRecord = false; + for (int i = 0; i < actuals.length; ++i) { + assertEquals(actuals[i], records.get(i)); + if (actuals[i].length() > 2 * splitSize) { + hasLargeRecord = true; + } + } + + assertTrue("Invalid test data. Doesn't have a large enough record", + hasLargeRecord); + } + + @Test + public void testRecordSpanningMultipleSplits() + throws IOException { + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt", + 10, + false); + } + + @Test + public void testRecordSpanningMultipleSplitsCompressed() + throws IOException { + // The file is generated with bz2 block size of 100k. The split size + // needs to be larger than that for the CompressedSplitLineReader to + // work. + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2", + 200 * 1000, + true); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt new file mode 100644 index 00000000000..86dc5c1c236 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt @@ -0,0 +1,4 @@ +Use with small split size, +like 32. +And then we give it a really really long line, which will surely span multiple splits, +to see how it handles. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt.bz2 b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt.bz2 new file mode 100644 index 0000000000000000000000000000000000000000..bcd13f5a87b126c54dfa940c3dfd00768131b06a GIT binary patch literal 99 zcmV-p0G$6qT4*sbL0KkKS#Z;6{{R35U4(%UNCI#W2mlgd+>itS00iG> z)EiYjAn_m?G-Z7#)`yxrC>nlA2tYMK00