MAPREDUCE-4974. Optimising the LineRecordReader initialize() method (Gelesh via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1468232 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2013-04-15 21:33:47 +00:00
parent 108e0e0953
commit 947e97f354
2 changed files with 12 additions and 15 deletions

View File

@ -200,6 +200,9 @@ Release 2.0.5-beta - UNRELEASED
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
(Gelesh via bobby)
BUG FIXES BUG FIXES
MAPREDUCE-4671. AM does not tell the RM about container requests which are MAPREDUCE-4671. AM does not tell the RM about container requests which are

View File

@ -52,7 +52,6 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
public static final String MAX_LINE_LENGTH = public static final String MAX_LINE_LENGTH =
"mapreduce.input.linerecordreader.line.maxlength"; "mapreduce.input.linerecordreader.line.maxlength";
private CompressionCodecFactory compressionCodecs = null;
private long start; private long start;
private long pos; private long pos;
private long end; private long end;
@ -60,9 +59,9 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
private FSDataInputStream fileIn; private FSDataInputStream fileIn;
private Seekable filePosition; private Seekable filePosition;
private int maxLineLength; private int maxLineLength;
private LongWritable key = null; private LongWritable key;
private Text value = null; private Text value;
private CompressionCodec codec; private boolean isCompressedInput;
private Decompressor decompressor; private Decompressor decompressor;
private byte[] recordDelimiterBytes; private byte[] recordDelimiterBytes;
@ -81,13 +80,14 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
start = split.getStart(); start = split.getStart();
end = start + split.getLength(); end = start + split.getLength();
final Path file = split.getPath(); final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split // open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job); final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file); fileIn = fs.open(file);
if (isCompressedInput()) {
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec); decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) { if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn = final SplitCompressionInputStream cIn =
@ -132,19 +132,16 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
this.pos = start; this.pos = start;
} }
private boolean isCompressedInput() {
return (codec != null);
}
private int maxBytesToConsume(long pos) { private int maxBytesToConsume(long pos) {
return isCompressedInput() return isCompressedInput
? Integer.MAX_VALUE ? Integer.MAX_VALUE
: (int) Math.min(Integer.MAX_VALUE, end - pos); : (int) Math.min(Integer.MAX_VALUE, end - pos);
} }
private long getFilePosition() throws IOException { private long getFilePosition() throws IOException {
long retVal; long retVal;
if (isCompressedInput() && null != filePosition) { if (isCompressedInput && null != filePosition) {
retVal = filePosition.getPos(); retVal = filePosition.getPos();
} else { } else {
retVal = pos; retVal = pos;
@ -166,9 +163,6 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
while (getFilePosition() <= end) { while (getFilePosition() <= end) {
newSize = in.readLine(value, maxLineLength, newSize = in.readLine(value, maxLineLength,
Math.max(maxBytesToConsume(pos), maxLineLength)); Math.max(maxBytesToConsume(pos), maxLineLength));
if (newSize == 0) {
break;
}
pos += newSize; pos += newSize;
if (newSize < maxLineLength) { if (newSize < maxLineLength) {
break; break;