From 96e728394980cfcb76c5173ff6eb0bfa4cb1f7fb Mon Sep 17 00:00:00 2001 From: chenglei Date: Fri, 17 Sep 2021 22:15:23 +0800 Subject: [PATCH] HubSpot Backport: HBASE-26281 DBB got from BucketCache would be freed unexpectedly before RPC completed (#3680) Signed-off-by: Duo Zhang --- .../hbase/io/hfile/bucket/BucketCache.java | 390 ++++++++------ .../hbase/io/hfile/bucket/BucketEntry.java | 26 +- ...tAvoidCellReferencesIntoShippedBlocks.java | 25 +- .../io/hfile/bucket/TestBucketCache.java | 30 +- .../hfile/bucket/TestBucketCacheRefCnt.java | 491 +++++++++++++++++- .../hfile/bucket/TestBucketWriterThread.java | 7 +- .../hfile/bucket/TestByteBufferIOEngine.java | 3 +- .../hbase/io/hfile/bucket/TestRAMCache.java | 2 +- 8 files changed, 767 insertions(+), 207 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 29a9db4c668..866a5fcf092 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -49,6 +49,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; +import java.util.function.Function; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -419,7 +420,7 @@ public class BucketCache implements BlockCache, HeapSize { boolean wait) { if (cacheEnabled) { if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { - if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) { + if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) { BucketEntry bucketEntry = backingMap.get(cacheKey); if (bucketEntry != null && bucketEntry.isRpcRef()) { // avoid replace when there are RPC refs for the bucket entry in bucket cache @@ -433,7 +434,11 @@ public class BucketCache implements BlockCache, HeapSize { } } - private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, + protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { + return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock); + } + + protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait) { if (!cacheEnabled) { return; @@ -441,8 +446,7 @@ public class BucketCache implements BlockCache, HeapSize { LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); // Stuff the entry into the RAM cache so it can get drained to the persistent store RAMQueueEntry re = - new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory, - createRecycler(cacheKey)); + new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); /** * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same * key in ramCache, the heap size of bucket cache need to update if replacing entry from @@ -540,13 +544,25 @@ public class BucketCache implements BlockCache, HeapSize { return null; } + /** + * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap} + */ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { - bucketAllocator.freeBlock(bucketEntry.offset()); - realCacheSize.add(-1 * bucketEntry.getLength()); + bucketEntry.markAsEvicted(); blocksByHFile.remove(cacheKey); if (decrementBlockNumber) { this.blockNumber.decrement(); } + cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary()); + } + + /** + * Free the {{@link BucketEntry} actually,which could only be invoked when the + * {@link BucketEntry#refCnt} becoming 0. + */ + void freeBucketEntry(BucketEntry bucketEntry) { + bucketAllocator.freeBlock(bucketEntry.offset()); + realCacheSize.add(-1 * bucketEntry.getLength()); } /** @@ -554,10 +570,10 @@ public class BucketCache implements BlockCache, HeapSize { * 1. Close an HFile, and clear all cached blocks.
* 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.
*

- * Firstly, we'll try to remove the block from RAMCache. If it doesn't exist in RAMCache, then try - * to evict from backingMap. Here we only need to free the reference from bucket cache by calling - * {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this block, block can - * only be de-allocated when all of them release the block. + * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap. + * Here we evict the block from backingMap immediately, but only free the reference from bucket + * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this + * block, block can only be de-allocated when all of them release the block. *

* NOTICE: we need to grab the write offset lock firstly before releasing the reference from * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when @@ -567,43 +583,92 @@ public class BucketCache implements BlockCache, HeapSize { */ @Override public boolean evictBlock(BlockCacheKey cacheKey) { + return doEvictBlock(cacheKey, null); + } + + /** + * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and + * {@link BucketCache#ramCache}.
+ * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and + * {@link BucketEntry} could be removed. + * @param cacheKey {@link BlockCacheKey} to evict. + * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. + * @return true to indicate whether we've evicted successfully or not. + */ + private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry) { if (!cacheEnabled) { return false; } - boolean existed = removeFromRamCache(cacheKey); - BucketEntry be = backingMap.get(cacheKey); - if (be == null) { - if (existed) { + boolean existedInRamCache = removeFromRamCache(cacheKey); + if (bucketEntry == null) { + bucketEntry = backingMap.get(cacheKey); + } + final BucketEntry bucketEntryToUse = bucketEntry; + + if (bucketEntryToUse == null) { + if (existedInRamCache) { cacheStats.evicted(0, cacheKey.isPrimary()); } - return existed; + return existedInRamCache; } else { - return be.withWriteLock(offsetLock, be::markAsEvicted); + return bucketEntryToUse.withWriteLock(offsetLock, () -> { + if (backingMap.remove(cacheKey, bucketEntryToUse)) { + blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache); + return true; + } + return false; + }); } } - private Recycler createRecycler(BlockCacheKey cacheKey) { + /** + *

+   * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as
+   * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}.
+   * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf}
+   * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different:
+   * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap},
+   *   it is the return value of current {@link BucketCache#createRecycler} method.
+   *
+   * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache},
+   *   it is {@link ByteBuffAllocator#putbackBuffer}.
+   * 
+ */ + protected Recycler createRecycler(final BucketEntry bucketEntry) { return () -> { - if (!cacheEnabled) { - return; - } - boolean existed = removeFromRamCache(cacheKey); - BucketEntry be = backingMap.get(cacheKey); - if (be == null && existed) { - cacheStats.evicted(0, cacheKey.isPrimary()); - } else if (be != null) { - be.withWriteLock(offsetLock, () -> { - if (backingMap.remove(cacheKey, be)) { - blockEvicted(cacheKey, be, !existed); - cacheStats.evicted(be.getCachedTime(), cacheKey.isPrimary()); - } - return null; - }); - } + freeBucketEntry(bucketEntry); + return; }; } - private boolean removeFromRamCache(BlockCacheKey cacheKey) { + /** + * NOTE: This method is only for test. + */ + public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) { + BucketEntry bucketEntry = backingMap.get(blockCacheKey); + if (bucketEntry == null) { + return false; + } + return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry); + } + + /** + * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if + * {@link BucketEntry#isRpcRef} is false.
+ * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and + * {@link BucketEntry} could be removed. + * @param blockCacheKey {@link BlockCacheKey} to evict. + * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict. + * @return true to indicate whether we've evicted successfully or not. + */ + boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) { + if (!bucketEntry.isRpcRef()) { + return doEvictBlock(blockCacheKey, bucketEntry); + } + return false; + } + + protected boolean removeFromRamCache(BlockCacheKey cacheKey) { return ramCache.remove(cacheKey, re -> { if (re != null) { this.blockNumber.decrement(); @@ -709,7 +774,7 @@ public class BucketCache implements BlockCache, HeapSize { bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded); for (Map.Entry entry : backingMap.entrySet()) { if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) { - entry.getValue().withWriteLock(offsetLock, entry.getValue()::markStaleAsEvicted); + evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue()); } } } @@ -898,134 +963,134 @@ public class BucketCache implements BlockCache, HeapSize { } LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); } + } - /** - * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing - * cache with a new block for the same cache key. there's a corner case: one thread cache a - * block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another - * new block with the same cache key do the same thing for the same cache key, so if not evict - * the previous bucket entry, then memory leak happen because the previous bucketEntry is gone - * but the bucketAllocator do not free its memory. - * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey - * cacheKey, Cacheable newBlock) - * @param key Block cache key - * @param bucketEntry Bucket entry to put into backingMap. - */ - private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { - BucketEntry previousEntry = backingMap.put(key, bucketEntry); - if (previousEntry != null && previousEntry != bucketEntry) { - previousEntry.withWriteLock(offsetLock, () -> { - blockEvicted(key, previousEntry, false); + /** + * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing + * cache with a new block for the same cache key. there's a corner case: one thread cache a block + * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block + * with the same cache key do the same thing for the same cache key, so if not evict the previous + * bucket entry, then memory leak happen because the previous bucketEntry is gone but the + * bucketAllocator do not free its memory. + * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey + * cacheKey, Cacheable newBlock) + * @param key Block cache key + * @param bucketEntry Bucket entry to put into backingMap. + */ + protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { + BucketEntry previousEntry = backingMap.put(key, bucketEntry); + if (previousEntry != null && previousEntry != bucketEntry) { + previousEntry.withWriteLock(offsetLock, () -> { + blockEvicted(key, previousEntry, false); + return null; + }); + } + } + + /** + * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that + * are passed in even if failure being sure to remove from ramCache else we'll never undo the + * references and we'll OOME. + * @param entries Presumes list passed in here will be processed by this invocation only. No + * interference expected. + */ + void doDrain(final List entries) throws InterruptedException { + if (entries.isEmpty()) { + return; + } + // This method is a little hard to follow. We run through the passed in entries and for each + // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must + // do cleanup making sure we've cleared ramCache of all entries regardless of whether we + // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by + // filling ramCache. We do the clean up by again running through the passed in entries + // doing extra work when we find a non-null bucketEntries corresponding entry. + final int size = entries.size(); + BucketEntry[] bucketEntries = new BucketEntry[size]; + // Index updated inside loop if success or if we can't succeed. We retry if cache is full + // when we go to add an entry by going around the loop again without upping the index. + int index = 0; + while (cacheEnabled && index < size) { + RAMQueueEntry re = null; + try { + re = entries.get(index); + if (re == null) { + LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); + index++; + continue; + } + BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize, + (entry) -> createRecycler(entry)); + // Successfully added. Up index and add bucketEntry. Clear io exceptions. + bucketEntries[index] = bucketEntry; + if (ioErrorStartTime > 0) { + ioErrorStartTime = -1; + } + index++; + } catch (BucketAllocatorException fle) { + LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle); + // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. + bucketEntries[index] = null; + index++; + } catch (CacheFullException cfe) { + // Cache full when we tried to add. Try freeing space and then retrying (don't up index) + if (!freeInProgress) { + freeSpace("Full!"); + } else { + Thread.sleep(50); + } + } catch (IOException ioex) { + // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. + LOG.error("Failed writing to bucket cache", ioex); + checkIOErrorIsTolerated(); + } + } + + // Make sure data pages are written on media before we update maps. + try { + ioEngine.sync(); + } catch (IOException ioex) { + LOG.error("Failed syncing IO engine", ioex); + checkIOErrorIsTolerated(); + // Since we failed sync, free the blocks in bucket allocator + for (int i = 0; i < entries.size(); ++i) { + if (bucketEntries[i] != null) { + bucketAllocator.freeBlock(bucketEntries[i].offset()); + bucketEntries[i] = null; + } + } + } + + // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if + // success or error. + for (int i = 0; i < size; ++i) { + BlockCacheKey key = entries.get(i).getKey(); + // Only add if non-null entry. + if (bucketEntries[i] != null) { + putIntoBackingMap(key, bucketEntries[i]); + } + // Always remove from ramCache even if we failed adding it to the block cache above. + boolean existed = ramCache.remove(key, re -> { + if (re != null) { + heapSize.add(-1 * re.getData().heapSize()); + } + }); + if (!existed && bucketEntries[i] != null) { + // Block should have already been evicted. Remove it and free space. + final BucketEntry bucketEntry = bucketEntries[i]; + bucketEntry.withWriteLock(offsetLock, () -> { + if (backingMap.remove(key, bucketEntry)) { + blockEvicted(key, bucketEntry, false); + } return null; }); } } - /** - * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. - * Process all that are passed in even if failure being sure to remove from ramCache else we'll - * never undo the references and we'll OOME. - * @param entries Presumes list passed in here will be processed by this invocation only. No - * interference expected. - * @throws InterruptedException - */ - void doDrain(final List entries) throws InterruptedException { - if (entries.isEmpty()) { - return; - } - // This method is a little hard to follow. We run through the passed in entries and for each - // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must - // do cleanup making sure we've cleared ramCache of all entries regardless of whether we - // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by - // filling ramCache. We do the clean up by again running through the passed in entries - // doing extra work when we find a non-null bucketEntries corresponding entry. - final int size = entries.size(); - BucketEntry[] bucketEntries = new BucketEntry[size]; - // Index updated inside loop if success or if we can't succeed. We retry if cache is full - // when we go to add an entry by going around the loop again without upping the index. - int index = 0; - while (cacheEnabled && index < size) { - RAMQueueEntry re = null; - try { - re = entries.get(index); - if (re == null) { - LOG.warn("Couldn't get entry or changed on us; who else is messing with it?"); - index++; - continue; - } - BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize); - // Successfully added. Up index and add bucketEntry. Clear io exceptions. - bucketEntries[index] = bucketEntry; - if (ioErrorStartTime > 0) { - ioErrorStartTime = -1; - } - index++; - } catch (BucketAllocatorException fle) { - LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle); - // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below. - bucketEntries[index] = null; - index++; - } catch (CacheFullException cfe) { - // Cache full when we tried to add. Try freeing space and then retrying (don't up index) - if (!freeInProgress) { - freeSpace("Full!"); - } else { - Thread.sleep(50); - } - } catch (IOException ioex) { - // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem. - LOG.error("Failed writing to bucket cache", ioex); - checkIOErrorIsTolerated(); - } - } - - // Make sure data pages are written on media before we update maps. - try { - ioEngine.sync(); - } catch (IOException ioex) { - LOG.error("Failed syncing IO engine", ioex); - checkIOErrorIsTolerated(); - // Since we failed sync, free the blocks in bucket allocator - for (int i = 0; i < entries.size(); ++i) { - if (bucketEntries[i] != null) { - bucketAllocator.freeBlock(bucketEntries[i].offset()); - bucketEntries[i] = null; - } - } - } - - // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if - // success or error. - for (int i = 0; i < size; ++i) { - BlockCacheKey key = entries.get(i).getKey(); - // Only add if non-null entry. - if (bucketEntries[i] != null) { - putIntoBackingMap(key, bucketEntries[i]); - } - // Always remove from ramCache even if we failed adding it to the block cache above. - boolean existed = ramCache.remove(key, re -> { - if (re != null) { - heapSize.add(-1 * re.getData().heapSize()); - } - }); - if (!existed && bucketEntries[i] != null) { - // Block should have already been evicted. Remove it and free space. - final BucketEntry bucketEntry = bucketEntries[i]; - bucketEntry.withWriteLock(offsetLock, () -> { - if (backingMap.remove(key, bucketEntry)) { - blockEvicted(key, bucketEntry, false); - } - return null; - }); - } - } - - long used = bucketAllocator.getUsedSize(); - if (used > acceptableSize()) { - freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); - } - return; + long used = bucketAllocator.getUsedSize(); + if (used > acceptableSize()) { + freeSpace("Used=" + used + " > acceptable=" + acceptableSize()); } + return; } /** @@ -1315,8 +1380,9 @@ public class BucketCache implements BlockCache, HeapSize { // TODO avoid a cycling siutation. We find no block which is not in use and so no way to free // What to do then? Caching attempt fail? Need some changes in cacheBlock API? while ((entry = queue.pollLast()) != null) { + BlockCacheKey blockCacheKey = entry.getKey(); BucketEntry be = entry.getValue(); - if (be.withWriteLock(offsetLock, be::markStaleAsEvicted)) { + if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) { freedBytes += be.getLength(); } if (freedBytes >= toFree) { @@ -1343,15 +1409,12 @@ public class BucketCache implements BlockCache, HeapSize { private final Cacheable data; private long accessCounter; private boolean inMemory; - private final Recycler recycler; - RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory, - Recycler recycler) { + RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) { this.key = bck; this.data = data; this.accessCounter = accessCounter; this.inMemory = inMemory; - this.recycler = recycler; } public Cacheable getData() { @@ -1374,7 +1437,8 @@ public class BucketCache implements BlockCache, HeapSize { } public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, - final LongAdder realCacheSize) throws IOException { + final LongAdder realCacheSize, Function createRecycler) + throws IOException { int len = data.getSerializedLength(); // This cacheable thing can't be serialized if (len == 0) { @@ -1384,7 +1448,7 @@ public class BucketCache implements BlockCache, HeapSize { boolean succ = false; BucketEntry bucketEntry = null; try { - bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, RefCnt.create(recycler), + bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, createRecycler, getByteBuffAllocator()); bucketEntry.setDeserializerReference(data.getDeserializer()); if (data instanceof HFileBlock) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java index 2dd77756e58..ca79f690b65 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java @@ -25,8 +25,10 @@ import java.nio.ByteBuffer; import java.util.Comparator; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; import org.apache.hadoop.hbase.io.hfile.BlockPriority; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; @@ -88,16 +90,21 @@ class BucketEntry implements HBaseReferenceCounted { private final long cachedTime = System.nanoTime(); BucketEntry(long offset, int length, long accessCounter, boolean inMemory) { - this(offset, length, accessCounter, inMemory, RefCnt.create(), ByteBuffAllocator.HEAP); + this(offset, length, accessCounter, inMemory, null, ByteBuffAllocator.HEAP); } - BucketEntry(long offset, int length, long accessCounter, boolean inMemory, RefCnt refCnt, + BucketEntry(long offset, int length, long accessCounter, boolean inMemory, + Function createRecycler, ByteBuffAllocator allocator) { setOffset(offset); this.length = length; this.accessCounter = accessCounter; this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI; - this.refCnt = refCnt; + if (createRecycler == null) { + this.refCnt = RefCnt.create(); + } else { + this.refCnt = RefCnt.create(createRecycler.apply(this)); + } this.markedAsEvicted = new AtomicBoolean(false); this.allocator = allocator; } @@ -165,19 +172,6 @@ class BucketEntry implements HBaseReferenceCounted { return false; } - /** - * Mark as evicted only when NO RPC references. Mainly used for eviction when cache size exceed - * the max acceptable size. - * @return true if we deallocate this entry successfully. - */ - boolean markStaleAsEvicted() { - if (!markedAsEvicted.get() && this.refCnt() == 1) { - // The only reference was coming from backingMap, now release the stale entry. - return this.markAsEvicted(); - } - return false; - } - /** * Check whether have some RPC patch referring this block. There're two case:
* 1. If current refCnt is greater than 1, there must be at least one referring RPC path;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java index a478fcb2076..a5756f93bef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; @@ -43,6 +44,8 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CachedBlock; +import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HStore; @@ -372,7 +375,11 @@ public class TestAvoidCellReferencesIntoShippedBlocks { CachedBlock next = iterator.next(); BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); cacheList.add(cacheKey); - cache.evictBlock(cacheKey); + /** + * There is only one Block referenced by rpc,here we evict blocks which have no rpc + * referenced. + */ + evictBlock(cache, cacheKey); } try { Thread.sleep(1); @@ -437,4 +444,20 @@ public class TestAvoidCellReferencesIntoShippedBlocks { table.close(); } } + + /** + * For {@link BucketCache},we only evict Block if there is no rpc referenced. + */ + private void evictBlock(BlockCache blockCache, BlockCacheKey blockCacheKey) { + assertTrue(blockCache instanceof CombinedBlockCache); + BlockCache[] blockCaches = blockCache.getBlockCaches(); + for (BlockCache currentBlockCache : blockCaches) { + if (currentBlockCache instanceof BucketCache) { + ((BucketCache) currentBlockCache).evictBlockIfNoRpcReferenced(blockCacheKey); + } else { + currentBlockCache.evictBlock(blockCacheKey); + } + } + + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index ad0f1c565c9..302005520c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -252,8 +252,26 @@ public class TestBucketCache { assertEquals(1, cache.getBlockCount()); lock.writeLock().unlock(); evictThread.join(); - assertEquals(0, cache.getBlockCount()); - assertEquals(cache.getCurrentSize(), 0L); + /** + *
+     * The asserts here before HBASE-21957 are:
+     * assertEquals(1L, cache.getBlockCount());
+     * assertTrue(cache.getCurrentSize() > 0L);
+     * assertTrue("We should have a block!", cache.iterator().hasNext());
+     *
+     * The asserts here after HBASE-21957 are:
+     * assertEquals(0, cache.getBlockCount());
+     * assertEquals(cache.getCurrentSize(), 0L);
+     *
+     * I think the asserts before HBASE-21957 is more reasonable,because
+     * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
+     * it had seen, and newly added Block after the {@link BucketEntry}
+     * it had seen should not be evicted.
+     * 
+ */ + assertEquals(1L, cache.getBlockCount()); + assertTrue(cache.getCurrentSize() > 0L); + assertTrue("We should have a block!", cache.iterator().hasNext()); } @Test @@ -506,8 +524,8 @@ public class TestBucketCache { HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP); HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP); - RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, ByteBuffAllocator.NONE); - RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, ByteBuffAllocator.NONE); + RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false); + RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false); assertFalse(cache.containsKey(key1)); assertNull(cache.putIfAbsent(key1, re1)); @@ -554,11 +572,11 @@ public class TestBucketCache { BucketAllocator allocator = new BucketAllocator(availableSpace, null); BlockCacheKey key = new BlockCacheKey("dummy", 1L); - RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, ByteBuffAllocator.NONE); + RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true); Assert.assertEquals(0, allocator.getUsedSize()); try { - re.writeToCache(ioEngine, allocator, null); + re.writeToCache(ioEngine, allocator, null, null); Assert.fail(); } catch (Exception e) { } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java index eead815f84b..fd083fd3c89 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java @@ -25,11 +25,17 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.HFileBlock; @@ -37,6 +43,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.ClassRule; @@ -65,6 +72,18 @@ public class TestBucketCacheRefCnt { queueSize, PERSISTENCE_PATH); } + private static MyBucketCache createMyBucketCache(int writerSize, int queueSize) + throws IOException { + return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, + queueSize, PERSISTENCE_PATH); + } + + private static MyBucketCache2 createMyBucketCache2(int writerSize, int queueSize) + throws IOException { + return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize, + queueSize, PERSISTENCE_PATH); + } + private static HFileBlock createBlock(int offset, int size) { return createBlock(offset, size, ByteBuffAllocator.HEAP); } @@ -133,8 +152,10 @@ public class TestBucketCacheRefCnt { } } - private void waitUntilFlushedToCache(BlockCacheKey key) throws InterruptedException { - while (!cache.backingMap.containsKey(key) || cache.ramCache.containsKey(key)) { + private static void waitUntilFlushedToCache(BucketCache bucketCache, BlockCacheKey blockCacheKey) + throws InterruptedException { + while (!bucketCache.backingMap.containsKey(blockCacheKey) + || bucketCache.ramCache.containsKey(blockCacheKey)) { Thread.sleep(100); } Thread.sleep(1000); @@ -148,7 +169,7 @@ public class TestBucketCacheRefCnt { HFileBlock blk = createBlock(200, 1020, alloc); BlockCacheKey key = createKey("testHFile-00", 200); cache.cacheBlock(key, blk); - waitUntilFlushedToCache(key); + waitUntilFlushedToCache(cache, key); assertEquals(1, blk.refCnt()); Cacheable block = cache.getBlock(key, false, false, false); @@ -180,17 +201,18 @@ public class TestBucketCacheRefCnt { assertFalse(block.release()); assertEquals(1, block.refCnt()); - newBlock = cache.getBlock(key, false, false, false); - assertEquals(2, block.refCnt()); - assertEquals(2, newBlock.refCnt()); + /** + * The key was evicted from {@link BucketCache#backingMap} and {@link BucketCache#ramCache}, + * so {@link BucketCache#getBlock} return null. + */ + Cacheable newestBlock = cache.getBlock(key, false, false, false); + assertNull(newestBlock); + assertEquals(1, block.refCnt()); assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc); // Release the block - assertFalse(block.release()); - assertEquals(1, block.refCnt()); - - // Release the newBlock; - assertTrue(newBlock.release()); + assertTrue(block.release()); + assertEquals(0, block.refCnt()); assertEquals(0, newBlock.refCnt()); } finally { cache.shutdown(); @@ -247,7 +269,7 @@ public class TestBucketCacheRefCnt { HFileBlock blk = createBlock(200, 1020); BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200); cache.cacheBlock(key, blk); - waitUntilFlushedToCache(key); + waitUntilFlushedToCache(cache, key); assertEquals(1, blk.refCnt()); assertNotNull(cache.backingMap.get(key)); assertEquals(1, cache.backingMap.get(key).refCnt()); @@ -260,7 +282,7 @@ public class TestBucketCacheRefCnt { assertEquals(2, be1.refCnt()); // We've some RPC reference, so it won't have any effect. - assertFalse(be1.markStaleAsEvicted()); + assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1)); assertEquals(2, block1.refCnt()); assertEquals(2, cache.backingMap.get(key).refCnt()); @@ -270,7 +292,7 @@ public class TestBucketCacheRefCnt { assertEquals(1, cache.backingMap.get(key).refCnt()); // Mark the stale as evicted again, it'll do the de-allocation. - assertTrue(be1.markStaleAsEvicted()); + assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1)); assertEquals(0, block1.refCnt()); assertNull(cache.backingMap.get(key)); assertEquals(0, cache.size()); @@ -278,4 +300,445 @@ public class TestBucketCacheRefCnt { cache.shutdown(); } } + + /** + *
+   * This test is for HBASE-26281,
+   * test two threads for replacing Block and getting Block execute concurrently.
+   * The threads sequence is:
+   * 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1.
+   * 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied
+   *    {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would
+   *    replace Block1, but thread1 stopping before {@link BucketCache#cacheBlockWithWaitInternal}
+   * 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
+   *    which returned Block1, the {@link RefCnt} of Block1 is 2.
+   * 4. Thread1 continues caching Block2, in {@link BucketCache.WriterThread#putIntoBackingMap},
+   *    the old Block1 is freed directly which {@link RefCnt} is 2, but the Block1 is still used
+   *    by Thread2 and the content of Block1 would be overwritten after it is freed, which may
+   *    cause a serious error.
+   * 
+ * @throws Exception + */ + @Test + public void testReplacingBlockAndGettingBlockConcurrently() throws Exception { + ByteBuffAllocator byteBuffAllocator = + ByteBuffAllocator.create(HBaseConfiguration.create(), true); + final MyBucketCache myBucketCache = createMyBucketCache(1, 1000); + try { + HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator); + final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 200); + myBucketCache.cacheBlock(blockCacheKey, hfileBlock); + waitUntilFlushedToCache(myBucketCache, blockCacheKey); + assertEquals(1, hfileBlock.refCnt()); + + assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey)); + final AtomicReference exceptionRef = new AtomicReference(); + Thread cacheBlockThread = new Thread(() -> { + try { + HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator); + myBucketCache.cacheBlock(blockCacheKey, newHFileBlock); + waitUntilFlushedToCache(myBucketCache, blockCacheKey); + + } catch (Throwable exception) { + exceptionRef.set(exception); + } + }); + cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME); + cacheBlockThread.start(); + + String oldThreadName = Thread.currentThread().getName(); + HFileBlock gotHFileBlock = null; + try { + + Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME); + + gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, false, false, false)); + assertTrue(gotHFileBlock.equals(hfileBlock)); + assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator); + assertEquals(2, gotHFileBlock.refCnt()); + /** + * Release the second cyclicBarrier.await in + * {@link MyBucketCache#cacheBlockWithWaitInternal} + */ + myBucketCache.cyclicBarrier.await(); + + } finally { + Thread.currentThread().setName(oldThreadName); + } + + cacheBlockThread.join(); + assertTrue(exceptionRef.get() == null); + assertEquals(1, gotHFileBlock.refCnt()); + assertTrue(gotHFileBlock.equals(hfileBlock)); + assertTrue(myBucketCache.overwiteByteBuff == null); + assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0); + + gotHFileBlock.release(); + assertEquals(0, gotHFileBlock.refCnt()); + assertTrue(myBucketCache.overwiteByteBuff != null); + assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1); + assertTrue(myBucketCache.replaceCounter.get() == 1); + assertTrue(myBucketCache.blockEvictCounter.get() == 1); + } finally { + myBucketCache.shutdown(); + } + + } + + /** + *
+   * This test also is for HBASE-26281,
+   * test three threads for evicting Block,caching Block and getting Block
+   * execute concurrently.
+   * 1. Thread1 caching Block1, stopping after {@link BucketCache.WriterThread#putIntoBackingMap},
+   *    the {@link RefCnt} of Block1 is 1.
+   * 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link BlockCacheKey},
+   *    but stopping after {@link BucketCache#removeFromRamCache}.
+   * 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
+   *    which returned Block1, the {@link RefCnt} of Block1 is 2.
+   * 4. Thread1 continues caching block1,but finding that {@link BucketCache.RAMCache#remove}
+   *    returning false, so invoking {@link BucketCache#blockEvicted} to free the the Block1
+   *    directly which {@link RefCnt} is 2 and the Block1 is still used by Thread3.
+   * 
+ */ + @Test + public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws Exception { + ByteBuffAllocator byteBuffAllocator = + ByteBuffAllocator.create(HBaseConfiguration.create(), true); + final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000); + try { + final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator); + final BlockCacheKey blockCacheKey = createKey("testThreeThreadConcurrent", 200); + final AtomicReference cacheBlockThreadExceptionRef = + new AtomicReference(); + Thread cacheBlockThread = new Thread(() -> { + try { + myBucketCache2.cacheBlock(blockCacheKey, hfileBlock); + /** + * Wait for Caching Block completed. + */ + myBucketCache2.writeThreadDoneCyclicBarrier.await(); + } catch (Throwable exception) { + cacheBlockThreadExceptionRef.set(exception); + } + }); + cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME); + cacheBlockThread.start(); + + final AtomicReference evictBlockThreadExceptionRef = + new AtomicReference(); + Thread evictBlockThread = new Thread(() -> { + try { + myBucketCache2.evictBlock(blockCacheKey); + } catch (Throwable exception) { + evictBlockThreadExceptionRef.set(exception); + } + }); + evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME); + evictBlockThread.start(); + + String oldThreadName = Thread.currentThread().getName(); + HFileBlock gotHFileBlock = null; + try { + Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME); + gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, false, false, false)); + assertTrue(gotHFileBlock.equals(hfileBlock)); + assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator); + assertEquals(2, gotHFileBlock.refCnt()); + try { + /** + * Release the second cyclicBarrier.await in {@link MyBucketCache2#putIntoBackingMap} for + * {@link BucketCache.WriterThread},getBlock completed,{@link BucketCache.WriterThread} + * could continue. + */ + myBucketCache2.putCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + + } finally { + Thread.currentThread().setName(oldThreadName); + } + + cacheBlockThread.join(); + evictBlockThread.join(); + assertTrue(cacheBlockThreadExceptionRef.get() == null); + assertTrue(evictBlockThreadExceptionRef.get() == null); + + assertTrue(gotHFileBlock.equals(hfileBlock)); + assertEquals(1, gotHFileBlock.refCnt()); + assertTrue(myBucketCache2.overwiteByteBuff == null); + assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0); + + gotHFileBlock.release(); + assertEquals(0, gotHFileBlock.refCnt()); + assertTrue(myBucketCache2.overwiteByteBuff != null); + assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1); + assertTrue(myBucketCache2.blockEvictCounter.get() == 1); + } finally { + myBucketCache2.shutdown(); + } + + } + + static class MyBucketCache extends BucketCache { + private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread"; + private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread"; + + private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); + private final AtomicInteger replaceCounter = new AtomicInteger(0); + private final AtomicInteger blockEvictCounter = new AtomicInteger(0); + private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0); + private ByteBuff overwiteByteBuff = null; + + public MyBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, + int writerThreadNum, int writerQLen, String persistencePath) throws IOException { + super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, + persistencePath); + } + + /** + * Simulate the Block could be replaced. + */ + @Override + protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) { + replaceCounter.incrementAndGet(); + return true; + } + + @Override + public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, + boolean updateCacheMetrics) { + if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) { + /** + * Wait the first cyclicBarrier.await() in {@link MyBucketCache#cacheBlockWithWaitInternal}, + * so the {@link BucketCache#getBlock} is executed after the {@link BucketEntry#isRpcRef} + * checking. + */ + try { + cyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics); + return result; + } + + @Override + protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, + boolean inMemory, boolean wait) { + if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) { + /** + * Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock} + */ + try { + cyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) { + /** + * Wait the cyclicBarrier.await() in + * {@link TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for + * {@link MyBucketCache#getBlock} and Assert completed. + */ + try { + cyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); + } + + @Override + void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, + boolean decrementBlockNumber) { + blockEvictCounter.incrementAndGet(); + super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber); + } + + /** + * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the + * {@link BucketEntry} is freed. + */ + @Override + void freeBucketEntry(BucketEntry bucketEntry) { + freeBucketEntryCounter.incrementAndGet(); + super.freeBucketEntry(bucketEntry); + this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry); + try { + this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + static class MyBucketCache2 extends BucketCache { + private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread"; + private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread"; + private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread"; + + private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2); + private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2); + private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2); + /** + * This is used for {@link BucketCache.WriterThread},{@link #CACHE_BLOCK_THREAD_NAME} and + * {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed. + */ + private final CyclicBarrier writeThreadDoneCyclicBarrier = new CyclicBarrier(3); + private final AtomicInteger blockEvictCounter = new AtomicInteger(0); + private final AtomicInteger removeRamCounter = new AtomicInteger(0); + private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0); + private ByteBuff overwiteByteBuff = null; + + public MyBucketCache2(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, + int writerThreadNum, int writerQLen, String persistencePath) throws IOException { + super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, + persistencePath); + } + + @Override + protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { + super.putIntoBackingMap(key, bucketEntry); + /** + * The {@link BucketCache.WriterThread} wait for evictCyclicBarrier.await before + * {@link MyBucketCache2#removeFromRamCache} for {@link #EVICT_BLOCK_THREAD_NAME} + */ + try { + evictCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + + /** + * Wait the cyclicBarrier.await() in + * {@link TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for + * {@link MyBucketCache#getBlock} and Assert completed. + */ + try { + putCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + @Override + void doDrain(List entries) throws InterruptedException { + super.doDrain(entries); + if (entries.size() > 0) { + /** + * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and + * {@link #EVICT_BLOCK_THREAD_NAME}. + */ + try { + writeThreadDoneCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + } + + @Override + public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, + boolean updateCacheMetrics) { + if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) { + /** + * Wait for second getCyclicBarrier.await in {@link MyBucketCache2#removeFromRamCache} after + * {@link BucketCache#removeFromRamCache}. + */ + try { + getCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics); + return result; + } + + @Override + protected boolean removeFromRamCache(BlockCacheKey cacheKey) { + boolean firstTime = false; + if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) { + int count = this.removeRamCounter.incrementAndGet(); + firstTime = (count == 1); + if (firstTime) { + /** + * The {@link #EVICT_BLOCK_THREAD_NAME} wait for evictCyclicBarrier.await after + * {@link BucketCache#putIntoBackingMap}. + */ + try { + evictCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + } + boolean result = super.removeFromRamCache(cacheKey); + if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) { + if (firstTime) { + /** + * Wait for getCyclicBarrier.await before {@link BucketCache#getBlock}. + */ + try { + getCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + /** + * Wait for Caching Block completed, after Caching Block completed, evictBlock could + * continue. + */ + try { + writeThreadDoneCyclicBarrier.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + } + + return result; + } + + @Override + void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, + boolean decrementBlockNumber) { + /** + * This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create + * only one {@link BucketCache.WriterThread}. + */ + assertTrue(Thread.currentThread() == this.writerThreads[0]); + + blockEvictCounter.incrementAndGet(); + super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber); + } + + /** + * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the + * {@link BucketEntry} is freed. + */ + @Override + void freeBucketEntry(BucketEntry bucketEntry) { + freeBucketEntryCounter.incrementAndGet(); + super.freeBucketEntry(bucketEntry); + this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry); + try { + this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) { + int byteSize = bucketEntry.getLength(); + byte[] data = new byte[byteSize]; + Arrays.fill(data, (byte) 0xff); + return ByteBuff.wrap(ByteBuffer.wrap(data)); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java index b631cf96eac..0ba7dea4808 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java @@ -73,7 +73,6 @@ public class TestBucketWriterThread { /** * Set up variables and get BucketCache and WriterThread into state where tests can manually * control the running of WriterThread and BucketCache is empty. - * @throws Exception */ @Before public void setUp() throws Exception { @@ -141,7 +140,7 @@ public class TestBucketWriterThread { RAMQueueEntry rqe = q.remove(); RAMQueueEntry spiedRqe = Mockito.spy(rqe); Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe). - writeToCache(Mockito.any(), Mockito.any(), Mockito.any()); + writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); this.q.add(spiedRqe); doDrainOfOneEntry(bc, wt, q); // Cache disabled when ioes w/o ever healing. @@ -163,7 +162,7 @@ public class TestBucketWriterThread { BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class); Mockito.doThrow(cfe). doReturn(mockedBucketEntry). - when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any()); + when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); this.q.add(spiedRqe); doDrainOfOneEntry(bc, wt, q); } @@ -172,7 +171,7 @@ public class TestBucketWriterThread { final BlockingQueue q) throws InterruptedException { List rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1)); - wt.doDrain(rqes); + bc.doDrain(rqes); assertTrue(q.isEmpty()); assertTrue(bc.ramCache.isEmpty()); assertEquals(0, bc.heapSize()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java index d1b8f9a87e8..97a5283e470 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.nio.ByteBuff; -import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Assert; @@ -50,7 +49,7 @@ public class TestByteBufferIOEngine { private long off; MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) { - super(offset & 0xFF00, length, 0, false, RefCnt.create(), allocator); + super(offset & 0xFF00, length, 0, false, null, allocator); this.off = offset; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java index eb9dca91311..4b0801f858b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java @@ -91,7 +91,7 @@ public class TestRAMCache { MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1, ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1, new HFileContextBuilder().build(), ByteBuffAllocator.HEAP); - RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, ByteBuffAllocator.NONE); + RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false); Assert.assertNull(cache.putIfAbsent(key, re)); Assert.assertEquals(cache.putIfAbsent(key, re), re);