MAPREDUCE-2094. LineRecordReader should not seek into non-splittable, compressed streams.

This commit is contained in:
Chris Douglas 2015-05-08 14:24:57 -07:00
parent ec2748dedb
commit 2edcf931d7
8 changed files with 56 additions and 12 deletions

View File

@ -93,6 +93,7 @@
<exclude>.gitattributes</exclude>
<exclude>src/test/resources/recordSpanningMultipleSplits.txt</exclude>
<exclude>src/test/resources/testBOM.txt</exclude>
<exclude>src/test/resources/TestSafeguardSplittingUnsplittableFiles.txt.gz</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -57,9 +57,12 @@ import com.google.common.collect.Iterables;
* <p><code>FileInputFormat</code> is the base class for all file-based
* <code>InputFormat</code>s. This provides a generic implementation of
* {@link #getSplits(JobConf, int)}.
* Subclasses of <code>FileInputFormat</code> can also override the
* {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
* not split-up and are processed as a whole by {@link Mapper}s.
*
* Implementations of <code>FileInputFormat</code> can also override the
* {@link #isSplitable(FileSystem, Path)} method to prevent input files
* from being split-up in certain situations. Implementations that may
* deal with non-splittable files <i>must</i> override this method, since
* the default implementation assumes splitting is always possible.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
@ -116,9 +119,13 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
}
/**
* Is the given filename splitable? Usually, true, but if the file is
* Is the given filename splittable? Usually, true, but if the file is
* stream compressed, it will not be.
*
*
* The default implementation in <code>FileInputFormat</code> always returns
* true. Implementations that may deal with non-splittable files <i>must</i>
* override this method.
*
* <code>FileInputFormat</code> implementations can override this and return
* <code>false</code> to ensure that individual input files are never split-up
* so that {@link Mapper}s process entire files.

View File

@ -118,6 +118,13 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
end = cIn.getAdjustedEnd();
filePosition = cIn; // take pos from compressed stream
} else {
if (start != 0) {
// So we have a split that is part of a file stored using
// a Compression codec that cannot be split.
throw new IOException("Cannot seek in " +
codec.getClass().getSimpleName() + " compressed stream");
}
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, recordDelimiter);
filePosition = fileIn;

View File

@ -51,13 +51,16 @@ import com.google.common.collect.Lists;
/**
* A base class for file-based {@link InputFormat}s.
*
*
* <p><code>FileInputFormat</code> is the base class for all file-based
* <code>InputFormat</code>s. This provides a generic implementation of
* {@link #getSplits(JobContext)}.
* Subclasses of <code>FileInputFormat</code> can also override the
* {@link #isSplitable(JobContext, Path)} method to ensure input-files are
* not split-up and are processed as a whole by {@link Mapper}s.
*
* Implementations of <code>FileInputFormat</code> can also override the
* {@link #isSplitable(JobContext, Path)} method to prevent input files
* from being split-up in certain situations. Implementations that may
* deal with non-splittable files <i>must</i> override this method, since
* the default implementation assumes splitting is always possible.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
@ -146,9 +149,13 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
}
/**
* Is the given filename splitable? Usually, true, but if the file is
* Is the given filename splittable? Usually, true, but if the file is
* stream compressed, it will not be.
*
*
* The default implementation in <code>FileInputFormat</code> always returns
* true. Implementations that may deal with non-splittable files <i>must</i>
* override this method.
*
* <code>FileInputFormat</code> implementations can override this and return
* <code>false</code> to ensure that individual input files are never split-up
* so that {@link Mapper}s process entire files.

View File

@ -86,7 +86,7 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
@ -99,6 +99,13 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
if (start != 0) {
// So we have a split that is only part of a file stored using
// a Compression codec that cannot be split.
throw new IOException("Cannot seek in " +
codec.getClass().getSimpleName() + " compressed stream");
}
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;

View File

@ -127,6 +127,13 @@ public class TestLineRecordReader {
testSplitRecords("blockEndingInCR.txt.bz2", 136494);
}
@Test(expected=IOException.class)
public void testSafeguardSplittingUnsplittableFiles() throws IOException {
// The LineRecordReader must fail when trying to read a file that
// was compressed using an unsplittable file format
testSplitRecords("TestSafeguardSplittingUnsplittableFiles.txt.gz", 2);
}
// Use the LineRecordReader to read records from the file
public ArrayList<String> readRecords(URL testFileUrl, int splitSize)
throws IOException {

View File

@ -110,6 +110,13 @@ public class TestLineRecordReader {
testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
}
@Test(expected=IOException.class)
public void testSafeguardSplittingUnsplittableFiles() throws IOException {
// The LineRecordReader must fail when trying to read a file that
// was compressed using an unsplittable file format
testSplitRecords("TestSafeguardSplittingUnsplittableFiles.txt.gz", 2);
}
// Use the LineRecordReader to read records from the file
public ArrayList<String> readRecords(URL testFileUrl, int splitSize)
throws IOException {