MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause duplicate records (wilfreds via rkanter)
(cherry picked from commit 7fd00b3db4
)
Conflicts:
hadoop-mapreduce-project/CHANGES.txt
This commit is contained in:
parent
0c48c17ae9
commit
e1b6a5413f
|
@ -333,6 +333,10 @@ public class LineReader implements Closeable {
|
|||
//appending the ambiguous characters (refer case 2.2)
|
||||
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
|
||||
ambiguousByteCount = 0;
|
||||
// since it is now certain that the split did not split a delimiter we
|
||||
// should not read the next record: clear the flag otherwise duplicate
|
||||
// records could be generated
|
||||
unsetNeedAdditionalRecordAfterSplit();
|
||||
}
|
||||
if (appendLength > 0) {
|
||||
str.append(buffer, startPosn, appendLength);
|
||||
|
@ -380,4 +384,9 @@ public class LineReader implements Closeable {
|
|||
protected int getBufferSize() {
|
||||
return bufferSize;
|
||||
}
|
||||
|
||||
protected void unsetNeedAdditionalRecordAfterSplit() {
|
||||
// needed for custom multi byte line delimiters only
|
||||
// see MAPREDUCE-6549 for details
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,9 @@ Release 2.7.3 - UNRELEASED
|
|||
MAPREDUCE-5883. "Total megabyte-seconds" in job counters is slightly
|
||||
misleading (Nathan Roberts via jlowe)
|
||||
|
||||
MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause
|
||||
duplicate records (wilfreds via rkanter)
|
||||
|
||||
Release 2.7.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -97,4 +97,9 @@ public class UncompressedSplitLineReader extends SplitLineReader {
|
|||
public boolean needAdditionalRecordAfterSplit() {
|
||||
return !finished && needAdditionalRecord;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void unsetNeedAdditionalRecordAfterSplit() {
|
||||
needAdditionalRecord = false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -327,12 +327,72 @@ public class TestLineRecordReader {
|
|||
@Test
|
||||
public void testUncompressedInput() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
String inputData = "abc+++def+++ghi+++"
|
||||
+ "jkl+++mno+++pqr+++stu+++vw +++xyz";
|
||||
// single char delimiter, best case
|
||||
String inputData = "abc+def+ghi+jkl+mno+pqr+stu+vw +xyz";
|
||||
Path inputFile = createInputFile(conf, inputData);
|
||||
conf.set("textinputformat.record.delimiter", "+++");
|
||||
for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.set("textinputformat.record.delimiter", "+");
|
||||
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
||||
}
|
||||
}
|
||||
// multi char delimiter, best case
|
||||
inputData = "abc|+|def|+|ghi|+|jkl|+|mno|+|pqr|+|stu|+|vw |+|xyz";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
conf.set("textinputformat.record.delimiter", "|+|");
|
||||
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
||||
}
|
||||
}
|
||||
// single char delimiter with empty records
|
||||
inputData = "abc+def++ghi+jkl++mno+pqr++stu+vw ++xyz";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
conf.set("textinputformat.record.delimiter", "+");
|
||||
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
||||
}
|
||||
}
|
||||
// multi char delimiter with empty records
|
||||
inputData = "abc|+||+|defghi|+|jkl|+||+|mno|+|pqr|+||+|stu|+|vw |+||+|xyz";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
conf.set("textinputformat.record.delimiter", "|+|");
|
||||
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
||||
}
|
||||
}
|
||||
// multi char delimiter with starting part of the delimiter in the data
|
||||
inputData = "abc+def+-ghi+jkl+-mno+pqr+-stu+vw +-xyz";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
conf.set("textinputformat.record.delimiter", "+-");
|
||||
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
||||
}
|
||||
}
|
||||
// multi char delimiter with newline as start of the delimiter
|
||||
inputData = "abc\n+def\n+ghi\n+jkl\n+mno";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
conf.set("textinputformat.record.delimiter", "\n+");
|
||||
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
||||
}
|
||||
}
|
||||
// multi char delimiter with newline in delimiter and in data
|
||||
inputData = "abc\ndef+\nghi+\njkl\nmno";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
conf.set("textinputformat.record.delimiter", "+\n");
|
||||
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
||||
}
|
||||
|
@ -356,80 +416,126 @@ public class TestLineRecordReader {
|
|||
public void testUncompressedInputCustomDelimiterPosValue()
|
||||
throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
String inputData = "1234567890ab12ab345";
|
||||
Path inputFile = createInputFile(conf, inputData);
|
||||
conf.setInt("io.file.buffer.size", 10);
|
||||
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
|
||||
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
||||
String delimiter = "ab";
|
||||
String inputData = "abcdefghij++kl++mno";
|
||||
Path inputFile = createInputFile(conf, inputData);
|
||||
String delimiter = "++";
|
||||
byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
|
||||
FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
|
||||
// the first split must contain two records to make sure that it also pulls
|
||||
// in the record from the 2nd split
|
||||
int splitLength = 15;
|
||||
FileSplit split = new FileSplit(inputFile, 0, splitLength, (String[]) null);
|
||||
LineRecordReader reader = new LineRecordReader(conf, split,
|
||||
recordDelimiterBytes);
|
||||
LongWritable key = new LongWritable();
|
||||
Text value = new Text();
|
||||
reader.next(key, value);
|
||||
// Get first record:"1234567890"
|
||||
assertEquals(10, value.getLength());
|
||||
// Position should be 12 right after "1234567890ab"
|
||||
assertEquals(12, reader.getPos());
|
||||
reader.next(key, value);
|
||||
// Get second record:"12"
|
||||
assertEquals(2, value.getLength());
|
||||
// Position should be 16 right after "1234567890ab12ab"
|
||||
assertEquals(16, reader.getPos());
|
||||
reader.next(key, value);
|
||||
// Get third record:"345"
|
||||
assertEquals(3, value.getLength());
|
||||
// Position should be 19 right after "1234567890ab12ab345"
|
||||
assertEquals(19, reader.getPos());
|
||||
// Get first record: "abcdefghij"
|
||||
assertTrue("Expected record got nothing", reader.next(key, value));
|
||||
assertEquals("Wrong length for record value", 10, value.getLength());
|
||||
// Position should be 12 right after "abcdefghij++"
|
||||
assertEquals("Wrong position after record read", 12, reader.getPos());
|
||||
// Get second record: "kl"
|
||||
assertTrue("Expected record got nothing", reader.next(key, value));
|
||||
assertEquals("Wrong length for record value", 2, value.getLength());
|
||||
// Position should be 16 right after "abcdefghij++kl++"
|
||||
assertEquals("Wrong position after record read", 16, reader.getPos());
|
||||
// Get third record: "mno"
|
||||
assertTrue("Expected record got nothing", reader.next(key, value));
|
||||
assertEquals("Wrong length for record value", 3, value.getLength());
|
||||
// Position should be 19 right after "abcdefghij++kl++mno"
|
||||
assertEquals("Wrong position after record read", 19, reader.getPos());
|
||||
assertFalse(reader.next(key, value));
|
||||
assertEquals(19, reader.getPos());
|
||||
|
||||
split = new FileSplit(inputFile, 15, 4, (String[])null);
|
||||
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
|
||||
// No record is in the second split because the second split dropped
|
||||
assertEquals("Wrong position after record read", 19, reader.getPos());
|
||||
reader.close();
|
||||
// No record is in the second split because the second split will drop
|
||||
// the first record, which was already reported by the first split.
|
||||
// The position should be 19 right after "1234567890ab12ab345"
|
||||
assertEquals(19, reader.getPos());
|
||||
assertFalse(reader.next(key, value));
|
||||
assertEquals(19, reader.getPos());
|
||||
|
||||
inputData = "123456789aab";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
split = new FileSplit(inputFile, 0, 12, (String[])null);
|
||||
split = new FileSplit(inputFile, splitLength,
|
||||
inputData.length() - splitLength, (String[]) null);
|
||||
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
|
||||
reader.next(key, value);
|
||||
// Get first record:"123456789a"
|
||||
assertEquals(10, value.getLength());
|
||||
// Position should be 12 right after "123456789aab"
|
||||
assertEquals(12, reader.getPos());
|
||||
assertFalse(reader.next(key, value));
|
||||
assertEquals(12, reader.getPos());
|
||||
// The position should be 19 right after "abcdefghij++kl++mno" and should
|
||||
// not change
|
||||
assertEquals("Wrong position after record read", 19, reader.getPos());
|
||||
assertFalse("Unexpected record returned", reader.next(key, value));
|
||||
assertEquals("Wrong position after record read", 19, reader.getPos());
|
||||
reader.close();
|
||||
|
||||
inputData = "123456789a";
|
||||
// multi char delimiter with starting part of the delimiter in the data
|
||||
inputData = "abcd+efgh++ijk++mno";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
split = new FileSplit(inputFile, 0, 10, (String[])null);
|
||||
splitLength = 5;
|
||||
split = new FileSplit(inputFile, 0, splitLength, (String[]) null);
|
||||
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
|
||||
reader.next(key, value);
|
||||
// Get first record:"123456789a"
|
||||
assertEquals(10, value.getLength());
|
||||
// Position should be 10 right after "123456789a"
|
||||
assertEquals(10, reader.getPos());
|
||||
// Get first record: "abcd+efgh"
|
||||
assertTrue("Expected record got nothing", reader.next(key, value));
|
||||
assertEquals("Wrong position after record read", 11, reader.getPos());
|
||||
assertEquals("Wrong length for record value", 9, value.getLength());
|
||||
// should have jumped over the delimiter, no record
|
||||
assertFalse("Unexpected record returned", reader.next(key, value));
|
||||
assertEquals("Wrong position after record read", 11, reader.getPos());
|
||||
reader.close();
|
||||
// next split: check for duplicate or dropped records
|
||||
split = new FileSplit(inputFile, splitLength,
|
||||
inputData.length() - splitLength, (String[]) null);
|
||||
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
|
||||
// Get second record: "ijk" first in this split
|
||||
assertTrue("Expected record got nothing", reader.next(key, value));
|
||||
assertEquals("Wrong position after record read", 16, reader.getPos());
|
||||
assertEquals("Wrong length for record value", 3, value.getLength());
|
||||
// Get third record: "mno" second in this split
|
||||
assertTrue("Expected record got nothing", reader.next(key, value));
|
||||
assertEquals("Wrong position after record read", 19, reader.getPos());
|
||||
assertEquals("Wrong length for record value", 3, value.getLength());
|
||||
// should be at the end of the input
|
||||
assertFalse(reader.next(key, value));
|
||||
assertEquals(10, reader.getPos());
|
||||
assertEquals("Wrong position after record read", 19, reader.getPos());
|
||||
reader.close();
|
||||
|
||||
inputData = "123456789ab";
|
||||
inputData = "abcd|efgh|+|ij|kl|+|mno|pqr";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
split = new FileSplit(inputFile, 0, 11, (String[])null);
|
||||
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
|
||||
reader.next(key, value);
|
||||
// Get first record:"123456789"
|
||||
assertEquals(9, value.getLength());
|
||||
// Position should be 11 right after "123456789ab"
|
||||
assertEquals(11, reader.getPos());
|
||||
assertFalse(reader.next(key, value));
|
||||
assertEquals(11, reader.getPos());
|
||||
delimiter = "|+|";
|
||||
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
|
||||
// walking over the buffer and split sizes checks for proper processing
|
||||
// of the ambiguous bytes of the delimiter
|
||||
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
split = new FileSplit(inputFile, 0, bufferSize, (String[]) null);
|
||||
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
|
||||
// Get first record: "abcd|efgh" always possible
|
||||
assertTrue("Expected record got nothing", reader.next(key, value));
|
||||
assertTrue("abcd|efgh".equals(value.toString()));
|
||||
assertEquals("Wrong position after record read", 9, value.getLength());
|
||||
// Position should be 12 right after "|+|"
|
||||
int recordPos = 12;
|
||||
assertEquals("Wrong position after record read", recordPos,
|
||||
reader.getPos());
|
||||
// get the next record: "ij|kl" if the split/buffer allows it
|
||||
if (reader.next(key, value)) {
|
||||
// check the record info: "ij|kl"
|
||||
assertTrue("ij|kl".equals(value.toString()));
|
||||
// Position should be 20 right after "|+|"
|
||||
recordPos = 20;
|
||||
assertEquals("Wrong position after record read", recordPos,
|
||||
reader.getPos());
|
||||
}
|
||||
// get the third record: "mno|pqr" if the split/buffer allows it
|
||||
if (reader.next(key, value)) {
|
||||
// check the record info: "mno|pqr"
|
||||
assertTrue("mno|pqr".equals(value.toString()));
|
||||
// Position should be 27 at the end of the string now
|
||||
recordPos = inputData.length();
|
||||
assertEquals("Wrong position after record read", recordPos,
|
||||
reader.getPos());
|
||||
}
|
||||
// no more records can be read we should still be at the last position
|
||||
assertFalse("Unexpected record returned", reader.next(key, value));
|
||||
assertEquals("Wrong position after record read", recordPos,
|
||||
reader.getPos());
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.lib.input;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -313,16 +314,76 @@ public class TestLineRecordReader {
|
|||
@Test
|
||||
public void testUncompressedInput() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
String inputData = "abc+++def+++ghi+++"
|
||||
+ "jkl+++mno+++pqr+++stu+++vw +++xyz";
|
||||
// single char delimiter, best case
|
||||
String inputData = "abc+def+ghi+jkl+mno+pqr+stu+vw +xyz";
|
||||
Path inputFile = createInputFile(conf, inputData);
|
||||
conf.set("textinputformat.record.delimiter", "+++");
|
||||
conf.set("textinputformat.record.delimiter", "+");
|
||||
for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
||||
}
|
||||
}
|
||||
// multi char delimiter, best case
|
||||
inputData = "abc|+|def|+|ghi|+|jkl|+|mno|+|pqr|+|stu|+|vw |+|xyz";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
conf.set("textinputformat.record.delimiter", "|+|");
|
||||
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
||||
}
|
||||
}
|
||||
// single char delimiter with empty records
|
||||
inputData = "abc+def++ghi+jkl++mno+pqr++stu+vw ++xyz";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
conf.set("textinputformat.record.delimiter", "+");
|
||||
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
||||
}
|
||||
}
|
||||
// multi char delimiter with empty records
|
||||
inputData = "abc|+||+|defghi|+|jkl|+||+|mno|+|pqr|+||+|stu|+|vw |+||+|xyz";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
conf.set("textinputformat.record.delimiter", "|+|");
|
||||
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
||||
}
|
||||
}
|
||||
// multi char delimiter with starting part of the delimiter in the data
|
||||
inputData = "abc+def+-ghi+jkl+-mno+pqr+-stu+vw +-xyz";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
conf.set("textinputformat.record.delimiter", "+-");
|
||||
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
||||
}
|
||||
}
|
||||
// multi char delimiter with newline as start of the delimiter
|
||||
inputData = "abc\n+def\n+ghi\n+jkl\n+mno";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
conf.set("textinputformat.record.delimiter", "\n+");
|
||||
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
||||
}
|
||||
}
|
||||
// multi char delimiter with newline in delimiter and in data
|
||||
inputData = "abc\ndef+\nghi+\njkl\nmno";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
conf.set("textinputformat.record.delimiter", "+\n");
|
||||
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -342,91 +403,145 @@ public class TestLineRecordReader {
|
|||
public void testUncompressedInputCustomDelimiterPosValue()
|
||||
throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
String inputData = "1234567890ab12ab345";
|
||||
Path inputFile = createInputFile(conf, inputData);
|
||||
conf.setInt("io.file.buffer.size", 10);
|
||||
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
|
||||
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
||||
String delimiter = "ab";
|
||||
String inputData = "abcdefghij++kl++mno";
|
||||
Path inputFile = createInputFile(conf, inputData);
|
||||
String delimiter = "++";
|
||||
byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
|
||||
FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
|
||||
int splitLength = 15;
|
||||
FileSplit split = new FileSplit(inputFile, 0, splitLength, (String[])null);
|
||||
TaskAttemptContext context = new TaskAttemptContextImpl(conf,
|
||||
new TaskAttemptID());
|
||||
LineRecordReader reader = new LineRecordReader(recordDelimiterBytes);
|
||||
reader.initialize(split, context);
|
||||
LongWritable key;
|
||||
Text value;
|
||||
reader.nextKeyValue();
|
||||
key = reader.getCurrentKey();
|
||||
value = reader.getCurrentValue();
|
||||
// Get first record:"1234567890"
|
||||
assertEquals(10, value.getLength());
|
||||
assertEquals(0, key.get());
|
||||
reader.nextKeyValue();
|
||||
// Get second record:"12"
|
||||
assertEquals(2, value.getLength());
|
||||
// Key should be 12 right after "1234567890ab"
|
||||
assertEquals(12, key.get());
|
||||
reader.nextKeyValue();
|
||||
// Get third record:"345"
|
||||
assertEquals(3, value.getLength());
|
||||
// Key should be 16 right after "1234567890ab12ab"
|
||||
assertEquals(16, key.get());
|
||||
// Get first record: "abcdefghij"
|
||||
assertTrue("Expected record got nothing", reader.nextKeyValue());
|
||||
LongWritable key = reader.getCurrentKey();
|
||||
Text value = reader.getCurrentValue();
|
||||
assertEquals("Wrong length for record value", 10, value.getLength());
|
||||
assertEquals("Wrong position after record read", 0, key.get());
|
||||
// Get second record: "kl"
|
||||
assertTrue("Expected record got nothing", reader.nextKeyValue());
|
||||
assertEquals("Wrong length for record value", 2, value.getLength());
|
||||
// Key should be 12 right after "abcdefghij++"
|
||||
assertEquals("Wrong position after record read", 12, key.get());
|
||||
// Get third record: "mno"
|
||||
assertTrue("Expected record got nothing", reader.nextKeyValue());
|
||||
assertEquals("Wrong length for record value", 3, value.getLength());
|
||||
// Key should be 16 right after "abcdefghij++kl++"
|
||||
assertEquals("Wrong position after record read", 16, key.get());
|
||||
assertFalse(reader.nextKeyValue());
|
||||
// Key should be 19 right after "1234567890ab12ab345"
|
||||
assertEquals(19, key.get());
|
||||
|
||||
split = new FileSplit(inputFile, 15, 4, (String[])null);
|
||||
// Key should be 19 right after "abcdefghij++kl++mno"
|
||||
assertEquals("Wrong position after record read", 19, key.get());
|
||||
// after refresh should be empty
|
||||
key = reader.getCurrentKey();
|
||||
assertNull("Unexpected key returned", key);
|
||||
reader.close();
|
||||
split = new FileSplit(inputFile, splitLength,
|
||||
inputData.length() - splitLength, (String[])null);
|
||||
reader = new LineRecordReader(recordDelimiterBytes);
|
||||
reader.initialize(split, context);
|
||||
// No record is in the second split because the second split dropped
|
||||
// the first record, which was already reported by the first split.
|
||||
assertFalse(reader.nextKeyValue());
|
||||
assertFalse("Unexpected record returned", reader.nextKeyValue());
|
||||
key = reader.getCurrentKey();
|
||||
assertNull("Unexpected key returned", key);
|
||||
reader.close();
|
||||
|
||||
inputData = "123456789aab";
|
||||
// multi char delimiter with starting part of the delimiter in the data
|
||||
inputData = "abcd+efgh++ijk++mno";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
split = new FileSplit(inputFile, 0, 12, (String[])null);
|
||||
splitLength = 5;
|
||||
split = new FileSplit(inputFile, 0, splitLength, (String[])null);
|
||||
reader = new LineRecordReader(recordDelimiterBytes);
|
||||
reader.initialize(split, context);
|
||||
reader.nextKeyValue();
|
||||
// Get first record: "abcd+efgh"
|
||||
assertTrue("Expected record got nothing", reader.nextKeyValue());
|
||||
key = reader.getCurrentKey();
|
||||
value = reader.getCurrentValue();
|
||||
// Get first record:"123456789a"
|
||||
assertEquals(10, value.getLength());
|
||||
assertEquals(0, key.get());
|
||||
assertEquals("Wrong position after record read", 0, key.get());
|
||||
assertEquals("Wrong length for record value", 9, value.getLength());
|
||||
// should have jumped over the delimiter, no record
|
||||
assertFalse(reader.nextKeyValue());
|
||||
// Key should be 12 right after "123456789aab"
|
||||
assertEquals(12, key.get());
|
||||
|
||||
inputData = "123456789a";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
split = new FileSplit(inputFile, 0, 10, (String[])null);
|
||||
assertEquals("Wrong position after record read", 11, key.get());
|
||||
// after refresh should be empty
|
||||
key = reader.getCurrentKey();
|
||||
assertNull("Unexpected key returned", key);
|
||||
reader.close();
|
||||
// next split: check for duplicate or dropped records
|
||||
split = new FileSplit(inputFile, splitLength,
|
||||
inputData.length () - splitLength, (String[])null);
|
||||
reader = new LineRecordReader(recordDelimiterBytes);
|
||||
reader.initialize(split, context);
|
||||
reader.nextKeyValue();
|
||||
assertTrue("Expected record got nothing", reader.nextKeyValue());
|
||||
key = reader.getCurrentKey();
|
||||
value = reader.getCurrentValue();
|
||||
// Get first record:"123456789a"
|
||||
assertEquals(10, value.getLength());
|
||||
assertEquals(0, key.get());
|
||||
// Get second record: "ijk" first in this split
|
||||
assertEquals("Wrong position after record read", 11, key.get());
|
||||
assertEquals("Wrong length for record value", 3, value.getLength());
|
||||
// Get third record: "mno" second in this split
|
||||
assertTrue("Expected record got nothing", reader.nextKeyValue());
|
||||
assertEquals("Wrong position after record read", 16, key.get());
|
||||
assertEquals("Wrong length for record value", 3, value.getLength());
|
||||
// should be at the end of the input
|
||||
assertFalse(reader.nextKeyValue());
|
||||
// Key should be 10 right after "123456789a"
|
||||
assertEquals(10, key.get());
|
||||
assertEquals("Wrong position after record read", 19, key.get());
|
||||
reader.close();
|
||||
|
||||
inputData = "123456789ab";
|
||||
inputData = "abcd|efgh|+|ij|kl|+|mno|pqr";
|
||||
inputFile = createInputFile(conf, inputData);
|
||||
split = new FileSplit(inputFile, 0, 11, (String[])null);
|
||||
reader = new LineRecordReader(recordDelimiterBytes);
|
||||
reader.initialize(split, context);
|
||||
reader.nextKeyValue();
|
||||
key = reader.getCurrentKey();
|
||||
value = reader.getCurrentValue();
|
||||
// Get first record:"123456789"
|
||||
assertEquals(9, value.getLength());
|
||||
assertEquals(0, key.get());
|
||||
assertFalse(reader.nextKeyValue());
|
||||
// Key should be 11 right after "123456789ab"
|
||||
assertEquals(11, key.get());
|
||||
delimiter = "|+|";
|
||||
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
|
||||
// walking over the buffer and split sizes checks for proper processing
|
||||
// of the ambiguous bytes of the delimiter
|
||||
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
|
||||
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
|
||||
// track where we are in the inputdata
|
||||
int keyPosition = 0;
|
||||
conf.setInt("io.file.buffer.size", bufferSize);
|
||||
split = new FileSplit(inputFile, 0, bufferSize, (String[]) null);
|
||||
reader = new LineRecordReader(recordDelimiterBytes);
|
||||
reader.initialize(split, context);
|
||||
// Get the first record: "abcd|efgh" always possible
|
||||
assertTrue("Expected record got nothing", reader.nextKeyValue());
|
||||
key = reader.getCurrentKey();
|
||||
value = reader.getCurrentValue();
|
||||
assertTrue("abcd|efgh".equals(value.toString()));
|
||||
// Position should be 0 right at the start
|
||||
assertEquals("Wrong position after record read", keyPosition,
|
||||
key.get());
|
||||
// Position should be 12 right after the first "|+|"
|
||||
keyPosition = 12;
|
||||
// get the next record: "ij|kl" if the split/buffer allows it
|
||||
if (reader.nextKeyValue()) {
|
||||
// check the record info: "ij|kl"
|
||||
assertTrue("ij|kl".equals(value.toString()));
|
||||
assertEquals("Wrong position after record read", keyPosition,
|
||||
key.get());
|
||||
// Position should be 20 after the second "|+|"
|
||||
keyPosition = 20;
|
||||
}
|
||||
// get the third record: "mno|pqr" if the split/buffer allows it
|
||||
if (reader.nextKeyValue()) {
|
||||
// check the record info: "mno|pqr"
|
||||
assertTrue("mno|pqr".equals(value.toString()));
|
||||
assertEquals("Wrong position after record read", keyPosition,
|
||||
key.get());
|
||||
// Position should be the end of the input
|
||||
keyPosition = inputData.length();
|
||||
}
|
||||
assertFalse("Unexpected record returned", reader.nextKeyValue());
|
||||
// no more records can be read we should be at the last position
|
||||
assertEquals("Wrong position after record read", keyPosition,
|
||||
key.get());
|
||||
// after refresh should be empty
|
||||
key = reader.getCurrentKey();
|
||||
assertNull("Unexpected key returned", key);
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue