diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java index 682322dc640..774f95b76ea 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java @@ -175,6 +175,11 @@ public class LineReader implements Closeable { } } + protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) + throws IOException { + return in.read(buffer); + } + /** * Read a line terminated by one of CR, LF, or CRLF. */ @@ -208,7 +213,7 @@ public class LineReader implements Closeable { if (prevCharCR) { ++bytesConsumed; //account for CR from previous read } - bufferLength = in.read(buffer); + bufferLength = fillBuffer(in, buffer, prevCharCR); if (bufferLength <= 0) { break; // EOF } @@ -296,7 +301,7 @@ public class LineReader implements Closeable { int startPosn = bufferPosn; // Start from previous end position if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; - bufferLength = in.read(buffer); + bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0); if (bufferLength <= 0) { str.append(recordDelimiterBytes, 0, ambiguousByteCount); break; // EOF diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 86cbdf7928d..f8194f1f063 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -94,6 +94,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5632. TestRMContainerAllocator#testUpdatedNodes fails (jeagles) + MAPREDUCE-5656. bzip2 codec can drop records when reading data in splits + (jlowe) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java index 35755da9913..8a1624cd0e6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java @@ -36,6 +36,8 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.SplitCompressionInputStream; import org.apache.hadoop.io.compress.SplittableCompressionCodec; +import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader; +import org.apache.hadoop.mapreduce.lib.input.SplitLineReader; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; @@ -52,7 +54,7 @@ public class LineRecordReader implements RecordReader { private long start; private long pos; private long end; - private LineReader in; + private SplitLineReader in; private FSDataInputStream fileIn; private final Seekable filePosition; int maxLineLength; @@ -111,17 +113,18 @@ public class LineRecordReader implements RecordReader { ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); - in = new LineReader(cIn, job, recordDelimiter); + in = new CompressedSplitLineReader(cIn, job, recordDelimiter); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; // take pos from compressed stream } else { - in = new LineReader(codec.createInputStream(fileIn, decompressor), job, recordDelimiter); + in = new SplitLineReader(codec.createInputStream(fileIn, + decompressor), job, recordDelimiter); filePosition = fileIn; } } else { fileIn.seek(start); - in = new LineReader(fileIn, job, recordDelimiter); + in = new SplitLineReader(fileIn, job, recordDelimiter); filePosition = fileIn; } // If this is not the first split, we always throw away first record @@ -141,7 +144,7 @@ public class LineRecordReader implements RecordReader { public LineRecordReader(InputStream in, long offset, long endOffset, int maxLineLength, byte[] recordDelimiter) { this.maxLineLength = maxLineLength; - this.in = new LineReader(in, recordDelimiter); + this.in = new SplitLineReader(in, recordDelimiter); this.start = offset; this.pos = offset; this.end = endOffset; @@ -159,7 +162,7 @@ public class LineRecordReader implements RecordReader { throws IOException{ this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input. LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); - this.in = new LineReader(in, job, recordDelimiter); + this.in = new SplitLineReader(in, job, recordDelimiter); this.start = offset; this.pos = offset; this.end = endOffset; @@ -200,7 +203,7 @@ public class LineRecordReader implements RecordReader { // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) - while (getFilePosition() <= end) { + while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { key.set(pos); int newSize = in.readLine(value, maxLineLength, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java new file mode 100644 index 00000000000..ef51f5cc678 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.SplitCompressionInputStream; + +/** + * Line reader for compressed splits + * + * Reading records from a compressed split is tricky, as the + * LineRecordReader is using the reported compressed input stream + * position directly to determine when a split has ended. In addition the + * compressed input stream is usually faking the actual byte position, often + * updating it only after the first compressed block after the split is + * accessed. + * + * Depending upon where the last compressed block of the split ends relative + * to the record delimiters it can be easy to accidentally drop the last + * record or duplicate the last record between this split and the next. + * + * Split end scenarios: + * + * 1) Last block of split ends in the middle of a record + * Nothing special that needs to be done here, since the compressed input + * stream will report a position after the split end once the record + * is fully read. The consumer of the next split will discard the + * partial record at the start of the split normally, and no data is lost + * or duplicated between the splits. + * + * 2) Last block of split ends in the middle of a delimiter + * The line reader will continue to consume bytes into the next block to + * locate the end of the delimiter. If a custom delimiter is being used + * then the next record must be read by this split or it will be dropped. + * The consumer of the next split will not recognize the partial + * delimiter at the beginning of its split and will discard it along with + * the next record. + * + * However for the default delimiter processing there is a special case + * because CR, LF, and CRLF are all valid record delimiters. If the + * block ends with a CR then the reader must peek at the next byte to see + * if it is an LF and therefore part of the same record delimiter. + * Peeking at the next byte is an access to the next block and triggers + * the stream to report the end of the split. There are two cases based + * on the next byte: + * + * A) The next byte is LF + * The split needs to end after the current record is returned. The + * consumer of the next split will discard the first record, which + * is degenerate since LF is itself a delimiter, and start consuming + * records after that byte. If the current split tries to read + * another record then the record will be duplicated between splits. + * + * B) The next byte is not LF + * The current record will be returned but the stream will report + * the split has ended due to the peek into the next block. If the + * next record is not read then it will be lost, as the consumer of + * the next split will discard it before processing subsequent + * records. Therefore the next record beyond the reported split end + * must be consumed by this split to avoid data loss. + * + * 3) Last block of split ends at the beginning of a delimiter + * This is equivalent to case 1, as the reader will consume bytes into + * the next block and trigger the end of the split. No further records + * should be read as the consumer of the next split will discard the + * (degenerate) record at the beginning of its split. + * + * 4) Last block of split ends at the end of a delimiter + * Nothing special needs to be done here. The reader will not start + * examining the bytes into the next block until the next record is read, + * so the stream will not report the end of the split just yet. Once the + * next record is read then the next block will be accessed and the + * stream will indicate the end of the split. The consumer of the next + * split will correctly discard the first record of its split, and no + * data is lost or duplicated. + * + * If the default delimiter is used and the block ends at a CR then this + * is treated as case 2 since the reader does not yet know without + * looking at subsequent bytes whether the delimiter has ended. + * + * NOTE: It is assumed that compressed input streams *never* return bytes from + * multiple compressed blocks from a single read. Failure to do so will + * violate the buffering performed by this class, as it will access + * bytes into the next block after the split before returning all of the + * records from the previous block. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class CompressedSplitLineReader extends SplitLineReader { + + SplitCompressionInputStream scin; + private boolean usingCRLF; + private boolean needAdditionalRecord = false; + private boolean finished = false; + + public CompressedSplitLineReader(SplitCompressionInputStream in, + Configuration conf, + byte[] recordDelimiterBytes) + throws IOException { + super(in, conf, recordDelimiterBytes); + scin = in; + usingCRLF = (recordDelimiterBytes == null); + } + + @Override + protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) + throws IOException { + int bytesRead = in.read(buffer); + + // If the split ended in the middle of a record delimiter then we need + // to read one additional record, as the consumer of the next split will + // not recognize the partial delimiter as a record. + // However if using the default delimiter and the next character is a + // linefeed then next split will treat it as a delimiter all by itself + // and the additional record read should not be performed. + if (inDelimiter && bytesRead > 0) { + if (usingCRLF) { + needAdditionalRecord = (buffer[0] != '\n'); + } else { + needAdditionalRecord = true; + } + } + return bytesRead; + } + + @Override + public int readLine(Text str, int maxLineLength, int maxBytesToConsume) + throws IOException { + int bytesRead = 0; + if (!finished) { + // only allow at most one more record to be read after the stream + // reports the split ended + if (scin.getPos() > scin.getAdjustedEnd()) { + finished = true; + } + + bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume); + } + return bytesRead; + } + + @Override + public boolean needAdditionalRecordAfterSplit() { + return !finished && needAdditionalRecord; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java index 8927adf7cdc..0c7635e4f30 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java @@ -38,7 +38,6 @@ import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.util.LineReader; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; @@ -55,7 +54,7 @@ public class LineRecordReader extends RecordReader { private long start; private long pos; private long end; - private LineReader in; + private SplitLineReader in; private FSDataInputStream fileIn; private Seekable filePosition; private int maxLineLength; @@ -94,33 +93,19 @@ public class LineRecordReader extends RecordReader { ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); - if (null == this.recordDelimiterBytes){ - in = new LineReader(cIn, job); - } else { - in = new LineReader(cIn, job, this.recordDelimiterBytes); - } - + in = new CompressedSplitLineReader(cIn, job, + this.recordDelimiterBytes); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; } else { - if (null == this.recordDelimiterBytes) { - in = new LineReader(codec.createInputStream(fileIn, decompressor), - job); - } else { - in = new LineReader(codec.createInputStream(fileIn, - decompressor), job, this.recordDelimiterBytes); - } + in = new SplitLineReader(codec.createInputStream(fileIn, + decompressor), job, this.recordDelimiterBytes); filePosition = fileIn; } } else { fileIn.seek(start); - if (null == this.recordDelimiterBytes){ - in = new LineReader(fileIn, job); - } else { - in = new LineReader(fileIn, job, this.recordDelimiterBytes); - } - + in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes); filePosition = fileIn; } // If this is not the first split, we always throw away first record @@ -160,7 +145,7 @@ public class LineRecordReader extends RecordReader { int newSize = 0; // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) - while (getFilePosition() <= end) { + while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { newSize = in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength)); pos += newSize; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/SplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/SplitLineReader.java new file mode 100644 index 00000000000..4497a198a7c --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/SplitLineReader.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class SplitLineReader extends org.apache.hadoop.util.LineReader { + public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) { + super(in, recordDelimiterBytes); + } + + public SplitLineReader(InputStream in, Configuration conf, + byte[] recordDelimiterBytes) throws IOException { + super(in, conf, recordDelimiterBytes); + } + + public boolean needAdditionalRecordAfterSplit() { + return false; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java new file mode 100644 index 00000000000..b8df069fccf --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.net.URL; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class TestLineRecordReader { + + private void testSplitRecords(String testFileName, long firstSplitLength) + throws IOException { + URL testFileUrl = getClass().getClassLoader().getResource(testFileName); + assertNotNull("Cannot find " + testFileName, testFileUrl); + File testFile = new File(testFileUrl.getFile()); + long testFileSize = testFile.length(); + Path testFilePath = new Path(testFile.getAbsolutePath()); + Configuration conf = new Configuration(); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + assertTrue("unexpected test data at " + testFile, + testFileSize > firstSplitLength); + + // read the data without splitting to count the records + FileSplit split = new FileSplit(testFilePath, 0, testFileSize, + (String[])null); + LineRecordReader reader = new LineRecordReader(conf, split); + LongWritable key = new LongWritable(); + Text value = new Text(); + int numRecordsNoSplits = 0; + while (reader.next(key, value)) { + ++numRecordsNoSplits; + } + reader.close(); + + // count the records in the first split + split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null); + reader = new LineRecordReader(conf, split); + int numRecordsFirstSplit = 0; + while (reader.next(key, value)) { + ++numRecordsFirstSplit; + } + reader.close(); + + // count the records in the second split + split = new FileSplit(testFilePath, firstSplitLength, + testFileSize - firstSplitLength, (String[])null); + reader = new LineRecordReader(conf, split); + int numRecordsRemainingSplits = 0; + while (reader.next(key, value)) { + ++numRecordsRemainingSplits; + } + reader.close(); + + assertEquals("Unexpected number of records in bzip2 compressed split", + numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits); + } + + @Test + public void testBzip2SplitEndsAtCR() throws IOException { + // the test data contains a carriage-return at the end of the first + // split which ends at compressed offset 136498 and the next + // character is not a linefeed + testSplitRecords("blockEndingInCR.txt.bz2", 136498); + } + + @Test + public void testBzip2SplitEndsAtCRThenLF() throws IOException { + // the test data contains a carriage-return at the end of the first + // split which ends at compressed offset 136498 and the next + // character is a linefeed + testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java new file mode 100644 index 00000000000..26c839614ee --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.net.URL; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.junit.Test; + +public class TestLineRecordReader { + + private void testSplitRecords(String testFileName, long firstSplitLength) + throws IOException { + URL testFileUrl = getClass().getClassLoader().getResource(testFileName); + assertNotNull("Cannot find " + testFileName, testFileUrl); + File testFile = new File(testFileUrl.getFile()); + long testFileSize = testFile.length(); + Path testFilePath = new Path(testFile.getAbsolutePath()); + Configuration conf = new Configuration(); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + assertTrue("unexpected test data at " + testFile, + testFileSize > firstSplitLength); + + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + + // read the data without splitting to count the records + FileSplit split = new FileSplit(testFilePath, 0, testFileSize, + (String[])null); + LineRecordReader reader = new LineRecordReader(); + reader.initialize(split, context); + int numRecordsNoSplits = 0; + while (reader.nextKeyValue()) { + ++numRecordsNoSplits; + } + reader.close(); + + // count the records in the first split + split = new FileSplit(testFilePath, 0, firstSplitLength, (String[])null); + reader = new LineRecordReader(); + reader.initialize(split, context); + int numRecordsFirstSplit = 0; + while (reader.nextKeyValue()) { + ++numRecordsFirstSplit; + } + reader.close(); + + // count the records in the second split + split = new FileSplit(testFilePath, firstSplitLength, + testFileSize - firstSplitLength, (String[])null); + reader = new LineRecordReader(); + reader.initialize(split, context); + int numRecordsRemainingSplits = 0; + while (reader.nextKeyValue()) { + ++numRecordsRemainingSplits; + } + reader.close(); + + assertEquals("Unexpected number of records in bzip2 compressed split", + numRecordsNoSplits, numRecordsFirstSplit + numRecordsRemainingSplits); + } + + @Test + public void testBzip2SplitEndsAtCR() throws IOException { + // the test data contains a carriage-return at the end of the first + // split which ends at compressed offset 136498 and the next + // character is not a linefeed + testSplitRecords("blockEndingInCR.txt.bz2", 136498); + } + + @Test + public void testBzip2SplitEndsAtCRThenLF() throws IOException { + // the test data contains a carriage-return at the end of the first + // split which ends at compressed offset 136498 and the next + // character is a linefeed + testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCR.txt.bz2 b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCR.txt.bz2 new file mode 100644 index 00000000000..bcb9701e050 Binary files /dev/null and b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCR.txt.bz2 differ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCRThenLF.txt.bz2 b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCRThenLF.txt.bz2 new file mode 100644 index 00000000000..80446e3409e Binary files /dev/null and b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCRThenLF.txt.bz2 differ