From 347997c9596a4f7c2e32e39c2e30e9152ba0e7a3 Mon Sep 17 00:00:00 2001
From: Jason Lowe
Date: Tue, 31 Oct 2017 09:30:13 -0500
Subject: [PATCH] HADOOP-14919. BZip2 drops records when reading data in
splits. Contributed by Jason Lowe
(cherry picked from commit 2fae63aa60c43b62bd908a9499562fe528603185)
---
.../apache/hadoop/io/compress/BZip2Codec.java | 39 +---------
.../io/compress/bzip2/CBZip2InputStream.java | 32 +++++---
.../hadoop/mapred/TestTextInputFormat.java | 76 +++++++++++++++++++
3 files changed, 98 insertions(+), 49 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
index 331606eb449..db781187315 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
@@ -204,43 +204,8 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
Seekable.class.getName());
}
- //find the position of first BZip2 start up marker
- ((Seekable)seekableIn).seek(0);
-
- // BZip2 start of block markers are of 6 bytes. But the very first block
- // also has "BZh9", making it 10 bytes. This is the common case. But at
- // time stream might start without a leading BZ.
- final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
- CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
- long adjStart = 0L;
- if (start != 0) {
- // Other than the first of file, the marker size is 6 bytes.
- adjStart = Math.max(0L, start - (FIRST_BZIP2_BLOCK_MARKER_POSITION
- - (HEADER_LEN + SUB_HEADER_LEN)));
- }
-
- ((Seekable)seekableIn).seek(adjStart);
- SplitCompressionInputStream in =
- new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode);
-
-
- // The following if clause handles the following case:
- // Assume the following scenario in BZip2 compressed stream where
- // . represent compressed data.
- // .....[48 bit Block].....[48 bit Block].....[48 bit Block]...
- // ........................[47 bits][1 bit].....[48 bit Block]...
- // ................................^[Assume a Byte alignment here]
- // ........................................^^[current position of stream]
- // .....................^^[We go back 10 Bytes in stream and find a Block marker]
- // ........................................^^[We align at wrong position!]
- // ...........................................................^^[While this pos is correct]
-
- if (in.getPos() < start) {
- ((Seekable)seekableIn).seek(start);
- in = new BZip2CompressionInputStream(seekableIn, start, end, readMode);
- }
-
- return in;
+ ((Seekable)seekableIn).seek(start);
+ return new BZip2CompressionInputStream(seekableIn, start, end, readMode);
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
index 57fc07b617a..edb37fee848 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
@@ -52,20 +52,20 @@ import org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE;
* This Ant code was enhanced so that it can de-compress blocks of bzip2 data.
* Current position in the stream is an important statistic for Hadoop. For
* example in LineRecordReader, we solely depend on the current position in the
- * stream to know about the progess. The notion of position becomes complicated
+ * stream to know about the progress. The notion of position becomes complicated
* for compressed files. The Hadoop splitting is done in terms of compressed
* file. But a compressed file deflates to a large amount of data. So we have
* handled this problem in the following way.
*
* On object creation time, we find the next block start delimiter. Once such a
* marker is found, the stream stops there (we discard any read compressed data
- * in this process) and the position is updated (i.e. the caller of this class
- * will find out the stream location). At this point we are ready for actual
- * reading (i.e. decompression) of data.
+ * in this process) and the position is reported as the beginning of the block
+ * start delimiter. At this point we are ready for actual reading
+ * (i.e. decompression) of data.
*
* The subsequent read calls give out data. The position is updated when the
* caller of this class has read off the current block + 1 bytes. In between the
- * block reading, position is not updated. (We can only update the postion on
+ * block reading, position is not updated. (We can only update the position on
* block boundaries).
*
*
@@ -204,11 +204,12 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants {
* in the stream. It can find bit patterns of length <= 63 bits. Specifically
* this method is used in CBZip2InputStream to find the end of block (EOB)
* delimiter in the stream, starting from the current position of the stream.
- * If marker is found, the stream position will be right after marker at the
- * end of this call.
+ * If marker is found, the stream position will be at the byte containing
+ * the starting bit of the marker.
*
* @param marker The bit pattern to be found in the stream
* @param markerBitLength No of bits in the marker
+ * @return true if the marker was found otherwise false
*
* @throws IOException
* @throws IllegalArgumentException if marketBitLength is greater than 63
@@ -224,23 +225,33 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants {
long bytes = 0;
bytes = this.bsR(markerBitLength);
if (bytes == -1) {
+ this.reportedBytesReadFromCompressedStream =
+ this.bytesReadFromCompressedStream;
return false;
}
while (true) {
if (bytes == marker) {
+ // Report the byte position where the marker starts
+ long markerBytesRead = (markerBitLength + this.bsLive + 7) / 8;
+ this.reportedBytesReadFromCompressedStream =
+ this.bytesReadFromCompressedStream - markerBytesRead;
return true;
-
} else {
bytes = bytes << 1;
bytes = bytes & ((1L << markerBitLength) - 1);
int oneBit = (int) this.bsR(1);
if (oneBit != -1) {
bytes = bytes | oneBit;
- } else
+ } else {
+ this.reportedBytesReadFromCompressedStream =
+ this.bytesReadFromCompressedStream;
return false;
+ }
}
}
} catch (IOException ex) {
+ this.reportedBytesReadFromCompressedStream =
+ this.bytesReadFromCompressedStream;
return false;
}
}
@@ -302,7 +313,6 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants {
} else if (readMode == READ_MODE.BYBLOCK) {
this.currentState = STATE.NO_PROCESS_STATE;
skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER,DELIMITER_BIT_LENGTH);
- this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream;
if(!skipDecompression){
changeStateToProcessABlock();
}
@@ -419,8 +429,6 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants {
result = b;
skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH);
- //Exactly when we are about to start a new block, we advertise the stream position.
- this.reportedBytesReadFromCompressedStream = this.bytesReadFromCompressedStream;
changeStateToProcessABlock();
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
index 5106c3843f2..220d4a9f5db 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
@@ -186,6 +186,82 @@ public class TestTextInputFormat {
verifyPartitions(473608, 110, file, codec, conf);
}
+ // Test a corner case when position of stream is right after BZip2 marker
+ @Test (timeout=900000)
+ public void testSplitableCodecs2() throws IOException {
+ JobConf conf = new JobConf(defaultConf);
+ // Create the codec
+ CompressionCodec codec = null;
+ try {
+ codec = (CompressionCodec)
+ ReflectionUtils.newInstance(conf.getClassByName("org.apache.hadoop.io.compress.BZip2Codec"), conf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Illegal codec!");
+ }
+ Path file = new Path(workDir, "test"+codec.getDefaultExtension());
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(workDir, true);
+ FileInputFormat.setInputPaths(conf, workDir);
+
+ int length = 250000;
+ LOG.info("creating; entries = " + length);
+ // create a file with length entries
+ Writer writer =
+ new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
+ try {
+ for (int i = 0; i < length; i++) {
+ writer.write(Integer.toString(i));
+ writer.write("\n");
+ }
+ } finally {
+ writer.close();
+ }
+
+ // Test split positions around a block boundary where the block does
+ // not start on a byte boundary.
+ for (long splitpos = 203418; splitpos < 203430; ++splitpos) {
+ TextInputFormat format = new TextInputFormat();
+ format.configure(conf);
+ LOG.info("setting block size of the input file to " + splitpos);
+ conf.setLong("mapreduce.input.fileinputformat.split.minsize", splitpos);
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ InputSplit[] splits = format.getSplits(conf, 2);
+ LOG.info("splitting: got = " + splits.length);
+
+ // check each split
+ BitSet bits = new BitSet(length);
+ for (int j = 0; j < splits.length; j++) {
+ LOG.debug("split[" + j + "]= " + splits[j]);
+ RecordReader reader =
+ format.getRecordReader(splits[j], conf, Reporter.NULL);
+ try {
+ int counter = 0;
+ while (reader.next(key, value)) {
+ int v = Integer.parseInt(value.toString());
+ LOG.debug("read " + v);
+ if (bits.get(v)) {
+ LOG.warn("conflict with " + v + " in split " + j +
+ " at position " + reader.getPos());
+ }
+ assertFalse("Key in multiple partitions.", bits.get(v));
+ bits.set(v);
+ counter++;
+ }
+ if (counter > 0) {
+ LOG.info("splits[" + j + "]=" + splits[j] + " count=" + counter);
+ } else {
+ LOG.debug("splits[" + j + "]=" + splits[j] + " count=" + counter);
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ assertEquals("Some keys in no partition.", length, bits.cardinality());
+ }
+ }
+
private void verifyPartitions(int length, int numSplits, Path file,
CompressionCodec codec, JobConf conf) throws IOException {