HBASE-22491 Separate the heap HFileBlock and offheap HFileBlock because the heap block won't need refCnt and save into prevBlocks list before shipping (#268)

This commit is contained in:
openinx 2019-06-13 14:34:34 +08:00 committed by huzheng
parent 97c1158421
commit d64c3a2611
19 changed files with 686 additions and 285 deletions

View File

@ -297,6 +297,12 @@ public class ByteBuffAllocator {
}
}
}
this.usedBufCount.set(0);
this.maxPoolSizeInfoLevelLogged = false;
this.poolAllocationBytes.reset();
this.heapAllocationBytes.reset();
this.lastPoolAllocationBytes = 0;
this.lastHeapAllocationBytes = 0;
}
/**

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.yetus.audience.InterfaceAudience;
/**
* The {@link ByteBuffAllocator} won't allocate pooled heap {@link ByteBuff} now; at the same time,
* if allocate an off-heap {@link ByteBuff} from allocator, then it must be a pooled one. That's to
* say, an exclusive memory HFileBlock would must be an heap block and a shared memory HFileBlock
* would must be an off-heap block.
* <p>
* The exclusive memory HFileBlock will do nothing when calling retain or release methods, because
* its memory will be garbage collected by JVM, even if its reference count decrease to zero, we can
* do nothing for the de-allocating.
* <p>
* @see org.apache.hadoop.hbase.io.hfile.SharedMemHFileBlock
*/
@InterfaceAudience.Private
public class ExclusiveMemHFileBlock extends HFileBlock {
ExclusiveMemHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
HFileContext fileContext) {
super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, buf,
fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext,
ByteBuffAllocator.HEAP);
}
@Override
public int refCnt() {
return 0;
}
@Override
public ExclusiveMemHFileBlock retain() {
// do nothing
return this;
}
@Override
public boolean release() {
// do nothing
return false;
}
@Override
public boolean isSharedMem() {
return false;
}
}

View File

