HBASE-23107 Avoid temp byte array creation when doing cacheDataOnWrite (#678)

Signed-off-by: huzheng <openinx@gmail.com>
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
chenxu14 2019-10-17 09:44:40 +08:00 committed by huzheng
parent cc76318f76
commit b0b7e5f5b8
4 changed files with 64 additions and 35 deletions

View File

@ -22,6 +22,7 @@ import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
@ -112,6 +113,10 @@ public class ByteArrayOutputStream extends OutputStream implements ByteBufferWri
return Arrays.copyOf(buf, pos);
}
public void toByteBuff(ByteBuff buff) {
buff.put(buf, 0, pos);
}
/**
* @return the underlying array where the data gets accumulated
*/

View File

@ -843,6 +843,8 @@ public class HFileBlock implements Cacheable {
/** Meta data that holds information about the hfileblock**/
private HFileContext fileContext;
private final ByteBuffAllocator allocator;
@Override
public void beforeShipped() {
if (getEncodingState() != null) {
@ -857,12 +859,19 @@ public class HFileBlock implements Cacheable {
/**
* @param dataBlockEncoder data block encoding algorithm to use
*/
@VisibleForTesting
public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
this(dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
}
public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext,
ByteBuffAllocator allocator) {
if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
" Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
fileContext.getBytesPerChecksum());
}
this.allocator = allocator;
this.dataBlockEncoder = dataBlockEncoder != null?
dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE;
this.dataBlockEncodingCtx = this.dataBlockEncoder.
@ -1013,6 +1022,18 @@ public class HFileBlock implements Cacheable {
Bytes.putInt(dest, offset, onDiskDataSize);
}
private void putHeader(ByteBuff buff, int onDiskSize,
int uncompressedSize, int onDiskDataSize) {
buff.rewind();
blockType.write(buff);
buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
buff.putLong(prevOffset);
buff.put(fileContext.getChecksumType().getCode());
buff.putInt(fileContext.getBytesPerChecksum());
buff.putInt(onDiskDataSize);
}
private void putHeader(ByteArrayOutputStream dest, int onDiskSize,
int uncompressedSize, int onDiskDataSize) {
putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize);
@ -1171,19 +1192,19 @@ public class HFileBlock implements Cacheable {
* cache. Can be called in the "writing" state or the "block ready" state.
* Returns only the header and data, does not include checksum data.
*
* @return Returns a copy of uncompressed block bytes for caching on write
* @return Returns an uncompressed block ByteBuff for caching on write
*/
@VisibleForTesting
ByteBuffer cloneUncompressedBufferWithHeader() {
ByteBuff cloneUncompressedBufferWithHeader() {
expectState(State.BLOCK_READY);
byte[] uncompressedBlockBytesWithHeader = baosInMemory.toByteArray();
ByteBuff bytebuff = allocator.allocate(baosInMemory.size());
baosInMemory.toByteBuff(bytebuff);
int numBytes = (int) ChecksumUtil.numBytes(
onDiskBlockBytesWithHeader.size(),
fileContext.getBytesPerChecksum());
putHeader(uncompressedBlockBytesWithHeader, 0,
onDiskBlockBytesWithHeader.size() + numBytes,
baosInMemory.size(), onDiskBlockBytesWithHeader.size());
return ByteBuffer.wrap(uncompressedBlockBytesWithHeader);
putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes,
baosInMemory.size(), onDiskBlockBytesWithHeader.size());
bytebuff.rewind();
return bytebuff;
}
/**
@ -1192,9 +1213,12 @@ public class HFileBlock implements Cacheable {
* include checksum data.
* @return Returns a copy of block bytes for caching on write
*/
private ByteBuffer cloneOnDiskBufferWithHeader() {
private ByteBuff cloneOnDiskBufferWithHeader() {
expectState(State.BLOCK_READY);
return ByteBuffer.wrap(onDiskBlockBytesWithHeader.toByteArray());
ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size());
onDiskBlockBytesWithHeader.toByteBuff(bytebuff);
bytebuff.rewind();
return bytebuff;
}
private void expectState(State expectedState) {
@ -1247,24 +1271,24 @@ public class HFileBlock implements Cacheable {
.build();
// Build the HFileBlock.
HFileBlockBuilder builder = new HFileBlockBuilder();
ByteBuffer buffer;
ByteBuff buff;
if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
buffer = cloneOnDiskBufferWithHeader();
buff = cloneOnDiskBufferWithHeader();
} else {
buffer = cloneUncompressedBufferWithHeader();
buff = cloneUncompressedBufferWithHeader();
}
return builder.withBlockType(blockType)
.withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
.withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
.withPrevBlockOffset(prevOffset)
.withByteBuff(ByteBuff.wrap(buffer))
.withByteBuff(buff)
.withFillHeader(FILL_HEADER)
.withOffset(startOffset)
.withNextBlockOnDiskSize(UNSET)
.withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
.withHFileContext(newContext)
.withByteBuffAllocator(cacheConf.getByteBuffAllocator())
.withShared(!buffer.hasArray())
.withShared(!buff.hasArray())
.build();
}
}

View File

@ -294,9 +294,8 @@ public class HFileWriterImpl implements HFile.Writer {
if (blockWriter != null) {
throw new IllegalStateException("finishInit called twice");
}
blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext,
cacheConf.getByteBuffAllocator());
// Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
@ -546,8 +545,13 @@ public class HFileWriterImpl implements HFile.Writer {
private void doCacheOnWrite(long offset) {
cacheConf.getBlockCache().ifPresent(cache -> {
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
cacheFormatBlock);
try {
cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
cacheFormatBlock);
} finally {
// refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
cacheFormatBlock.release();
}
});
}

View File

@ -452,26 +452,21 @@ public class TestHFileBlock {
HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta);
long totalSize = 0;
final List<Integer> encodedSizes = new ArrayList<>();
final List<ByteBuffer> encodedBlocks = new ArrayList<>();
final List<ByteBuff> encodedBlocks = new ArrayList<>();
for (int blockId = 0; blockId < numBlocks; ++blockId) {
hbw.startWriting(BlockType.DATA);
writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag);
hbw.writeHeaderAndData(os);
int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE;
byte[] encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader().array();
final int encodedSize = encodedResultWithHeader.length - headerLen;
ByteBuff encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader();
final int encodedSize = encodedResultWithHeader.limit() - headerLen;
if (encoding != DataBlockEncoding.NONE) {
// We need to account for the two-byte encoding algorithm ID that
// comes after the 24-byte block header but before encoded KVs.
headerLen += DataBlockEncoding.ID_SIZE;
}
byte[] encodedDataSection =
new byte[encodedResultWithHeader.length - headerLen];
System.arraycopy(encodedResultWithHeader, headerLen,
encodedDataSection, 0, encodedDataSection.length);
final ByteBuffer encodedBuf =
ByteBuffer.wrap(encodedDataSection);
encodedSizes.add(encodedSize);
ByteBuff encodedBuf = encodedResultWithHeader.position(headerLen).slice();
encodedBlocks.add(encodedBuf);
totalSize += hbw.getOnDiskSizeWithHeader();
}
@ -521,12 +516,11 @@ public class TestHFileBlock {
actualBuffer = actualBuffer.slice();
}
ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
expectedBuffer.rewind();
ByteBuff expectedBuff = encodedBlocks.get(blockId);
expectedBuff.rewind();
// test if content matches, produce nice message
assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer, algo, encoding,
pread);
assertBuffersEqual(expectedBuff, actualBuffer, algo, encoding, pread);
// test serialized blocks
for (boolean reuseBuffer : new boolean[] { false, true }) {
@ -882,8 +876,10 @@ public class TestHFileBlock {
hbw.writeHeaderAndData(os);
totalSize += hbw.getOnDiskSizeWithHeader();
if (cacheOnWrite)
expectedContents.add(hbw.cloneUncompressedBufferWithHeader());
if (cacheOnWrite) {
ByteBuff buff = hbw.cloneUncompressedBufferWithHeader();
expectedContents.add(buff.asSubByteBuffer(buff.capacity()));
}
if (detailedLogging) {
LOG.info("Written block #" + i + " of type " + bt