MAPREDUCE-6481. LineRecordReader may give incomplete record and wrong position/key information for uncompressed input sometimes. Contributed by Zhihai Xu

This commit is contained in:
Jason Lowe 2015-09-17 14:30:18 +00:00
parent 6c6e734f0b
commit 58d1a02b8d
5 changed files with 316 additions and 35 deletions

View File

@ -303,7 +303,10 @@ private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
startPosn = bufferPosn = 0;
bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
if (bufferLength <= 0) {
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
if (ambiguousByteCount > 0) {
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
bytesConsumed += ambiguousByteCount;
}
break; // EOF
}
}
@ -325,13 +328,13 @@ private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
bytesConsumed += ambiguousByteCount;
if (appendLength >= 0 && ambiguousByteCount > 0) {
//appending the ambiguous characters (refer case 2.2)
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
ambiguousByteCount = 0;
}
if (appendLength > 0) {
if (ambiguousByteCount > 0) {
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
//appending the ambiguous characters (refer case 2.2)
bytesConsumed += ambiguousByteCount;
ambiguousByteCount=0;
}
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}

View File

@ -566,6 +566,10 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6452. NPE when intermediate encrypt enabled for LocalRunner.
(Zhihai Xu)
MAPREDUCE-6481. LineRecordReader may give incomplete record and wrong
position/key information for uncompressed input sometimes. (Zhihai Xu via
jlowe)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -40,8 +40,6 @@ public class UncompressedSplitLineReader extends SplitLineReader {
private long totalBytesRead = 0;
private boolean finished = false;
private boolean usingCRLF;
private int unusedBytes = 0;
private int lastBytesRead = 0;
public UncompressedSplitLineReader(FSDataInputStream in, Configuration conf,
byte[] recordDelimiterBytes, long splitLength) throws IOException {
@ -59,7 +57,6 @@ protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
(int)(splitLength - totalBytesRead));
}
int bytesRead = in.read(buffer, 0, maxBytesToRead);
lastBytesRead = bytesRead;
// If the split ended in the middle of a record delimiter then we need
// to read one additional record, as the consumer of the next split will
@ -83,39 +80,17 @@ protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
@Override
public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
long bytesRead = 0;
int bytesRead = 0;
if (!finished) {
// only allow at most one more record to be read after the stream
// reports the split ended
if (totalBytesRead > splitLength) {
finished = true;
}
bytesRead = totalBytesRead;
int bytesConsumed = super.readLine(str, maxLineLength, maxBytesToConsume);
bytesRead = totalBytesRead - bytesRead;
// No records left.
if (bytesConsumed == 0 && bytesRead == 0) {
return 0;
}
int bufferSize = getBufferSize();
// Add the remaining buffer size not used for the last call
// of fillBuffer method.
if (lastBytesRead <= 0) {
bytesRead += bufferSize;
} else if (bytesRead > 0) {
bytesRead += bufferSize - lastBytesRead;
}
// Adjust the size of the buffer not used for this record.
// The size is carried over for the next calculation.
bytesRead += unusedBytes;
unusedBytes = bufferSize - getBufferPosn();
bytesRead -= unusedBytes;
bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
}
return (int) bytesRead;
return bytesRead;
}
@Override

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -357,4 +358,141 @@ public void testUncompressedInputContainingCRLF() throws Exception {
}
}
}
@Test
public void testUncompressedInputCustomDelimiterPosValue()
throws Exception {
Configuration conf = new Configuration();
String inputData = "1234567890ab12ab345";
Path inputFile = createInputFile(conf, inputData);
conf.setInt("io.file.buffer.size", 10);
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
String delimiter = "ab";
byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
LineRecordReader reader = new LineRecordReader(conf, split,
recordDelimiterBytes);
LongWritable key = new LongWritable();
Text value = new Text();
reader.next(key, value);
// Get first record:"1234567890"
assertEquals(10, value.getLength());
// Position should be 12 right after "1234567890ab"
assertEquals(12, reader.getPos());
reader.next(key, value);
// Get second record:"12"
assertEquals(2, value.getLength());
// Position should be 16 right after "1234567890ab12ab"
assertEquals(16, reader.getPos());
reader.next(key, value);
// Get third record:"345"
assertEquals(3, value.getLength());
// Position should be 19 right after "1234567890ab12ab345"
assertEquals(19, reader.getPos());
assertFalse(reader.next(key, value));
assertEquals(19, reader.getPos());
split = new FileSplit(inputFile, 15, 4, (String[])null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
// No record is in the second split because the second split dropped
// the first record, which was already reported by the first split.
// The position should be 19 right after "1234567890ab12ab345"
assertEquals(19, reader.getPos());
assertFalse(reader.next(key, value));
assertEquals(19, reader.getPos());
inputData = "123456789aab";
inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 12, (String[])null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
reader.next(key, value);
// Get first record:"123456789a"
assertEquals(10, value.getLength());
// Position should be 12 right after "123456789aab"
assertEquals(12, reader.getPos());
assertFalse(reader.next(key, value));
assertEquals(12, reader.getPos());
inputData = "123456789a";
inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 10, (String[])null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
reader.next(key, value);
// Get first record:"123456789a"
assertEquals(10, value.getLength());
// Position should be 10 right after "123456789a"
assertEquals(10, reader.getPos());
assertFalse(reader.next(key, value));
assertEquals(10, reader.getPos());
inputData = "123456789ab";
inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 11, (String[])null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
reader.next(key, value);
// Get first record:"123456789"
assertEquals(9, value.getLength());
// Position should be 11 right after "123456789ab"
assertEquals(11, reader.getPos());
assertFalse(reader.next(key, value));
assertEquals(11, reader.getPos());
}
@Test
public void testUncompressedInputDefaultDelimiterPosValue()
throws Exception {
Configuration conf = new Configuration();
String inputData = "1234567890\r\n12\r\n345";
Path inputFile = createInputFile(conf, inputData);
conf.setInt("io.file.buffer.size", 10);
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
LineRecordReader reader = new LineRecordReader(conf, split,
null);
LongWritable key = new LongWritable();
Text value = new Text();
reader.next(key, value);
// Get first record:"1234567890"
assertEquals(10, value.getLength());
// Position should be 12 right after "1234567890\r\n"
assertEquals(12, reader.getPos());
reader.next(key, value);
// Get second record:"12"
assertEquals(2, value.getLength());
// Position should be 16 right after "1234567890\r\n12\r\n"
assertEquals(16, reader.getPos());
assertFalse(reader.next(key, value));
split = new FileSplit(inputFile, 15, 4, (String[])null);
reader = new LineRecordReader(conf, split, null);
// The second split dropped the first record "\n"
// The position should be 16 right after "1234567890\r\n12\r\n"
assertEquals(16, reader.getPos());
reader.next(key, value);
// Get third record:"345"
assertEquals(3, value.getLength());
// Position should be 19 right after "1234567890\r\n12\r\n345"
assertEquals(19, reader.getPos());
assertFalse(reader.next(key, value));
assertEquals(19, reader.getPos());
inputData = "123456789\r\r\n";
inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 12, (String[])null);
reader = new LineRecordReader(conf, split, null);
reader.next(key, value);
// Get first record:"123456789"
assertEquals(9, value.getLength());
// Position should be 10 right after "123456789\r"
assertEquals(10, reader.getPos());
reader.next(key, value);
// Get second record:""
assertEquals(0, value.getLength());
// Position should be 12 right after "123456789\r\r\n"
assertEquals(12, reader.getPos());
assertFalse(reader.next(key, value));
assertEquals(12, reader.getPos());
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.lib.input;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -37,6 +38,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.Decompressor;
@ -341,4 +344,162 @@ public void testUncompressedInputContainingCRLF() throws Exception {
}
}
}
@Test
public void testUncompressedInputCustomDelimiterPosValue()
throws Exception {
Configuration conf = new Configuration();
String inputData = "1234567890ab12ab345";
Path inputFile = createInputFile(conf, inputData);
conf.setInt("io.file.buffer.size", 10);
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
String delimiter = "ab";
byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
TaskAttemptContext context = new TaskAttemptContextImpl(conf,
new TaskAttemptID());
LineRecordReader reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
LongWritable key;
Text value;
reader.nextKeyValue();
key = reader.getCurrentKey();
value = reader.getCurrentValue();
// Get first record:"1234567890"
assertEquals(10, value.getLength());
assertEquals(0, key.get());
reader.nextKeyValue();
// Get second record:"12"
assertEquals(2, value.getLength());
// Key should be 12 right after "1234567890ab"
assertEquals(12, key.get());
reader.nextKeyValue();
// Get third record:"345"
assertEquals(3, value.getLength());
// Key should be 16 right after "1234567890ab12ab"
assertEquals(16, key.get());
assertFalse(reader.nextKeyValue());
// Key should be 19 right after "1234567890ab12ab345"
assertEquals(19, key.get());
split = new FileSplit(inputFile, 15, 4, (String[])null);
reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
// No record is in the second split because the second split dropped
// the first record, which was already reported by the first split.
assertFalse(reader.nextKeyValue());
inputData = "123456789aab";
inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 12, (String[])null);
reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
reader.nextKeyValue();
key = reader.getCurrentKey();
value = reader.getCurrentValue();
// Get first record:"123456789a"
assertEquals(10, value.getLength());
assertEquals(0, key.get());
assertFalse(reader.nextKeyValue());
// Key should be 12 right after "123456789aab"
assertEquals(12, key.get());
inputData = "123456789a";
inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 10, (String[])null);
reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
reader.nextKeyValue();
key = reader.getCurrentKey();
value = reader.getCurrentValue();
// Get first record:"123456789a"
assertEquals(10, value.getLength());
assertEquals(0, key.get());
assertFalse(reader.nextKeyValue());
// Key should be 10 right after "123456789a"
assertEquals(10, key.get());
inputData = "123456789ab";
inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 11, (String[])null);
reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context);
reader.nextKeyValue();
key = reader.getCurrentKey();
value = reader.getCurrentValue();
// Get first record:"123456789"
assertEquals(9, value.getLength());
assertEquals(0, key.get());
assertFalse(reader.nextKeyValue());
// Key should be 11 right after "123456789ab"
assertEquals(11, key.get());
}
@Test
public void testUncompressedInputDefaultDelimiterPosValue()
throws Exception {
Configuration conf = new Configuration();
String inputData = "1234567890\r\n12\r\n345";
Path inputFile = createInputFile(conf, inputData);
conf.setInt("io.file.buffer.size", 10);
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null);
TaskAttemptContext context = new TaskAttemptContextImpl(conf,
new TaskAttemptID());
LineRecordReader reader = new LineRecordReader(null);
reader.initialize(split, context);
LongWritable key;
Text value;
reader.nextKeyValue();
key = reader.getCurrentKey();
value = reader.getCurrentValue();
// Get first record:"1234567890"
assertEquals(10, value.getLength());
assertEquals(0, key.get());
reader.nextKeyValue();
// Get second record:"12"
assertEquals(2, value.getLength());
// Key should be 12 right after "1234567890\r\n"
assertEquals(12, key.get());
assertFalse(reader.nextKeyValue());
// Key should be 16 right after "1234567890\r\n12\r\n"
assertEquals(16, key.get());
split = new FileSplit(inputFile, 15, 4, (String[])null);
reader = new LineRecordReader(null);
reader.initialize(split, context);
// The second split dropped the first record "\n"
reader.nextKeyValue();
key = reader.getCurrentKey();
value = reader.getCurrentValue();
// Get third record:"345"
assertEquals(3, value.getLength());
// Key should be 16 right after "1234567890\r\n12\r\n"
assertEquals(16, key.get());
assertFalse(reader.nextKeyValue());
// Key should be 19 right after "1234567890\r\n12\r\n345"
assertEquals(19, key.get());
inputData = "123456789\r\r\n";
inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 12, (String[])null);
reader = new LineRecordReader(null);
reader.initialize(split, context);
reader.nextKeyValue();
key = reader.getCurrentKey();
value = reader.getCurrentValue();
// Get first record:"123456789"
assertEquals(9, value.getLength());
assertEquals(0, key.get());
reader.nextKeyValue();
// Get second record:""
assertEquals(0, value.getLength());
// Key should be 10 right after "123456789\r"
assertEquals(10, key.get());
assertFalse(reader.nextKeyValue());
// Key should be 12 right after "123456789\r\r\n"
assertEquals(12, key.get());
}
}