MAPREDUCE-5948. org.apache.hadoop.mapred.LineRecordReader does not handle multibyte record delimiters well. Contributed by Vinayakumar B, Rushabh Shah, and Akira AJISAKA

This commit is contained in:
Jason Lowe 2015-06-22 21:59:20 +00:00
parent 11ac848207
commit 077250d8d7
7 changed files with 286 additions and 14 deletions

View File

@ -369,4 +369,12 @@ public class LineReader implements Closeable {
public int readLine(Text str) throws IOException { public int readLine(Text str) throws IOException {
return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
} }
protected int getBufferPosn() {
return bufferPosn;
}
protected int getBufferSize() {
return bufferSize;
}
} }

View File

@ -498,6 +498,10 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6403. Fix typo in the usage of NNBench. MAPREDUCE-6403. Fix typo in the usage of NNBench.
(Jagadesh Kiran N via aajisaka) (Jagadesh Kiran N via aajisaka)
MAPREDUCE-5948. org.apache.hadoop.mapred.LineRecordReader does not handle
multibyte record delimiters well (Vinayakumar B, Rushabh Shah, and Akira
AJISAKA via jlowe)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader; import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader;
import org.apache.hadoop.mapreduce.lib.input.SplitLineReader; import org.apache.hadoop.mapreduce.lib.input.SplitLineReader;
import org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -131,7 +132,8 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
} }
} else { } else {
fileIn.seek(start); fileIn.seek(start);
in = new SplitLineReader(fileIn, job, recordDelimiter); in = new UncompressedSplitLineReader(
fileIn, job, recordDelimiter, split.getLength());
filePosition = fileIn; filePosition = fileIn;
} }
// If this is not the first split, we always throw away first record // If this is not the first split, we always throw away first record

View File

@ -112,7 +112,8 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
} }
} else { } else {
fileIn.seek(start); fileIn.seek(start);
in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes); in = new UncompressedSplitLineReader(
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn; filePosition = fileIn;
} }
// If this is not the first split, we always throw away first record // If this is not the first split, we always throw away first record

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.input;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.io.Text;
/**
* SplitLineReader for uncompressed files.
* This class can split the file correctly even if the delimiter is multi-bytes.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class UncompressedSplitLineReader extends SplitLineReader {
private boolean needAdditionalRecord = false;
private long splitLength;
/** Total bytes read from the input stream. */
private long totalBytesRead = 0;
private boolean finished = false;
private boolean usingCRLF;
private int unusedBytes = 0;
private int lastBytesRead = 0;
public UncompressedSplitLineReader(FSDataInputStream in, Configuration conf,
byte[] recordDelimiterBytes, long splitLength) throws IOException {
super(in, conf, recordDelimiterBytes);
this.splitLength = splitLength;
usingCRLF = (recordDelimiterBytes == null);
}
@Override
protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
throws IOException {
int maxBytesToRead = buffer.length;
if (totalBytesRead < splitLength) {
maxBytesToRead = Math.min(maxBytesToRead,
(int)(splitLength - totalBytesRead));
}
int bytesRead = in.read(buffer, 0, maxBytesToRead);
lastBytesRead = bytesRead;
// If the split ended in the middle of a record delimiter then we need
// to read one additional record, as the consumer of the next split will
// not recognize the partial delimiter as a record.
// However if using the default delimiter and the next character is a
// linefeed then next split will treat it as a delimiter all by itself
// and the additional record read should not be performed.
if (totalBytesRead == splitLength && inDelimiter && bytesRead > 0) {
if (usingCRLF) {
needAdditionalRecord = (buffer[0] != '\n');
} else {
needAdditionalRecord = true;
}
}
if (bytesRead > 0) {
totalBytesRead += bytesRead;
}
return bytesRead;
}
@Override
public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
long bytesRead = 0;
if (!finished) {
// only allow at most one more record to be read after the stream
// reports the split ended
if (totalBytesRead > splitLength) {
finished = true;
}
bytesRead = totalBytesRead;
int bytesConsumed = super.readLine(str, maxLineLength, maxBytesToConsume);
bytesRead = totalBytesRead - bytesRead;
// No records left.
if (bytesConsumed == 0 && bytesRead == 0) {
return 0;
}
int bufferSize = getBufferSize();
// Add the remaining buffer size not used for the last call
// of fillBuffer method.
if (lastBytesRead <= 0) {
bytesRead += bufferSize;
} else if (bytesRead > 0) {
bytesRead += bufferSize - lastBytesRead;
}
// Adjust the size of the buffer not used for this record.
// The size is carried over for the next calculation.
bytesRead += unusedBytes;
unusedBytes = bufferSize - getBufferPosn();
bytesRead -= unusedBytes;
}
return (int) bytesRead;
}
@Override
public boolean needAdditionalRecordAfterSplit() {
return !finished && needAdditionalRecord;
}
}