@ -285,7 +285,7 @@ public class HFileBlock implements Cacheable {
boolean usesChecksum = buf.get() == (byte) 1;
long offset = buf.getLong();
int nextBlockOnDiskSize = buf.getInt();
return new HFileBlock(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
}
@Override
@ -300,28 +300,6 @@ public class HFileBlock implements Cacheable {
CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
}
/**
* Copy constructor. Creates a shallow copy of {@code that}'s buffer.
*/
private HFileBlock(HFileBlock that) {
this(that, false);
}
/**
* Copy constructor. Creates a shallow/deep copy of {@code that}'s buffer as per the boolean
* param.
*/
private HFileBlock(HFileBlock that, boolean bufCopy) {
init(that.blockType, that.onDiskSizeWithoutHeader, that.uncompressedSizeWithoutHeader,
that.prevBlockOffset, that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize,
that.fileContext, that.allocator);
if (bufCopy) {
this.buf = ByteBuff.wrap(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit())));
} else {
this.buf = that.buf.duplicate();
}
}
/**
* Creates a new {@link HFile} block from the given fields. This constructor
* is used only while writing blocks and caching,
@ -336,7 +314,7 @@ public class HFileBlock implements Cacheable {
* @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
* @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
* @param prevBlockOffset see {@link #prevBlockOffset}
* @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
* @param buf block buffer with header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
* @param fillHeader when true, write the first 4 header fields into passed buffer.
* @param offset the file offset the block was read from
* @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
@ -344,12 +322,19 @@ public class HFileBlock implements Cacheable {
*/
@VisibleForTesting
public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
HFileContext fileContext, ByteBuffAllocator allocator) {
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
this.buf = new SingleByteBuff(b);
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
ByteBuffAllocator allocator) {
this.blockType = blockType;
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
this.prevBlockOffset = prevBlockOffset;
this.offset = offset;
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
this.nextBlockOnDiskSize = nextBlockOnDiskSize;
this.fileContext = fileContext;
this.allocator = allocator;
this.buf = buf;
if (fillHeader) {
overwriteHeader();
}
@ -363,7 +348,7 @@ public class HFileBlock implements Cacheable {
* to that point.
* @param buf Has header, content, and trailing checksums if present.
*/
HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
throws IOException {
buf.rewind();
@ -374,15 +359,15 @@ public class HFileBlock implements Cacheable {
final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
// This constructor is called when we deserialize a block from cache and when we read a block in
// from the fs. fileCache is null when deserialized from cache so need to make up one.
HFileContextBuilder fileContextBuilder = fileContext != null?
new HFileContextBuilder(fileContext): new HFileContextBuilder();
HFileContextBuilder fileContextBuilder =
fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder();
fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
int onDiskDataSizeWithHeader;
if (usesHBaseChecksum) {
byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
// Use the checksum type and bytes per checksum from header, not from filecontext.
// Use the checksum type and bytes per checksum from header, not from fileContext.
fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
} else {
@ -393,29 +378,19 @@ public class HFileBlock implements Cacheable {
}
fileContext = fileContextBuilder.build();
assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
this.offset = offset;
this.buf = buf;
this.buf.rewind();
}
/**
* Called from constructors.
*/
private void init(BlockType blockType, int onDiskSizeWithoutHeader,
int uncompressedSizeWithoutHeader, long prevBlockOffset, long offset,
int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, HFileContext fileContext,
ByteBuffAllocator allocator) {
this.blockType = blockType;
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
this.prevBlockOffset = prevBlockOffset;
this.offset = offset;
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
this.nextBlockOnDiskSize = nextBlockOnDiskSize;
this.fileContext = fileContext;
this.allocator = allocator;
return new HFileBlockBuilder()
.withBlockType(blockType)
.withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader)
.withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader)
.withPrevBlockOffset(prevBlockOffset)
.withOffset(offset)
.withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader)
.withNextBlockOnDiskSize(nextBlockOnDiskSize)
.withHFileContext(fileContext)
.withByteBuffAllocator(allocator)
.withByteBuff(buf.rewind())
.withShared(!buf.hasArray())
.build();
}
/**
@ -639,7 +614,7 @@ public class HFileBlock implements Cacheable {
.append("(").append(onDiskSizeWithoutHeader)
.append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
}
String dataBegin = null;
String dataBegin;
if (buf.hasArray()) {
dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
@ -673,7 +648,7 @@ public class HFileBlock implements Cacheable {
return this;
}
HFileBlock unpacked = new HFileBlock(this);
HFileBlock unpacked = shallowClone(this);
unpacked.allocateBuffer(); // allocates space for the decompressed block
boolean succ = false;
try {
@ -761,10 +736,16 @@ public class HFileBlock implements Cacheable {
}
/**
* @return true to indicate the block is allocated from JVM heap, otherwise from off-heap.
* Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true
* by default.
*/
boolean isOnHeap() {
return buf.hasArray();
public boolean isSharedMem() {
if (this instanceof SharedMemHFileBlock) {
return true;
} else if (this instanceof ExclusiveMemHFileBlock) {
return false;
}
return true;
}
/**
@ -1039,8 +1020,7 @@ public class HFileBlock implements Cacheable {
+ offset);
}
startOffset = offset;
finishBlockAndWriteHeaderAndData((DataOutputStream) out);
finishBlockAndWriteHeaderAndData(out);
}
/**
@ -1251,13 +1231,27 @@ public class HFileBlock implements Cacheable {
.withIncludesMvcc(fileContext.isIncludesMvcc())
.withIncludesTags(fileContext.isIncludesTags())
.build();
return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
getUncompressedSizeWithoutHeader(), prevOffset,
cacheConf.shouldCacheCompressed(blockType.getCategory()) ? cloneOnDiskBufferWithHeader()
: cloneUncompressedBufferWithHeader(),
FILL_HEADER, startOffset, UNSET,
onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext,
cacheConf.getByteBuffAllocator());
// Build the HFileBlock.
HFileBlockBuilder builder = new HFileBlockBuilder();
ByteBuffer buffer;
if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
buffer = cloneOnDiskBufferWithHeader();
} else {
buffer = cloneUncompressedBufferWithHeader();
}
return builder.withBlockType(blockType)
.withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
.withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
.withPrevBlockOffset(prevOffset)
.withByteBuff(ByteBuff.wrap(buffer))
.withFillHeader(FILL_HEADER)
.withOffset(startOffset)
.withNextBlockOnDiskSize(UNSET)
.withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
.withHFileContext(newContext)
.withByteBuffAllocator(cacheConf.getByteBuffAllocator())
.withShared(!buffer.hasArray())
.build();
}
}
@ -1781,8 +1775,8 @@ public class HFileBlock implements Cacheable {
// The onDiskBlock will become the headerAndDataBuffer for this block.
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
// contains the header of next block, so no need to set next block's header in it.
HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, offset,
nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset,
nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
// Run check on uncompressed sizings.
if (!fileContext.isCompressedOrEncrypted()) {
hFileBlock.sanityCheckUncompressed();
@ -1947,7 +1941,7 @@ public class HFileBlock implements Cacheable {
if (comparison == null) {
return false;
}
if (comparison.getClass() != this.getClass()) {
if (!(comparison instanceof HFileBlock)) {
return false;
}
@ -2084,7 +2078,27 @@ public class HFileBlock implements Cacheable {
" onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
}
public HFileBlock deepCloneOnHeap() {
return new HFileBlock(this, true);
private static HFileBlockBuilder createBuilder(HFileBlock blk){
return new HFileBlockBuilder()
.withBlockType(blk.blockType)
.withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
.withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
.withPrevBlockOffset(blk.prevBlockOffset)
.withByteBuff(blk.buf.duplicate()) // Duplicate the buffer.
.withOffset(blk.offset)
.withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
.withNextBlockOnDiskSize(blk.nextBlockOnDiskSize)
.withHFileContext(blk.fileContext)
.withByteBuffAllocator(blk.allocator)
.withShared(blk.isSharedMem());
}
static HFileBlock shallowClone(HFileBlock blk) {
return createBuilder(blk).build();
}
static HFileBlock deepCloneOnHeap(HFileBlock blk) {
ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build();
}
}

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static javax.swing.Spring.UNSET;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class HFileBlockBuilder {
private BlockType blockType;
private int onDiskSizeWithoutHeader;
private int onDiskDataSizeWithHeader;
private int uncompressedSizeWithoutHeader;
private long prevBlockOffset;
private ByteBuff buf;
private boolean fillHeader = false;
private long offset = UNSET;
private int nextBlockOnDiskSize = UNSET;
private HFileContext fileContext;
private ByteBuffAllocator allocator = HEAP;
private boolean isShared;
public HFileBlockBuilder withBlockType(BlockType blockType) {
this.blockType = blockType;
return this;
}
public HFileBlockBuilder withOnDiskSizeWithoutHeader(int onDiskSizeWithoutHeader) {
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
return this;
}
public HFileBlockBuilder withOnDiskDataSizeWithHeader(int onDiskDataSizeWithHeader) {
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
return this;
}
public HFileBlockBuilder withUncompressedSizeWithoutHeader(int uncompressedSizeWithoutHeader) {
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
return this;
}
public HFileBlockBuilder withPrevBlockOffset(long prevBlockOffset) {
this.prevBlockOffset = prevBlockOffset;
return this;
}
public HFileBlockBuilder withByteBuff(ByteBuff buf) {
this.buf = buf;
return this;
}
public HFileBlockBuilder withFillHeader(boolean fillHeader) {
this.fillHeader = fillHeader;
return this;
}
public HFileBlockBuilder withOffset(long offset) {
this.offset = offset;
return this;
}
public HFileBlockBuilder withNextBlockOnDiskSize(int nextBlockOnDiskSize) {
this.nextBlockOnDiskSize = nextBlockOnDiskSize;
return this;
}
public HFileBlockBuilder withHFileContext(HFileContext fileContext) {
this.fileContext = fileContext;
return this;
}
public HFileBlockBuilder withByteBuffAllocator(ByteBuffAllocator allocator) {
this.allocator = allocator;
return this;
}
public HFileBlockBuilder withShared(boolean isShared) {
this.isShared = isShared;
return this;
}
public HFileBlock build() {
if (isShared) {
return new SharedMemHFileBlock(blockType, onDiskSizeWithoutHeader,
uncompressedSizeWithoutHeader, prevBlockOffset, buf, fillHeader, offset,
nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext, allocator);
} else {
return new ExclusiveMemHFileBlock(blockType, onDiskSizeWithoutHeader,
uncompressedSizeWithoutHeader, prevBlockOffset, buf, fillHeader, offset,
nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext);
}
}
}

View File

@ -523,15 +523,15 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
if (block != null && curBlock != null && block.getOffset() == curBlock.getOffset()) {
return;
}
if (this.curBlock != null) {
if (this.curBlock != null && this.curBlock.isSharedMem()) {
prevBlocks.add(this.curBlock);
}
this.curBlock = block;
}
void reset() {
// We don't have to keep ref to EXCLUSIVE type of block
if (this.curBlock != null) {
// We don't have to keep ref to heap block
if (this.curBlock != null && this.curBlock.isSharedMem()) {
this.prevBlocks.add(this.curBlock);
}
this.curBlock = null;

View File

@ -372,8 +372,8 @@ public class LruBlockCache implements FirstLevelBlockCache {
private Cacheable asReferencedHeapBlock(Cacheable buf) {
if (buf instanceof HFileBlock) {
HFileBlock blk = ((HFileBlock) buf);
if (!blk.isOnHeap()) {
return blk.deepCloneOnHeap();
if (blk.isSharedMem()) {
return HFileBlock.deepCloneOnHeap(blk);
}
}
// The block will be referenced by this LRUBlockCache, so should increase its refCnt here.

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.yetus.audience.InterfaceAudience;
/**
* The {@link ByteBuffAllocator} won't allocate pooled heap {@link ByteBuff} now; at the same time,
* if allocate an off-heap {@link ByteBuff} from allocator, then it must be a pooled one. That's to
* say, an exclusive memory HFileBlock would must be an heap block and a shared memory HFileBlock
* would must be an off-heap block.
* @see org.apache.hadoop.hbase.io.hfile.ExclusiveMemHFileBlock
**/
@InterfaceAudience.Private
public class SharedMemHFileBlock extends HFileBlock {
SharedMemHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
ByteBuffAllocator alloc) {
super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, buf,
fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext, alloc);
}
@Override
public boolean isSharedMem() {
return true;
}
}

