HBASE-21937 Make the Compression#decompress can accept ByteBuff as input

This commit is contained in:
huzheng 2019-04-02 20:44:08 +08:00
parent 77cef7490b
commit 7dedb5625a
7 changed files with 90 additions and 67 deletions

View File

@ -25,7 +25,8 @@ import java.io.OutputStream;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.hbase.io.util.BlockIOUtils;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.CompressionInputStream;
@ -438,45 +439,29 @@ public final class Compression {
} }
/** /**
* Decompresses data from the given stream using the configured compression * Decompresses data from the given stream using the configured compression algorithm. It will
* algorithm. It will throw an exception if the dest buffer does not have * throw an exception if the dest buffer does not have enough space to hold the decompressed data.
* enough space to hold the decompressed data. * @param dest the output buffer
* * @param bufferedBoundedStream a stream to read compressed data from, bounded to the exact amount
* @param dest
* the output bytes buffer
* @param destOffset
* start writing position of the output buffer
* @param bufferedBoundedStream
* a stream to read compressed data from, bounded to the exact amount
* of compressed data * of compressed data
* @param compressedSize * @param uncompressedSize uncompressed data size, header not included
* compressed data size, header not included * @param compressAlgo compression algorithm used
* @param uncompressedSize * @throws IOException if any IO error happen
* uncompressed data size, header not included
* @param compressAlgo
* compression algorithm used
* @throws IOException
*/ */
public static void decompress(byte[] dest, int destOffset, public static void decompress(ByteBuff dest, InputStream bufferedBoundedStream,
InputStream bufferedBoundedStream, int compressedSize, int uncompressedSize, Compression.Algorithm compressAlgo) throws IOException {
int uncompressedSize, Compression.Algorithm compressAlgo) if (dest.remaining() < uncompressedSize) {
throws IOException { throw new IllegalArgumentException("Output buffer does not have enough space to hold "
+ uncompressedSize + " decompressed bytes, available: " + dest.remaining());
if (dest.length - destOffset < uncompressedSize) {
throw new IllegalArgumentException(
"Output buffer does not have enough space to hold "
+ uncompressedSize + " decompressed bytes, available: "
+ (dest.length - destOffset));
} }
Decompressor decompressor = null; Decompressor decompressor = null;
try { try {
decompressor = compressAlgo.getDecompressor(); decompressor = compressAlgo.getDecompressor();
InputStream is = compressAlgo.createDecompressionStream( try (InputStream is =
bufferedBoundedStream, decompressor, 0); compressAlgo.createDecompressionStream(bufferedBoundedStream, decompressor, 0)) {
BlockIOUtils.readFullyWithHeapBuffer(is, dest, uncompressedSize);
IOUtils.readFully(is, dest, destOffset, uncompressedSize); }
is.close();
} finally { } finally {
if (decompressor != null) { if (decompressor != null) {
compressAlgo.returnDecompressor(decompressor); compressAlgo.returnDecompressor(decompressor);

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.io.crypto.Cipher;
import org.apache.hadoop.hbase.io.crypto.Decryptor; import org.apache.hadoop.hbase.io.crypto.Decryptor;
import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -87,14 +88,12 @@ public class HFileBlockDefaultDecodingContext implements
} }
Compression.Algorithm compression = fileContext.getCompression(); Compression.Algorithm compression = fileContext.getCompression();
assert blockBufferWithoutHeader.hasArray();
if (compression != Compression.Algorithm.NONE) { if (compression != Compression.Algorithm.NONE) {
Compression.decompress(blockBufferWithoutHeader.array(), Compression.decompress(blockBufferWithoutHeader, dataInputStream,
blockBufferWithoutHeader.arrayOffset(), dataInputStream, onDiskSizeWithoutHeader,
uncompressedSizeWithoutHeader, compression); uncompressedSizeWithoutHeader, compression);
} else { } else {
IOUtils.readFully(dataInputStream, blockBufferWithoutHeader.array(), BlockIOUtils.readFullyWithHeapBuffer(dataInputStream, blockBufferWithoutHeader,
blockBufferWithoutHeader.arrayOffset(), onDiskSizeWithoutHeader); onDiskSizeWithoutHeader);
} }
} finally { } finally {
byteBuffInputStream.close(); byteBuffInputStream.close();

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.util;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -29,9 +29,14 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
class BlockIOUtils { public final class BlockIOUtils {
static boolean isByteBufferReadable(FSDataInputStream is) { // Disallow instantiation
private BlockIOUtils() {
}
public static boolean isByteBufferReadable(FSDataInputStream is) {
InputStream cur = is.getWrappedStream(); InputStream cur = is.getWrappedStream();
for (;;) { for (;;) {
if ((cur instanceof FSDataInputStream)) { if ((cur instanceof FSDataInputStream)) {
@ -50,7 +55,7 @@ class BlockIOUtils {
* @param length bytes to read. * @param length bytes to read.
* @throws IOException exception to throw if any error happen * @throws IOException exception to throw if any error happen
*/ */
static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException { public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException {
if (!isByteBufferReadable(dis)) { if (!isByteBufferReadable(dis)) {
// If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
// the destination ByteBuff. // the destination ByteBuff.
@ -81,6 +86,32 @@ class BlockIOUtils {
} }
} }
/**
* Copying bytes from InputStream to {@link ByteBuff} by using an temporary heap byte[] (default
* size is 1024 now).
* @param in the InputStream to read
* @param out the destination {@link ByteBuff}
* @param length to read
* @throws IOException if any io error encountered.
*/
public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length)
throws IOException {
byte[] buffer = new byte[1024];
if (length < 0) {
throw new IllegalArgumentException("Length must not be negative: " + length);
}
int remain = length, count;
while (remain > 0) {
count = in.read(buffer, 0, Math.min(remain, buffer.length));
if (count < 0) {
throw new IOException(
"Premature EOF from inputStream, but still need " + remain + " bytes");
}
out.put(buffer, 0, count);
remain -= count;
}
}
/** /**
* Read from an input stream at least <code>necessaryLen</code> and if possible, * Read from an input stream at least <code>necessaryLen</code> and if possible,
* <code>extraLen</code> also if available. Analogous to * <code>extraLen</code> also if available. Analogous to
@ -125,8 +156,8 @@ class BlockIOUtils {
* ByteBuffers, otherwise we've not read the extraLen bytes yet. * ByteBuffers, otherwise we've not read the extraLen bytes yet.
* @throws IOException if failed to read the necessary bytes. * @throws IOException if failed to read the necessary bytes.
*/ */
static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen, int extraLen) public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen,
throws IOException { int extraLen) throws IOException {
if (!isByteBufferReadable(dis)) { if (!isByteBufferReadable(dis)) {
// If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
// the destination ByteBuff. // the destination ByteBuff.
@ -174,7 +205,7 @@ class BlockIOUtils {
* @return true if and only if extraLen is > 0 and reading those extra bytes was successful * @return true if and only if extraLen is > 0 and reading those extra bytes was successful
* @throws IOException if failed to read the necessary bytes * @throws IOException if failed to read the necessary bytes
*/ */
static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen) throws IOException { int necessaryLen, int extraLen) throws IOException {
int remain = necessaryLen + extraLen; int remain = necessaryLen + extraLen;
byte[] buf = new byte[remain]; byte[] buf = new byte[remain];

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -527,14 +528,22 @@ public class HFileBlock implements Cacheable {
} }
/** /**
* Returns a buffer that does not include the header or checksum. * Returns a buffer that does not include the header and checksum.
*
* @return the buffer with header skipped and checksum omitted. * @return the buffer with header skipped and checksum omitted.
*/ */
public ByteBuff getBufferWithoutHeader() { public ByteBuff getBufferWithoutHeader() {
return this.getBufferWithoutHeader(false);
}
/**
* Returns a buffer that does not include the header or checksum.
* @param withChecksum to indicate whether include the checksum or not.
* @return the buffer with header skipped and checksum omitted.
*/
public ByteBuff getBufferWithoutHeader(boolean withChecksum) {
ByteBuff dup = getBufferReadOnly(); ByteBuff dup = getBufferReadOnly();
// Now set it up so Buffer spans content only -- no header or no checksums. int delta = withChecksum ? 0 : totalChecksumBytes();
return dup.position(headerSize()).limit(buf.limit() - totalChecksumBytes()).slice(); return dup.position(headerSize()).limit(buf.limit() - delta).slice();
} }
/** /**
@ -608,8 +617,9 @@ public class HFileBlock implements Cacheable {
// We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
// block's header, so there are two sensible values for buffer capacity. // block's header, so there are two sensible values for buffer capacity.
int hdrSize = headerSize(); int hdrSize = headerSize();
if (dup.capacity() != expectedBufLimit && dup.capacity() != expectedBufLimit + hdrSize) { dup.rewind();
throw new AssertionError("Invalid buffer capacity: " + dup.capacity() + if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) {
throw new AssertionError("Invalid buffer capacity: " + dup.remaining() +
", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
} }
} }
@ -671,15 +681,15 @@ public class HFileBlock implements Cacheable {
HFileBlock unpacked = new HFileBlock(this); HFileBlock unpacked = new HFileBlock(this);
unpacked.allocateBuffer(); // allocates space for the decompressed block unpacked.allocateBuffer(); // allocates space for the decompressed block
HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ? HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
ByteBuff dup = this.buf.duplicate(); ByteBuff dup = this.buf.duplicate();
dup.position(this.headerSize()); dup.position(this.headerSize());
dup = dup.slice(); dup = dup.slice();
ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup); unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
return unpacked; return unpacked;
} }
@ -697,7 +707,6 @@ public class HFileBlock implements Cacheable {
ByteBuff newBuf = allocator.allocate(capacityNeeded); ByteBuff newBuf = allocator.allocate(capacityNeeded);
// Copy header bytes into newBuf. // Copy header bytes into newBuf.
// newBuf is HBB so no issue in calling array()
buf.position(0); buf.position(0);
newBuf.put(0, buf, 0, headerSize); newBuf.put(0, buf, 0, headerSize);

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.TestMiniClusterLoadSequential; import org.apache.hadoop.hbase.util.TestMiniClusterLoadSequential;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
@ -74,7 +73,6 @@ public class TestLoadAndSwitchEncodeOnDisk extends TestMiniClusterLoadSequential
@Override @Override
@Test @Test
@Ignore("TODO Ignore this UT temporarily, will fix this in the critical HBASE-21937.")
public void loadTest() throws Exception { public void loadTest() throws Exception {
Admin admin = TEST_UTIL.getAdmin(); Admin admin = TEST_UTIL.getAdmin();

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff;

View File

@ -97,8 +97,7 @@ public class TestHFileBlock {
private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class); private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class);
// TODO let uncomment the GZ algorithm in HBASE-21937, because no support BB unpack yet. static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, /* GZ */ };
private static final int NUM_TEST_BLOCKS = 1000; private static final int NUM_TEST_BLOCKS = 1000;
private static final int NUM_READER_THREADS = 26; private static final int NUM_READER_THREADS = 26;
@ -623,7 +622,7 @@ public class TestHFileBlock {
if (detailedLogging) { if (detailedLogging) {
LOG.info("Reading block #" + i + " at offset " + curOffset); LOG.info("Reading block #" + i + " at offset " + curOffset);
} }
HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, true); HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, false);
if (detailedLogging) { if (detailedLogging) {
LOG.info("Block #" + i + ": " + b); LOG.info("Block #" + i + ": " + b);
} }
@ -638,7 +637,7 @@ public class TestHFileBlock {
// Now re-load this block knowing the on-disk size. This tests a // Now re-load this block knowing the on-disk size. This tests a
// different branch in the loader. // different branch in the loader.
HFileBlock b2 = HFileBlock b2 =
hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, true); hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, false);
b2.sanityCheck(); b2.sanityCheck();
assertEquals(b.getBlockType(), b2.getBlockType()); assertEquals(b.getBlockType(), b2.getBlockType());
@ -667,11 +666,10 @@ public class TestHFileBlock {
// expectedContents have header + data only // expectedContents have header + data only
ByteBuff bufRead = newBlock.getBufferReadOnly(); ByteBuff bufRead = newBlock.getBufferReadOnly();
ByteBuffer bufExpected = expectedContents.get(i); ByteBuffer bufExpected = expectedContents.get(i);
boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(), byte[] tmp = new byte[bufRead.limit() - newBlock.totalChecksumBytes()];
bufRead.arrayOffset(), bufRead.get(tmp, 0, tmp.length);
bufRead.limit() - newBlock.totalChecksumBytes(), boolean bytesAreCorrect = Bytes.compareTo(tmp, 0, tmp.length, bufExpected.array(),
bufExpected.array(), bufExpected.arrayOffset(), bufExpected.arrayOffset(), bufExpected.limit()) == 0;
bufExpected.limit()) == 0;
String wrongBytesMsg = ""; String wrongBytesMsg = "";
if (!bytesAreCorrect) { if (!bytesAreCorrect) {
@ -702,6 +700,8 @@ public class TestHFileBlock {
if (newBlock != b) { if (newBlock != b) {
assertTrue(b.release()); assertTrue(b.release());
} }
} else {
assertTrue(b.release());
} }
} }
assertEquals(curOffset, fs.getFileStatus(path).getLen()); assertEquals(curOffset, fs.getFileStatus(path).getLen());