HBASE-26295 BucketCache could not free BucketEntry which restored fro… (#3699)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
chenglei 2021-09-29 21:24:12 +08:00 committed by Duo Zhang
parent 43455061d3
commit 3cc539a561
5 changed files with 125 additions and 31 deletions

View File

@ -634,7 +634,7 @@ public class BucketCache implements BlockCache, HeapSize {
* it is {@link ByteBuffAllocator#putbackBuffer}. * it is {@link ByteBuffAllocator#putbackBuffer}.
* </pre> * </pre>
*/ */
protected Recycler createRecycler(final BucketEntry bucketEntry) { private Recycler createRecycler(final BucketEntry bucketEntry) {
return () -> { return () -> {
freeBucketEntry(bucketEntry); freeBucketEntry(bucketEntry);
return; return;
@ -1017,7 +1017,7 @@ public class BucketCache implements BlockCache, HeapSize {
continue; continue;
} }
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize, BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
(entry) -> createRecycler(entry)); this::createRecycler);
// Successfully added. Up index and add bucketEntry. Clear io exceptions. // Successfully added. Up index and add bucketEntry. Clear io exceptions.
bucketEntries[index] = bucketEntry; bucketEntries[index] = bucketEntry;
if (ioErrorStartTime > 0) { if (ioErrorStartTime > 0) {
@ -1217,7 +1217,8 @@ public class BucketCache implements BlockCache, HeapSize {
LOG.info("Persistent file is old format, it does not support verifying file integrity!"); LOG.info("Persistent file is old format, it does not support verifying file integrity!");
} }
verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap()); backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
this::createRecycler);
} }
/** /**

View File

@ -66,19 +66,27 @@ class BucketEntry implements HBaseReferenceCounted {
private BlockPriority priority; private BlockPriority priority;
/** /**
* The RefCnt means how many paths are referring the {@link BucketEntry}, each RPC reading path is * <pre>
* considering as one path, the {@link BucketCache#backingMap} reference is also considered a * The RefCnt means how many paths are referring the {@link BucketEntry}, there are two cases:
* path. NOTICE that if two read RPC path hit the same {@link BucketEntry}, then the HFileBlocks * 1.If the {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}),the refCnt is
* the two RPC referred will share the same refCnt instance with the BucketEntry. so the refCnt * always 1 until this {@link BucketEntry} is evicted from {@link BucketCache#backingMap}.Even
* will increase or decrease as the following: <br> * if the corresponding {@link HFileBlock} is referenced by RPC reading, the refCnt should not
* 1. when writerThread flush the block into IOEngine and add the bucketEntry into backingMap, the * increase.
* refCnt ++; <br> *
* 2. If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--; it * 2.If the {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}),each RPC
* usually happen when HFile is closing or someone call the clearBucketCache by force. <br> * reading path is considering as one path, the {@link BucketCache#backingMap} reference is
* 3. The read RPC path start to refer the block which is backend by the memory area in * also considered a path. NOTICE that if two read RPC path hit the same {@link BucketEntry},
* bucketEntry, then refCnt ++ ; <br> * then the {@link HFileBlock}s the two RPC referred will share the same refCnt instance with
* 4. The read RPC patch shipped the response, and release the block. then refCnt--; <br> * the {@link BucketEntry},so the refCnt will increase or decrease as the following:
* Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area. * (1) when writerThread flush the block into IOEngine and add the bucketEntry into backingMap,
* the refCnt ++;
* (2) If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--;
* it usually happen when HFile is closing or someone call the clearBucketCache by force.
* (3) The read RPC path start to refer the block which is backend by the memory area in
* bucketEntry, then refCnt ++ ;
* (4) The read RPC patch shipped the response, and release the block. then refCnt--;
* Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area.
* </pre>
*/ */
private final RefCnt refCnt; private final RefCnt refCnt;
final AtomicBoolean markedAsEvicted; final AtomicBoolean markedAsEvicted;
@ -89,22 +97,22 @@ class BucketEntry implements HBaseReferenceCounted {
*/ */
private final long cachedTime = System.nanoTime(); private final long cachedTime = System.nanoTime();
BucketEntry(long offset, int length, long accessCounter, boolean inMemory) { /**
this(offset, length, accessCounter, inMemory, null, ByteBuffAllocator.HEAP); * @param createRecycler used to free this {@link BucketEntry} when {@link BucketEntry#refCnt}
} * becoming 0. NOTICE that {@link ByteBuffAllocator#NONE} could only be used for test.
*/
BucketEntry(long offset, int length, long accessCounter, boolean inMemory, BucketEntry(long offset, int length, long accessCounter, boolean inMemory,
Function<BucketEntry, Recycler> createRecycler, Function<BucketEntry, Recycler> createRecycler,
ByteBuffAllocator allocator) { ByteBuffAllocator allocator) {
if (createRecycler == null) {
throw new IllegalArgumentException("createRecycler could not be null!");
}
setOffset(offset); setOffset(offset);
this.length = length; this.length = length;
this.accessCounter = accessCounter; this.accessCounter = accessCounter;
this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI; this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
if (createRecycler == null) { this.refCnt = RefCnt.create(createRecycler.apply(this));
this.refCnt = RefCnt.create();
} else {
this.refCnt = RefCnt.create(createRecycler.apply(this));
}
this.markedAsEvicted = new AtomicBoolean(false); this.markedAsEvicted = new AtomicBoolean(false);
this.allocator = allocator; this.allocator = allocator;
} }
@ -173,13 +181,19 @@ class BucketEntry implements HBaseReferenceCounted {
} }
/** /**
* Check whether have some RPC patch referring this block. There're two case: <br> * Check whether have some RPC patch referring this block.<br/>
* For {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}), there're two
* case: <br>
* 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br> * 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br>
* 2. If current refCnt is equal to 1 and the markedAtEvicted is true, the it means backingMap has * 2. If current refCnt is equal to 1 and the markedAtEvicted is true, the it means backingMap has
* released its reference, the remaining reference can only be from RPC path. <br> * released its reference, the remaining reference can only be from RPC path. <br>
* We use this check to decide whether we can free the block area: when cached size exceed the * We use this check to decide whether we can free the block area: when cached size exceed the
* acceptable size, our eviction policy will choose those stale blocks without any RPC reference * acceptable size, our eviction policy will choose those stale blocks without any RPC reference
* and the RPC referred block will be excluded. * and the RPC referred block will be excluded. <br/>
* <br/>
* For {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}),
* {@link BucketEntry#refCnt} is always 1 until it is evicted from {@link BucketCache#backingMap},
* so {@link BucketEntry#isRpcRef()} is always return false.
* @return true to indicate there're some RPC referring the block. * @return true to indicate there're some RPC referring the block.
*/ */
boolean isRpcRef() { boolean isRpcRef() {

View File

@ -23,7 +23,10 @@ package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockPriority; import org.apache.hadoop.hbase.io.hfile.BlockPriority;
import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.BlockType;
@ -127,7 +130,8 @@ final class BucketProtoUtils {
} }
static ConcurrentHashMap<BlockCacheKey, BucketEntry> fromPB( static ConcurrentHashMap<BlockCacheKey, BucketEntry> fromPB(
Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap) Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap,
Function<BucketEntry, Recycler> createRecycler)
throws IOException { throws IOException {
ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>(); ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>();
for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) { for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
@ -135,11 +139,14 @@ final class BucketProtoUtils {
BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(), BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType())); protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
BucketCacheProtos.BucketEntry protoValue = entry.getValue(); BucketCacheProtos.BucketEntry protoValue = entry.getValue();
// TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator
// which created by RpcServer elegantly.
BucketEntry value = new BucketEntry( BucketEntry value = new BucketEntry(
protoValue.getOffset(), protoValue.getOffset(),
protoValue.getLength(), protoValue.getLength(),
protoValue.getAccessCounter(), protoValue.getAccessCounter(),
protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory); protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory, createRecycler,
ByteBuffAllocator.HEAP);
// This is the deserializer that we stored // This is the deserializer that we stored
int oldIndex = protoValue.getDeserialiserIndex(); int oldIndex = protoValue.getDeserialiserIndex();
String deserializerClass = deserializers.get(oldIndex); String deserializerClass = deserializers.get(oldIndex);

View File

@ -113,6 +113,8 @@ public class TestBucketCache {
String ioEngineName = "offheap"; String ioEngineName = "offheap";
String persistencePath = null; String persistencePath = null;
private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
private static class MockedBucketCache extends BucketCache { private static class MockedBucketCache extends BucketCache {
public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
@ -144,6 +146,18 @@ public class TestBucketCache {
cache.shutdown(); cache.shutdown();
} }
/**
* Test Utility to create test dir and return name
*
* @return return name of created dir
* @throws IOException throws IOException
*/
private Path createAndGetTestDir() throws IOException {
final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
return testDir;
}
/** /**
* Return a random element from {@code a}. * Return a random element from {@code a}.
*/ */
@ -444,7 +458,10 @@ public class TestBucketCache {
// This number is picked because it produces negative output if the values isn't ensured to be // This number is picked because it produces negative output if the values isn't ensured to be
// positive. See HBASE-18757 for more information. // positive. See HBASE-18757 for more information.
long testValue = 549888460800L; long testValue = 549888460800L;
BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true); BucketEntry bucketEntry =
new BucketEntry(testValue, 10, 10L, true, (entry) -> {
return ByteBuffAllocator.NONE;
}, ByteBuffAllocator.HEAP);
assertEquals(testValue, bucketEntry.offset()); assertEquals(testValue, bucketEntry.offset());
} }
@ -579,4 +596,57 @@ public class TestBucketCache {
} }
Assert.assertEquals(0, allocator.getUsedSize()); Assert.assertEquals(0, allocator.getUsedSize());
} }
/**
* This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
* could not be freed even if corresponding {@link HFileBlock} is evicted from
* {@link BucketCache}.
*/
@Test
public void testFreeBucketEntryRestoredFromFile() throws Exception {
try {
final Path dataTestDir = createAndGetTestDir();
String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
long usedByteSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedByteSize);
HFileBlockPair[] hfileBlockPairs =
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
// Add blocks
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
}
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
hfileBlockPair.getBlock());
}
usedByteSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedByteSize);
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());
// restore cache from file
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
bucketCache.evictBlock(blockCacheKey);
}
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
} finally {
HBASE_TESTING_UTILITY.cleanupTestDir();
}
}
} }

View File

@ -49,7 +49,9 @@ public class TestByteBufferIOEngine {
private long off; private long off;
MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) { MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
super(offset & 0xFF00, length, 0, false, null, allocator); super(offset & 0xFF00, length, 0, false, (entry) -> {
return ByteBuffAllocator.NONE;
}, allocator);
this.off = offset; this.off = offset;
} }