View File

@ -171,8 +171,8 @@ public final class TinyLfuBlockCache implements FirstLevelBlockCache {
if (victimCache != null) {
value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
if ((value != null) && caching) {
if ((value instanceof HFileBlock) && !((HFileBlock) value).isOnHeap()) {
value = ((HFileBlock) value).deepCloneOnHeap();
if ((value instanceof HFileBlock) && ((HFileBlock) value).isSharedMem()) {
value = HFileBlock.deepCloneOnHeap((HFileBlock) value);
}
cacheBlock(cacheKey, value);
}

View File

@ -43,10 +43,12 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.ExclusiveMemHFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.io.hfile.LruCachedBlock;
import org.apache.hadoop.hbase.io.hfile.SharedMemHFileBlock;
import org.apache.hadoop.hbase.regionserver.CSLMImmutableSegment;
import org.apache.hadoop.hbase.regionserver.CellArrayImmutableSegment;
import org.apache.hadoop.hbase.regionserver.CellArrayMap;
@ -529,6 +531,14 @@ public class TestHeapSize {
actual = HFileBlock.FIXED_OVERHEAD;
expected = ClassSize.estimateBase(HFileBlock.class, false);
assertEquals(expected, actual);
actual = ExclusiveMemHFileBlock.FIXED_OVERHEAD;
expected = ClassSize.estimateBase(ExclusiveMemHFileBlock.class, false);
assertEquals(expected, actual);
actual = SharedMemHFileBlock.FIXED_OVERHEAD;
expected = ClassSize.estimateBase(SharedMemHFileBlock.class, false);
assertEquals(expected, actual);
}
@Test

View File

@ -148,7 +148,6 @@ public class CacheTestUtils {
if (buf != null) {
assertEquals(block.block, buf);
}
}
// Re-add some duplicate blocks. Hope nothing breaks.
@ -307,10 +306,11 @@ public class CacheTestUtils {
.withBytesPerCheckSum(0)
.withChecksumType(ChecksumType.NULL)
.build();
HFileBlock generated = new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader,
uncompressedSizeWithoutHeader, prevBlockOffset, cachedBuffer, HFileBlock.DONT_FILL_HEADER,
blockSize, onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta,
ByteBuffAllocator.HEAP);
HFileBlock generated =
new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
prevBlockOffset, ByteBuff.wrap(cachedBuffer), HFileBlock.DONT_FILL_HEADER, blockSize,
onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta,
ByteBuffAllocator.HEAP);
String strKey;
/* No conflicting keys */

View File

@ -101,7 +101,7 @@ public class TestChecksum {
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
meta, ByteBuffAllocator.HEAP);
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
assertTrue(b.isOnHeap());
assertTrue(!b.isSharedMem());
assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
}
@ -148,7 +148,7 @@ public class TestChecksum {
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
meta, ByteBuffAllocator.HEAP);
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
assertTrue(b.isOnHeap());
assertTrue(!b.isSharedMem());
// verify SingleByteBuff checksum.
verifySBBCheckSum(b.getBufferReadOnly());

View File

@ -169,7 +169,7 @@ public class TestHFile {
Cacheable cachedBlock = lru.getBlock(key, false, false, true);
Assert.assertNotNull(cachedBlock);
Assert.assertTrue(cachedBlock instanceof HFileBlock);
Assert.assertTrue(((HFileBlock) cachedBlock).isOnHeap());
Assert.assertFalse(((HFileBlock) cachedBlock).isSharedMem());
// Should never allocate off-heap block from allocator because ensure that it's LRU.
Assert.assertEquals(bufCount, alloc.getFreeBufferCount());
block.release(); // return back the ByteBuffer back to allocator.
@ -217,10 +217,10 @@ public class TestHFile {
HFileBlock hfb = (HFileBlock) cachedBlock;
// Data block will be cached in BucketCache, so it should be an off-heap block.
if (hfb.getBlockType().isData()) {
Assert.assertFalse(hfb.isOnHeap());
Assert.assertTrue(hfb.isSharedMem());
} else {
// Non-data block will be cached in LRUBlockCache, so it must be an on-heap block.
Assert.assertTrue(hfb.isOnHeap());
Assert.assertFalse(hfb.isSharedMem());
}
} finally {
cachedBlock.release();

View File

@ -340,6 +340,14 @@ public class TestHFileBlock {
testReaderV2Internals();
}
private void assertRelease(HFileBlock blk) {
if (blk instanceof ExclusiveMemHFileBlock) {
assertFalse(blk.release());
} else {
assertTrue(blk.release());
}
}
protected void testReaderV2Internals() throws IOException {
if(includesTag) {
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
@ -403,10 +411,10 @@ public class TestHFileBlock {
+ "'.\nMessage is expected to start with: '" + expectedPrefix
+ "'", ex.getMessage().startsWith(expectedPrefix));
}
assertTrue(b.release());
assertRelease(b);
is.close();
}
assertTrue(expected.release());
assertRelease(expected);
}
}
}
@ -534,7 +542,7 @@ public class TestHFileBlock {
deserialized.unpack(meta, hbr));
}
}
assertTrue(blockUnpacked.release());
assertRelease(blockUnpacked);
if (blockFromHFile != blockUnpacked) {
blockFromHFile.release();
}
@ -651,7 +659,7 @@ public class TestHFileBlock {
assertEquals(b.getOnDiskDataSizeWithHeader(),
b2.getOnDiskDataSizeWithHeader());
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
assertTrue(b2.release());
assertRelease(b2);
curOffset += b.getOnDiskSizeWithHeader();
@ -694,12 +702,12 @@ public class TestHFileBlock {
}
}
assertTrue(wrongBytesMsg, bytesAreCorrect);
assertTrue(newBlock.release());
assertRelease(newBlock);
if (newBlock != b) {
assertTrue(b.release());
assertRelease(b);
}
} else {
assertTrue(b.release());
assertRelease(b);
}
}
assertEquals(curOffset, fs.getFileStatus(path).getLen());
@ -750,9 +758,9 @@ public class TestHFileBlock {
long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
b = hbr.readBlockData(offset, onDiskSizeArg, pread, false, false);
if (useHeapAllocator) {
assertTrue(b.isOnHeap());
assertTrue(!b.isSharedMem());
} else {
assertTrue(!b.getBlockType().isData() || !b.isOnHeap());
assertTrue(!b.getBlockType().isData() || b.isSharedMem());
}
assertEquals(types.get(blockId), b.getBlockType());
assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
@ -913,14 +921,13 @@ public class TestHFileBlock {
.withCompression(Algorithm.NONE)
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
.withChecksumType(ChecksumType.NULL).build();
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
-1, 0, -1, meta, HEAP);
long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase(
new MultiByteBuff(buf).getClass(), true)
+ HConstants.HFILEBLOCK_HEADER_SIZE + size);
long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
long hfileBlockExpectedSize =
ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
HFileBlock.FILL_HEADER, -1, 0, -1, meta, HEAP);
long byteBufferExpectedSize =
ClassSize.align(ClassSize.estimateBase(new MultiByteBuff(buf).getClass(), true)
+ HConstants.HFILEBLOCK_HEADER_SIZE + size);
long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
long hfileBlockExpectedSize = ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize;
assertEquals("Block data size: " + size + ", byte buffer expected " +
"size: " + byteBufferExpectedSize + ", HFileBlock class expected " +
@ -936,10 +943,10 @@ public class TestHFileBlock {
byte[] byteArr = new byte[length];
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
HFileContext meta = new HFileContextBuilder().build();
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
ByteBuffer buff1 = ByteBuffer.allocate(length);
ByteBuffer buff2 = ByteBuffer.allocate(length);
blockWithNextBlockMetadata.serialize(buff1, true);

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ChecksumType;
@ -132,8 +133,8 @@ public class TestHFileDataBlockEncoder {
.withBlockSize(0)
.withChecksumType(ChecksumType.NULL)
.build();
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
0, 0, -1, hfileContext, ByteBuffAllocator.HEAP);
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
HFileBlock.FILL_HEADER, 0, 0, -1, hfileContext, ByteBuffAllocator.HEAP);
HFileBlock cacheBlock = createBlockOnDisk(kvs, block, useTags);
assertEquals(headerSize, cacheBlock.getDummyHeaderForVersion().length);
}
@ -198,9 +199,8 @@ public class TestHFileDataBlockEncoder {
.withBlockSize(0)
.withChecksumType(ChecksumType.NULL)
.build();
HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, 0,
0, -1, meta, ByteBuffAllocator.HEAP);
HFileBlock b = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
HFileBlock.FILL_HEADER, 0, 0, -1, meta, ByteBuffAllocator.HEAP);
return b;
}
@ -221,9 +221,9 @@ public class TestHFileDataBlockEncoder {
blockEncoder.endBlockEncoding(context, dos, baos.getBuffer(), BlockType.DATA);
byte[] encodedBytes = baos.toByteArray();
size = encodedBytes.length - block.getDummyHeaderForVersion().length;
return new HFileBlock(context.getBlockType(), size, size, -1, ByteBuffer.wrap(encodedBytes),
HFileBlock.FILL_HEADER, 0, block.getOnDiskDataSizeWithHeader(), -1,
block.getHFileContext(), ByteBuffAllocator.HEAP);
return new HFileBlock(context.getBlockType(), size, size, -1,
ByteBuff.wrap(ByteBuffer.wrap(encodedBytes)), HFileBlock.FILL_HEADER, 0,
block.getOnDiskDataSizeWithHeader(), -1, block.getHFileContext(), ByteBuffAllocator.HEAP);
}
private void writeBlock(List<Cell> kvs, HFileContext fileContext, boolean useTags)

