MAPREDUCE-6558. multibyte delimiters with compressed input files generate duplicate records. Contributed by Wilfred Spiegelenburg

(cherry picked from commit 9227dfc25f)
This commit is contained in:
Jason Lowe 2016-05-13 14:33:48 +00:00
parent a84850b4ca
commit ff8caccc87
4 changed files with 63 additions and 0 deletions

View File

@ -165,4 +165,9 @@ public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
public boolean needAdditionalRecordAfterSplit() { public boolean needAdditionalRecordAfterSplit() {
return !finished && needAdditionalRecord; return !finished && needAdditionalRecord;
} }
@Override
protected void unsetNeedAdditionalRecordAfterSplit() {
needAdditionalRecord = false;
}
} }

View File

@ -654,4 +654,33 @@ public void testUncompressedInputDefaultDelimiterPosValue()
assertFalse(reader.next(key, value)); assertFalse(reader.next(key, value));
assertEquals(12, reader.getPos()); assertEquals(12, reader.getPos());
} }
@Test
public void testBzipWithMultibyteDelimiter() throws IOException {
String testFileName = "compressedMultibyteDelimiter.txt.bz2";
// firstSplitLength < (headers + blockMarker) will pass always since no
// records will be read (in the test file that is byte 0..9)
// firstSplitlength > (compressed file length - one compressed block
// size + 1) will also always pass since the second split will be empty
// (833 bytes is the last block start in the used data file)
int firstSplitLength = 100;
URL testFileUrl = getClass().getClassLoader().getResource(testFileName);
assertNotNull("Cannot find " + testFileName, testFileUrl);
File testFile = new File(testFileUrl.getFile());
long testFileSize = testFile.length();
Path testFilePath = new Path(testFile.getAbsolutePath());
assertTrue("Split size is smaller than header length",
firstSplitLength > 9);
assertTrue("Split size is larger than compressed file size " +
testFilePath, testFileSize > firstSplitLength);
Configuration conf = new Configuration();
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
String delimiter = "<E-LINE>\r\r\n";
conf.set("textinputformat.record.delimiter", delimiter);
testSplitRecordsForFile(conf, firstSplitLength, testFileSize,
testFilePath);
}
} }

View File

@ -617,4 +617,33 @@ public void testUncompressedInputDefaultDelimiterPosValue()
// Key should be 12 right after "123456789\r\r\n" // Key should be 12 right after "123456789\r\r\n"
assertEquals(12, key.get()); assertEquals(12, key.get());
} }
@Test
public void testBzipWithMultibyteDelimiter() throws IOException {
String testFileName = "compressedMultibyteDelimiter.txt.bz2";
// firstSplitLength < (headers + blockMarker) will pass always since no
// records will be read (in the test file that is byte 0..9)
// firstSplitlength > (compressed file length - one compressed block
// size + 1) will also always pass since the second split will be empty
// (833 bytes is the last block start in the used data file)
int firstSplitLength = 100;
URL testFileUrl = getClass().getClassLoader().getResource(testFileName);
assertNotNull("Cannot find " + testFileName, testFileUrl);
File testFile = new File(testFileUrl.getFile());
long testFileSize = testFile.length();
Path testFilePath = new Path(testFile.getAbsolutePath());
assertTrue("Split size is smaller than header length",
firstSplitLength > 9);
assertTrue("Split size is larger than compressed file size " +
testFilePath, testFileSize > firstSplitLength);
Configuration conf = new Configuration();
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
String delimiter = "<E-LINE>\r\r\n";
conf.set("textinputformat.record.delimiter", delimiter);
testSplitRecordsForFile(conf, firstSplitLength, testFileSize,
testFilePath);
}
} }