View File

@ -25,13 +25,17 @@ import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.io.Charsets;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -41,6 +45,9 @@ import org.apache.hadoop.io.compress.Decompressor;
import org.junit.Test; import org.junit.Test;
public class TestLineRecordReader { public class TestLineRecordReader {
private static Path workDir = new Path(new Path(System.getProperty(
"test.build.data", "target"), "data"), "TestTextInputFormat");
private static Path inputDir = new Path(workDir, "input");
private void testSplitRecords(String testFileName, long firstSplitLength) private void testSplitRecords(String testFileName, long firstSplitLength)
throws IOException { throws IOException {
@ -50,15 +57,27 @@ public class TestLineRecordReader {
long testFileSize = testFile.length(); long testFileSize = testFile.length();
Path testFilePath = new Path(testFile.getAbsolutePath()); Path testFilePath = new Path(testFile.getAbsolutePath());
Configuration conf = new Configuration(); Configuration conf = new Configuration();
testSplitRecordsForFile(conf, firstSplitLength, testFileSize, testFilePath);
}
private void testSplitRecordsForFile(Configuration conf,
long firstSplitLength, long testFileSize, Path testFilePath)
throws IOException {
conf.setInt(org.apache.hadoop.mapreduce.lib.input. conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
assertTrue("unexpected test data at " + testFile, assertTrue("unexpected test data at " + testFilePath,
testFileSize > firstSplitLength); testFileSize > firstSplitLength);
String delimiter = conf.get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
// read the data without splitting to count the records // read the data without splitting to count the records
FileSplit split = new FileSplit(testFilePath, 0, testFileSize, FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
(String[])null); (String[])null);
LineRecordReader reader = new LineRecordReader(conf, split); LineRecordReader reader = new LineRecordReader(conf, split,
recordDelimiterBytes);
LongWritable key = new LongWritable(); LongWritable key = new LongWritable();
Text value = new Text(); Text value = new Text();
int numRecordsNoSplits = 0; int numRecordsNoSplits = 0;
@ -69,7 +88,7 @@ public class TestLineRecordReader {
// count the records in the first split // count the records in the first split
split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null); split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null);
reader = new LineRecordReader(conf, split); reader = new LineRecordReader(conf, split, recordDelimiterBytes);
int numRecordsFirstSplit = 0; int numRecordsFirstSplit = 0;
while (reader.next(key, value)) { while (reader.next(key, value)) {
++numRecordsFirstSplit; ++numRecordsFirstSplit;
@ -79,14 +98,14 @@ public class TestLineRecordReader {
// count the records in the second split // count the records in the second split
split = new FileSplit(testFilePath, firstSplitLength, split = new FileSplit(testFilePath, firstSplitLength,
testFileSize - firstSplitLength, (String[])null); testFileSize - firstSplitLength, (String[])null);
reader = new LineRecordReader(conf, split); reader = new LineRecordReader(conf, split, recordDelimiterBytes);
int numRecordsRemainingSplits = 0; int numRecordsRemainingSplits = 0;
while (reader.next(key, value)) { while (reader.next(key, value)) {
++numRecordsRemainingSplits; ++numRecordsRemainingSplits;
} }
reader.close(); reader.close();
assertEquals("Unexpected number of records in bzip2 compressed split", assertEquals("Unexpected number of records in split",
numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits); numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits);
} }
@ -290,4 +309,52 @@ public class TestLineRecordReader {
} }
assertEquals(10, decompressors.size()); assertEquals(10, decompressors.size());
} }
/**
* Writes the input test file
*
* @param conf
* @return Path of the file created
* @throws IOException
*/
private Path createInputFile(Configuration conf, String data)
throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
Path file = new Path(inputDir, "test.txt");
Writer writer = new OutputStreamWriter(localFs.create(file));
try {
writer.write(data);
} finally {
writer.close();
}
return file;
}
@Test
public void testUncompressedInput() throws Exception {
Configuration conf = new Configuration();
String inputData = "abc+++def+++ghi+++"
+ "jkl+++mno+++pqr+++stu+++vw +++xyz";
Path inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+++");
for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
}
@Test
public void testUncompressedInputContainingCRLF() throws Exception {
Configuration conf = new Configuration();
String inputData = "a\r\nb\rc\nd\r\n";
Path inputFile = createInputFile(conf, inputData);
for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
}
} }

