HBASE-25698 Fixing IllegalReferenceCountException when using TinyLfuBlockCache (#3215)
Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Anoop Sam John <anoopsamjohn@apache.org> Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
beaeb0dfec
commit
0c1d1d26a8
|
@ -158,7 +158,13 @@ public final class TinyLfuBlockCache implements FirstLevelBlockCache {
|
|||
@Override
|
||||
public Cacheable getBlock(BlockCacheKey cacheKey,
|
||||
boolean caching, boolean repeat, boolean updateCacheMetrics) {
|
||||
Cacheable value = cache.getIfPresent(cacheKey);
|
||||
Cacheable value = cache.asMap().computeIfPresent(cacheKey, (blockCacheKey, cacheable) -> {
|
||||
// It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
|
||||
// this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
|
||||
// the block and release, then we're retaining a block with refCnt=0 which is disallowed.
|
||||
cacheable.retain();
|
||||
return cacheable;
|
||||
});
|
||||
if (value == null) {
|
||||
if (repeat) {
|
||||
return null;
|
||||
|
@ -169,9 +175,6 @@ public final class TinyLfuBlockCache implements FirstLevelBlockCache {
|
|||
if (victimCache != null) {
|
||||
value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
|
||||
if ((value != null) && caching) {
|
||||
if ((value instanceof HFileBlock) && ((HFileBlock) value).isSharedMem()) {
|
||||
value = HFileBlock.deepCloneOnHeap((HFileBlock) value);
|
||||
}
|
||||
cacheBlock(cacheKey, value);
|
||||
}
|
||||
}
|
||||
|
@ -196,13 +199,44 @@ public final class TinyLfuBlockCache implements FirstLevelBlockCache {
|
|||
key.getHfileName(), key.getOffset(), value.heapSize(), DEFAULT_MAX_BLOCK_SIZE));
|
||||
}
|
||||
} else {
|
||||
value = asReferencedHeapBlock(value);
|
||||
cache.put(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The block cached in TinyLfuBlockCache will always be an heap block: on the one side, the heap
|
||||
* access will be more faster then off-heap, the small index block or meta block cached in
|
||||
* CombinedBlockCache will benefit a lot. on other side, the TinyLfuBlockCache size is always
|
||||
* calculated based on the total heap size, if caching an off-heap block in TinyLfuBlockCache, the
|
||||
* heap size will be messed up. Here we will clone the block into an heap block if it's an
|
||||
* off-heap block, otherwise just use the original block. The key point is maintain the refCnt of
|
||||
* the block (HBASE-22127): <br>
|
||||
* 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br>
|
||||
* 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
|
||||
* reservoir, if both RPC and TinyLfuBlockCache release the block, then it can be
|
||||
* garbage collected by JVM, so need a retain here.
|
||||
*
|
||||
* @param buf the original block
|
||||
* @return an block with an heap memory backend.
|
||||
*/
|
||||
private Cacheable asReferencedHeapBlock(Cacheable buf) {
|
||||
if (buf instanceof HFileBlock) {
|
||||
HFileBlock blk = ((HFileBlock) buf);
|
||||
if (blk.isSharedMem()) {
|
||||
return HFileBlock.deepCloneOnHeap(blk);
|
||||
}
|
||||
}
|
||||
// The block will be referenced by this TinyLfuBlockCache, so should increase its refCnt here.
|
||||
return buf.retain();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean evictBlock(BlockCacheKey cacheKey) {
|
||||
Cacheable value = cache.asMap().remove(cacheKey);
|
||||
if (value != null) {
|
||||
value.release();
|
||||
}
|
||||
return (value != null);
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
|
|||
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
|
||||
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
|
||||
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
|
||||
import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BLOCKCACHE_POLICY_KEY;
|
||||
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
@ -204,10 +205,11 @@ public class TestHFile {
|
|||
lru.shutdown();
|
||||
}
|
||||
|
||||
private BlockCache initCombinedBlockCache() {
|
||||
private BlockCache initCombinedBlockCache(final String l1CachePolicy) {
|
||||
Configuration that = HBaseConfiguration.create(conf);
|
||||
that.setFloat(BUCKET_CACHE_SIZE_KEY, 32); // 32MB for bucket cache.
|
||||
that.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
|
||||
that.set(BLOCKCACHE_POLICY_KEY, l1CachePolicy);
|
||||
BlockCache bc = BlockCacheFactory.createBlockCache(that);
|
||||
Assert.assertNotNull(bc);
|
||||
Assert.assertTrue(bc instanceof CombinedBlockCache);
|
||||
|
@ -224,7 +226,7 @@ public class TestHFile {
|
|||
fillByteBuffAllocator(alloc, bufCount);
|
||||
Path storeFilePath = writeStoreFile();
|
||||
// Open the file reader with CombinedBlockCache
|
||||
BlockCache combined = initCombinedBlockCache();
|
||||
BlockCache combined = initCombinedBlockCache("LRU");
|
||||
conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
|
||||
CacheConfig cacheConfig = new CacheConfig(conf, null, combined, alloc);
|
||||
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
|
||||
|
@ -854,4 +856,72 @@ public class TestHFile {
|
|||
writer.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test case for CombinedBlockCache with TinyLfu as L1 cache
|
||||
*/
|
||||
@Test
|
||||
public void testReaderWithTinyLfuCombinedBlockCache() throws Exception {
|
||||
testReaderCombinedCache("TinyLfu");
|
||||
}
|
||||
|
||||
/**
|
||||
* Test case for CombinedBlockCache with AdaptiveLRU as L1 cache
|
||||
*/
|
||||
@Test
|
||||
public void testReaderWithAdaptiveLruCombinedBlockCache() throws Exception {
|
||||
testReaderCombinedCache("AdaptiveLRU");
|
||||
}
|
||||
|
||||
/**
|
||||
* Test case for CombinedBlockCache with AdaptiveLRU as L1 cache
|
||||
*/
|
||||
@Test
|
||||
public void testReaderWithLruCombinedBlockCache() throws Exception {
|
||||
testReaderCombinedCache("LRU");
|
||||
}
|
||||
|
||||
private void testReaderCombinedCache(final String l1CachePolicy) throws Exception {
|
||||
int bufCount = 1024;
|
||||
int blockSize = 64 * 1024;
|
||||
ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0);
|
||||
fillByteBuffAllocator(alloc, bufCount);
|
||||
Path storeFilePath = writeStoreFile();
|
||||
// Open the file reader with CombinedBlockCache
|
||||
BlockCache combined = initCombinedBlockCache(l1CachePolicy);
|
||||
conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
|
||||
CacheConfig cacheConfig = new CacheConfig(conf, null, combined, alloc);
|
||||
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
|
||||
long offset = 0;
|
||||
Cacheable cachedBlock = null;
|
||||
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
|
||||
BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset);
|
||||
HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null);
|
||||
offset += block.getOnDiskSizeWithHeader();
|
||||
// Read the cached block.
|
||||
cachedBlock = combined.getBlock(key, false, false, true);
|
||||
try {
|
||||
Assert.assertNotNull(cachedBlock);
|
||||
Assert.assertTrue(cachedBlock instanceof HFileBlock);
|
||||
HFileBlock hfb = (HFileBlock) cachedBlock;
|
||||
// Data block will be cached in BucketCache, so it should be an off-heap block.
|
||||
if (hfb.getBlockType().isData()) {
|
||||
Assert.assertTrue(hfb.isSharedMem());
|
||||
} else if (!l1CachePolicy.equals("TinyLfu")) {
|
||||
Assert.assertFalse(hfb.isSharedMem());
|
||||
}
|
||||
} finally {
|
||||
cachedBlock.release();
|
||||
}
|
||||
block.release(); // return back the ByteBuffer back to allocator.
|
||||
}
|
||||
reader.close();
|
||||
combined.shutdown();
|
||||
if (cachedBlock != null) {
|
||||
Assert.assertEquals(0, cachedBlock.refCnt());
|
||||
}
|
||||
Assert.assertEquals(bufCount, alloc.getFreeBufferCount());
|
||||
alloc.clean();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue