From 39a6a357ff106d41c2fd4672196dc4621eab17c2 Mon Sep 17 00:00:00 2001 From: huzheng Date: Tue, 2 Apr 2019 20:44:08 +0800 Subject: [PATCH] HBASE-21937 Make the Compression#decompress can accept ByteBuff as input --- .../hadoop/hbase/io/compress/Compression.java | 51 +++++++------------ .../HFileBlockDefaultDecodingContext.java | 11 ++-- .../hadoop/hbase/io/util}/BlockIOUtils.java | 45 +++++++++++++--- .../hadoop/hbase/io/hfile/HFileBlock.java | 29 +++++++---- .../TestLoadAndSwitchEncodeOnDisk.java | 2 - .../hbase/io/hfile/TestBlockIOUtils.java | 1 + .../hadoop/hbase/io/hfile/TestHFileBlock.java | 18 +++---- 7 files changed, 90 insertions(+), 67 deletions(-) rename {hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile => hbase-common/src/main/java/org/apache/hadoop/hbase/io/util}/BlockIOUtils.java (86%) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java index d258ba2927d..3004973fd4a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java @@ -25,7 +25,8 @@ import java.io.OutputStream; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.hbase.io.util.BlockIOUtils; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionInputStream; @@ -438,45 +439,29 @@ public final class Compression { } /** - * Decompresses data from the given stream using the configured compression - * algorithm. It will throw an exception if the dest buffer does not have - * enough space to hold the decompressed data. - * - * @param dest - * the output bytes buffer - * @param destOffset - * start writing position of the output buffer - * @param bufferedBoundedStream - * a stream to read compressed data from, bounded to the exact amount + * Decompresses data from the given stream using the configured compression algorithm. It will + * throw an exception if the dest buffer does not have enough space to hold the decompressed data. + * @param dest the output buffer + * @param bufferedBoundedStream a stream to read compressed data from, bounded to the exact amount * of compressed data - * @param compressedSize - * compressed data size, header not included - * @param uncompressedSize - * uncompressed data size, header not included - * @param compressAlgo - * compression algorithm used - * @throws IOException + * @param uncompressedSize uncompressed data size, header not included + * @param compressAlgo compression algorithm used + * @throws IOException if any IO error happen */ - public static void decompress(byte[] dest, int destOffset, - InputStream bufferedBoundedStream, int compressedSize, - int uncompressedSize, Compression.Algorithm compressAlgo) - throws IOException { - - if (dest.length - destOffset < uncompressedSize) { - throw new IllegalArgumentException( - "Output buffer does not have enough space to hold " - + uncompressedSize + " decompressed bytes, available: " - + (dest.length - destOffset)); + public static void decompress(ByteBuff dest, InputStream bufferedBoundedStream, + int uncompressedSize, Compression.Algorithm compressAlgo) throws IOException { + if (dest.remaining() < uncompressedSize) { + throw new IllegalArgumentException("Output buffer does not have enough space to hold " + + uncompressedSize + " decompressed bytes, available: " + dest.remaining()); } Decompressor decompressor = null; try { decompressor = compressAlgo.getDecompressor(); - InputStream is = compressAlgo.createDecompressionStream( - bufferedBoundedStream, decompressor, 0); - - IOUtils.readFully(is, dest, destOffset, uncompressedSize); - is.close(); + try (InputStream is = + compressAlgo.createDecompressionStream(bufferedBoundedStream, decompressor, 0)) { + BlockIOUtils.readFullyWithHeapBuffer(is, dest, uncompressedSize); + } } finally { if (decompressor != null) { compressAlgo.returnDecompressor(decompressor); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java index d5bf58cb2a3..97d0e6bd66a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.io.crypto.Cipher; import org.apache.hadoop.hbase.io.crypto.Decryptor; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.util.BlockIOUtils; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; @@ -87,14 +88,12 @@ public class HFileBlockDefaultDecodingContext implements } Compression.Algorithm compression = fileContext.getCompression(); - assert blockBufferWithoutHeader.hasArray(); if (compression != Compression.Algorithm.NONE) { - Compression.decompress(blockBufferWithoutHeader.array(), - blockBufferWithoutHeader.arrayOffset(), dataInputStream, onDiskSizeWithoutHeader, - uncompressedSizeWithoutHeader, compression); + Compression.decompress(blockBufferWithoutHeader, dataInputStream, + uncompressedSizeWithoutHeader, compression); } else { - IOUtils.readFully(dataInputStream, blockBufferWithoutHeader.array(), - blockBufferWithoutHeader.arrayOffset(), onDiskSizeWithoutHeader); + BlockIOUtils.readFullyWithHeapBuffer(dataInputStream, blockBufferWithoutHeader, + onDiskSizeWithoutHeader); } } finally { byteBuffInputStream.close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java similarity index 86% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java rename to hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java index dbd5b2e9b76..a98a47879c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.hbase.io.hfile; +package org.apache.hadoop.hbase.io.util; import java.io.IOException; import java.io.InputStream; @@ -29,9 +29,14 @@ import org.apache.hadoop.io.IOUtils; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private -class BlockIOUtils { +public final class BlockIOUtils { - static boolean isByteBufferReadable(FSDataInputStream is) { + // Disallow instantiation + private BlockIOUtils() { + + } + + public static boolean isByteBufferReadable(FSDataInputStream is) { InputStream cur = is.getWrappedStream(); for (;;) { if ((cur instanceof FSDataInputStream)) { @@ -50,7 +55,7 @@ class BlockIOUtils { * @param length bytes to read. * @throws IOException exception to throw if any error happen */ - static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException { + public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException { if (!isByteBufferReadable(dis)) { // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to // the destination ByteBuff. @@ -81,6 +86,32 @@ class BlockIOUtils { } } + /** + * Copying bytes from InputStream to {@link ByteBuff} by using an temporary heap byte[] (default + * size is 1024 now). + * @param in the InputStream to read + * @param out the destination {@link ByteBuff} + * @param length to read + * @throws IOException if any io error encountered. + */ + public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length) + throws IOException { + byte[] buffer = new byte[1024]; + if (length < 0) { + throw new IllegalArgumentException("Length must not be negative: " + length); + } + int remain = length, count; + while (remain > 0) { + count = in.read(buffer, 0, Math.min(remain, buffer.length)); + if (count < 0) { + throw new IOException( + "Premature EOF from inputStream, but still need " + remain + " bytes"); + } + out.put(buffer, 0, count); + remain -= count; + } + } + /** * Read from an input stream at least necessaryLen and if possible, * extraLen also if available. Analogous to @@ -125,8 +156,8 @@ class BlockIOUtils { * ByteBuffers, otherwise we've not read the extraLen bytes yet. * @throws IOException if failed to read the necessary bytes. */ - static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen, int extraLen) - throws IOException { + public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen, + int extraLen) throws IOException { if (!isByteBufferReadable(dis)) { // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to // the destination ByteBuff. @@ -174,7 +205,7 @@ class BlockIOUtils { * @return true if and only if extraLen is > 0 and reading those extra bytes was successful * @throws IOException if failed to read the necessary bytes */ - static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, + public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, int necessaryLen, int extraLen) throws IOException { int remain = necessaryLen + extraLen; byte[] buf = new byte[remain]; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index b54cc0cbbee..c44b22d83cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.util.BlockIOUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -528,14 +529,22 @@ public class HFileBlock implements Cacheable { } /** - * Returns a buffer that does not include the header or checksum. - * + * Returns a buffer that does not include the header and checksum. * @return the buffer with header skipped and checksum omitted. */ public ByteBuff getBufferWithoutHeader() { + return this.getBufferWithoutHeader(false); + } + + /** + * Returns a buffer that does not include the header or checksum. + * @param withChecksum to indicate whether include the checksum or not. + * @return the buffer with header skipped and checksum omitted. + */ + public ByteBuff getBufferWithoutHeader(boolean withChecksum) { ByteBuff dup = getBufferReadOnly(); - // Now set it up so Buffer spans content only -- no header or no checksums. - return dup.position(headerSize()).limit(buf.limit() - totalChecksumBytes()).slice(); + int delta = withChecksum ? 0 : totalChecksumBytes(); + return dup.position(headerSize()).limit(buf.limit() - delta).slice(); } /** @@ -609,8 +618,9 @@ public class HFileBlock implements Cacheable { // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next // block's header, so there are two sensible values for buffer capacity. int hdrSize = headerSize(); - if (dup.capacity() != expectedBufLimit && dup.capacity() != expectedBufLimit + hdrSize) { - throw new AssertionError("Invalid buffer capacity: " + dup.capacity() + + dup.rewind(); + if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) { + throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); } } @@ -672,15 +682,15 @@ public class HFileBlock implements Cacheable { HFileBlock unpacked = new HFileBlock(this); unpacked.allocateBuffer(); // allocates space for the decompressed block - HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ? - reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); + HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA + ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); ByteBuff dup = this.buf.duplicate(); dup.position(this.headerSize()); dup = dup.slice(); ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), - unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup); + unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup); return unpacked; } @@ -698,7 +708,6 @@ public class HFileBlock implements Cacheable { ByteBuff newBuf = allocator.allocate(capacityNeeded); // Copy header bytes into newBuf. - // newBuf is HBB so no issue in calling array() buf.position(0); newBuf.put(0, buf, 0, headerSize); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java index c8ce9135042..43fa0e3ecb8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.TestMiniClusterLoadSequential; import org.apache.hadoop.hbase.util.Threads; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runners.Parameterized.Parameters; @@ -74,7 +73,6 @@ public class TestLoadAndSwitchEncodeOnDisk extends TestMiniClusterLoadSequential @Override @Test - @Ignore("TODO Ignore this UT temporarily, will fix this in the critical HBASE-21937.") public void loadTest() throws Exception { Admin admin = TEST_UTIL.getAdmin(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java index 60180e6ff56..a386f49cc78 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.io.util.BlockIOUtils; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 2733ca21ed8..af42a24f827 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -97,8 +97,7 @@ public class TestHFileBlock { private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class); - // TODO let uncomment the GZ algorithm in HBASE-21937, because no support BB unpack yet. - static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, /* GZ */ }; + static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ }; private static final int NUM_TEST_BLOCKS = 1000; private static final int NUM_READER_THREADS = 26; @@ -623,7 +622,7 @@ public class TestHFileBlock { if (detailedLogging) { LOG.info("Reading block #" + i + " at offset " + curOffset); } - HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, true); + HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, false); if (detailedLogging) { LOG.info("Block #" + i + ": " + b); } @@ -638,7 +637,7 @@ public class TestHFileBlock { // Now re-load this block knowing the on-disk size. This tests a // different branch in the loader. HFileBlock b2 = - hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, true); + hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, false); b2.sanityCheck(); assertEquals(b.getBlockType(), b2.getBlockType()); @@ -667,11 +666,10 @@ public class TestHFileBlock { // expectedContents have header + data only ByteBuff bufRead = newBlock.getBufferReadOnly(); ByteBuffer bufExpected = expectedContents.get(i); - boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(), - bufRead.arrayOffset(), - bufRead.limit() - newBlock.totalChecksumBytes(), - bufExpected.array(), bufExpected.arrayOffset(), - bufExpected.limit()) == 0; + byte[] tmp = new byte[bufRead.limit() - newBlock.totalChecksumBytes()]; + bufRead.get(tmp, 0, tmp.length); + boolean bytesAreCorrect = Bytes.compareTo(tmp, 0, tmp.length, bufExpected.array(), + bufExpected.arrayOffset(), bufExpected.limit()) == 0; String wrongBytesMsg = ""; if (!bytesAreCorrect) { @@ -702,6 +700,8 @@ public class TestHFileBlock { if (newBlock != b) { assertTrue(b.release()); } + } else { + assertTrue(b.release()); } } assertEquals(curOffset, fs.getFileStatus(path).getLen());