diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 356c38d49cc..b59d19e52b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -645,7 +645,7 @@ public class BucketCache implements BlockCache, HeapSize { * it is {@link ByteBuffAllocator#putbackBuffer}. * */ - protected Recycler createRecycler(final BucketEntry bucketEntry) { + private Recycler createRecycler(final BucketEntry bucketEntry) { return () -> { freeBucketEntry(bucketEntry); return; @@ -1028,7 +1028,7 @@ public class BucketCache implements BlockCache, HeapSize { continue; } BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize, - (entry) -> createRecycler(entry)); + this::createRecycler); // Successfully added. Up index and add bucketEntry. Clear io exceptions. bucketEntries[index] = bucketEntry; if (ioErrorStartTime > 0) { @@ -1232,7 +1232,8 @@ public class BucketCache implements BlockCache, HeapSize { LOG.info("Persistent file is old format, it does not support verifying file integrity!"); } verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); - backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap()); + backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), + this::createRecycler); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java index ca79f690b65..222cd804112 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java @@ -66,19 +66,27 @@ class BucketEntry implements HBaseReferenceCounted { private BlockPriority priority; /** - * The RefCnt means how many paths are referring the {@link BucketEntry}, each RPC reading path is - * considering as one path, the {@link BucketCache#backingMap} reference is also considered a - * path. NOTICE that if two read RPC path hit the same {@link BucketEntry}, then the HFileBlocks - * the two RPC referred will share the same refCnt instance with the BucketEntry. so the refCnt - * will increase or decrease as the following:
- * 1. when writerThread flush the block into IOEngine and add the bucketEntry into backingMap, the - * refCnt ++;
- * 2. If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--; it - * usually happen when HFile is closing or someone call the clearBucketCache by force.
- * 3. The read RPC path start to refer the block which is backend by the memory area in - * bucketEntry, then refCnt ++ ;
- * 4. The read RPC patch shipped the response, and release the block. then refCnt--;
- * Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area. + *
+   * The RefCnt means how many paths are referring the {@link BucketEntry}, there are two cases:
+   * 1.If the {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}),the refCnt is
+   *   always 1 until this {@link BucketEntry} is evicted from {@link BucketCache#backingMap}.Even
+   *   if the corresponding {@link HFileBlock} is referenced by RPC reading, the refCnt should not
+   *   increase.
+   *
+   * 2.If the {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}),each RPC
+   *   reading path is considering as one path, the {@link BucketCache#backingMap} reference is
+   *   also considered a path. NOTICE that if two read RPC path hit the same {@link BucketEntry},
+   *   then the {@link HFileBlock}s the two RPC referred will share the same refCnt instance with
+   *   the {@link BucketEntry},so the refCnt will increase or decrease as the following:
+   *   (1) when writerThread flush the block into IOEngine and add the bucketEntry into backingMap,
+   *       the refCnt ++;
+   *   (2) If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--;
+   *       it usually happen when HFile is closing or someone call the clearBucketCache by force.
+   *   (3) The read RPC path start to refer the block which is backend by the memory area in
+   *       bucketEntry, then refCnt ++ ;
+   *   (4) The read RPC patch shipped the response, and release the block. then refCnt--;
+   *    Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area.
+   * 
*/ private final RefCnt refCnt; final AtomicBoolean markedAsEvicted; @@ -89,22 +97,22 @@ class BucketEntry implements HBaseReferenceCounted { */ private final long cachedTime = System.nanoTime(); - BucketEntry(long offset, int length, long accessCounter, boolean inMemory) { - this(offset, length, accessCounter, inMemory, null, ByteBuffAllocator.HEAP); - } - + /** + * @param createRecycler used to free this {@link BucketEntry} when {@link BucketEntry#refCnt} + * becoming 0. NOTICE that {@link ByteBuffAllocator#NONE} could only be used for test. + */ BucketEntry(long offset, int length, long accessCounter, boolean inMemory, Function createRecycler, ByteBuffAllocator allocator) { + if (createRecycler == null) { + throw new IllegalArgumentException("createRecycler could not be null!"); + } setOffset(offset); this.length = length; this.accessCounter = accessCounter; this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI; - if (createRecycler == null) { - this.refCnt = RefCnt.create(); - } else { - this.refCnt = RefCnt.create(createRecycler.apply(this)); - } + this.refCnt = RefCnt.create(createRecycler.apply(this)); + this.markedAsEvicted = new AtomicBoolean(false); this.allocator = allocator; } @@ -173,13 +181,19 @@ class BucketEntry implements HBaseReferenceCounted { } /** - * Check whether have some RPC patch referring this block. There're two case:
+ * Check whether have some RPC patch referring this block.
+ * For {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}), there're two + * case:
* 1. If current refCnt is greater than 1, there must be at least one referring RPC path;
* 2. If current refCnt is equal to 1 and the markedAtEvicted is true, the it means backingMap has * released its reference, the remaining reference can only be from RPC path.
* We use this check to decide whether we can free the block area: when cached size exceed the * acceptable size, our eviction policy will choose those stale blocks without any RPC reference - * and the RPC referred block will be excluded. + * and the RPC referred block will be excluded.
+ *
+ * For {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}), + * {@link BucketEntry#refCnt} is always 1 until it is evicted from {@link BucketCache#backingMap}, + * so {@link BucketEntry#isRpcRef()} is always return false. * @return true to indicate there're some RPC referring the block. */ boolean isRpcRef() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java index f3d63d4e72f..b2a00f1795e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -23,7 +23,10 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.BlockPriority; import org.apache.hadoop.hbase.io.hfile.BlockType; @@ -127,7 +130,8 @@ final class BucketProtoUtils { } static ConcurrentHashMap fromPB( - Map deserializers, BucketCacheProtos.BackingMap backingMap) + Map deserializers, BucketCacheProtos.BackingMap backingMap, + Function createRecycler) throws IOException { ConcurrentHashMap result = new ConcurrentHashMap<>(); for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) { @@ -135,11 +139,14 @@ final class BucketProtoUtils { BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(), protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType())); BucketCacheProtos.BucketEntry protoValue = entry.getValue(); + // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator + // which created by RpcServer elegantly. BucketEntry value = new BucketEntry( protoValue.getOffset(), protoValue.getLength(), protoValue.getAccessCounter(), - protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory); + protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory, createRecycler, + ByteBuffAllocator.HEAP); // This is the deserializer that we stored int oldIndex = protoValue.getDeserialiserIndex(); String deserializerClass = deserializers.get(oldIndex); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index bca9d9e50de..22b48dc9163 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -554,7 +554,10 @@ public class TestBucketCache { // This number is picked because it produces negative output if the values isn't ensured to be // positive. See HBASE-18757 for more information. long testValue = 549888460800L; - BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true); + BucketEntry bucketEntry = + new BucketEntry(testValue, 10, 10L, true, (entry) -> { + return ByteBuffAllocator.NONE; + }, ByteBuffAllocator.HEAP); assertEquals(testValue, bucketEntry.offset()); } @@ -689,4 +692,57 @@ public class TestBucketCache { } Assert.assertEquals(0, allocator.getUsedSize()); } + + /** + * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file + * could not be freed even if corresponding {@link HFileBlock} is evicted from + * {@link BucketCache}. + */ + @Test + public void testFreeBucketEntryRestoredFromFile() throws Exception { + try { + final Path dataTestDir = createAndGetTestDir(); + + String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; + String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; + + BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath); + long usedByteSize = bucketCache.getAllocator().getUsedSize(); + assertEquals(0, usedByteSize); + + HFileBlockPair[] hfileBlockPairs = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + // Add blocks + for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { + bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock()); + } + + for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { + cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(), + hfileBlockPair.getBlock()); + } + usedByteSize = bucketCache.getAllocator().getUsedSize(); + assertNotEquals(0, usedByteSize); + // persist cache to file + bucketCache.shutdown(); + assertTrue(new File(persistencePath).exists()); + + // restore cache from file + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath); + assertFalse(new File(persistencePath).exists()); + assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); + + for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { + BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); + bucketCache.evictBlock(blockCacheKey); + } + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + } finally { + HBASE_TESTING_UTILITY.cleanupTestDir(); + } + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java index 97a5283e470..677d602297c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java @@ -49,7 +49,9 @@ public class TestByteBufferIOEngine { private long off; MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) { - super(offset & 0xFF00, length, 0, false, null, allocator); + super(offset & 0xFF00, length, 0, false, (entry) -> { + return ByteBuffAllocator.NONE; + }, allocator); this.off = offset; }