View File

@ -18,6 +18,11 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MAX_CHUNK_SIZE_KEY;
import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENTRIES_KEY;
import static org.junit.Assert.assertEquals;
@ -34,17 +39,24 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.HFileScannerImpl;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.io.hfile.bucket.TestBucketCache;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -55,30 +67,74 @@ public class TestHFileScannerImplReferenceCount {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHFileScannerImplReferenceCount.class);
@Rule
public TestName CASE = new TestName();
private static final Logger LOG =
LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final byte[] FAMILY = Bytes.toBytes("f");
private static final byte[] QUALIFIER = Bytes.toBytes("q");
private static final byte[] SUFFIX = randLongBytes();
private static final int CELL_COUNT = 1000;
private static byte[] randLongBytes() {
Random rand = new Random();
byte[] keys = new byte[300];
byte[] keys = new byte[30];
rand.nextBytes(keys);
return keys;
}
// It's a deep copy of configuration of UTIL, DON'T use shallow copy.
private Configuration conf;
private Path workDir;
private FileSystem fs;
private Path hfilePath;
private Cell firstCell = null;
private Cell secondCell = null;
private ByteBuffAllocator allocator;
@BeforeClass
public static void setUp() {
public static void setUpBeforeClass() {
Configuration conf = UTIL.getConfiguration();
// Set the max chunk size and min entries key to be very small for index block, so that we can
// create an index block tree with level >= 2.
conf.setInt(MAX_CHUNK_SIZE_KEY, 10);
conf.setInt(MIN_INDEX_NUM_ENTRIES_KEY, 2);
// Create a bucket cache with 32MB.
conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
conf.setInt(BUCKET_CACHE_SIZE_KEY, 32);
conf.setInt(BUFFER_SIZE_KEY, 1024);
conf.setInt(MAX_BUFFER_COUNT_KEY, 32 * 1024);
// All allocated ByteBuff are pooled ByteBuff.
conf.setInt(MIN_ALLOCATE_SIZE_KEY, 0);
}
@Before
public void setUp() throws IOException {
this.firstCell = null;
this.secondCell = null;
this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true);
this.conf = new Configuration(UTIL.getConfiguration());
String caseName = CASE.getMethodName();
this.workDir = UTIL.getDataTestDir(caseName);
this.fs = this.workDir.getFileSystem(conf);
this.hfilePath = new Path(this.workDir, caseName + System.currentTimeMillis());
LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName);
}
@After
public void tearDown() throws IOException {
this.allocator.clean();
this.fs.delete(this.workDir, true);
}
private void waitBucketCacheFlushed(BlockCache cache) throws InterruptedException {
Assert.assertTrue(cache instanceof CombinedBlockCache);
BlockCache[] blockCaches = cache.getBlockCaches();
Assert.assertEquals(blockCaches.length, 2);
Assert.assertTrue(blockCaches[1] instanceof BucketCache);
TestBucketCache.waitUntilAllFlushedToBucket((BucketCache) blockCaches[1]);
}
private void writeHFile(Configuration conf, FileSystem fs, Path hfilePath, Algorithm compression,
@ -107,176 +163,192 @@ public class TestHFileScannerImplReferenceCount {
}
}
/**
* A careful UT for validating the reference count mechanism, if want to change this UT please
* read the design doc in HBASE-21879 firstly and make sure that understand the refCnt design.
*/
private void testReleaseBlock(Algorithm compression, DataBlockEncoding encoding)
throws Exception {
Configuration conf = new Configuration(UTIL.getConfiguration());
Path dir = UTIL.getDataTestDir("testReleasingBlock");
FileSystem fs = dir.getFileSystem(conf);
try {
String hfileName = "testReleaseBlock_hfile_0_" + System.currentTimeMillis();
Path hfilePath = new Path(dir, hfileName);
int cellCount = 1000;
LOG.info("Start to write {} cells into hfile: {}", cellCount, hfilePath);
writeHFile(conf, fs, hfilePath, compression, encoding, cellCount);
writeHFile(conf, fs, hfilePath, compression, encoding, CELL_COUNT);
HFileBlock curBlock, prevBlock;
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
Assert.assertNotNull(defaultBC);
Assert.assertTrue(cacheConfig.isCombinedBlockCache());
HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
Assert.assertTrue(reader instanceof HFileReaderImpl);
// We've build a HFile tree with index = 16.
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
Assert.assertNotNull(defaultBC);
HFile.Reader reader =
HFile.createReader(fs, hfilePath, new CacheConfig(conf, defaultBC), true, conf);
Assert.assertTrue(reader instanceof HFileReaderImpl);
// We've build a HFile tree with index = 16.
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
HFileBlock block1 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
.getHFileBlock();
waitBucketCacheFlushed(defaultBC);
Assert.assertTrue(block1.getBlockType().isData());
Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
HFileScanner scanner = reader.getScanner(true, true, false);
BlockWithScanInfo scanInfo = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE);
BlockWithScanInfo scanInfo2 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE);
HFileBlock block = scanInfo.getHFileBlock();
HFileBlock block2 = scanInfo2.getHFileBlock();
// One refCnt for blockCache and the other refCnt for RPC path.
Assert.assertEquals(block.refCnt(), 2);
Assert.assertEquals(block2.refCnt(), 2);
Assert.assertFalse(block == block2);
HFileBlock block2 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
.getHFileBlock();
waitBucketCacheFlushed(defaultBC);
Assert.assertTrue(block2.getBlockType().isData());
Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
// Only one refCnt for RPC path.
Assert.assertEquals(block1.refCnt(), 1);
Assert.assertEquals(block2.refCnt(), 1);
Assert.assertFalse(block1 == block2);
scanner.seekTo(firstCell);
Assert.assertEquals(block.refCnt(), 3);
scanner.seekTo(firstCell);
curBlock = scanner.curBlock;
Assert.assertEquals(curBlock.refCnt(), 2);
// Seek to the block again, the curBlock won't change and won't read from BlockCache. so
// refCnt should be unchanged.
scanner.seekTo(firstCell);
Assert.assertEquals(block.refCnt(), 3);
// Seek to the block again, the curBlock won't change and won't read from BlockCache. so
// refCnt should be unchanged.
scanner.seekTo(firstCell);
Assert.assertTrue(curBlock == scanner.curBlock);
Assert.assertEquals(curBlock.refCnt(), 2);
prevBlock = curBlock;
scanner.seekTo(secondCell);
Assert.assertEquals(block.refCnt(), 3);
Assert.assertEquals(block2.refCnt(), 3);
scanner.seekTo(secondCell);
curBlock = scanner.curBlock;
Assert.assertEquals(prevBlock.refCnt(), 2);
Assert.assertEquals(curBlock.refCnt(), 2);
// After shipped, the block will be release, but block2 is still referenced by the curBlock.
scanner.shipped();
Assert.assertEquals(block.refCnt(), 2);
Assert.assertEquals(block2.refCnt(), 3);
// After shipped, the prevBlock will be release, but curBlock is still referenced by the
// curBlock.
scanner.shipped();
Assert.assertEquals(prevBlock.refCnt(), 1);
Assert.assertEquals(curBlock.refCnt(), 2);
// Try to ship again, though with nothing to client.
scanner.shipped();
Assert.assertEquals(block.refCnt(), 2);
Assert.assertEquals(block2.refCnt(), 3);
// Try to ship again, though with nothing to client.
scanner.shipped();
Assert.assertEquals(prevBlock.refCnt(), 1);
Assert.assertEquals(curBlock.refCnt(), 2);
// The curBlock(block2) will also be released.
scanner.close();
Assert.assertEquals(block2.refCnt(), 2);
// The curBlock will also be released.
scanner.close();
Assert.assertEquals(curBlock.refCnt(), 1);
// Finish the block & block2 RPC path
block.release();
block2.release();
Assert.assertEquals(block.refCnt(), 1);
Assert.assertEquals(block2.refCnt(), 1);
// Finish the block & block2 RPC path
Assert.assertTrue(block1.release());
Assert.assertTrue(block2.release());
// Evict the LRUBlockCache
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfileName) >= 2);
Assert.assertEquals(block.refCnt(), 0);
Assert.assertEquals(block2.refCnt(), 0);
// Evict the LRUBlockCache
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
Assert.assertEquals(prevBlock.refCnt(), 0);
Assert.assertEquals(curBlock.refCnt(), 0);
int count = 0;
Assert.assertTrue(scanner.seekTo());
++count;
while (scanner.next()) {
count++;
}
assertEquals(cellCount, count);
} finally {
fs.delete(dir, true);
int count = 0;
Assert.assertTrue(scanner.seekTo());
++count;
while (scanner.next()) {
count++;
}
assertEquals(CELL_COUNT, count);
}
/**
* See HBASE-22480
*/
@Test
public void testSeekBefore() throws IOException {
Configuration conf = new Configuration(UTIL.getConfiguration());
Path dir = UTIL.getDataTestDir("testSeekBefore");
FileSystem fs = dir.getFileSystem(conf);
try {
String hfileName = "testSeekBefore_hfile_0_" + System.currentTimeMillis();
Path hfilePath = new Path(dir, hfileName);
int cellCount = 1000;
LOG.info("Start to write {} cells into hfile: {}", cellCount, hfilePath);
writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, cellCount);
public void testSeekBefore() throws Exception {
HFileBlock curBlock, prevBlock;
writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
Assert.assertNotNull(defaultBC);
Assert.assertTrue(cacheConfig.isCombinedBlockCache());
HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
Assert.assertTrue(reader instanceof HFileReaderImpl);
// We've build a HFile tree with index = 16.
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
Assert.assertNotNull(defaultBC);
HFile.Reader reader =
HFile.createReader(fs, hfilePath, new CacheConfig(conf, defaultBC), true, conf);
Assert.assertTrue(reader instanceof HFileReaderImpl);
// We've build a HFile tree with index = 16.
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
HFileBlock block1 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
.getHFileBlock();
Assert.assertTrue(block1.getBlockType().isData());
Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
HFileBlock block2 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
.getHFileBlock();
Assert.assertTrue(block2.getBlockType().isData());
Assert.assertFalse(block2 instanceof ExclusiveMemHFileBlock);
// Wait until flushed to IOEngine;
waitBucketCacheFlushed(defaultBC);
// One RPC reference path.
Assert.assertEquals(block1.refCnt(), 1);
Assert.assertEquals(block2.refCnt(), 1);
HFileScanner scanner = reader.getScanner(true, true, false);
HFileBlock block1 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
.getHFileBlock();
HFileBlock block2 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
.getHFileBlock();
Assert.assertEquals(block1.refCnt(), 2);
Assert.assertEquals(block2.refCnt(), 2);
// Let the curBlock refer to block2.
scanner.seekTo(secondCell);
curBlock = scanner.curBlock;
Assert.assertFalse(curBlock == block2);
Assert.assertEquals(1, block2.refCnt());
Assert.assertEquals(2, curBlock.refCnt());
prevBlock = scanner.curBlock;
// Let the curBlock refer to block2.
scanner.seekTo(secondCell);
Assert.assertTrue(((HFileScannerImpl) scanner).curBlock == block2);
Assert.assertEquals(3, block2.refCnt());
// Release the block1, no other reference.
Assert.assertTrue(block1.release());
Assert.assertEquals(0, block1.refCnt());
// Release the block2, no other reference.
Assert.assertTrue(block2.release());
Assert.assertEquals(0, block2.refCnt());
// Release the block1, only one reference: blockCache.
Assert.assertFalse(block1.release());
Assert.assertEquals(1, block1.refCnt());
// Release the block2, so the remain references are: 1. scanner; 2. blockCache.
Assert.assertFalse(block2.release());
Assert.assertEquals(2, block2.refCnt());
// Do the seekBefore: the newBlock will be the previous block of curBlock.
Assert.assertTrue(scanner.seekBefore(secondCell));
Assert.assertEquals(scanner.prevBlocks.size(), 1);
Assert.assertTrue(scanner.prevBlocks.get(0) == prevBlock);
curBlock = scanner.curBlock;
// the curBlock is read from IOEngine, so a different block.
Assert.assertFalse(curBlock == block1);
// Two reference for curBlock: 1. scanner; 2. blockCache.
Assert.assertEquals(2, curBlock.refCnt());
// Reference count of prevBlock must be unchanged because we haven't shipped.
Assert.assertEquals(2, prevBlock.refCnt());
// Do the seekBefore: the newBlock will be the previous block of curBlock.
Assert.assertTrue(scanner.seekBefore(secondCell));
Assert.assertTrue(((HFileScannerImpl) scanner).curBlock == block1);
// Two reference for block1: 1. scanner; 2. blockCache.
Assert.assertEquals(2, block1.refCnt());
// Reference count of block2 must be unchanged because we haven't shipped.
Assert.assertEquals(2, block2.refCnt());
// Do the shipped
scanner.shipped();
Assert.assertEquals(scanner.prevBlocks.size(), 0);
Assert.assertNotNull(scanner.curBlock);
Assert.assertEquals(2, curBlock.refCnt());
Assert.assertEquals(1, prevBlock.refCnt());
// Do the shipped
scanner.shipped();
Assert.assertEquals(2, block1.refCnt());
Assert.assertEquals(1, block2.refCnt());
// Do the close
scanner.close();
Assert.assertNull(scanner.curBlock);
Assert.assertEquals(1, curBlock.refCnt());
Assert.assertEquals(1, prevBlock.refCnt());
// Do the close
scanner.close();
Assert.assertEquals(1, block1.refCnt());
Assert.assertEquals(1, block2.refCnt());
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
Assert.assertEquals(0, curBlock.refCnt());
Assert.assertEquals(0, prevBlock.refCnt());
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfileName) >= 2);
Assert.assertEquals(0, block1.refCnt());
Assert.assertEquals(0, block2.refCnt());
// Reload the block1 again.
block1 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
.getHFileBlock();
// Wait until flushed to IOEngine;
waitBucketCacheFlushed(defaultBC);
Assert.assertTrue(block1.getBlockType().isData());
Assert.assertFalse(block1 instanceof ExclusiveMemHFileBlock);
Assert.assertTrue(block1.release());
Assert.assertEquals(0, block1.refCnt());
// Re-seek to the begin.
Assert.assertTrue(scanner.seekTo());
curBlock = scanner.curBlock;
Assert.assertFalse(curBlock == block1);
Assert.assertEquals(2, curBlock.refCnt());
// Return false because firstCell <= c[0]
Assert.assertFalse(scanner.seekBefore(firstCell));
// The block1 shouldn't be released because we still don't do the shipped or close.
Assert.assertEquals(2, curBlock.refCnt());
// Reload the block1 again.
block1 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
.getHFileBlock();
Assert.assertFalse(block1.release());
Assert.assertEquals(1, block1.refCnt());
// Re-seek to the begin.
Assert.assertTrue(scanner.seekTo());
Assert.assertTrue(((HFileScannerImpl) scanner).curBlock == block1);
Assert.assertEquals(2, block1.refCnt());
// Return false because firstCell <= c[0]
Assert.assertFalse(scanner.seekBefore(firstCell));
// The block1 shouldn't be released because we still don't do the shipped or close.
Assert.assertEquals(2, block1.refCnt());
scanner.close();
Assert.assertEquals(1, block1.refCnt());
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfileName) >= 1);
Assert.assertEquals(0, block1.refCnt());
} finally {
fs.delete(dir, true);
}
scanner.close();
Assert.assertEquals(1, curBlock.refCnt());
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1);
Assert.assertEquals(0, curBlock.refCnt());
}
@Test
@ -298,4 +370,56 @@ public class TestHFileScannerImplReferenceCount {
public void testDataBlockEncodingAndCompression() throws Exception {
testReleaseBlock(Algorithm.GZ, DataBlockEncoding.ROW_INDEX_V1);
}
@Test
public void testWithLruBlockCache() throws Exception {
HFileBlock curBlock;
writeHFile(conf, fs, hfilePath, Algorithm.NONE, DataBlockEncoding.NONE, CELL_COUNT);
// Set LruBlockCache
conf.set(BUCKET_CACHE_IOENGINE_KEY, "");
BlockCache defaultBC = BlockCacheFactory.createBlockCache(conf);
CacheConfig cacheConfig = new CacheConfig(conf, null, defaultBC, allocator);
Assert.assertNotNull(defaultBC);
Assert.assertFalse(cacheConfig.isCombinedBlockCache()); // Must be LruBlockCache.
HFile.Reader reader = HFile.createReader(fs, hfilePath, cacheConfig, true, conf);
Assert.assertTrue(reader instanceof HFileReaderImpl);
// We've build a HFile tree with index = 16.
Assert.assertEquals(16, reader.getTrailer().getNumDataIndexLevels());
HFileScannerImpl scanner = (HFileScannerImpl) reader.getScanner(true, true, false);
HFileBlock block1 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(firstCell, null, true, true, false, DataBlockEncoding.NONE)
.getHFileBlock();
Assert.assertTrue(block1.getBlockType().isData());
Assert.assertTrue(block1 instanceof ExclusiveMemHFileBlock);
HFileBlock block2 = reader.getDataBlockIndexReader()
.loadDataBlockWithScanInfo(secondCell, null, true, true, false, DataBlockEncoding.NONE)
.getHFileBlock();
Assert.assertTrue(block2.getBlockType().isData());
Assert.assertTrue(block2 instanceof ExclusiveMemHFileBlock);
// One RPC reference path.
Assert.assertEquals(block1.refCnt(), 0);
Assert.assertEquals(block2.refCnt(), 0);
scanner.seekTo(firstCell);
curBlock = scanner.curBlock;
Assert.assertTrue(curBlock == block1);
Assert.assertEquals(curBlock.refCnt(), 0);
Assert.assertTrue(scanner.prevBlocks.isEmpty());
// Switch to next block
scanner.seekTo(secondCell);
curBlock = scanner.curBlock;
Assert.assertTrue(curBlock == block2);
Assert.assertEquals(curBlock.refCnt(), 0);
Assert.assertEquals(curBlock.retain().refCnt(), 0);
// Only pooled HFileBlock will be kept in prevBlocks and ExclusiveMemHFileBlock will never keep
// in prevBlocks.
Assert.assertTrue(scanner.prevBlocks.isEmpty());
// close the scanner
scanner.close();
Assert.assertNull(scanner.curBlock);
Assert.assertTrue(scanner.prevBlocks.isEmpty());
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.EvictionThread;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ClassSize;
@ -820,10 +821,10 @@ public class TestLruBlockCache {
byte[] byteArr = new byte[length];
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
HFileContext meta = new HFileContextBuilder().build();
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
HFileBlock.FILL_HEADER, -1, -1, -1, meta, HEAP);
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, -1, meta, HEAP);
LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false,
(int)Math.ceil(1.2*maxSize/blockSize),
@ -964,7 +965,8 @@ public class TestLruBlockCache {
HFileContext meta = new HFileContextBuilder().build();
BlockCacheKey key = new BlockCacheKey("key1", 0);
HFileBlock blk = new HFileBlock(BlockType.DATA, size, size, -1,
ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
ByteBuff.wrap(ByteBuffer.wrap(byteArr, 0, size)), HFileBlock.FILL_HEADER, -1, 52, -1, meta,
HEAP);
AtomicBoolean err1 = new AtomicBoolean(false);
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000 && !err1.get(); i++) {

View File

@ -203,7 +203,7 @@ public class TestBucketCache {
CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
}
private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
throws InterruptedException {
while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
Thread.sleep(100);
@ -211,6 +211,13 @@ public class TestBucketCache {
Thread.sleep(1000);
}
public static void waitUntilAllFlushedToBucket(BucketCache cache) throws InterruptedException {
while (!cache.ramCache.isEmpty()) {
Thread.sleep(100);
}
Thread.sleep(1000);
}
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
// threads will flush it to the bucket and put reference entry in backingMap.
private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
@ -430,10 +437,10 @@ public class TestBucketCache {
ByteBuffer buf1 = ByteBuffer.allocate(size), buf2 = ByteBuffer.allocate(size);
HFileContext meta = new HFileContextBuilder().build();
ByteBuffAllocator allocator = ByteBuffAllocator.HEAP;
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf1,
HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf2,
HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
ByteBuff.wrap(buf1), HFileBlock.FILL_HEADER, -1, 52, -1, meta, allocator);
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
ByteBuff.wrap(buf2), HFileBlock.FILL_HEADER, -1, -1, -1, meta, allocator);
BlockCacheKey key = new BlockCacheKey("testCacheBlockNextBlockMetadataMissing", 0);
ByteBuffer actualBuffer = ByteBuffer.allocate(length);
@ -492,10 +499,10 @@ public class TestBucketCache {
RAMCache cache = new RAMCache();
BlockCacheKey key1 = new BlockCacheKey("file-1", 1);
BlockCacheKey key2 = new BlockCacheKey("file-2", 2);
HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
-1, 52, -1, meta, ByteBuffAllocator.HEAP);
HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
-1, -1, -1, meta, ByteBuffAllocator.HEAP);
HFileBlock blk1 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, ByteBuffAllocator.NONE);
RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, ByteBuffAllocator.NONE);
@ -527,8 +534,8 @@ public class TestBucketCache {
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
ByteBuffer buf = ByteBuffer.allocate(length);
HFileContext meta = new HFileContextBuilder().build();
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf, HFileBlock.FILL_HEADER,
offset, 52, -1, meta, ByteBuffAllocator.HEAP);
HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
HFileBlock.FILL_HEADER, offset, 52, -1, meta, ByteBuffAllocator.HEAP);
// initialize an mocked ioengine.
IOEngine ioEngine = Mockito.mock(IOEngine.class);

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.ClassRule;
@ -69,7 +70,7 @@ public class TestBucketCacheRefCnt {
}
private static HFileBlock createBlock(int offset, int size, ByteBuffAllocator alloc) {
return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuffer.allocate(size),
return new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(ByteBuffer.allocate(size)),
HFileBlock.FILL_HEADER, offset, 52, size, CONTEXT, alloc);
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Assert;
@ -57,9 +58,9 @@ public class TestRAMCache {
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
HFileContext fileContext, ByteBuffAllocator allocator) {
super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, b,
fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext,
allocator);
super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset,
ByteBuff.wrap(b), fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader,
fileContext, allocator);
}
public void setLatch(CountDownLatch latch) {