HBASE-25698 Fixing IllegalReferenceCountException when using TinyLfuBlockCache (#3215)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Anoop Sam John <anoopsamjohn@apache.org>
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Viraj Jasani 2021-06-21 11:50:33 +05:30
parent 029dc81b39
commit 41a68325c2
No known key found for this signature in database
GPG Key ID: 08E70F70AB71C5A1
2 changed files with 112 additions and 8 deletions

View File

@ -158,7 +158,13 @@ public final class TinyLfuBlockCache implements FirstLevelBlockCache {
@Override
public Cacheable getBlock(BlockCacheKey cacheKey,
boolean caching, boolean repeat, boolean updateCacheMetrics) {
Cacheable value = cache.getIfPresent(cacheKey);
Cacheable value = cache.asMap().computeIfPresent(cacheKey, (blockCacheKey, cacheable) -> {
// It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
// this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
// the block and release, then we're retaining a block with refCnt=0 which is disallowed.
cacheable.retain();
return cacheable;
});
if (value == null) {
if (repeat) {
return null;
@ -169,9 +175,6 @@ public final class TinyLfuBlockCache implements FirstLevelBlockCache {
if (victimCache != null) {
value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
if ((value != null) && caching) {
if ((value instanceof HFileBlock) && ((HFileBlock) value).isSharedMem()) {
value = HFileBlock.deepCloneOnHeap((HFileBlock) value);
}
cacheBlock(cacheKey, value);
}
}
@ -196,13 +199,44 @@ public final class TinyLfuBlockCache implements FirstLevelBlockCache {
key.getHfileName(), key.getOffset(), value.heapSize(), DEFAULT_MAX_BLOCK_SIZE));
}
} else {
value = asReferencedHeapBlock(value);
cache.put(key, value);
}
}
/**
* The block cached in TinyLfuBlockCache will always be an heap block: on the one side, the heap
* access will be more faster then off-heap, the small index block or meta block cached in
* CombinedBlockCache will benefit a lot. on other side, the TinyLfuBlockCache size is always
* calculated based on the total heap size, if caching an off-heap block in TinyLfuBlockCache, the
* heap size will be messed up. Here we will clone the block into an heap block if it's an
* off-heap block, otherwise just use the original block. The key point is maintain the refCnt of
* the block (HBASE-22127): <br>
* 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br>
* 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
* reservoir, if both RPC and TinyLfuBlockCache release the block, then it can be
* garbage collected by JVM, so need a retain here.
*
* @param buf the original block
* @return an block with an heap memory backend.
*/
private Cacheable asReferencedHeapBlock(Cacheable buf) {
if (buf instanceof HFileBlock) {
HFileBlock blk = ((HFileBlock) buf);
if (blk.isSharedMem()) {
return HFileBlock.deepCloneOnHeap(blk);
}
}
// The block will be referenced by this TinyLfuBlockCache, so should increase its refCnt here.
return buf.retain();
}
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
Cacheable value = cache.asMap().remove(cacheKey);
if (value != null) {
value.release();
}
return (value != null);
}

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BLOCKCACHE_POLICY_KEY;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -205,10 +206,11 @@ public class TestHFile {
lru.shutdown();
}
private BlockCache initCombinedBlockCache() {
private BlockCache initCombinedBlockCache(final String l1CachePolicy) {
Configuration that = HBaseConfiguration.create(conf);
that.setFloat(BUCKET_CACHE_SIZE_KEY, 32); // 32MB for bucket cache.
that.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
that.set(BLOCKCACHE_POLICY_KEY, l1CachePolicy);
BlockCache bc = BlockCacheFactory.createBlockCache(that);
Assert.assertNotNull(bc);
Assert.assertTrue(bc instanceof CombinedBlockCache);
@ -225,7 +227,7 @@ public class TestHFile {
fillByteBuffAllocator(alloc, bufCount);
Path storeFilePath = writeStoreFile();
// Open the file reader with CombinedBlockCache
BlockCache combined = initCombinedBlockCache();
BlockCache combined = initCombinedBlockCache("LRU");
conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
CacheConfig cacheConfig = new CacheConfig(conf, null, combined, alloc);
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
@ -881,4 +883,72 @@ public class TestHFile {
writer.close();
}
}
/**
* Test case for CombinedBlockCache with TinyLfu as L1 cache
*/
@Test
public void testReaderWithTinyLfuCombinedBlockCache() throws Exception {
testReaderCombinedCache("TinyLfu");
}
/**
* Test case for CombinedBlockCache with AdaptiveLRU as L1 cache
*/
@Test
public void testReaderWithAdaptiveLruCombinedBlockCache() throws Exception {
testReaderCombinedCache("AdaptiveLRU");
}
/**
* Test case for CombinedBlockCache with AdaptiveLRU as L1 cache
*/
@Test
public void testReaderWithLruCombinedBlockCache() throws Exception {
testReaderCombinedCache("LRU");
}
private void testReaderCombinedCache(final String l1CachePolicy) throws Exception {
int bufCount = 1024;
int blockSize = 64 * 1024;
ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0);
fillByteBuffAllocator(alloc, bufCount);
Path storeFilePath = writeStoreFile();
// Open the file reader with CombinedBlockCache
BlockCache combined = initCombinedBlockCache(l1CachePolicy);
conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
CacheConfig cacheConfig = new CacheConfig(conf, null, combined, alloc);
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
long offset = 0;
Cacheable cachedBlock = null;
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset);
HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null);
offset += block.getOnDiskSizeWithHeader();
// Read the cached block.
cachedBlock = combined.getBlock(key, false, false, true);
try {
Assert.assertNotNull(cachedBlock);
Assert.assertTrue(cachedBlock instanceof HFileBlock);
HFileBlock hfb = (HFileBlock) cachedBlock;
// Data block will be cached in BucketCache, so it should be an off-heap block.
if (hfb.getBlockType().isData()) {
Assert.assertTrue(hfb.isSharedMem());
} else if (!l1CachePolicy.equals("TinyLfu")) {
Assert.assertFalse(hfb.isSharedMem());
}
} finally {
cachedBlock.release();
}
block.release(); // return back the ByteBuffer back to allocator.
}
reader.close();
combined.shutdown();
if (cachedBlock != null) {
Assert.assertEquals(0, cachedBlock.refCnt());
}
Assert.assertEquals(bufCount, alloc.getFreeBufferCount());
alloc.clean();
}
}