svn merge -c 1598111 FIXES: MAPREDUCE-5862. Line records longer than 2x split size aren't handled correctly. Contributed by bc Wong

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1598115 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2014-05-28 19:53:08 +00:00
parent 57c01b1bda
commit 3045fdc7c6
8 changed files with 203 additions and 6 deletions

View File

@ -99,6 +99,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5309. 2.0.4 JobHistoryParser can't parse certain failed job MAPREDUCE-5309. 2.0.4 JobHistoryParser can't parse certain failed job
history files generated by 2.0.3 history server (Rushabh S Shah via jlowe) history files generated by 2.0.3 history server (Rushabh S Shah via jlowe)
MAPREDUCE-5862. Line records longer than 2x split size aren't handled
correctly (bc Wong via jlowe)
Release 2.4.1 - UNRELEASED Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -85,6 +85,15 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/test/resources/recordSpanningMultipleSplits.txt</exclude>
</excludes>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>
</project> </project>

View File

@ -184,7 +184,7 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
private int maxBytesToConsume(long pos) { private int maxBytesToConsume(long pos) {
return isCompressedInput() return isCompressedInput()
? Integer.MAX_VALUE ? Integer.MAX_VALUE
: (int) Math.min(Integer.MAX_VALUE, end - pos); : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength);
} }
private long getFilePosition() throws IOException { private long getFilePosition() throws IOException {
@ -206,8 +206,7 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
key.set(pos); key.set(pos);
int newSize = in.readLine(value, maxLineLength, int newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
Math.max(maxBytesToConsume(pos), maxLineLength));
if (newSize == 0) { if (newSize == 0) {
return false; return false;
} }

View File

@ -121,7 +121,7 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
private int maxBytesToConsume(long pos) { private int maxBytesToConsume(long pos) {
return isCompressedInput return isCompressedInput
? Integer.MAX_VALUE ? Integer.MAX_VALUE
: (int) Math.min(Integer.MAX_VALUE, end - pos); : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength);
} }
private long getFilePosition() throws IOException { private long getFilePosition() throws IOException {
@ -146,8 +146,7 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
// We always read one extra line, which lies outside the upper // We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1) // split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
newSize = in.readLine(value, maxLineLength, newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
Math.max(maxBytesToConsume(pos), maxLineLength));
pos += newSize; pos += newSize;
if (newSize < maxLineLength) { if (newSize < maxLineLength) {
break; break;

View File

@ -23,9 +23,12 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URL; import java.net.URL;
import java.util.ArrayList;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
@ -97,4 +100,92 @@ public class TestLineRecordReader {
// character is a linefeed // character is a linefeed
testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498); testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
} }
// Use the LineRecordReader to read records from the file
public ArrayList<String> readRecords(URL testFileUrl, int splitSize)
throws IOException {
// Set up context
File testFile = new File(testFileUrl.getFile());
long testFileSize = testFile.length();
Path testFilePath = new Path(testFile.getAbsolutePath());
Configuration conf = new Configuration();
conf.setInt("io.file.buffer.size", 1);
// Gather the records returned by the record reader
ArrayList<String> records = new ArrayList<String>();
long offset = 0;
LongWritable key = new LongWritable();
Text value = new Text();
while (offset < testFileSize) {
FileSplit split =
new FileSplit(testFilePath, offset, splitSize, (String[]) null);
LineRecordReader reader = new LineRecordReader(conf, split);
while (reader.next(key, value)) {
records.add(value.toString());
}
offset += splitSize;
}
return records;
}
// Gather the records by just splitting on new lines
public String[] readRecordsDirectly(URL testFileUrl, boolean bzip)
throws IOException {
int MAX_DATA_SIZE = 1024 * 1024;
byte[] data = new byte[MAX_DATA_SIZE];
FileInputStream fis = new FileInputStream(testFileUrl.getFile());
int count;
if (bzip) {
BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis);
count = bzIn.read(data);
bzIn.close();
} else {
count = fis.read(data);
}
fis.close();
assertTrue("Test file data too big for buffer", count < data.length);
return new String(data, 0, count, "UTF-8").split("\n");
}
public void checkRecordSpanningMultipleSplits(String testFile,
int splitSize,
boolean bzip)
throws IOException {
URL testFileUrl = getClass().getClassLoader().getResource(testFile);
ArrayList<String> records = readRecords(testFileUrl, splitSize);
String[] actuals = readRecordsDirectly(testFileUrl, bzip);
assertEquals("Wrong number of records", actuals.length, records.size());
boolean hasLargeRecord = false;
for (int i = 0; i < actuals.length; ++i) {
assertEquals(actuals[i], records.get(i));
if (actuals[i].length() > 2 * splitSize) {
hasLargeRecord = true;
}
}
assertTrue("Invalid test data. Doesn't have a large enough record",
hasLargeRecord);
}
@Test
public void testRecordSpanningMultipleSplits()
throws IOException {
checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt",
10, false);
}
@Test
public void testRecordSpanningMultipleSplitsCompressed()
throws IOException {
// The file is generated with bz2 block size of 100k. The split size
// needs to be larger than that for the CompressedSplitLineReader to
// work.
checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2",
200 * 1000, true);
}
} }

