HADOOP-15206. BZip2 drops and duplicates records when input split size is small. Contributed by Aki Tanaka

This commit is contained in:
Jason Lowe 2018-02-16 14:49:00 -06:00
parent 4c2119f04e
commit 0898ff42e9
2 changed files with 37 additions and 1 deletions

View File

@ -362,9 +362,29 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
bufferedIn = new BufferedInputStream(super.in); bufferedIn = new BufferedInputStream(super.in);
this.startingPos = super.getPos(); this.startingPos = super.getPos();
this.readMode = readMode; this.readMode = readMode;
long numSkipped = 0;
if (this.startingPos == 0) { if (this.startingPos == 0) {
// We only strip header if it is start of file // We only strip header if it is start of file
bufferedIn = readStreamHeader(); bufferedIn = readStreamHeader();
} else if (this.readMode == READ_MODE.BYBLOCK &&
this.startingPos <= HEADER_LEN + SUB_HEADER_LEN) {
// When we're in BYBLOCK mode and the start position is >=0
// and < HEADER_LEN + SUB_HEADER_LEN, we should skip to after
// start of the first bz2 block to avoid duplicated records
numSkipped = HEADER_LEN + SUB_HEADER_LEN + 1 - this.startingPos;
long skipBytes = numSkipped;
while (skipBytes > 0) {
long s = bufferedIn.skip(skipBytes);
if (s > 0) {
skipBytes -= s;
} else {
if (bufferedIn.read() == -1) {
break; // end of the split
} else {
skipBytes--;
}
}
}
} }
input = new CBZip2InputStream(bufferedIn, readMode); input = new CBZip2InputStream(bufferedIn, readMode);
if (this.isHeaderStripped) { if (this.isHeaderStripped) {
@ -375,8 +395,16 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
input.updateReportedByteCount(SUB_HEADER_LEN); input.updateReportedByteCount(SUB_HEADER_LEN);
} }
if (numSkipped > 0) {
input.updateReportedByteCount((int) numSkipped);
}
// To avoid dropped records, not advertising a new byte position
// when we are in BYBLOCK mode and the start position is 0
if (!(this.readMode == READ_MODE.BYBLOCK && this.startingPos == 0)) {
this.updatePos(false); this.updatePos(false);
} }
}
private BufferedInputStream readStreamHeader() throws IOException { private BufferedInputStream readStreamHeader() throws IOException {
// We are flexible enough to allow the compressed stream not to // We are flexible enough to allow the compressed stream not to

View File

@ -183,6 +183,14 @@ public class TestTextInputFormat {
// corner case when we have byte alignment and position of stream are same // corner case when we have byte alignment and position of stream are same
verifyPartitions(471507, 218, file, codec, conf); verifyPartitions(471507, 218, file, codec, conf);
verifyPartitions(473608, 110, file, codec, conf); verifyPartitions(473608, 110, file, codec, conf);
// corner case when split size is small and position of stream is before
// the first BZip2 block
verifyPartitions(100, 20, file, codec, conf);
verifyPartitions(100, 25, file, codec, conf);
verifyPartitions(100, 30, file, codec, conf);
verifyPartitions(100, 50, file, codec, conf);
verifyPartitions(100, 100, file, codec, conf);
} }
// Test a corner case when position of stream is right after BZip2 marker // Test a corner case when position of stream is right after BZip2 marker