HBASE-26295 BucketCache could not free BucketEntry which restored fro… (#3699)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
1152a61b5b
commit
121bdea230
|
@ -645,7 +645,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
* it is {@link ByteBuffAllocator#putbackBuffer}.
|
||||
* </pre>
|
||||
*/
|
||||
protected Recycler createRecycler(final BucketEntry bucketEntry) {
|
||||
private Recycler createRecycler(final BucketEntry bucketEntry) {
|
||||
return () -> {
|
||||
freeBucketEntry(bucketEntry);
|
||||
return;
|
||||
|
@ -1028,7 +1028,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
continue;
|
||||
}
|
||||
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
|
||||
(entry) -> createRecycler(entry));
|
||||
this::createRecycler);
|
||||
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
|
||||
bucketEntries[index] = bucketEntry;
|
||||
if (ioErrorStartTime > 0) {
|
||||
|
@ -1232,7 +1232,8 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
LOG.info("Persistent file is old format, it does not support verifying file integrity!");
|
||||
}
|
||||
verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
|
||||
backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap());
|
||||
backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
|
||||
this::createRecycler);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -66,19 +66,27 @@ class BucketEntry implements HBaseReferenceCounted {
|
|||
private BlockPriority priority;
|
||||
|
||||
/**
|
||||
* The RefCnt means how many paths are referring the {@link BucketEntry}, each RPC reading path is
|
||||
* considering as one path, the {@link BucketCache#backingMap} reference is also considered a
|
||||
* path. NOTICE that if two read RPC path hit the same {@link BucketEntry}, then the HFileBlocks
|
||||
* the two RPC referred will share the same refCnt instance with the BucketEntry. so the refCnt
|
||||
* will increase or decrease as the following: <br>
|
||||
* 1. when writerThread flush the block into IOEngine and add the bucketEntry into backingMap, the
|
||||
* refCnt ++; <br>
|
||||
* 2. If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--; it
|
||||
* usually happen when HFile is closing or someone call the clearBucketCache by force. <br>
|
||||
* 3. The read RPC path start to refer the block which is backend by the memory area in
|
||||
* bucketEntry, then refCnt ++ ; <br>
|
||||
* 4. The read RPC patch shipped the response, and release the block. then refCnt--; <br>
|
||||
* Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area.
|
||||
* <pre>
|
||||
* The RefCnt means how many paths are referring the {@link BucketEntry}, there are two cases:
|
||||
* 1.If the {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}),the refCnt is
|
||||
* always 1 until this {@link BucketEntry} is evicted from {@link BucketCache#backingMap}.Even
|
||||
* if the corresponding {@link HFileBlock} is referenced by RPC reading, the refCnt should not
|
||||
* increase.
|
||||
*
|
||||
* 2.If the {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}),each RPC
|
||||
* reading path is considering as one path, the {@link BucketCache#backingMap} reference is
|
||||
* also considered a path. NOTICE that if two read RPC path hit the same {@link BucketEntry},
|
||||
* then the {@link HFileBlock}s the two RPC referred will share the same refCnt instance with
|
||||
* the {@link BucketEntry},so the refCnt will increase or decrease as the following:
|
||||
* (1) when writerThread flush the block into IOEngine and add the bucketEntry into backingMap,
|
||||
* the refCnt ++;
|
||||
* (2) If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--;
|
||||
* it usually happen when HFile is closing or someone call the clearBucketCache by force.
|
||||
* (3) The read RPC path start to refer the block which is backend by the memory area in
|
||||
* bucketEntry, then refCnt ++ ;
|
||||
* (4) The read RPC patch shipped the response, and release the block. then refCnt--;
|
||||
* Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area.
|
||||
* </pre>
|
||||
*/
|
||||
private final RefCnt refCnt;
|
||||
final AtomicBoolean markedAsEvicted;
|
||||
|
@ -89,22 +97,22 @@ class BucketEntry implements HBaseReferenceCounted {
|
|||
*/
|
||||
private final long cachedTime = System.nanoTime();
|
||||
|
||||
BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
|
||||
this(offset, length, accessCounter, inMemory, null, ByteBuffAllocator.HEAP);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param createRecycler used to free this {@link BucketEntry} when {@link BucketEntry#refCnt}
|
||||
* becoming 0. NOTICE that {@link ByteBuffAllocator#NONE} could only be used for test.
|
||||
*/
|
||||
BucketEntry(long offset, int length, long accessCounter, boolean inMemory,
|
||||
Function<BucketEntry, Recycler> createRecycler,
|
||||
ByteBuffAllocator allocator) {
|
||||
if (createRecycler == null) {
|
||||
throw new IllegalArgumentException("createRecycler could not be null!");
|
||||
}
|
||||
setOffset(offset);
|
||||
this.length = length;
|
||||
this.accessCounter = accessCounter;
|
||||
this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
|
||||
if (createRecycler == null) {
|
||||
this.refCnt = RefCnt.create();
|
||||
} else {
|
||||
this.refCnt = RefCnt.create(createRecycler.apply(this));
|
||||
}
|
||||
this.refCnt = RefCnt.create(createRecycler.apply(this));
|
||||
|
||||
this.markedAsEvicted = new AtomicBoolean(false);
|
||||
this.allocator = allocator;
|
||||
}
|
||||
|
@ -173,13 +181,19 @@ class BucketEntry implements HBaseReferenceCounted {
|
|||
}
|
||||
|
||||
/**
|
||||
* Check whether have some RPC patch referring this block. There're two case: <br>
|
||||
* Check whether have some RPC patch referring this block.<br/>
|
||||
* For {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}), there're two
|
||||
* case: <br>
|
||||
* 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br>
|
||||
* 2. If current refCnt is equal to 1 and the markedAtEvicted is true, the it means backingMap has
|
||||
* released its reference, the remaining reference can only be from RPC path. <br>
|
||||
* We use this check to decide whether we can free the block area: when cached size exceed the
|
||||
* acceptable size, our eviction policy will choose those stale blocks without any RPC reference
|
||||
* and the RPC referred block will be excluded.
|
||||
* and the RPC referred block will be excluded. <br/>
|
||||
* <br/>
|
||||
* For {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}),
|
||||
* {@link BucketEntry#refCnt} is always 1 until it is evicted from {@link BucketCache#backingMap},
|
||||
* so {@link BucketEntry#isRpcRef()} is always return false.
|
||||
* @return true to indicate there're some RPC referring the block.
|
||||
*/
|
||||
boolean isRpcRef() {
|
||||
|
|
|
@ -23,7 +23,10 @@ package org.apache.hadoop.hbase.io.hfile.bucket;
|
|||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockPriority;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockType;
|
||||
|
@ -127,7 +130,8 @@ final class BucketProtoUtils {
|
|||
}
|
||||
|
||||
static ConcurrentHashMap<BlockCacheKey, BucketEntry> fromPB(
|
||||
Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap)
|
||||
Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap,
|
||||
Function<BucketEntry, Recycler> createRecycler)
|
||||
throws IOException {
|
||||
ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>();
|
||||
for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
|
||||
|
@ -135,11 +139,14 @@ final class BucketProtoUtils {
|
|||
BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
|
||||
protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
|
||||
BucketCacheProtos.BucketEntry protoValue = entry.getValue();
|
||||
// TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator
|
||||
// which created by RpcServer elegantly.
|
||||
BucketEntry value = new BucketEntry(
|
||||
protoValue.getOffset(),
|
||||
protoValue.getLength(),
|
||||
protoValue.getAccessCounter(),
|
||||
protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory);
|
||||
protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory, createRecycler,
|
||||
ByteBuffAllocator.HEAP);
|
||||
// This is the deserializer that we stored
|
||||
int oldIndex = protoValue.getDeserialiserIndex();
|
||||
String deserializerClass = deserializers.get(oldIndex);
|
||||
|
|
|
@ -554,7 +554,10 @@ public class TestBucketCache {
|
|||
// This number is picked because it produces negative output if the values isn't ensured to be
|
||||
// positive. See HBASE-18757 for more information.
|
||||
long testValue = 549888460800L;
|
||||
BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true);
|
||||
BucketEntry bucketEntry =
|
||||
new BucketEntry(testValue, 10, 10L, true, (entry) -> {
|
||||
return ByteBuffAllocator.NONE;
|
||||
}, ByteBuffAllocator.HEAP);
|
||||
assertEquals(testValue, bucketEntry.offset());
|
||||
}
|
||||
|
||||
|
@ -689,4 +692,57 @@ public class TestBucketCache {
|
|||
}
|
||||
Assert.assertEquals(0, allocator.getUsedSize());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
|
||||
* could not be freed even if corresponding {@link HFileBlock} is evicted from
|
||||
* {@link BucketCache}.
|
||||
*/
|
||||
@Test
|
||||
public void testFreeBucketEntryRestoredFromFile() throws Exception {
|
||||
try {
|
||||
final Path dataTestDir = createAndGetTestDir();
|
||||
|
||||
String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
|
||||
String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
|
||||
|
||||
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
|
||||
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
|
||||
long usedByteSize = bucketCache.getAllocator().getUsedSize();
|
||||
assertEquals(0, usedByteSize);
|
||||
|
||||
HFileBlockPair[] hfileBlockPairs =
|
||||
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
|
||||
// Add blocks
|
||||
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
|
||||
bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
|
||||
}
|
||||
|
||||
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
|
||||
cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
|
||||
hfileBlockPair.getBlock());
|
||||
}
|
||||
usedByteSize = bucketCache.getAllocator().getUsedSize();
|
||||
assertNotEquals(0, usedByteSize);
|
||||
// persist cache to file
|
||||
bucketCache.shutdown();
|
||||
assertTrue(new File(persistencePath).exists());
|
||||
|
||||
// restore cache from file
|
||||
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
|
||||
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
|
||||
assertFalse(new File(persistencePath).exists());
|
||||
assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
|
||||
|
||||
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
|
||||
BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
|
||||
bucketCache.evictBlock(blockCacheKey);
|
||||
}
|
||||
assertEquals(0, bucketCache.getAllocator().getUsedSize());
|
||||
assertEquals(0, bucketCache.backingMap.size());
|
||||
} finally {
|
||||
HBASE_TESTING_UTILITY.cleanupTestDir();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -49,7 +49,9 @@ public class TestByteBufferIOEngine {
|
|||
private long off;
|
||||
|
||||
MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
|
||||
super(offset & 0xFF00, length, 0, false, null, allocator);
|
||||
super(offset & 0xFF00, length, 0, false, (entry) -> {
|
||||
return ByteBuffAllocator.NONE;
|
||||
}, allocator);
|
||||
this.off = offset;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue