MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause duplicate records (wilfreds via rkanter)

(cherry picked from commit 7fd00b3db4)

Conflicts:

	hadoop-mapreduce-project/CHANGES.txt
This commit is contained in:
Jason Lowe 2015-11-30 17:51:37 +00:00
parent f61e3320cb
commit 53dccddfdb
5 changed files with 361 additions and 123 deletions

View File

@ -333,6 +333,10 @@ public class LineReader implements Closeable {
//appending the ambiguous characters (refer case 2.2)
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
ambiguousByteCount = 0;
// since it is now certain that the split did not split a delimiter we
// should not read the next record: clear the flag otherwise duplicate
// records could be generated
unsetNeedAdditionalRecordAfterSplit();
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
@ -380,4 +384,9 @@ public class LineReader implements Closeable {
protected int getBufferSize() {
return bufferSize;
}
protected void unsetNeedAdditionalRecordAfterSplit() {
// needed for custom multi byte line delimiters only
// see MAPREDUCE-6549 for details
}
}

View File

@ -35,6 +35,9 @@ Release 2.6.3 - UNRELEASED
MAPREDUCE-5883. "Total megabyte-seconds" in job counters is slightly
misleading (Nathan Roberts via jlowe)
MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause
duplicate records (wilfreds via rkanter)
Release 2.6.2 - 2015-10-28
INCOMPATIBLE CHANGES

View File

@ -97,4 +97,9 @@ public class UncompressedSplitLineReader extends SplitLineReader {
public boolean needAdditionalRecordAfterSplit() {
return !finished && needAdditionalRecord;
}
@Override
protected void unsetNeedAdditionalRecordAfterSplit() {
needAdditionalRecord = false;
}
}

View File

@ -269,12 +269,72 @@ public class TestLineRecordReader {
@Test
public void testUncompressedInput() throws Exception {
Configuration conf = new Configuration();
String inputData = "abc+++def+++ghi+++"
+ "jkl+++mno+++pqr+++stu+++vw +++xyz";
// single char delimiter, best case
String inputData = "abc+def+ghi+jkl+mno+pqr+stu+vw +xyz";
Path inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+++");
for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.set("textinputformat.record.delimiter", "+");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter, best case
inputData = "abc|+|def|+|ghi|+|jkl|+|mno|+|pqr|+|stu|+|vw |+|xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "|+|");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// single char delimiter with empty records
inputData = "abc+def++ghi+jkl++mno+pqr++stu+vw ++xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with empty records
inputData = "abc|+||+|defghi|+|jkl|+||+|mno|+|pqr|+||+|stu|+|vw |+||+|xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "|+|");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with starting part of the delimiter in the data
inputData = "abc+def+-ghi+jkl+-mno+pqr+-stu+vw +-xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+-");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with newline as start of the delimiter
inputData = "abc\n+def\n+ghi\n+jkl\n+mno";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "\n+");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with newline in delimiter and in data
inputData = "abc\ndef+\nghi+\njkl\nmno";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+\n");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
@ -298,80 +358,126 @@ public class TestLineRecordReader {
public void testUncompressedInputCustomDelimiterPosValue()
throws Exception {
Configuration conf = new Configuration();
String inputData = "1234567890ab12ab345";
Path inputFile = createInputFile(conf, inputData);
conf.setInt("io.file.buffer.size", 10);
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
String delimiter = "ab";
String inputData = "abcdefghij++kl++mno";
Path inputFile = createInputFile(conf, inputData);
String delimiter = "++";
byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
// the first split must contain two records to make sure that it also pulls
// in the record from the 2nd split
int splitLength = 15;
FileSplit split = new FileSplit(inputFile, 0, splitLength, (String[]) null);
LineRecordReader reader = new LineRecordReader(conf, split,
recordDelimiterBytes);
LongWritable key = new LongWritable();
Text value = new Text();
reader.next(key, value);
// Get first record:"1234567890"
assertEquals(10, value.getLength());
// Position should be 12 right after "1234567890ab"
assertEquals(12, reader.getPos());
reader.next(key, value);
// Get second record:"12"
assertEquals(2, value.getLength());
// Position should be 16 right after "1234567890ab12ab"
assertEquals(16, reader.getPos());
reader.next(key, value);
// Get third record:"345"
assertEquals(3, value.getLength());
// Position should be 19 right after "1234567890ab12ab345"
assertEquals(19, reader.getPos());
// Get first record: "abcdefghij"
assertTrue("Expected record got nothing", reader.next(key, value));
assertEquals("Wrong length for record value", 10, value.getLength());
// Position should be 12 right after "abcdefghij++"
assertEquals("Wrong position after record read", 12, reader.getPos());
// Get second record: "kl"
assertTrue("Expected record got nothing", reader.next(key, value));
assertEquals("Wrong length for record value", 2, value.getLength());
// Position should be 16 right after "abcdefghij++kl++"
assertEquals("Wrong position after record read", 16, reader.getPos());
// Get third record: "mno"
assertTrue("Expected record got nothing", reader.next(key, value));
assertEquals("Wrong length for record value", 3, value.getLength());
// Position should be 19 right after "abcdefghij++kl++mno"
assertEquals("Wrong position after record read", 19, reader.getPos());
assertFalse(reader.next(key, value));
assertEquals(19, reader.getPos());
split = new FileSplit(inputFile, 15, 4, (String[])null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
// No record is in the second split because the second split dropped
assertEquals("Wrong position after record read", 19, reader.getPos());
reader.close();
// No record is in the second split because the second split will drop
// the first record, which was already reported by the first split.
// The position should be 19 right after "1234567890ab12ab345"
assertEquals(19, reader.getPos());
assertFalse(reader.next(key, value));
assertEquals(19, reader.getPos());
inputData = "123456789aab";
inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 12, (String[])null);
split = new FileSplit(inputFile, splitLength,
inputData.length() - splitLength, (String[]) null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
reader.next(key, value);
// Get first record:"123456789a"
assertEquals(10, value.getLength());
// Position should be 12 right after "123456789aab"
assertEquals(12, reader.getPos());
assertFalse(reader.next(key, value));
assertEquals(12, reader.getPos());
// The position should be 19 right after "abcdefghij++kl++mno" and should
// not change
assertEquals("Wrong position after record read", 19, reader.getPos());
assertFalse("Unexpected record returned", reader.next(key, value));
assertEquals("Wrong position after record read", 19, reader.getPos());
reader.close();
inputData = "123456789a";
// multi char delimiter with starting part of the delimiter in the data
inputData = "abcd+efgh++ijk++mno";
inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 10, (String[])null);
splitLength = 5;
split = new FileSplit(inputFile, 0, splitLength, (String[]) null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
reader.next(key, value);
// Get first record:"123456789a"
assertEquals(10, value.getLength());
// Position should be 10 right after "123456789a"
assertEquals(10, reader.getPos());
// Get first record: "abcd+efgh"
assertTrue("Expected record got nothing", reader.next(key, value));
assertEquals("Wrong position after record read", 11, reader.getPos());
assertEquals("Wrong length for record value", 9, value.getLength());
// should have jumped over the delimiter, no record
assertFalse("Unexpected record returned", reader.next(key, value));
assertEquals("Wrong position after record read", 11, reader.getPos());
reader.close();
// next split: check for duplicate or dropped records
split = new FileSplit(inputFile, splitLength,
inputData.length() - splitLength, (String[]) null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
// Get second record: "ijk" first in this split
assertTrue("Expected record got nothing", reader.next(key, value));
assertEquals("Wrong position after record read", 16, reader.getPos());
assertEquals("Wrong length for record value", 3, value.getLength());
// Get third record: "mno" second in this split
assertTrue("Expected record got nothing", reader.next(key, value));
assertEquals("Wrong position after record read", 19, reader.getPos());
assertEquals("Wrong length for record value", 3, value.getLength());
// should be at the end of the input
assertFalse(reader.next(key, value));
assertEquals(10, reader.getPos());
assertEquals("Wrong position after record read", 19, reader.getPos());
reader.close();
inputData = "123456789ab";
inputData = "abcd|efgh|+|ij|kl|+|mno|pqr";
inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 11, (String[])null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
reader.next(key, value);
// Get first record:"123456789"
assertEquals(9, value.getLength());
// Position should be 11 right after "123456789ab"
assertEquals(11, reader.getPos());
assertFalse(reader.next(key, value));
assertEquals(11, reader.getPos());
delimiter = "|+|";
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
// walking over the buffer and split sizes checks for proper processing
// of the ambiguous bytes of the delimiter
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
split = new FileSplit(inputFile, 0, bufferSize, (String[]) null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
// Get first record: "abcd|efgh" always possible
assertTrue("Expected record got nothing", reader.next(key, value));
assertTrue("abcd|efgh".equals(value.toString()));
assertEquals("Wrong position after record read", 9, value.getLength());
// Position should be 12 right after "|+|"
int recordPos = 12;
assertEquals("Wrong position after record read", recordPos,
reader.getPos());
// get the next record: "ij|kl" if the split/buffer allows it
if (reader.next(key, value)) {
// check the record info: "ij|kl"
assertTrue("ij|kl".equals(value.toString()));
// Position should be 20 right after "|+|"
recordPos = 20;
assertEquals("Wrong position after record read", recordPos,
reader.getPos());
}
// get the third record: "mno|pqr" if the split/buffer allows it
if (reader.next(key, value)) {
// check the record info: "mno|pqr"
assertTrue("mno|pqr".equals(value.toString()));
// Position should be 27 at the end of the string now
recordPos = inputData.length();
assertEquals("Wrong position after record read", recordPos,
reader.getPos());
}
// no more records can be read we should still be at the last position
assertFalse("Unexpected record returned", reader.next(key, value));
assertEquals("Wrong position after record read", recordPos,
reader.getPos());
reader.close();
}
}
}
@Test

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.lib.input;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
@ -275,16 +276,76 @@ public class TestLineRecordReader {
@Test
public void testUncompressedInput() throws Exception {
Configuration conf = new Configuration();
String inputData = "abc+++def+++ghi+++"
+ "jkl+++mno+++pqr+++stu+++vw +++xyz";
// single char delimiter, best case
String inputData = "abc+def+ghi+jkl+mno+pqr+stu+vw +xyz";
Path inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+++");
conf.set("textinputformat.record.delimiter", "+");
for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter, best case
inputData = "abc|+|def|+|ghi|+|jkl|+|mno|+|pqr|+|stu|+|vw |+|xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "|+|");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// single char delimiter with empty records
inputData = "abc+def++ghi+jkl++mno+pqr++stu+vw ++xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with empty records
inputData = "abc|+||+|defghi|+|jkl|+||+|mno|+|pqr|+||+|stu|+|vw |+||+|xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "|+|");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with starting part of the delimiter in the data
inputData = "abc+def+-ghi+jkl+-mno+pqr+-stu+vw +-xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+-");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with newline as start of the delimiter
inputData = "abc\n+def\n+ghi\n+jkl\n+mno";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "\n+");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with newline in delimiter and in data
inputData = "abc\ndef+\nghi+\njkl\nmno";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+\n");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
}
@Test
@ -304,91 +365,145 @@ public class TestLineRecordReader {
public void testUncompressedInputCustomDelimiterPosValue()
throws Exception {
Configuration conf = new Configuration();
String inputData = "1234567890ab12ab345";
Path inputFile = createInputFile(conf, inputData);
conf.setInt("io.file.buffer.size", 10);
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
String delimiter = "ab";
String inputData = "abcdefghij++kl++mno";
Path inputFile = createInputFile(conf, inputData);
String delimiter = "++";
byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
int splitLength = 15;
FileSplit split = new FileSplit(inputFile, 0, splitLength, (String[])null);
TaskAttemptContext context = new TaskAttemptContextImpl(conf,
new TaskAttemptID());
LineRecordReader reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
LongWritable key;
Text value;
reader.nextKeyValue();
key = reader.getCurrentKey();
value = reader.getCurrentValue();
// Get first record:"1234567890"
assertEquals(10, value.getLength());
assertEquals(0, key.get());
reader.nextKeyValue();
// Get second record:"12"
assertEquals(2, value.getLength());
// Key should be 12 right after "1234567890ab"
assertEquals(12, key.get());
reader.nextKeyValue();
// Get third record:"345"
assertEquals(3, value.getLength());
// Key should be 16 right after "1234567890ab12ab"
assertEquals(16, key.get());
// Get first record: "abcdefghij"
assertTrue("Expected record got nothing", reader.nextKeyValue());
LongWritable key = reader.getCurrentKey();
Text value = reader.getCurrentValue();
assertEquals("Wrong length for record value", 10, value.getLength());
assertEquals("Wrong position after record read", 0, key.get());
// Get second record: "kl"
assertTrue("Expected record got nothing", reader.nextKeyValue());
assertEquals("Wrong length for record value", 2, value.getLength());
// Key should be 12 right after "abcdefghij++"
assertEquals("Wrong position after record read", 12, key.get());
// Get third record: "mno"
assertTrue("Expected record got nothing", reader.nextKeyValue());
assertEquals("Wrong length for record value", 3, value.getLength());
// Key should be 16 right after "abcdefghij++kl++"
assertEquals("Wrong position after record read", 16, key.get());
assertFalse(reader.nextKeyValue());
// Key should be 19 right after "1234567890ab12ab345"
assertEquals(19, key.get());
split = new FileSplit(inputFile, 15, 4, (String[])null);
// Key should be 19 right after "abcdefghij++kl++mno"
assertEquals("Wrong position after record read", 19, key.get());
// after refresh should be empty
key = reader.getCurrentKey();
assertNull("Unexpected key returned", key);
reader.close();
split = new FileSplit(inputFile, splitLength,
inputData.length() - splitLength, (String[])null);
reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
// No record is in the second split because the second split dropped
// the first record, which was already reported by the first split.
assertFalse(reader.nextKeyValue());
assertFalse("Unexpected record returned", reader.nextKeyValue());
key = reader.getCurrentKey();
assertNull("Unexpected key returned", key);
reader.close();
inputData = "123456789aab";
// multi char delimiter with starting part of the delimiter in the data
inputData = "abcd+efgh++ijk++mno";
inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 12, (String[])null);
splitLength = 5;
split = new FileSplit(inputFile, 0, splitLength, (String[])null);
reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
reader.nextKeyValue();
// Get first record: "abcd+efgh"
assertTrue("Expected record got nothing", reader.nextKeyValue());
key = reader.getCurrentKey();
value = reader.getCurrentValue();
// Get first record:"123456789a"
assertEquals(10, value.getLength());
assertEquals(0, key.get());
assertEquals("Wrong position after record read", 0, key.get());
assertEquals("Wrong length for record value", 9, value.getLength());
// should have jumped over the delimiter, no record
assertFalse(reader.nextKeyValue());
// Key should be 12 right after "123456789aab"
assertEquals(12, key.get());
inputData = "123456789a";
inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 10, (String[])null);
assertEquals("Wrong position after record read", 11, key.get());
// after refresh should be empty
key = reader.getCurrentKey();
assertNull("Unexpected key returned", key);
reader.close();
// next split: check for duplicate or dropped records
split = new FileSplit(inputFile, splitLength,
inputData.length () - splitLength, (String[])null);
reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
reader.nextKeyValue();
assertTrue("Expected record got nothing", reader.nextKeyValue());
key = reader.getCurrentKey();
value = reader.getCurrentValue();
// Get first record:"123456789a"
assertEquals(10, value.getLength());
assertEquals(0, key.get());
// Get second record: "ijk" first in this split
assertEquals("Wrong position after record read", 11, key.get());
assertEquals("Wrong length for record value", 3, value.getLength());
// Get third record: "mno" second in this split
assertTrue("Expected record got nothing", reader.nextKeyValue());
assertEquals("Wrong position after record read", 16, key.get());
assertEquals("Wrong length for record value", 3, value.getLength());
// should be at the end of the input
assertFalse(reader.nextKeyValue());
// Key should be 10 right after "123456789a"
assertEquals(10, key.get());
assertEquals("Wrong position after record read", 19, key.get());
reader.close();
inputData = "123456789ab";
inputData = "abcd|efgh|+|ij|kl|+|mno|pqr";
inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 11, (String[])null);
reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
reader.nextKeyValue();
key = reader.getCurrentKey();
value = reader.getCurrentValue();
// Get first record:"123456789"
assertEquals(9, value.getLength());
assertEquals(0, key.get());
assertFalse(reader.nextKeyValue());
// Key should be 11 right after "123456789ab"
assertEquals(11, key.get());
delimiter = "|+|";
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
// walking over the buffer and split sizes checks for proper processing
// of the ambiguous bytes of the delimiter
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
// track where we are in the inputdata
int keyPosition = 0;
conf.setInt("io.file.buffer.size", bufferSize);
split = new FileSplit(inputFile, 0, bufferSize, (String[]) null);
reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
// Get the first record: "abcd|efgh" always possible
assertTrue("Expected record got nothing", reader.nextKeyValue());
key = reader.getCurrentKey();
value = reader.getCurrentValue();
assertTrue("abcd|efgh".equals(value.toString()));
// Position should be 0 right at the start
assertEquals("Wrong position after record read", keyPosition,
key.get());
// Position should be 12 right after the first "|+|"
keyPosition = 12;
// get the next record: "ij|kl" if the split/buffer allows it
if (reader.nextKeyValue()) {
// check the record info: "ij|kl"
assertTrue("ij|kl".equals(value.toString()));
assertEquals("Wrong position after record read", keyPosition,
key.get());
// Position should be 20 after the second "|+|"
keyPosition = 20;
}
// get the third record: "mno|pqr" if the split/buffer allows it
if (reader.nextKeyValue()) {
// check the record info: "mno|pqr"
assertTrue("mno|pqr".equals(value.toString()));
assertEquals("Wrong position after record read", keyPosition,
key.get());
// Position should be the end of the input
keyPosition = inputData.length();
}
assertFalse("Unexpected record returned", reader.nextKeyValue());
// no more records can be read we should be at the last position
assertEquals("Wrong position after record read", keyPosition,
key.get());
// after refresh should be empty
key = reader.getCurrentKey();
assertNull("Unexpected key returned", key);
reader.close();
}
}
}
@Test