View File

@ -23,9 +23,12 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URL; import java.net.URL;
import java.util.ArrayList;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
@ -101,4 +104,93 @@ public class TestLineRecordReader {
// character is a linefeed // character is a linefeed
testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498); testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
} }
// Use the LineRecordReader to read records from the file
public ArrayList<String> readRecords(URL testFileUrl, int splitSize)
throws IOException {
// Set up context
File testFile = new File(testFileUrl.getFile());
long testFileSize = testFile.length();
Path testFilePath = new Path(testFile.getAbsolutePath());
Configuration conf = new Configuration();
conf.setInt("io.file.buffer.size", 1);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
// Gather the records returned by the record reader
ArrayList<String> records = new ArrayList<String>();
long offset = 0;
while (offset < testFileSize) {
FileSplit split = new FileSplit(testFilePath, offset, splitSize, null);
LineRecordReader reader = new LineRecordReader();
reader.initialize(split, context);
while (reader.nextKeyValue()) {
records.add(reader.getCurrentValue().toString());
}
offset += splitSize;
}
return records;
}
// Gather the records by just splitting on new lines
public String[] readRecordsDirectly(URL testFileUrl, boolean bzip)
throws IOException {
int MAX_DATA_SIZE = 1024 * 1024;
byte[] data = new byte[MAX_DATA_SIZE];
FileInputStream fis = new FileInputStream(testFileUrl.getFile());
int count;
if (bzip) {
BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis);
count = bzIn.read(data);
bzIn.close();
} else {
count = fis.read(data);
}
fis.close();
assertTrue("Test file data too big for buffer", count < data.length);
return new String(data, 0, count, "UTF-8").split("\n");
}
public void checkRecordSpanningMultipleSplits(String testFile,
int splitSize,
boolean bzip)
throws IOException {
URL testFileUrl = getClass().getClassLoader().getResource(testFile);
ArrayList<String> records = readRecords(testFileUrl, splitSize);
String[] actuals = readRecordsDirectly(testFileUrl, bzip);
assertEquals("Wrong number of records", actuals.length, records.size());
boolean hasLargeRecord = false;
for (int i = 0; i < actuals.length; ++i) {
assertEquals(actuals[i], records.get(i));
if (actuals[i].length() > 2 * splitSize) {
hasLargeRecord = true;
}
}
assertTrue("Invalid test data. Doesn't have a large enough record",
hasLargeRecord);
}
@Test
public void testRecordSpanningMultipleSplits()
throws IOException {
checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt",
10,
false);
}
@Test
public void testRecordSpanningMultipleSplitsCompressed()
throws IOException {
// The file is generated with bz2 block size of 100k. The split size
// needs to be larger than that for the CompressedSplitLineReader to
// work.
checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2",
200 * 1000,
true);
}
} }

View File

@ -0,0 +1,4 @@
Use with small split size,
like 32.
And then we give it a really really long line, which will surely span multiple splits,
to see how it handles.