MAPREDUCE-2094. LineRecordReader should not seek into non-splittable, compressed streams.

This commit is contained in:
Chris Douglas 2015-05-08 14:24:57 -07:00
parent 1523ed5a76
commit 4f301f92c1
8 changed files with 56 additions and 12 deletions

View File

@ -93,6 +93,7 @@
<exclude>.gitattributes</exclude> <exclude>.gitattributes</exclude>
<exclude>src/test/resources/recordSpanningMultipleSplits.txt</exclude> <exclude>src/test/resources/recordSpanningMultipleSplits.txt</exclude>
<exclude>src/test/resources/testBOM.txt</exclude> <exclude>src/test/resources/testBOM.txt</exclude>
<exclude>src/test/resources/TestSafeguardSplittingUnsplittableFiles.txt.gz</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -57,9 +57,12 @@ import com.google.common.collect.Iterables;
* <p><code>FileInputFormat</code> is the base class for all file-based * <p><code>FileInputFormat</code> is the base class for all file-based
* <code>InputFormat</code>s. This provides a generic implementation of * <code>InputFormat</code>s. This provides a generic implementation of
* {@link #getSplits(JobConf, int)}. * {@link #getSplits(JobConf, int)}.
* Subclasses of <code>FileInputFormat</code> can also override the *
* {@link #isSplitable(FileSystem, Path)} method to ensure input-files are * Implementations of <code>FileInputFormat</code> can also override the
* not split-up and are processed as a whole by {@link Mapper}s. * {@link #isSplitable(FileSystem, Path)} method to prevent input files
* from being split-up in certain situations. Implementations that may
* deal with non-splittable files <i>must</i> override this method, since
* the default implementation assumes splitting is always possible.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
@ -116,9 +119,13 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
} }
/** /**
* Is the given filename splitable? Usually, true, but if the file is * Is the given filename splittable? Usually, true, but if the file is
* stream compressed, it will not be. * stream compressed, it will not be.
* *
* The default implementation in <code>FileInputFormat</code> always returns
* true. Implementations that may deal with non-splittable files <i>must</i>
* override this method.
*
* <code>FileInputFormat</code> implementations can override this and return * <code>FileInputFormat</code> implementations can override this and return
* <code>false</code> to ensure that individual input files are never split-up * <code>false</code> to ensure that individual input files are never split-up
* so that {@link Mapper}s process entire files. * so that {@link Mapper}s process entire files.

View File

@ -118,6 +118,13 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
end = cIn.getAdjustedEnd(); end = cIn.getAdjustedEnd();
filePosition = cIn; // take pos from compressed stream filePosition = cIn; // take pos from compressed stream
} else { } else {
if (start != 0) {
// So we have a split that is part of a file stored using
// a Compression codec that cannot be split.
throw new IOException("Cannot seek in " +
codec.getClass().getSimpleName() + " compressed stream");
}
in = new SplitLineReader(codec.createInputStream(fileIn, in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, recordDelimiter); decompressor), job, recordDelimiter);
filePosition = fileIn; filePosition = fileIn;

View File

@ -51,13 +51,16 @@ import com.google.common.collect.Lists;
/** /**
* A base class for file-based {@link InputFormat}s. * A base class for file-based {@link InputFormat}s.
* *
* <p><code>FileInputFormat</code> is the base class for all file-based * <p><code>FileInputFormat</code> is the base class for all file-based
* <code>InputFormat</code>s. This provides a generic implementation of * <code>InputFormat</code>s. This provides a generic implementation of
* {@link #getSplits(JobContext)}. * {@link #getSplits(JobContext)}.
* Subclasses of <code>FileInputFormat</code> can also override the *
* {@link #isSplitable(JobContext, Path)} method to ensure input-files are * Implementations of <code>FileInputFormat</code> can also override the
* not split-up and are processed as a whole by {@link Mapper}s. * {@link #isSplitable(JobContext, Path)} method to prevent input files
* from being split-up in certain situations. Implementations that may
* deal with non-splittable files <i>must</i> override this method, since
* the default implementation assumes splitting is always possible.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
@ -146,9 +149,13 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
} }
/** /**
* Is the given filename splitable? Usually, true, but if the file is * Is the given filename splittable? Usually, true, but if the file is
* stream compressed, it will not be. * stream compressed, it will not be.
* *
* The default implementation in <code>FileInputFormat</code> always returns
* true. Implementations that may deal with non-splittable files <i>must</i>
* override this method.
*
* <code>FileInputFormat</code> implementations can override this and return * <code>FileInputFormat</code> implementations can override this and return
* <code>false</code> to ensure that individual input files are never split-up * <code>false</code> to ensure that individual input files are never split-up
* so that {@link Mapper}s process entire files. * so that {@link Mapper}s process entire files.

View File

@ -86,7 +86,7 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) { if (null!=codec) {
isCompressedInput = true; isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec); decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) { if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn = final SplitCompressionInputStream cIn =
@ -99,6 +99,13 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
end = cIn.getAdjustedEnd(); end = cIn.getAdjustedEnd();
filePosition = cIn; filePosition = cIn;
} else { } else {
if (start != 0) {
// So we have a split that is only part of a file stored using
// a Compression codec that cannot be split.
throw new IOException("Cannot seek in " +
codec.getClass().getSimpleName() + " compressed stream");
}
in = new SplitLineReader(codec.createInputStream(fileIn, in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes); decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn; filePosition = fileIn;

View File

@ -127,6 +127,13 @@ public class TestLineRecordReader {
testSplitRecords("blockEndingInCR.txt.bz2", 136494); testSplitRecords("blockEndingInCR.txt.bz2", 136494);
} }
@Test(expected=IOException.class)
public void testSafeguardSplittingUnsplittableFiles() throws IOException {
// The LineRecordReader must fail when trying to read a file that
// was compressed using an unsplittable file format
testSplitRecords("TestSafeguardSplittingUnsplittableFiles.txt.gz", 2);
}
// Use the LineRecordReader to read records from the file // Use the LineRecordReader to read records from the file
public ArrayList<String> readRecords(URL testFileUrl, int splitSize) public ArrayList<String> readRecords(URL testFileUrl, int splitSize)
throws IOException { throws IOException {

View File

@ -110,6 +110,13 @@ public class TestLineRecordReader {
testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498); testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
} }
@Test(expected=IOException.class)
public void testSafeguardSplittingUnsplittableFiles() throws IOException {
// The LineRecordReader must fail when trying to read a file that
// was compressed using an unsplittable file format
testSplitRecords("TestSafeguardSplittingUnsplittableFiles.txt.gz", 2);
}
// Use the LineRecordReader to read records from the file // Use the LineRecordReader to read records from the file
public ArrayList<String> readRecords(URL testFileUrl, int splitSize) public ArrayList<String> readRecords(URL testFileUrl, int splitSize)
throws IOException { throws IOException {