View File

@ -25,13 +25,17 @@ import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.io.Charsets;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CodecPool;
@ -42,6 +46,9 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.junit.Test; import org.junit.Test;
public class TestLineRecordReader { public class TestLineRecordReader {
private static Path workDir = new Path(new Path(System.getProperty(
"test.build.data", "target"), "data"), "TestTextInputFormat");
private static Path inputDir = new Path(workDir, "input");
private void testSplitRecords(String testFileName, long firstSplitLength) private void testSplitRecords(String testFileName, long firstSplitLength)
throws IOException { throws IOException {
@ -51,17 +58,28 @@ public class TestLineRecordReader {
long testFileSize = testFile.length(); long testFileSize = testFile.length();
Path testFilePath = new Path(testFile.getAbsolutePath()); Path testFilePath = new Path(testFile.getAbsolutePath());
Configuration conf = new Configuration(); Configuration conf = new Configuration();
testSplitRecordsForFile(conf, firstSplitLength, testFileSize, testFilePath);
}
private void testSplitRecordsForFile(Configuration conf,
long firstSplitLength, long testFileSize, Path testFilePath)
throws IOException {
conf.setInt(org.apache.hadoop.mapreduce.lib.input. conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
assertTrue("unexpected test data at " + testFile, assertTrue("unexpected test data at " + testFilePath,
testFileSize > firstSplitLength); testFileSize > firstSplitLength);
String delimiter = conf.get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
// read the data without splitting to count the records // read the data without splitting to count the records
FileSplit split = new FileSplit(testFilePath, 0, testFileSize, FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
(String[])null); (String[])null);
LineRecordReader reader = new LineRecordReader(); LineRecordReader reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context); reader.initialize(split, context);
int numRecordsNoSplits = 0; int numRecordsNoSplits = 0;
while (reader.nextKeyValue()) { while (reader.nextKeyValue()) {
@ -71,7 +89,7 @@ public class TestLineRecordReader {
// count the records in the first split // count the records in the first split
split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null); split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null);
reader = new LineRecordReader(); reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context); reader.initialize(split, context);
int numRecordsFirstSplit = 0; int numRecordsFirstSplit = 0;
while (reader.nextKeyValue()) { while (reader.nextKeyValue()) {
@ -82,16 +100,15 @@ public class TestLineRecordReader {
// count the records in the second split // count the records in the second split
split = new FileSplit(testFilePath, firstSplitLength, split = new FileSplit(testFilePath, firstSplitLength,
testFileSize - firstSplitLength, (String[])null); testFileSize - firstSplitLength, (String[])null);
reader = new LineRecordReader(); reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context); reader.initialize(split, context);
int numRecordsRemainingSplits = 0; int numRecordsRemainingSplits = 0;
while (reader.nextKeyValue()) { while (reader.nextKeyValue()) {
++numRecordsRemainingSplits; ++numRecordsRemainingSplits;
} }
reader.close(); reader.close();
assertEquals("Unexpected number of records in split ", numRecordsNoSplits,
assertEquals("Unexpected number of records in bzip2 compressed split", numRecordsFirstSplit + numRecordsRemainingSplits);
numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits);
} }
@Test @Test
@ -276,4 +293,52 @@ public class TestLineRecordReader {
} }
assertEquals(10, decompressors.size()); assertEquals(10, decompressors.size());
} }
/**
* Writes the input test file
*
* @param conf
* @return Path of the file created
* @throws IOException
*/
private Path createInputFile(Configuration conf, String data)
throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
Path file = new Path(inputDir, "test.txt");
Writer writer = new OutputStreamWriter(localFs.create(file));
try {
writer.write(data);
} finally {
writer.close();
}
return file;
}
@Test
public void testUncompressedInput() throws Exception {
Configuration conf = new Configuration();
String inputData = "abc+++def+++ghi+++"
+ "jkl+++mno+++pqr+++stu+++vw +++xyz";
Path inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+++");
for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
}
@Test
public void testUncompressedInputContainingCRLF() throws Exception {
Configuration conf = new Configuration();
String inputData = "a\r\nb\rc\nd\r\n";
Path inputFile = createInputFile(conf, inputData);
for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
}
} }