HADOOP-13270. BZip2CompressionInputStream finds the same compression marker twice in corner case, causing duplicate data blocks. Contributed by Kai Sasaki.
(cherry picked from commit e3ba9ad3f116306910f74645ded91506345b9f6e) (cherry picked from commit d219550e8bf4c24be68a74f7b8032b6a8b8af0fc)
This commit is contained in:
parent
56e29d2711
commit
8295351e54
@ -207,7 +207,12 @@ public SplitCompressionInputStream createInputStream(InputStream seekableIn,
|
||||
// time stream might start without a leading BZ.
|
||||
final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
|
||||
CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
|
||||
long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION);
|
||||
long adjStart = 0L;
|
||||
if (start != 0) {
|
||||
// Other than the first of file, the marker size is 6 bytes.
|
||||
adjStart = Math.max(0L, start - (FIRST_BZIP2_BLOCK_MARKER_POSITION
|
||||
- (HEADER_LEN + SUB_HEADER_LEN)));
|
||||
}
|
||||
|
||||
((Seekable)seekableIn).seek(adjStart);
|
||||
SplitCompressionInputStream in =
|
||||
|
@ -175,69 +175,75 @@ public void testSplitableCodecs() throws IOException {
|
||||
for (int length = MAX_LENGTH / 2; length < MAX_LENGTH;
|
||||
length += random.nextInt(MAX_LENGTH / 4)+1) {
|
||||
|
||||
LOG.info("creating; entries = " + length);
|
||||
|
||||
|
||||
// create a file with length entries
|
||||
Writer writer =
|
||||
new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
|
||||
try {
|
||||
for (int i = 0; i < length; i++) {
|
||||
writer.write(Integer.toString(i));
|
||||
writer.write("\n");
|
||||
}
|
||||
} finally {
|
||||
writer.close();
|
||||
}
|
||||
|
||||
// try splitting the file in a variety of sizes
|
||||
TextInputFormat format = new TextInputFormat();
|
||||
format.configure(conf);
|
||||
LongWritable key = new LongWritable();
|
||||
Text value = new Text();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
int numSplits = random.nextInt(MAX_LENGTH/2000)+1;
|
||||
LOG.info("splitting: requesting = " + numSplits);
|
||||
InputSplit[] splits = format.getSplits(conf, numSplits);
|
||||
LOG.info("splitting: got = " + splits.length);
|
||||
|
||||
|
||||
|
||||
// check each split
|
||||
BitSet bits = new BitSet(length);
|
||||
for (int j = 0; j < splits.length; j++) {
|
||||
LOG.debug("split["+j+"]= " + splits[j]);
|
||||
RecordReader<LongWritable, Text> reader =
|
||||
format.getRecordReader(splits[j], conf, reporter);
|
||||
try {
|
||||
int counter = 0;
|
||||
while (reader.next(key, value)) {
|
||||
int v = Integer.parseInt(value.toString());
|
||||
LOG.debug("read " + v);
|
||||
|
||||
if (bits.get(v)) {
|
||||
LOG.warn("conflict with " + v +
|
||||
" in split " + j +
|
||||
" at position "+reader.getPos());
|
||||
}
|
||||
assertFalse("Key in multiple partitions.", bits.get(v));
|
||||
bits.set(v);
|
||||
counter++;
|
||||
}
|
||||
if (counter > 0) {
|
||||
LOG.info("splits["+j+"]="+splits[j]+" count=" + counter);
|
||||
} else {
|
||||
LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
assertEquals("Some keys in no partition.", length, bits.cardinality());
|
||||
int numSplits = random.nextInt(MAX_LENGTH / 2000) + 1;
|
||||
verifyPartitions(length, numSplits, file, codec, conf);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// corner case when we have byte alignment and position of stream are same
|
||||
verifyPartitions(471507, 218, file, codec, conf);
|
||||
verifyPartitions(473608, 110, file, codec, conf);
|
||||
}
|
||||
|
||||
private void verifyPartitions(int length, int numSplits, Path file,
|
||||
CompressionCodec codec, JobConf conf) throws IOException {
|
||||
|
||||
LOG.info("creating; entries = " + length);
|
||||
|
||||
|
||||
// create a file with length entries
|
||||
Writer writer =
|
||||
new OutputStreamWriter(codec.createOutputStream(localFs.create(file)));
|
||||
try {
|
||||
for (int i = 0; i < length; i++) {
|
||||
writer.write(Integer.toString(i));
|
||||
writer.write("\n");
|
||||
}
|
||||
} finally {
|
||||
writer.close();
|
||||
}
|
||||
|
||||
// try splitting the file in a variety of sizes
|
||||
TextInputFormat format = new TextInputFormat();
|
||||
format.configure(conf);
|
||||
LongWritable key = new LongWritable();
|
||||
Text value = new Text();
|
||||
LOG.info("splitting: requesting = " + numSplits);
|
||||
InputSplit[] splits = format.getSplits(conf, numSplits);
|
||||
LOG.info("splitting: got = " + splits.length);
|
||||
|
||||
|
||||
// check each split
|
||||
BitSet bits = new BitSet(length);
|
||||
for (int j = 0; j < splits.length; j++) {
|
||||
LOG.debug("split["+j+"]= " + splits[j]);
|
||||
RecordReader<LongWritable, Text> reader =
|
||||
format.getRecordReader(splits[j], conf, Reporter.NULL);
|
||||
try {
|
||||
int counter = 0;
|
||||
while (reader.next(key, value)) {
|
||||
int v = Integer.parseInt(value.toString());
|
||||
LOG.debug("read " + v);
|
||||
if (bits.get(v)) {
|
||||
LOG.warn("conflict with " + v +
|
||||
" in split " + j +
|
||||
" at position "+reader.getPos());
|
||||
}
|
||||
assertFalse("Key in multiple partitions.", bits.get(v));
|
||||
bits.set(v);
|
||||
counter++;
|
||||
}
|
||||
if (counter > 0) {
|
||||
LOG.info("splits["+j+"]="+splits[j]+" count=" + counter);
|
||||
} else {
|
||||
LOG.debug("splits["+j+"]="+splits[j]+" count=" + counter);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
assertEquals("Some keys in no partition.", length, bits.cardinality());
|
||||
}
|
||||
|
||||
private static LineReader makeStream(String str) throws IOException {
|
||||
|
Loading…
x
Reference in New Issue
Block a user