MAPREDUCE-5862. Line records longer than 2x split size aren't handled correctly. Contributed by bc Wong
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1598111 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
001078e067
commit
4bb4de93d6
|
@ -241,6 +241,9 @@ Release 2.5.0 - UNRELEASED
|
||||||
MAPREDUCE-5309. 2.0.4 JobHistoryParser can't parse certain failed job
|
MAPREDUCE-5309. 2.0.4 JobHistoryParser can't parse certain failed job
|
||||||
history files generated by 2.0.3 history server (Rushabh S Shah via jlowe)
|
history files generated by 2.0.3 history server (Rushabh S Shah via jlowe)
|
||||||
|
|
||||||
|
MAPREDUCE-5862. Line records longer than 2x split size aren't handled
|
||||||
|
correctly (bc Wong via jlowe)
|
||||||
|
|
||||||
Release 2.4.1 - UNRELEASED
|
Release 2.4.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -85,6 +85,15 @@
|
||||||
</execution>
|
</execution>
|
||||||
</executions>
|
</executions>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.rat</groupId>
|
||||||
|
<artifactId>apache-rat-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<excludes>
|
||||||
|
<exclude>src/test/resources/recordSpanningMultipleSplits.txt</exclude>
|
||||||
|
</excludes>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
</plugins>
|
</plugins>
|
||||||
</build>
|
</build>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -184,7 +184,7 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
|
||||||
private int maxBytesToConsume(long pos) {
|
private int maxBytesToConsume(long pos) {
|
||||||
return isCompressedInput()
|
return isCompressedInput()
|
||||||
? Integer.MAX_VALUE
|
? Integer.MAX_VALUE
|
||||||
: (int) Math.min(Integer.MAX_VALUE, end - pos);
|
: (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getFilePosition() throws IOException {
|
private long getFilePosition() throws IOException {
|
||||||
|
@ -206,8 +206,7 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
|
||||||
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
|
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
|
||||||
key.set(pos);
|
key.set(pos);
|
||||||
|
|
||||||
int newSize = in.readLine(value, maxLineLength,
|
int newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
|
||||||
Math.max(maxBytesToConsume(pos), maxLineLength));
|
|
||||||
if (newSize == 0) {
|
if (newSize == 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,7 +121,7 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
||||||
private int maxBytesToConsume(long pos) {
|
private int maxBytesToConsume(long pos) {
|
||||||
return isCompressedInput
|
return isCompressedInput
|
||||||
? Integer.MAX_VALUE
|
? Integer.MAX_VALUE
|
||||||
: (int) Math.min(Integer.MAX_VALUE, end - pos);
|
: (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength);
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getFilePosition() throws IOException {
|
private long getFilePosition() throws IOException {
|
||||||
|
@ -146,8 +146,7 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
||||||
// We always read one extra line, which lies outside the upper
|
// We always read one extra line, which lies outside the upper
|
||||||
// split limit i.e. (end - 1)
|
// split limit i.e. (end - 1)
|
||||||
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
|
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
|
||||||
newSize = in.readLine(value, maxLineLength,
|
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
|
||||||
Math.max(maxBytesToConsume(pos), maxLineLength));
|
|
||||||
pos += newSize;
|
pos += newSize;
|
||||||
if (newSize < maxLineLength) {
|
if (newSize < maxLineLength) {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -23,9 +23,12 @@ import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
@ -97,4 +100,92 @@ public class TestLineRecordReader {
|
||||||
// character is a linefeed
|
// character is a linefeed
|
||||||
testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
|
testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Use the LineRecordReader to read records from the file
|
||||||
|
public ArrayList<String> readRecords(URL testFileUrl, int splitSize)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
// Set up context
|
||||||
|
File testFile = new File(testFileUrl.getFile());
|
||||||
|
long testFileSize = testFile.length();
|
||||||
|
Path testFilePath = new Path(testFile.getAbsolutePath());
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt("io.file.buffer.size", 1);
|
||||||
|
|
||||||
|
// Gather the records returned by the record reader
|
||||||
|
ArrayList<String> records = new ArrayList<String>();
|
||||||
|
|
||||||
|
long offset = 0;
|
||||||
|
LongWritable key = new LongWritable();
|
||||||
|
Text value = new Text();
|
||||||
|
while (offset < testFileSize) {
|
||||||
|
FileSplit split =
|
||||||
|
new FileSplit(testFilePath, offset, splitSize, (String[]) null);
|
||||||
|
LineRecordReader reader = new LineRecordReader(conf, split);
|
||||||
|
|
||||||
|
while (reader.next(key, value)) {
|
||||||
|
records.add(value.toString());
|
||||||
|
}
|
||||||
|
offset += splitSize;
|
||||||
|
}
|
||||||
|
return records;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gather the records by just splitting on new lines
|
||||||
|
public String[] readRecordsDirectly(URL testFileUrl, boolean bzip)
|
||||||
|
throws IOException {
|
||||||
|
int MAX_DATA_SIZE = 1024 * 1024;
|
||||||
|
byte[] data = new byte[MAX_DATA_SIZE];
|
||||||
|
FileInputStream fis = new FileInputStream(testFileUrl.getFile());
|
||||||
|
int count;
|
||||||
|
if (bzip) {
|
||||||
|
BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis);
|
||||||
|
count = bzIn.read(data);
|
||||||
|
bzIn.close();
|
||||||
|
} else {
|
||||||
|
count = fis.read(data);
|
||||||
|
}
|
||||||
|
fis.close();
|
||||||
|
assertTrue("Test file data too big for buffer", count < data.length);
|
||||||
|
return new String(data, 0, count, "UTF-8").split("\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void checkRecordSpanningMultipleSplits(String testFile,
|
||||||
|
int splitSize,
|
||||||
|
boolean bzip)
|
||||||
|
throws IOException {
|
||||||
|
URL testFileUrl = getClass().getClassLoader().getResource(testFile);
|
||||||
|
ArrayList<String> records = readRecords(testFileUrl, splitSize);
|
||||||
|
String[] actuals = readRecordsDirectly(testFileUrl, bzip);
|
||||||
|
|
||||||
|
assertEquals("Wrong number of records", actuals.length, records.size());
|
||||||
|
|
||||||
|
boolean hasLargeRecord = false;
|
||||||
|
for (int i = 0; i < actuals.length; ++i) {
|
||||||
|
assertEquals(actuals[i], records.get(i));
|
||||||
|
if (actuals[i].length() > 2 * splitSize) {
|
||||||
|
hasLargeRecord = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue("Invalid test data. Doesn't have a large enough record",
|
||||||
|
hasLargeRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRecordSpanningMultipleSplits()
|
||||||
|
throws IOException {
|
||||||
|
checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt",
|
||||||
|
10, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRecordSpanningMultipleSplitsCompressed()
|
||||||
|
throws IOException {
|
||||||
|
// The file is generated with bz2 block size of 100k. The split size
|
||||||
|
// needs to be larger than that for the CompressedSplitLineReader to
|
||||||
|
// work.
|
||||||
|
checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2",
|
||||||
|
200 * 1000, true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,9 +23,12 @@ import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
@ -101,4 +104,93 @@ public class TestLineRecordReader {
|
||||||
// character is a linefeed
|
// character is a linefeed
|
||||||
testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
|
testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Use the LineRecordReader to read records from the file
|
||||||
|
public ArrayList<String> readRecords(URL testFileUrl, int splitSize)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
// Set up context
|
||||||
|
File testFile = new File(testFileUrl.getFile());
|
||||||
|
long testFileSize = testFile.length();
|
||||||
|
Path testFilePath = new Path(testFile.getAbsolutePath());
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt("io.file.buffer.size", 1);
|
||||||
|
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
|
||||||
|
|
||||||
|
// Gather the records returned by the record reader
|
||||||
|
ArrayList<String> records = new ArrayList<String>();
|
||||||
|
|
||||||
|
long offset = 0;
|
||||||
|
while (offset < testFileSize) {
|
||||||
|
FileSplit split = new FileSplit(testFilePath, offset, splitSize, null);
|
||||||
|
LineRecordReader reader = new LineRecordReader();
|
||||||
|
reader.initialize(split, context);
|
||||||
|
|
||||||
|
while (reader.nextKeyValue()) {
|
||||||
|
records.add(reader.getCurrentValue().toString());
|
||||||
|
}
|
||||||
|
offset += splitSize;
|
||||||
|
}
|
||||||
|
return records;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gather the records by just splitting on new lines
|
||||||
|
public String[] readRecordsDirectly(URL testFileUrl, boolean bzip)
|
||||||
|
throws IOException {
|
||||||
|
int MAX_DATA_SIZE = 1024 * 1024;
|
||||||
|
byte[] data = new byte[MAX_DATA_SIZE];
|
||||||
|
FileInputStream fis = new FileInputStream(testFileUrl.getFile());
|
||||||
|
int count;
|
||||||
|
if (bzip) {
|
||||||
|
BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis);
|
||||||
|
count = bzIn.read(data);
|
||||||
|
bzIn.close();
|
||||||
|
} else {
|
||||||
|
count = fis.read(data);
|
||||||
|
}
|
||||||
|
fis.close();
|
||||||
|
assertTrue("Test file data too big for buffer", count < data.length);
|
||||||
|
return new String(data, 0, count, "UTF-8").split("\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void checkRecordSpanningMultipleSplits(String testFile,
|
||||||
|
int splitSize,
|
||||||
|
boolean bzip)
|
||||||
|
throws IOException {
|
||||||
|
URL testFileUrl = getClass().getClassLoader().getResource(testFile);
|
||||||
|
ArrayList<String> records = readRecords(testFileUrl, splitSize);
|
||||||
|
String[] actuals = readRecordsDirectly(testFileUrl, bzip);
|
||||||
|
|
||||||
|
assertEquals("Wrong number of records", actuals.length, records.size());
|
||||||
|
|
||||||
|
boolean hasLargeRecord = false;
|
||||||
|
for (int i = 0; i < actuals.length; ++i) {
|
||||||
|
assertEquals(actuals[i], records.get(i));
|
||||||
|
if (actuals[i].length() > 2 * splitSize) {
|
||||||
|
hasLargeRecord = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue("Invalid test data. Doesn't have a large enough record",
|
||||||
|
hasLargeRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRecordSpanningMultipleSplits()
|
||||||
|
throws IOException {
|
||||||
|
checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt",
|
||||||
|
10,
|
||||||
|
false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRecordSpanningMultipleSplitsCompressed()
|
||||||
|
throws IOException {
|
||||||
|
// The file is generated with bz2 block size of 100k. The split size
|
||||||
|
// needs to be larger than that for the CompressedSplitLineReader to
|
||||||
|
// work.
|
||||||
|
checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2",
|
||||||
|
200 * 1000,
|
||||||
|
true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
Use with small split size,
|
||||||
|
like 32.
|
||||||
|
And then we give it a really really long line, which will surely span multiple splits,
|
||||||
|
to see how it handles.
|
Binary file not shown.
Loading…
Reference in New Issue