From 0043dfebc5e43705818071c3de062211943829f1 Mon Sep 17 00:00:00 2001 From: chenxu14 <47170471+chenxu14@users.noreply.github.com> Date: Thu, 17 Oct 2019 09:44:40 +0800 Subject: [PATCH] HBASE-23107 Avoid temp byte array creation when doing cacheDataOnWrite (#678) Signed-off-by: huzheng Signed-off-by: stack --- .../hbase/io/ByteArrayOutputStream.java | 5 ++ .../hadoop/hbase/io/hfile/HFileBlock.java | 54 +++++++++++++------ .../hbase/io/hfile/HFileWriterImpl.java | 14 +++-- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 26 ++++----- 4 files changed, 64 insertions(+), 35 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java index 38c986a49d7..77dd3b82c42 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java @@ -22,6 +22,7 @@ import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.Arrays; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; @@ -112,6 +113,10 @@ public class ByteArrayOutputStream extends OutputStream implements ByteBufferWri return Arrays.copyOf(buf, pos); } + public void toByteBuff(ByteBuff buff) { + buff.put(buf, 0, pos); + } + /** * @return the underlying array where the data gets accumulated */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 9b115230515..a723e524a76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -842,6 +842,8 @@ public class HFileBlock implements Cacheable { /** Meta data that holds information about the hfileblock**/ private HFileContext fileContext; + private final ByteBuffAllocator allocator; + @Override public void beforeShipped() { if (getEncodingState() != null) { @@ -856,12 +858,19 @@ public class HFileBlock implements Cacheable { /** * @param dataBlockEncoder data block encoding algorithm to use */ + @VisibleForTesting public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) { + this(dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP); + } + + public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext, + ByteBuffAllocator allocator) { if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) { throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " + fileContext.getBytesPerChecksum()); } + this.allocator = allocator; this.dataBlockEncoder = dataBlockEncoder != null? dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE; this.dataBlockEncodingCtx = this.dataBlockEncoder. @@ -1012,6 +1021,18 @@ public class HFileBlock implements Cacheable { Bytes.putInt(dest, offset, onDiskDataSize); } + private void putHeader(ByteBuff buff, int onDiskSize, + int uncompressedSize, int onDiskDataSize) { + buff.rewind(); + blockType.write(buff); + buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE); + buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE); + buff.putLong(prevOffset); + buff.put(fileContext.getChecksumType().getCode()); + buff.putInt(fileContext.getBytesPerChecksum()); + buff.putInt(onDiskDataSize); + } + private void putHeader(ByteArrayOutputStream dest, int onDiskSize, int uncompressedSize, int onDiskDataSize) { putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize); @@ -1170,19 +1191,19 @@ public class HFileBlock implements Cacheable { * cache. Can be called in the "writing" state or the "block ready" state. * Returns only the header and data, does not include checksum data. * - * @return Returns a copy of uncompressed block bytes for caching on write + * @return Returns an uncompressed block ByteBuff for caching on write */ - @VisibleForTesting - ByteBuffer cloneUncompressedBufferWithHeader() { + ByteBuff cloneUncompressedBufferWithHeader() { expectState(State.BLOCK_READY); - byte[] uncompressedBlockBytesWithHeader = baosInMemory.toByteArray(); + ByteBuff bytebuff = allocator.allocate(baosInMemory.size()); + baosInMemory.toByteBuff(bytebuff); int numBytes = (int) ChecksumUtil.numBytes( onDiskBlockBytesWithHeader.size(), fileContext.getBytesPerChecksum()); - putHeader(uncompressedBlockBytesWithHeader, 0, - onDiskBlockBytesWithHeader.size() + numBytes, - baosInMemory.size(), onDiskBlockBytesWithHeader.size()); - return ByteBuffer.wrap(uncompressedBlockBytesWithHeader); + putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes, + baosInMemory.size(), onDiskBlockBytesWithHeader.size()); + bytebuff.rewind(); + return bytebuff; } /** @@ -1191,9 +1212,12 @@ public class HFileBlock implements Cacheable { * include checksum data. * @return Returns a copy of block bytes for caching on write */ - private ByteBuffer cloneOnDiskBufferWithHeader() { + private ByteBuff cloneOnDiskBufferWithHeader() { expectState(State.BLOCK_READY); - return ByteBuffer.wrap(onDiskBlockBytesWithHeader.toByteArray()); + ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size()); + onDiskBlockBytesWithHeader.toByteBuff(bytebuff); + bytebuff.rewind(); + return bytebuff; } private void expectState(State expectedState) { @@ -1246,24 +1270,24 @@ public class HFileBlock implements Cacheable { .build(); // Build the HFileBlock. HFileBlockBuilder builder = new HFileBlockBuilder(); - ByteBuffer buffer; + ByteBuff buff; if (cacheConf.shouldCacheCompressed(blockType.getCategory())) { - buffer = cloneOnDiskBufferWithHeader(); + buff = cloneOnDiskBufferWithHeader(); } else { - buffer = cloneUncompressedBufferWithHeader(); + buff = cloneUncompressedBufferWithHeader(); } return builder.withBlockType(blockType) .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader()) .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader()) .withPrevBlockOffset(prevOffset) - .withByteBuff(ByteBuff.wrap(buffer)) + .withByteBuff(buff) .withFillHeader(FILL_HEADER) .withOffset(startOffset) .withNextBlockOnDiskSize(UNSET) .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length) .withHFileContext(newContext) .withByteBuffAllocator(cacheConf.getByteBuffAllocator()) - .withShared(!buffer.hasArray()) + .withShared(!buff.hasArray()) .build(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index fa5f1f16cb8..93cca8bd362 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -294,9 +294,8 @@ public class HFileWriterImpl implements HFile.Writer { if (blockWriter != null) { throw new IllegalStateException("finishInit called twice"); } - - blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext); - + blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext, + cacheConf.getByteBuffAllocator()); // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, @@ -546,8 +545,13 @@ public class HFileWriterImpl implements HFile.Writer { private void doCacheOnWrite(long offset) { cacheConf.getBlockCache().ifPresent(cache -> { HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); - cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()), - cacheFormatBlock); + try { + cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()), + cacheFormatBlock); + } finally { + // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent + cacheFormatBlock.release(); + } }); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index d0e98fd71a9..006415cb467 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -452,26 +452,21 @@ public class TestHFileBlock { HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta); long totalSize = 0; final List encodedSizes = new ArrayList<>(); - final List encodedBlocks = new ArrayList<>(); + final List encodedBlocks = new ArrayList<>(); for (int blockId = 0; blockId < numBlocks; ++blockId) { hbw.startWriting(BlockType.DATA); writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag); hbw.writeHeaderAndData(os); int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE; - byte[] encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader().array(); - final int encodedSize = encodedResultWithHeader.length - headerLen; + ByteBuff encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader(); + final int encodedSize = encodedResultWithHeader.limit() - headerLen; if (encoding != DataBlockEncoding.NONE) { // We need to account for the two-byte encoding algorithm ID that // comes after the 24-byte block header but before encoded KVs. headerLen += DataBlockEncoding.ID_SIZE; } - byte[] encodedDataSection = - new byte[encodedResultWithHeader.length - headerLen]; - System.arraycopy(encodedResultWithHeader, headerLen, - encodedDataSection, 0, encodedDataSection.length); - final ByteBuffer encodedBuf = - ByteBuffer.wrap(encodedDataSection); encodedSizes.add(encodedSize); + ByteBuff encodedBuf = encodedResultWithHeader.position(headerLen).slice(); encodedBlocks.add(encodedBuf); totalSize += hbw.getOnDiskSizeWithHeader(); } @@ -521,12 +516,11 @@ public class TestHFileBlock { actualBuffer = actualBuffer.slice(); } - ByteBuffer expectedBuffer = encodedBlocks.get(blockId); - expectedBuffer.rewind(); + ByteBuff expectedBuff = encodedBlocks.get(blockId); + expectedBuff.rewind(); // test if content matches, produce nice message - assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer, algo, encoding, - pread); + assertBuffersEqual(expectedBuff, actualBuffer, algo, encoding, pread); // test serialized blocks for (boolean reuseBuffer : new boolean[] { false, true }) { @@ -882,8 +876,10 @@ public class TestHFileBlock { hbw.writeHeaderAndData(os); totalSize += hbw.getOnDiskSizeWithHeader(); - if (cacheOnWrite) - expectedContents.add(hbw.cloneUncompressedBufferWithHeader()); + if (cacheOnWrite) { + ByteBuff buff = hbw.cloneUncompressedBufferWithHeader(); + expectedContents.add(buff.asSubByteBuffer(buff.capacity())); + } if (detailedLogging) { LOG.info("Written block #" + i + " of type " + bt