HBASE-23107 Avoid temp byte array creation when doing cacheDataOnWrite (#678)
Signed-off-by: huzheng <openinx@gmail.com> Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
820a416f04
commit
0043dfebc5
|
@ -22,6 +22,7 @@ import java.nio.BufferOverflowException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -112,6 +113,10 @@ public class ByteArrayOutputStream extends OutputStream implements ByteBufferWri
|
|||
return Arrays.copyOf(buf, pos);
|
||||
}
|
||||
|
||||
public void toByteBuff(ByteBuff buff) {
|
||||
buff.put(buf, 0, pos);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the underlying array where the data gets accumulated
|
||||
*/
|
||||
|
|
|
@ -842,6 +842,8 @@ public class HFileBlock implements Cacheable {
|
|||
/** Meta data that holds information about the hfileblock**/
|
||||
private HFileContext fileContext;
|
||||
|
||||
private final ByteBuffAllocator allocator;
|
||||
|
||||
@Override
|
||||
public void beforeShipped() {
|
||||
if (getEncodingState() != null) {
|
||||
|
@ -856,12 +858,19 @@ public class HFileBlock implements Cacheable {
|
|||
/**
|
||||
* @param dataBlockEncoder data block encoding algorithm to use
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
|
||||
this(dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
|
||||
}
|
||||
|
||||
public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext,
|
||||
ByteBuffAllocator allocator) {
|
||||
if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
|
||||
throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
|
||||
" Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
|
||||
fileContext.getBytesPerChecksum());
|
||||
}
|
||||
this.allocator = allocator;
|
||||
this.dataBlockEncoder = dataBlockEncoder != null?
|
||||
dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE;
|
||||
this.dataBlockEncodingCtx = this.dataBlockEncoder.
|
||||
|
@ -1012,6 +1021,18 @@ public class HFileBlock implements Cacheable {
|
|||
Bytes.putInt(dest, offset, onDiskDataSize);
|
||||
}
|
||||
|
||||
private void putHeader(ByteBuff buff, int onDiskSize,
|
||||
int uncompressedSize, int onDiskDataSize) {
|
||||
buff.rewind();
|
||||
blockType.write(buff);
|
||||
buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
|
||||
buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
|
||||
buff.putLong(prevOffset);
|
||||
buff.put(fileContext.getChecksumType().getCode());
|
||||
buff.putInt(fileContext.getBytesPerChecksum());
|
||||
buff.putInt(onDiskDataSize);
|
||||
}
|
||||
|
||||
private void putHeader(ByteArrayOutputStream dest, int onDiskSize,
|
||||
int uncompressedSize, int onDiskDataSize) {
|
||||
putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize);
|
||||
|
@ -1170,19 +1191,19 @@ public class HFileBlock implements Cacheable {
|
|||
* cache. Can be called in the "writing" state or the "block ready" state.
|
||||
* Returns only the header and data, does not include checksum data.
|
||||
*
|
||||
* @return Returns a copy of uncompressed block bytes for caching on write
|
||||
* @return Returns an uncompressed block ByteBuff for caching on write
|
||||
*/
|
||||
@VisibleForTesting
|
||||
ByteBuffer cloneUncompressedBufferWithHeader() {
|
||||
ByteBuff cloneUncompressedBufferWithHeader() {
|
||||
expectState(State.BLOCK_READY);
|
||||
byte[] uncompressedBlockBytesWithHeader = baosInMemory.toByteArray();
|
||||
ByteBuff bytebuff = allocator.allocate(baosInMemory.size());
|
||||
baosInMemory.toByteBuff(bytebuff);
|
||||
int numBytes = (int) ChecksumUtil.numBytes(
|
||||
onDiskBlockBytesWithHeader.size(),
|
||||
fileContext.getBytesPerChecksum());
|
||||
putHeader(uncompressedBlockBytesWithHeader, 0,
|
||||
onDiskBlockBytesWithHeader.size() + numBytes,
|
||||
baosInMemory.size(), onDiskBlockBytesWithHeader.size());
|
||||
return ByteBuffer.wrap(uncompressedBlockBytesWithHeader);
|
||||
putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes,
|
||||
baosInMemory.size(), onDiskBlockBytesWithHeader.size());
|
||||
bytebuff.rewind();
|
||||
return bytebuff;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1191,9 +1212,12 @@ public class HFileBlock implements Cacheable {
|
|||
* include checksum data.
|
||||
* @return Returns a copy of block bytes for caching on write
|
||||
*/
|
||||
private ByteBuffer cloneOnDiskBufferWithHeader() {
|
||||
private ByteBuff cloneOnDiskBufferWithHeader() {
|
||||
expectState(State.BLOCK_READY);
|
||||
return ByteBuffer.wrap(onDiskBlockBytesWithHeader.toByteArray());
|
||||
ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size());
|
||||
onDiskBlockBytesWithHeader.toByteBuff(bytebuff);
|
||||
bytebuff.rewind();
|
||||
return bytebuff;
|
||||
}
|
||||
|
||||
private void expectState(State expectedState) {
|
||||
|
@ -1246,24 +1270,24 @@ public class HFileBlock implements Cacheable {
|
|||
.build();
|
||||
// Build the HFileBlock.
|
||||
HFileBlockBuilder builder = new HFileBlockBuilder();
|
||||
ByteBuffer buffer;
|
||||
ByteBuff buff;
|
||||
if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
|
||||
buffer = cloneOnDiskBufferWithHeader();
|
||||
buff = cloneOnDiskBufferWithHeader();
|
||||
} else {
|
||||
buffer = cloneUncompressedBufferWithHeader();
|
||||
buff = cloneUncompressedBufferWithHeader();
|
||||
}
|
||||
return builder.withBlockType(blockType)
|
||||
.withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
|
||||
.withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
|
||||
.withPrevBlockOffset(prevOffset)
|
||||
.withByteBuff(ByteBuff.wrap(buffer))
|
||||
.withByteBuff(buff)
|
||||
.withFillHeader(FILL_HEADER)
|
||||
.withOffset(startOffset)
|
||||
.withNextBlockOnDiskSize(UNSET)
|
||||
.withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
|
||||
.withHFileContext(newContext)
|
||||
.withByteBuffAllocator(cacheConf.getByteBuffAllocator())
|
||||
.withShared(!buffer.hasArray())
|
||||
.withShared(!buff.hasArray())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -294,9 +294,8 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
if (blockWriter != null) {
|
||||
throw new IllegalStateException("finishInit called twice");
|
||||
}
|
||||
|
||||
blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
|
||||
|
||||
blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext,
|
||||
cacheConf.getByteBuffAllocator());
|
||||
// Data block index writer
|
||||
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
|
||||
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
|
||||
|
@ -546,8 +545,13 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
private void doCacheOnWrite(long offset) {
|
||||
cacheConf.getBlockCache().ifPresent(cache -> {
|
||||
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
|
||||
cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
|
||||
cacheFormatBlock);
|
||||
try {
|
||||
cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
|
||||
cacheFormatBlock);
|
||||
} finally {
|
||||
// refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
|
||||
cacheFormatBlock.release();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -452,26 +452,21 @@ public class TestHFileBlock {
|
|||
HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta);
|
||||
long totalSize = 0;
|
||||
final List<Integer> encodedSizes = new ArrayList<>();
|
||||
final List<ByteBuffer> encodedBlocks = new ArrayList<>();
|
||||
final List<ByteBuff> encodedBlocks = new ArrayList<>();
|
||||
for (int blockId = 0; blockId < numBlocks; ++blockId) {
|
||||
hbw.startWriting(BlockType.DATA);
|
||||
writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag);
|
||||
hbw.writeHeaderAndData(os);
|
||||
int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE;
|
||||
byte[] encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader().array();
|
||||
final int encodedSize = encodedResultWithHeader.length - headerLen;
|
||||
ByteBuff encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader();
|
||||
final int encodedSize = encodedResultWithHeader.limit() - headerLen;
|
||||
if (encoding != DataBlockEncoding.NONE) {
|
||||
// We need to account for the two-byte encoding algorithm ID that
|
||||
// comes after the 24-byte block header but before encoded KVs.
|
||||
headerLen += DataBlockEncoding.ID_SIZE;
|
||||
}
|
||||
byte[] encodedDataSection =
|
||||
new byte[encodedResultWithHeader.length - headerLen];
|
||||
System.arraycopy(encodedResultWithHeader, headerLen,
|
||||
encodedDataSection, 0, encodedDataSection.length);
|
||||
final ByteBuffer encodedBuf =
|
||||
ByteBuffer.wrap(encodedDataSection);
|
||||
encodedSizes.add(encodedSize);
|
||||
ByteBuff encodedBuf = encodedResultWithHeader.position(headerLen).slice();
|
||||
encodedBlocks.add(encodedBuf);
|
||||
totalSize += hbw.getOnDiskSizeWithHeader();
|
||||
}
|
||||
|
@ -521,12 +516,11 @@ public class TestHFileBlock {
|
|||
actualBuffer = actualBuffer.slice();
|
||||
}
|
||||
|
||||
ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
|
||||
expectedBuffer.rewind();
|
||||
ByteBuff expectedBuff = encodedBlocks.get(blockId);
|
||||
expectedBuff.rewind();
|
||||
|
||||
// test if content matches, produce nice message
|
||||
assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer, algo, encoding,
|
||||
pread);
|
||||
assertBuffersEqual(expectedBuff, actualBuffer, algo, encoding, pread);
|
||||
|
||||
// test serialized blocks
|
||||
for (boolean reuseBuffer : new boolean[] { false, true }) {
|
||||
|
@ -882,8 +876,10 @@ public class TestHFileBlock {
|
|||
hbw.writeHeaderAndData(os);
|
||||
totalSize += hbw.getOnDiskSizeWithHeader();
|
||||
|
||||
if (cacheOnWrite)
|
||||
expectedContents.add(hbw.cloneUncompressedBufferWithHeader());
|
||||
if (cacheOnWrite) {
|
||||
ByteBuff buff = hbw.cloneUncompressedBufferWithHeader();
|
||||
expectedContents.add(buff.asSubByteBuffer(buff.capacity()));
|
||||
}
|
||||
|
||||
if (detailedLogging) {
|
||||
LOG.info("Written block #" + i + " of type " + bt
|
||||
|
|
Loading…
Reference in New Issue