From a432925f74b93d05b4dfdd1831bfbabbf4466a80 Mon Sep 17 00:00:00 2001 From: Ashutosh Gupta Date: Wed, 6 Jul 2022 05:30:14 +0100 Subject: [PATCH] HADOOP-18321.Fix when to read an additional record from a BZip2 text file split (#4521) * HADOOP-18321.Fix when to read an additional record from a BZip2 text file split Co-authored-by: Ashutosh Gupta and Reviewed by Akira Ajisaka. --- .../apache/hadoop/io/compress/BZip2Codec.java | 13 +- .../io/compress/bzip2/CBZip2InputStream.java | 16 +- .../io/compress/bzip2/CBZip2OutputStream.java | 9 +- .../compress/bzip2/BZip2TextFileWriter.java | 114 ++++++ .../hadoop/io/compress/bzip2/BZip2Utils.java | 67 ++++ .../bzip2/TestBZip2TextFileWriter.java | 91 +++++ .../lib/input/CompressedSplitLineReader.java | 12 +- .../hadoop/mapred/LineRecordReaderHelper.java | 54 +++ .../mapred/TestLineRecordReaderBZip2.java | 31 ++ .../lib/input/BaseLineRecordReaderHelper.java | 58 +++ .../input/BaseTestLineRecordReaderBZip2.java | 375 ++++++++++++++++++ .../lib/input/LineRecordReaderHelper.java | 62 +++ .../lib/input/TestLineRecordReaderBZip2.java | 29 ++ 13 files changed, 921 insertions(+), 10 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2TextFileWriter.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2Utils.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/TestBZip2TextFileWriter.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/LineRecordReaderHelper.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReaderBZip2.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseLineRecordReaderHelper.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseTestLineRecordReaderBZip2.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReaderHelper.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReaderBZip2.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java index 7640f7ed7a6..1564ae90855 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java @@ -24,6 +24,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -255,10 +256,7 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec { private void writeStreamHeader() throws IOException { if (super.out != null) { - // The compressed bzip2 stream should start with the - // identifying characters BZ. Caller of CBZip2OutputStream - // i.e. this class must write these characters. - out.write(HEADER.getBytes(StandardCharsets.UTF_8)); + writeHeader(out); } } @@ -547,4 +545,11 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec { }// end of BZip2CompressionInputStream + @VisibleForTesting + public static void writeHeader(OutputStream out) throws IOException { + // The compressed bzip2 stream should start with the + // identifying characters BZ. Caller of CBZip2OutputStream + // i.e. this class must write these characters. + out.write(HEADER.getBytes(StandardCharsets.UTF_8)); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java index 187fe481588..61e88d80d8c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java @@ -27,6 +27,7 @@ import java.io.BufferedInputStream; import java.io.InputStream; import java.io.IOException; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE; @@ -312,13 +313,24 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants { } } else if (readMode == READ_MODE.BYBLOCK) { this.currentState = STATE.NO_PROCESS_STATE; - skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER,DELIMITER_BIT_LENGTH); + skipResult = skipToNextBlockMarker(); if(!skipDecompression){ changeStateToProcessABlock(); } } } + /** + * Skips bytes in the stream until the start marker of a block is reached + * or end of stream is reached. Used for testing purposes to identify the + * start offsets of blocks. + */ + @VisibleForTesting + boolean skipToNextBlockMarker() throws IOException { + return skipToNextMarker( + CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH); + } + /** * Returns the number of bytes between the current stream position * and the immediate next BZip2 block marker. @@ -428,7 +440,7 @@ public class CBZip2InputStream extends InputStream implements BZip2Constants { //report 'end of block' or 'end of stream' result = b; - skipResult = this.skipToNextMarker(CBZip2InputStream.BLOCK_DELIMITER, DELIMITER_BIT_LENGTH); + skipResult = skipToNextBlockMarker(); changeStateToProcessABlock(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java index 39c3638b0f4..50bdddb8136 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java @@ -27,6 +27,7 @@ package org.apache.hadoop.io.compress.bzip2; import java.io.OutputStream; import java.io.IOException; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.io.IOUtils; /** @@ -781,8 +782,7 @@ public class CBZip2OutputStream extends OutputStream implements BZip2Constants { inUse[i] = false; } - /* 20 is just a paranoia constant */ - this.allowableBlockSize = (this.blockSize100k * BZip2Constants.baseBlockSize) - 20; + this.allowableBlockSize = getAllowableBlockSize(this.blockSize100k); } private void endBlock() throws IOException { @@ -2093,4 +2093,9 @@ public class CBZip2OutputStream extends OutputStream implements BZip2Constants { } + @VisibleForTesting + static int getAllowableBlockSize(int blockSize100k) { + /* 20 is just a paranoia constant */ + return (blockSize100k * BZip2Constants.baseBlockSize) - 20; + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2TextFileWriter.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2TextFileWriter.java new file mode 100644 index 00000000000..e40f631b603 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2TextFileWriter.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.io.compress.bzip2; + +import static org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream.MIN_BLOCKSIZE; +import static org.apache.hadoop.util.Preconditions.checkArgument; + +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.BZip2Codec; + +/** + * A writer that simplifies creating BZip2 compressed text data for testing + * purposes. + */ +public final class BZip2TextFileWriter implements Closeable { + + // Use minimum block size to reduce amount of data to require to be written + // to CBZip2OutputStream before a new block is created. + private static final int BLOCK_SIZE_100K = MIN_BLOCKSIZE; + + /** + * The amount of bytes of run-length encoded data that needs to be written + * to this writer in order for the next byte written starts a new BZip2 block. + */ + public static final int BLOCK_SIZE = + // The + 1 is needed because of how CBZip2OutputStream checks whether the + // last offset written is less than allowable block size. Because the last + // offset is one less of the amount of bytes written to the block, we need + // to write an extra byte to trigger writing a new block. + CBZip2OutputStream.getAllowableBlockSize(BLOCK_SIZE_100K) + 1; + + private final CBZip2OutputStream out; + + public BZip2TextFileWriter(Path path, Configuration conf) throws IOException { + this(path.getFileSystem(conf).create(path)); + } + + public BZip2TextFileWriter(OutputStream rawOut) throws IOException { + try { + BZip2Codec.writeHeader(rawOut); + out = new CBZip2OutputStream(rawOut, BLOCK_SIZE_100K); + } catch (Throwable e) { + rawOut.close(); + throw e; + } + } + + public void writeManyRecords(int totalSize, int numRecords, byte[] delimiter) + throws IOException { + checkArgument(numRecords > 0); + checkArgument(delimiter.length > 0); + + int minRecordSize = totalSize / numRecords; + checkArgument(minRecordSize >= delimiter.length); + + int lastRecordExtraSize = totalSize % numRecords; + + for (int i = 0; i < numRecords - 1; i++) { + writeRecord(minRecordSize, delimiter); + } + writeRecord(minRecordSize + lastRecordExtraSize, delimiter); + } + + public void writeRecord(int totalSize, byte[] delimiter) throws IOException { + checkArgument(delimiter.length > 0); + checkArgument(totalSize >= delimiter.length); + + int contentSize = totalSize - delimiter.length; + for (int i = 0; i < contentSize; i++) { + // Alternate between characters so that internals of CBZip2OutputStream + // cannot condensed the written bytes using run-length encoding. This + // allows the caller to use #BLOCK_SIZE in order to know whether the next + // write will end just before the end of the current block, or exceed it, + // and by how much. + out.write(i % 2 == 0 ? 'a' : 'b'); + } + write(delimiter); + } + + public void write(String bytes) throws IOException { + write(bytes.getBytes(StandardCharsets.UTF_8)); + } + + public void write(byte[] bytes) throws IOException { + out.write(bytes); + } + + @Override + public void close() throws IOException { + out.close(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2Utils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2Utils.java new file mode 100644 index 00000000000..717b282797e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/BZip2Utils.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.bzip2; + +import static org.apache.hadoop.io.compress.SplittableCompressionCodec.READ_MODE.BYBLOCK; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public final class BZip2Utils { + + private BZip2Utils() { + } + + /** + * Returns the start offsets of blocks that follow the first block in the + * BZip2 compressed file at the given path. The first offset corresponds to + * the first byte containing the BZip2 block marker of the second block. The + * i-th offset corresponds to the block marker of the (i + 1)-th block. + */ + public static List getNextBlockMarkerOffsets( + Path path, Configuration conf) throws IOException { + FileSystem fs = path.getFileSystem(conf); + try (InputStream fileIn = fs.open(path)) { + return getNextBlockMarkerOffsets(fileIn); + } + } + + /** + * Returns the start offsets of blocks that follow the first block in the + * BZip2 compressed input stream. The first offset corresponds to + * the first byte containing the BZip2 block marker of the second block. The + * i-th offset corresponds to the block marker of the (i + 1)-th block. + */ + public static List getNextBlockMarkerOffsets(InputStream rawIn) + throws IOException { + try (CBZip2InputStream in = new CBZip2InputStream(rawIn, BYBLOCK)) { + ArrayList offsets = new ArrayList<>(); + while (in.skipToNextBlockMarker()) { + offsets.add(in.getProcessedByteCount()); + } + return offsets; + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/TestBZip2TextFileWriter.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/TestBZip2TextFileWriter.java new file mode 100644 index 00000000000..9b36f899b6d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/TestBZip2TextFileWriter.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress.bzip2; + +import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE; +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public final class TestBZip2TextFileWriter { + + private static final byte[] DELIMITER = new byte[] {'\0'}; + + private ByteArrayOutputStream rawOut; + private BZip2TextFileWriter writer; + + @Before + public void setUp() throws Exception { + rawOut = new ByteArrayOutputStream(); + writer = new BZip2TextFileWriter(rawOut); + } + + @After + public void tearDown() throws Exception { + rawOut = null; + writer.close(); + } + + @Test + public void writingSingleBlockSizeOfData() throws Exception { + writer.writeRecord(BLOCK_SIZE, DELIMITER); + writer.close(); + + List nextBlocks = getNextBlockMarkerOffsets(); + assertEquals(0, nextBlocks.size()); + } + + @Test + public void justExceedingBeyondBlockSize() throws Exception { + writer.writeRecord(BLOCK_SIZE + 1, DELIMITER); + writer.close(); + + List nextBlocks = getNextBlockMarkerOffsets(); + assertEquals(1, nextBlocks.size()); + } + + @Test + public void writingTwoBlockSizesOfData() throws Exception { + writer.writeRecord(2 * BLOCK_SIZE, DELIMITER); + writer.close(); + + List nextBlocks = getNextBlockMarkerOffsets(); + assertEquals(1, nextBlocks.size()); + } + + @Test + public void justExceedingBeyondTwoBlocks() throws Exception { + writer.writeRecord(2 * BLOCK_SIZE + 1, DELIMITER); + writer.close(); + + List nextBlocks = getNextBlockMarkerOffsets(); + assertEquals(2, nextBlocks.size()); + } + + private List getNextBlockMarkerOffsets() throws IOException { + ByteArrayInputStream in = new ByteArrayInputStream(rawOut.toByteArray()); + return BZip2Utils.getNextBlockMarkerOffsets(in); + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java index 9d0e949a10b..74b959cf47c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java @@ -127,6 +127,8 @@ public class CompressedSplitLineReader extends SplitLineReader { @Override protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) throws IOException { + boolean alreadyReadAfterSplit = didReadAfterSplit(); + int bytesRead = in.read(buffer); // If the split ended in the middle of a record delimiter then we need @@ -135,7 +137,9 @@ public class CompressedSplitLineReader extends SplitLineReader { // However if using the default delimiter and the next character is a // linefeed then next split will treat it as a delimiter all by itself // and the additional record read should not be performed. - if (inDelimiter && bytesRead > 0) { + boolean justReadAfterSplit = !alreadyReadAfterSplit && didReadAfterSplit(); + + if (justReadAfterSplit && inDelimiter && bytesRead > 0) { if (usingCRLF) { needAdditionalRecord = (buffer[0] != '\n'); } else { @@ -152,7 +156,7 @@ public class CompressedSplitLineReader extends SplitLineReader { if (!finished) { // only allow at most one more record to be read after the stream // reports the split ended - if (scin.getPos() > scin.getAdjustedEnd()) { + if (didReadAfterSplit()) { finished = true; } @@ -170,4 +174,8 @@ public class CompressedSplitLineReader extends SplitLineReader { protected void unsetNeedAdditionalRecordAfterSplit() { needAdditionalRecord = false; } + + private boolean didReadAfterSplit() throws IOException { + return scin.getPos() > scin.getAdjustedEnd(); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/LineRecordReaderHelper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/LineRecordReaderHelper.java new file mode 100644 index 00000000000..fccc01ad7e6 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/LineRecordReaderHelper.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.BaseLineRecordReaderHelper; + +public final class LineRecordReaderHelper extends + BaseLineRecordReaderHelper { + + public LineRecordReaderHelper(Path filePath, Configuration conf) { + super(filePath, conf); + } + + @Override + public long countRecords(long start, long length) throws IOException { + try (LineRecordReader reader = newReader(start, length)) { + LongWritable key = new LongWritable(); + Text value = new Text(); + + long numRecords = 0L; + while (reader.next(key, value)) { + numRecords++; + } + return numRecords; + } + } + + private LineRecordReader newReader(long start, long length) + throws IOException { + FileSplit split = new FileSplit(getFilePath(), start, length, (String[]) null); + return new LineRecordReader(getConf(), split, getRecordDelimiterBytes()); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReaderBZip2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReaderBZip2.java new file mode 100644 index 00000000000..1588e629c8c --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReaderBZip2.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.input.BaseLineRecordReaderHelper; +import org.apache.hadoop.mapreduce.lib.input.BaseTestLineRecordReaderBZip2; + +public final class TestLineRecordReaderBZip2 extends + BaseTestLineRecordReaderBZip2 { + + @Override + protected BaseLineRecordReaderHelper newReader(Path file) { + return new LineRecordReaderHelper(file, getConf()); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseLineRecordReaderHelper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseLineRecordReaderHelper.java new file mode 100644 index 00000000000..d1e21e6e258 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseLineRecordReaderHelper.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +public abstract class BaseLineRecordReaderHelper { + + private final Configuration conf; + private final Path filePath; + private final byte[] recordDelimiterBytes; + + + + public BaseLineRecordReaderHelper(Path filePath, Configuration conf) { + this.conf = conf; + this.filePath = filePath; + + conf.setInt(LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + + String delimiter = conf.get("textinputformat.record.delimiter"); + this.recordDelimiterBytes = + null != delimiter ? delimiter.getBytes(StandardCharsets.UTF_8) : null; + } + + public abstract long countRecords(long start, long length) throws IOException; + + public Configuration getConf() { + return conf; + } + + public Path getFilePath() { + return filePath; + } + + public byte[] getRecordDelimiterBytes() { + return recordDelimiterBytes; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseTestLineRecordReaderBZip2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseTestLineRecordReaderBZip2.java new file mode 100644 index 00000000000..cd400365efa --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/BaseTestLineRecordReaderBZip2.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.hadoop.mapreduce.lib.input; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; +import static org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter.BLOCK_SIZE; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.bzip2.BZip2Utils; +import org.apache.hadoop.io.compress.bzip2.BZip2TextFileWriter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public abstract class BaseTestLineRecordReaderBZip2 { + + // LF stands for line feed + private static final byte[] LF = new byte[] {'\n'}; + // CR stands for cartridge return + private static final byte[] CR = new byte[] {'\r'}; + private static final byte[] CR_LF = new byte[] {'\r', '\n'}; + + private Configuration conf; + private FileSystem fs; + private Path tempFile; + + public Configuration getConf() { + return conf; + } + + public FileSystem getFs() { + return fs; + } + + public Path getTempFile() { + return tempFile; + } + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + + Path workDir = new Path( + System.getProperty("test.build.data", "target"), + "data/" + getClass().getSimpleName()); + + fs = workDir.getFileSystem(conf); + + Path inputDir = new Path(workDir, "input"); + tempFile = new Path(inputDir, "test.txt.bz2"); + } + + @After + public void tearDown() throws Exception { + fs.delete(tempFile, /* recursive */ false); + } + + @Test + public void firstBlockEndsWithLF() throws Exception { + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.writeManyRecords(BLOCK_SIZE, 1000, LF); + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + } + assertRecordCountsPerSplit(tempFile, new long[] {1001, 2}); + } + + @Test + public void firstBlockEndsWithLFSecondBlockStartsWithLF() throws Exception { + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.writeManyRecords(BLOCK_SIZE, 1000, LF); + // Write 254 empty rows terminating at LF, as those records will get + // rolled into the first block record due to run-length encoding, the + // 255th LF character will trigger a run to be written to the block. We + // only need 254 LF characters since the last byte written by prior + // writeManyRecords call is already a LF. + writer.writeManyRecords(254, 254, LF); + + // This LF character should be the first byte of the second block, but + // if splitting at blocks, the first split will read this record as the + // additional record. + writer.writeRecord(1, LF); + + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + } + assertRecordCountsPerSplit(tempFile, new long[] {1255, 2}); + } + + @Test + public void firstBlockEndsWithLFSecondBlockStartsWithCR() throws Exception { + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.writeManyRecords(BLOCK_SIZE, 1000, LF); + writer.writeRecord(1, CR); + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + } + assertRecordCountsPerSplit(tempFile, new long[] {1001, 2}); + } + + @Test + public void firstBlockEndsWithCRLF() throws Exception { + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.writeManyRecords(BLOCK_SIZE, 1000, CR_LF); + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + } + assertRecordCountsPerSplit(tempFile, new long[] {1001, 2}); + } + + @Test + public void lastRecordContentSpanAcrossBlocks() + throws Exception { + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF); + writer.writeRecord(100, LF); + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + } + assertRecordCountsPerSplit(tempFile, new long[] {1000, 3}); + } + + @Test + public void lastRecordOfBlockHasItsLFInNextBlock() throws Exception { + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF); + // The LF character is the first byte of the second block + writer.writeRecord(51, LF); + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + } + assertRecordCountsPerSplit(tempFile, new long[] {1000, 3}); + } + + @Test + public void lastRecordOfFirstBlockHasItsCRLFInSecondBlock() throws Exception { + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF); + // Both CR + LF characters are the first two bytes of second block + writer.writeRecord(52, CR_LF); + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + } + assertRecordCountsPerSplit(tempFile, new long[] {1000, 3}); + } + + @Test + public void lastRecordOfFirstBlockHasItsCRLFPartlyInSecondBlock() + throws Exception { + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.writeManyRecords(BLOCK_SIZE - 50, 999, LF); + // The CR character is the last byte of the first block and the LF is + // the firs byte of the second block + writer.writeRecord(51, CR_LF); + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + } + assertRecordCountsPerSplit(tempFile, new long[] {1000, 3}); + } + + @Test + public void lastByteInFirstBlockIsCRFirstByteInSecondBlockIsNotLF() + throws Exception { + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.writeManyRecords(BLOCK_SIZE, 1000, CR); + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + writer.writeRecord(10, LF); + } + assertRecordCountsPerSplit(tempFile, new long[] {1001, 2}); + } + + @Test + public void usingCRDelimiterWithSmallestBufferSize() throws Exception { + // Forces calling LineReader#fillBuffer for ever byte read + conf.set(IO_FILE_BUFFER_SIZE_KEY, "1"); + + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.writeManyRecords(BLOCK_SIZE - 50, 999, CR); + writer.writeRecord(100, CR); + writer.writeRecord(10, CR); + writer.writeRecord(10, CR); + writer.writeRecord(10, CR); + } + assertRecordCountsPerSplit(tempFile, new long[] {1000, 3}); + } + + @Test + public void delimitedByCRSpanningThreeBlocks() throws Exception { + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.writeRecord(3 * BLOCK_SIZE, CR); + writer.writeRecord(3 * BLOCK_SIZE, CR); + writer.writeRecord(3 * BLOCK_SIZE, CR); + } + assertRecordCountsPerSplit(tempFile, + new long[] {1, 0, 1, 0, 0, 1, 0, 0, 0}); + } + + @Test + public void customDelimiterLastThreeBytesInBlockAreDelimiter() + throws Exception { + byte[] delimiter = new byte[] {'e', 'n', 'd'}; + setDelimiter(delimiter); + + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.writeManyRecords(BLOCK_SIZE, 1000, delimiter); + writer.writeRecord(10, delimiter); + writer.writeRecord(10, delimiter); + writer.writeRecord(10, delimiter); + } + assertRecordCountsPerSplit(tempFile, new long[] {1001, 2}); + } + + @Test + public void customDelimiterDelimiterSpansAcrossBlocks() + throws Exception { + byte[] delimiter = new byte[] {'e', 'n', 'd'}; + setDelimiter(delimiter); + + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.writeManyRecords(BLOCK_SIZE - 50, 999, delimiter); + writer.writeRecord(52, delimiter); + writer.writeRecord(10, delimiter); + writer.writeRecord(10, delimiter); + writer.writeRecord(10, delimiter); + } + assertRecordCountsPerSplit(tempFile, new long[] {1001, 2}); + } + + @Test + public void customDelimiterLastRecordDelimiterStartsAtNextBlockStart() + throws Exception { + byte[] delimiter = new byte[] {'e', 'n', 'd'}; + setDelimiter(delimiter); + + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.writeManyRecords(BLOCK_SIZE - 50, 999, delimiter); + writer.writeRecord(53, delimiter); + writer.writeRecord(10, delimiter); + writer.writeRecord(10, delimiter); + writer.writeRecord(10, delimiter); + } + assertRecordCountsPerSplit(tempFile, new long[] {1000, 3}); + } + + @Test + public void customDelimiterLastBlockBytesShareCommonPrefixWithDelimiter() + throws Exception { + byte[] delimiter = new byte[] {'e', 'n', 'd'}; + setDelimiter(delimiter); + + try (BZip2TextFileWriter writer = new BZip2TextFileWriter(tempFile, conf)) { + writer.writeManyRecords(BLOCK_SIZE - 4, 999, delimiter); + // The first 4 bytes, "an e", will be the last 4 bytes of the first block, + // the last byte being 'e' which matches the first character of the + // delimiter "end". The first byte of the next block also matches the + // second byte of the delimiter "n"; however the next character "c" does + // not match the last character of the delimiter. Thus an additional + // record should not be read for the split that reads the first block. + // The split that reads the second block will just discard + // "nchanting tale coming to an end". + writer.write("an enchanting tale coming to an end"); + writer.writeRecord(10, delimiter); + writer.writeRecord(10, delimiter); + writer.writeRecord(10, delimiter); + } + assertRecordCountsPerSplit(tempFile, new long[] {1000, 3}); + } + + protected abstract BaseLineRecordReaderHelper newReader(Path file); + + private void assertRecordCountsPerSplit( + Path path, long[] countsIfSplitAtBlocks) throws IOException { + RecordCountAssert countAssert = + new RecordCountAssert(path, countsIfSplitAtBlocks); + countAssert.assertSingleSplit(); + countAssert.assertSplittingAtBlocks(); + countAssert.assertSplittingJustAfterSecondBlockStarts(); + } + + private class RecordCountAssert { + + private final BaseLineRecordReaderHelper reader; + private final long numBlocks; + private final long[] countsIfSplitAtBlocks; + private final long fileSize; + private final long totalRecords; + private final List nextBlockOffsets; + + RecordCountAssert( + Path path, long[] countsIfSplitAtBlocks) throws IOException { + this.reader = newReader(path); + this.countsIfSplitAtBlocks = countsIfSplitAtBlocks; + this.fileSize = getFileSize(path); + this.totalRecords = Arrays.stream(countsIfSplitAtBlocks).sum(); + this.numBlocks = countsIfSplitAtBlocks.length; + this.nextBlockOffsets = BZip2Utils.getNextBlockMarkerOffsets(path, conf); + + assertEquals(numBlocks, nextBlockOffsets.size() + 1); + } + + private void assertSingleSplit() throws IOException { + assertEquals(totalRecords, reader.countRecords(0, fileSize)); + } + + private void assertSplittingAtBlocks() throws IOException { + for (int i = 0; i < numBlocks; i++) { + long start = i == 0 ? 0 : nextBlockOffsets.get(i - 1); + long end = i == numBlocks - 1 ? fileSize : nextBlockOffsets.get(i); + long length = end - start; + + String message = "At i=" + i; + long expectedCount = countsIfSplitAtBlocks[i]; + assertEquals( + message, expectedCount, reader.countRecords(start, length)); + } + } + + private void assertSplittingJustAfterSecondBlockStarts() + throws IOException { + if (numBlocks <= 1) { + return; + } + long recordsInFirstTwoBlocks = + countsIfSplitAtBlocks[0] + countsIfSplitAtBlocks[1]; + long remainingRecords = totalRecords - recordsInFirstTwoBlocks; + + long firstSplitSize = nextBlockOffsets.get(0) + 1; + assertEquals( + recordsInFirstTwoBlocks, + reader.countRecords(0, firstSplitSize)); + assertEquals( + remainingRecords, + reader.countRecords(firstSplitSize, fileSize - firstSplitSize)); + } + } + + private long getFileSize(Path path) throws IOException { + return fs.getFileStatus(path).getLen(); + } + + private void setDelimiter(byte[] delimiter) { + conf.set("textinputformat.record.delimiter", + new String(delimiter, StandardCharsets.UTF_8)); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReaderHelper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReaderHelper.java new file mode 100644 index 00000000000..56fc7eb4e86 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReaderHelper.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +public final class LineRecordReaderHelper extends + BaseLineRecordReaderHelper { + + public LineRecordReaderHelper(Path filePath, Configuration conf) { + super(filePath, conf); + } + + @Override + public long countRecords(long start, long length) throws IOException { + try (LineRecordReader reader = newReader(start, length)) { + long numRecords = 0L; + while (reader.nextKeyValue()) { + numRecords++; + } + return numRecords; + } + } + + private LineRecordReader newReader(long start, long length) + throws IOException { + FileSplit split = new FileSplit(getFilePath(), start, length, null); + + TaskAttemptContext context = + new TaskAttemptContextImpl(getConf(), new TaskAttemptID()); + + LineRecordReader reader = new LineRecordReader(getRecordDelimiterBytes()); + try { + reader.initialize(split, context); + return reader; + } catch (Throwable e) { + reader.close(); + throw e; + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReaderBZip2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReaderBZip2.java new file mode 100644 index 00000000000..ec01ef332fa --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReaderBZip2.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.hadoop.mapreduce.lib.input; + +import org.apache.hadoop.fs.Path; + +public final class TestLineRecordReaderBZip2 + extends BaseTestLineRecordReaderBZip2 { + + @Override + protected BaseLineRecordReaderHelper newReader(Path file) { + return new LineRecordReaderHelper(file, getConf()); + } +}