From d219550e8bf4c24be68a74f7b8032b6a8b8af0fc Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Tue, 14 Jun 2016 10:18:17 +0900 Subject: [PATCH] HADOOP-13270. BZip2CompressionInputStream finds the same compression marker twice in corner case, causing duplicate data blocks. Contributed by Kai Sasaki. (cherry picked from commit e3ba9ad3f116306910f74645ded91506345b9f6e) --- .../apache/hadoop/io/compress/BZip2Codec.java | 7 +- .../hadoop/mapred/TestTextInputFormat.java | 124 +++++++++--------- 2 files changed, 71 insertions(+), 60 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java index 2c5a7bec852..49dd9c184c5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java @@ -207,7 +207,12 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec { // time stream might start without a leading BZ. final long FIRST_BZIP2_BLOCK_MARKER_POSITION = CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn); - long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION); + long adjStart = 0L; + if (start != 0) { + // Other than the first of file, the marker size is 6 bytes. + adjStart = Math.max(0L, start - (FIRST_BZIP2_BLOCK_MARKER_POSITION + - (HEADER_LEN + SUB_HEADER_LEN))); + } ((Seekable)seekableIn).seek(adjStart); SplitCompressionInputStream in = diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java index b833b607738..5106c3843f2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java @@ -175,69 +175,75 @@ public class TestTextInputFormat { for (int length = MAX_LENGTH / 2; length < MAX_LENGTH; length += random.nextInt(MAX_LENGTH / 4)+1) { - LOG.info("creating; entries = " + length); - - - // create a file with length entries - Writer writer = - new OutputStreamWriter(codec.createOutputStream(localFs.create(file))); - try { - for (int i = 0; i < length; i++) { - writer.write(Integer.toString(i)); - writer.write("\n"); - } - } finally { - writer.close(); - } - - // try splitting the file in a variety of sizes - TextInputFormat format = new TextInputFormat(); - format.configure(conf); - LongWritable key = new LongWritable(); - Text value = new Text(); for (int i = 0; i < 3; i++) { - int numSplits = random.nextInt(MAX_LENGTH/2000)+1; - LOG.info("splitting: requesting = " + numSplits); - InputSplit[] splits = format.getSplits(conf, numSplits); - LOG.info("splitting: got = " + splits.length); - - - - // check each split - BitSet bits = new BitSet(length); - for (int j = 0; j < splits.length; j++) { - LOG.debug("split["+j+"]= " + splits[j]); - RecordReader reader = - format.getRecordReader(splits[j], conf, reporter); - try { - int counter = 0; - while (reader.next(key, value)) { - int v = Integer.parseInt(value.toString()); - LOG.debug("read " + v); - - if (bits.get(v)) { - LOG.warn("conflict with " + v + - " in split " + j + - " at position "+reader.getPos()); - } - assertFalse("Key in multiple partitions.", bits.get(v)); - bits.set(v); - counter++; - } - if (counter > 0) { - LOG.info("splits["+j+"]="+splits[j]+" count=" + counter); - } else { - LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter); - } - } finally { - reader.close(); - } - } - assertEquals("Some keys in no partition.", length, bits.cardinality()); + int numSplits = random.nextInt(MAX_LENGTH / 2000) + 1; + verifyPartitions(length, numSplits, file, codec, conf); } - } + // corner case when we have byte alignment and position of stream are same + verifyPartitions(471507, 218, file, codec, conf); + verifyPartitions(473608, 110, file, codec, conf); + } + + private void verifyPartitions(int length, int numSplits, Path file, + CompressionCodec codec, JobConf conf) throws IOException { + + LOG.info("creating; entries = " + length); + + + // create a file with length entries + Writer writer = + new OutputStreamWriter(codec.createOutputStream(localFs.create(file))); + try { + for (int i = 0; i < length; i++) { + writer.write(Integer.toString(i)); + writer.write("\n"); + } + } finally { + writer.close(); + } + + // try splitting the file in a variety of sizes + TextInputFormat format = new TextInputFormat(); + format.configure(conf); + LongWritable key = new LongWritable(); + Text value = new Text(); + LOG.info("splitting: requesting = " + numSplits); + InputSplit[] splits = format.getSplits(conf, numSplits); + LOG.info("splitting: got = " + splits.length); + + + // check each split + BitSet bits = new BitSet(length); + for (int j = 0; j < splits.length; j++) { + LOG.debug("split["+j+"]= " + splits[j]); + RecordReader reader = + format.getRecordReader(splits[j], conf, Reporter.NULL); + try { + int counter = 0; + while (reader.next(key, value)) { + int v = Integer.parseInt(value.toString()); + LOG.debug("read " + v); + if (bits.get(v)) { + LOG.warn("conflict with " + v + + " in split " + j + + " at position "+reader.getPos()); + } + assertFalse("Key in multiple partitions.", bits.get(v)); + bits.set(v); + counter++; + } + if (counter > 0) { + LOG.info("splits["+j+"]="+splits[j]+" count=" + counter); + } else { + LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter); + } + } finally { + reader.close(); + } + } + assertEquals("Some keys in no partition.", length, bits.cardinality()); } private static LineReader makeStream(String str) throws IOException {