diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index c24494fb6bf..45f46a31a64 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -49,6 +49,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
+import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -419,7 +420,7 @@ public class BucketCache implements BlockCache, HeapSize {
boolean wait) {
if (cacheEnabled) {
if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
- if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) {
+ if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) {
BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry != null && bucketEntry.isRpcRef()) {
// avoid replace when there are RPC refs for the bucket entry in bucket cache
@@ -433,7 +434,11 @@ public class BucketCache implements BlockCache, HeapSize {
}
}
- private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
+ protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
+ return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock);
+ }
+
+ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
boolean inMemory, boolean wait) {
if (!cacheEnabled) {
return;
@@ -441,8 +446,7 @@ public class BucketCache implements BlockCache, HeapSize {
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
// Stuff the entry into the RAM cache so it can get drained to the persistent store
RAMQueueEntry re =
- new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory,
- createRecycler(cacheKey));
+ new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
/**
* Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
* key in ramCache, the heap size of bucket cache need to update if replacing entry from
@@ -540,13 +544,25 @@ public class BucketCache implements BlockCache, HeapSize {
return null;
}
+ /**
+ * This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap}
+ */
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
- bucketAllocator.freeBlock(bucketEntry.offset());
- realCacheSize.add(-1 * bucketEntry.getLength());
+ bucketEntry.markAsEvicted();
blocksByHFile.remove(cacheKey);
if (decrementBlockNumber) {
this.blockNumber.decrement();
}
+ cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
+ }
+
+ /**
+ * Free the {{@link BucketEntry} actually,which could only be invoked when the
+ * {@link BucketEntry#refCnt} becoming 0.
+ */
+ void freeBucketEntry(BucketEntry bucketEntry) {
+ bucketAllocator.freeBlock(bucketEntry.offset());
+ realCacheSize.add(-1 * bucketEntry.getLength());
}
/**
@@ -554,10 +570,10 @@ public class BucketCache implements BlockCache, HeapSize {
* 1. Close an HFile, and clear all cached blocks.
* 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.
*
- * Firstly, we'll try to remove the block from RAMCache. If it doesn't exist in RAMCache, then try
- * to evict from backingMap. Here we only need to free the reference from bucket cache by calling
- * {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this block, block can
- * only be de-allocated when all of them release the block.
+ * Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap.
+ * Here we evict the block from backingMap immediately, but only free the reference from bucket
+ * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this
+ * block, block can only be de-allocated when all of them release the block.
*
* NOTICE: we need to grab the write offset lock firstly before releasing the reference from
* bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when
@@ -567,43 +583,92 @@ public class BucketCache implements BlockCache, HeapSize {
*/
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
+ return doEvictBlock(cacheKey, null);
+ }
+
+ /**
+ * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and
+ * {@link BucketCache#ramCache}.
+ * NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
+ * {@link BucketEntry} could be removed.
+ * @param cacheKey {@link BlockCacheKey} to evict.
+ * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict.
+ * @return true to indicate whether we've evicted successfully or not.
+ */
+ private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry) {
if (!cacheEnabled) {
return false;
}
- boolean existed = removeFromRamCache(cacheKey);
- BucketEntry be = backingMap.get(cacheKey);
- if (be == null) {
- if (existed) {
+ boolean existedInRamCache = removeFromRamCache(cacheKey);
+ if (bucketEntry == null) {
+ bucketEntry = backingMap.get(cacheKey);
+ }
+ final BucketEntry bucketEntryToUse = bucketEntry;
+
+ if (bucketEntryToUse == null) {
+ if (existedInRamCache) {
cacheStats.evicted(0, cacheKey.isPrimary());
}
- return existed;
+ return existedInRamCache;
} else {
- return be.withWriteLock(offsetLock, be::markAsEvicted);
+ return bucketEntryToUse.withWriteLock(offsetLock, () -> {
+ if (backingMap.remove(cacheKey, bucketEntryToUse)) {
+ blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache);
+ return true;
+ }
+ return false;
+ });
}
}
- private Recycler createRecycler(BlockCacheKey cacheKey) {
+ /**
+ *
+ * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as
+ * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}.
+ * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf}
+ * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different:
+ * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap},
+ * it is the return value of current {@link BucketCache#createRecycler} method.
+ *
+ * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache},
+ * it is {@link ByteBuffAllocator#putbackBuffer}.
+ *
+ */
+ protected Recycler createRecycler(final BucketEntry bucketEntry) {
return () -> {
- if (!cacheEnabled) {
- return;
- }
- boolean existed = removeFromRamCache(cacheKey);
- BucketEntry be = backingMap.get(cacheKey);
- if (be == null && existed) {
- cacheStats.evicted(0, cacheKey.isPrimary());
- } else if (be != null) {
- be.withWriteLock(offsetLock, () -> {
- if (backingMap.remove(cacheKey, be)) {
- blockEvicted(cacheKey, be, !existed);
- cacheStats.evicted(be.getCachedTime(), cacheKey.isPrimary());
- }
- return null;
- });
- }
+ freeBucketEntry(bucketEntry);
+ return;
};
}
- private boolean removeFromRamCache(BlockCacheKey cacheKey) {
+ /**
+ * NOTE: This method is only for test.
+ */
+ public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) {
+ BucketEntry bucketEntry = backingMap.get(blockCacheKey);
+ if (bucketEntry == null) {
+ return false;
+ }
+ return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry);
+ }
+
+ /**
+ * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if
+ * {@link BucketEntry#isRpcRef} is false.
+ * NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
+ * {@link BucketEntry} could be removed.
+ * @param blockCacheKey {@link BlockCacheKey} to evict.
+ * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict.
+ * @return true to indicate whether we've evicted successfully or not.
+ */
+ boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) {
+ if (!bucketEntry.isRpcRef()) {
+ return doEvictBlock(blockCacheKey, bucketEntry);
+ }
+ return false;
+ }
+
+ protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
return ramCache.remove(cacheKey, re -> {
if (re != null) {
this.blockNumber.decrement();
@@ -707,7 +772,7 @@ public class BucketCache implements BlockCache, HeapSize {
bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded);
for (Map.Entry entry : backingMap.entrySet()) {
if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) {
- entry.getValue().withWriteLock(offsetLock, entry.getValue()::markStaleAsEvicted);
+ evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue());
}
}
}
@@ -896,134 +961,134 @@ public class BucketCache implements BlockCache, HeapSize {
}
LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
}
+ }
- /**
- * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
- * cache with a new block for the same cache key. there's a corner case: one thread cache a
- * block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another
- * new block with the same cache key do the same thing for the same cache key, so if not evict
- * the previous bucket entry, then memory leak happen because the previous bucketEntry is gone
- * but the bucketAllocator do not free its memory.
- * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
- * cacheKey, Cacheable newBlock)
- * @param key Block cache key
- * @param bucketEntry Bucket entry to put into backingMap.
- */
- private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
- BucketEntry previousEntry = backingMap.put(key, bucketEntry);
- if (previousEntry != null && previousEntry != bucketEntry) {
- previousEntry.withWriteLock(offsetLock, () -> {
- blockEvicted(key, previousEntry, false);
+ /**
+ * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
+ * cache with a new block for the same cache key. there's a corner case: one thread cache a block
+ * in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block
+ * with the same cache key do the same thing for the same cache key, so if not evict the previous
+ * bucket entry, then memory leak happen because the previous bucketEntry is gone but the
+ * bucketAllocator do not free its memory.
+ * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
+ * cacheKey, Cacheable newBlock)
+ * @param key Block cache key
+ * @param bucketEntry Bucket entry to put into backingMap.
+ */
+ protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
+ BucketEntry previousEntry = backingMap.put(key, bucketEntry);
+ if (previousEntry != null && previousEntry != bucketEntry) {
+ previousEntry.withWriteLock(offsetLock, () -> {
+ blockEvicted(key, previousEntry, false);
+ return null;
+ });
+ }
+ }
+
+ /**
+ * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that
+ * are passed in even if failure being sure to remove from ramCache else we'll never undo the
+ * references and we'll OOME.
+ * @param entries Presumes list passed in here will be processed by this invocation only. No
+ * interference expected.
+ */
+ void doDrain(final List entries) throws InterruptedException {
+ if (entries.isEmpty()) {
+ return;
+ }
+ // This method is a little hard to follow. We run through the passed in entries and for each
+ // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
+ // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
+ // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
+ // filling ramCache. We do the clean up by again running through the passed in entries
+ // doing extra work when we find a non-null bucketEntries corresponding entry.
+ final int size = entries.size();
+ BucketEntry[] bucketEntries = new BucketEntry[size];
+ // Index updated inside loop if success or if we can't succeed. We retry if cache is full
+ // when we go to add an entry by going around the loop again without upping the index.
+ int index = 0;
+ while (cacheEnabled && index < size) {
+ RAMQueueEntry re = null;
+ try {
+ re = entries.get(index);
+ if (re == null) {
+ LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
+ index++;
+ continue;
+ }
+ BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
+ (entry) -> createRecycler(entry));
+ // Successfully added. Up index and add bucketEntry. Clear io exceptions.
+ bucketEntries[index] = bucketEntry;
+ if (ioErrorStartTime > 0) {
+ ioErrorStartTime = -1;
+ }
+ index++;
+ } catch (BucketAllocatorException fle) {
+ LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
+ // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
+ bucketEntries[index] = null;
+ index++;
+ } catch (CacheFullException cfe) {
+ // Cache full when we tried to add. Try freeing space and then retrying (don't up index)
+ if (!freeInProgress) {
+ freeSpace("Full!");
+ } else {
+ Thread.sleep(50);
+ }
+ } catch (IOException ioex) {
+ // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
+ LOG.error("Failed writing to bucket cache", ioex);
+ checkIOErrorIsTolerated();
+ }
+ }
+
+ // Make sure data pages are written on media before we update maps.
+ try {
+ ioEngine.sync();
+ } catch (IOException ioex) {
+ LOG.error("Failed syncing IO engine", ioex);
+ checkIOErrorIsTolerated();
+ // Since we failed sync, free the blocks in bucket allocator
+ for (int i = 0; i < entries.size(); ++i) {
+ if (bucketEntries[i] != null) {
+ bucketAllocator.freeBlock(bucketEntries[i].offset());
+ bucketEntries[i] = null;
+ }
+ }
+ }
+
+ // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
+ // success or error.
+ for (int i = 0; i < size; ++i) {
+ BlockCacheKey key = entries.get(i).getKey();
+ // Only add if non-null entry.
+ if (bucketEntries[i] != null) {
+ putIntoBackingMap(key, bucketEntries[i]);
+ }
+ // Always remove from ramCache even if we failed adding it to the block cache above.
+ boolean existed = ramCache.remove(key, re -> {
+ if (re != null) {
+ heapSize.add(-1 * re.getData().heapSize());
+ }
+ });
+ if (!existed && bucketEntries[i] != null) {
+ // Block should have already been evicted. Remove it and free space.
+ final BucketEntry bucketEntry = bucketEntries[i];
+ bucketEntry.withWriteLock(offsetLock, () -> {
+ if (backingMap.remove(key, bucketEntry)) {
+ blockEvicted(key, bucketEntry, false);
+ }
return null;
});
}
}
- /**
- * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
- * Process all that are passed in even if failure being sure to remove from ramCache else we'll
- * never undo the references and we'll OOME.
- * @param entries Presumes list passed in here will be processed by this invocation only. No
- * interference expected.
- * @throws InterruptedException
- */
- void doDrain(final List entries) throws InterruptedException {
- if (entries.isEmpty()) {
- return;
- }
- // This method is a little hard to follow. We run through the passed in entries and for each
- // successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
- // do cleanup making sure we've cleared ramCache of all entries regardless of whether we
- // successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
- // filling ramCache. We do the clean up by again running through the passed in entries
- // doing extra work when we find a non-null bucketEntries corresponding entry.
- final int size = entries.size();
- BucketEntry[] bucketEntries = new BucketEntry[size];
- // Index updated inside loop if success or if we can't succeed. We retry if cache is full
- // when we go to add an entry by going around the loop again without upping the index.
- int index = 0;
- while (cacheEnabled && index < size) {
- RAMQueueEntry re = null;
- try {
- re = entries.get(index);
- if (re == null) {
- LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
- index++;
- continue;
- }
- BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize);
- // Successfully added. Up index and add bucketEntry. Clear io exceptions.
- bucketEntries[index] = bucketEntry;
- if (ioErrorStartTime > 0) {
- ioErrorStartTime = -1;
- }
- index++;
- } catch (BucketAllocatorException fle) {
- LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
- // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
- bucketEntries[index] = null;
- index++;
- } catch (CacheFullException cfe) {
- // Cache full when we tried to add. Try freeing space and then retrying (don't up index)
- if (!freeInProgress) {
- freeSpace("Full!");
- } else {
- Thread.sleep(50);
- }
- } catch (IOException ioex) {
- // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
- LOG.error("Failed writing to bucket cache", ioex);
- checkIOErrorIsTolerated();
- }
- }
-
- // Make sure data pages are written on media before we update maps.
- try {
- ioEngine.sync();
- } catch (IOException ioex) {
- LOG.error("Failed syncing IO engine", ioex);
- checkIOErrorIsTolerated();
- // Since we failed sync, free the blocks in bucket allocator
- for (int i = 0; i < entries.size(); ++i) {
- if (bucketEntries[i] != null) {
- bucketAllocator.freeBlock(bucketEntries[i].offset());
- bucketEntries[i] = null;
- }
- }
- }
-
- // Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
- // success or error.
- for (int i = 0; i < size; ++i) {
- BlockCacheKey key = entries.get(i).getKey();
- // Only add if non-null entry.
- if (bucketEntries[i] != null) {
- putIntoBackingMap(key, bucketEntries[i]);
- }
- // Always remove from ramCache even if we failed adding it to the block cache above.
- boolean existed = ramCache.remove(key, re -> {
- if (re != null) {
- heapSize.add(-1 * re.getData().heapSize());
- }
- });
- if (!existed && bucketEntries[i] != null) {
- // Block should have already been evicted. Remove it and free space.
- final BucketEntry bucketEntry = bucketEntries[i];
- bucketEntry.withWriteLock(offsetLock, () -> {
- if (backingMap.remove(key, bucketEntry)) {
- blockEvicted(key, bucketEntry, false);
- }
- return null;
- });
- }
- }
-
- long used = bucketAllocator.getUsedSize();
- if (used > acceptableSize()) {
- freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
- }
- return;
+ long used = bucketAllocator.getUsedSize();
+ if (used > acceptableSize()) {
+ freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
}
+ return;
}
/**
@@ -1313,8 +1378,9 @@ public class BucketCache implements BlockCache, HeapSize {
// TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
// What to do then? Caching attempt fail? Need some changes in cacheBlock API?
while ((entry = queue.pollLast()) != null) {
+ BlockCacheKey blockCacheKey = entry.getKey();
BucketEntry be = entry.getValue();
- if (be.withWriteLock(offsetLock, be::markStaleAsEvicted)) {
+ if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) {
freedBytes += be.getLength();
}
if (freedBytes >= toFree) {
@@ -1341,15 +1407,12 @@ public class BucketCache implements BlockCache, HeapSize {
private final Cacheable data;
private long accessCounter;
private boolean inMemory;
- private final Recycler recycler;
- RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory,
- Recycler recycler) {
+ RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) {
this.key = bck;
this.data = data;
this.accessCounter = accessCounter;
this.inMemory = inMemory;
- this.recycler = recycler;
}
public Cacheable getData() {
@@ -1372,7 +1435,8 @@ public class BucketCache implements BlockCache, HeapSize {
}
public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
- final LongAdder realCacheSize) throws IOException {
+ final LongAdder realCacheSize, Function createRecycler)
+ throws IOException {
int len = data.getSerializedLength();
// This cacheable thing can't be serialized
if (len == 0) {
@@ -1382,7 +1446,7 @@ public class BucketCache implements BlockCache, HeapSize {
boolean succ = false;
BucketEntry bucketEntry = null;
try {
- bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, RefCnt.create(recycler),
+ bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, createRecycler,
getByteBuffAllocator());
bucketEntry.setDeserializerReference(data.getDeserializer());
if (data instanceof HFileBlock) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
index 2dd77756e58..ca79f690b65 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
@@ -25,8 +25,10 @@ import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.io.hfile.BlockPriority;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
@@ -88,16 +90,21 @@ class BucketEntry implements HBaseReferenceCounted {
private final long cachedTime = System.nanoTime();
BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
- this(offset, length, accessCounter, inMemory, RefCnt.create(), ByteBuffAllocator.HEAP);
+ this(offset, length, accessCounter, inMemory, null, ByteBuffAllocator.HEAP);
}
- BucketEntry(long offset, int length, long accessCounter, boolean inMemory, RefCnt refCnt,
+ BucketEntry(long offset, int length, long accessCounter, boolean inMemory,
+ Function createRecycler,
ByteBuffAllocator allocator) {
setOffset(offset);
this.length = length;
this.accessCounter = accessCounter;
this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
- this.refCnt = refCnt;
+ if (createRecycler == null) {
+ this.refCnt = RefCnt.create();
+ } else {
+ this.refCnt = RefCnt.create(createRecycler.apply(this));
+ }
this.markedAsEvicted = new AtomicBoolean(false);
this.allocator = allocator;
}
@@ -165,19 +172,6 @@ class BucketEntry implements HBaseReferenceCounted {
return false;
}
- /**
- * Mark as evicted only when NO RPC references. Mainly used for eviction when cache size exceed
- * the max acceptable size.
- * @return true if we deallocate this entry successfully.
- */
- boolean markStaleAsEvicted() {
- if (!markedAsEvicted.get() && this.refCnt() == 1) {
- // The only reference was coming from backingMap, now release the stale entry.
- return this.markAsEvicted();
- }
- return false;
- }
-
/**
* Check whether have some RPC patch referring this block. There're two case:
* 1. If current refCnt is greater than 1, there must be at least one referring RPC path;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
index a478fcb2076..a5756f93bef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
@@ -43,6 +44,8 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
+import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
@@ -372,7 +375,11 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
cacheList.add(cacheKey);
- cache.evictBlock(cacheKey);
+ /**
+ * There is only one Block referenced by rpc,here we evict blocks which have no rpc
+ * referenced.
+ */
+ evictBlock(cache, cacheKey);
}
try {
Thread.sleep(1);
@@ -437,4 +444,20 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
table.close();
}
}
+
+ /**
+ * For {@link BucketCache},we only evict Block if there is no rpc referenced.
+ */
+ private void evictBlock(BlockCache blockCache, BlockCacheKey blockCacheKey) {
+ assertTrue(blockCache instanceof CombinedBlockCache);
+ BlockCache[] blockCaches = blockCache.getBlockCaches();
+ for (BlockCache currentBlockCache : blockCaches) {
+ if (currentBlockCache instanceof BucketCache) {
+ ((BucketCache) currentBlockCache).evictBlockIfNoRpcReferenced(blockCacheKey);
+ } else {
+ currentBlockCache.evictBlock(blockCacheKey);
+ }
+ }
+
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 5f2d1360c70..1b10fbd5226 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -249,8 +249,26 @@ public class TestBucketCache {
assertEquals(1, cache.getBlockCount());
lock.writeLock().unlock();
evictThread.join();
- assertEquals(0, cache.getBlockCount());
- assertEquals(cache.getCurrentSize(), 0L);
+ /**
+ *
+ * The asserts here before HBASE-21957 are:
+ * assertEquals(1L, cache.getBlockCount());
+ * assertTrue(cache.getCurrentSize() > 0L);
+ * assertTrue("We should have a block!", cache.iterator().hasNext());
+ *
+ * The asserts here after HBASE-21957 are:
+ * assertEquals(0, cache.getBlockCount());
+ * assertEquals(cache.getCurrentSize(), 0L);
+ *
+ * I think the asserts before HBASE-21957 is more reasonable,because
+ * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
+ * it had seen, and newly added Block after the {@link BucketEntry}
+ * it had seen should not be evicted.
+ *
+ */
+ assertEquals(1L, cache.getBlockCount());
+ assertTrue(cache.getCurrentSize() > 0L);
+ assertTrue("We should have a block!", cache.iterator().hasNext());
}
@Test
@@ -503,8 +521,8 @@ public class TestBucketCache {
HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
- RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, ByteBuffAllocator.NONE);
- RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, ByteBuffAllocator.NONE);
+ RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false);
+ RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false);
assertFalse(cache.containsKey(key1));
assertNull(cache.putIfAbsent(key1, re1));
@@ -551,11 +569,11 @@ public class TestBucketCache {
BucketAllocator allocator = new BucketAllocator(availableSpace, null);
BlockCacheKey key = new BlockCacheKey("dummy", 1L);
- RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, ByteBuffAllocator.NONE);
+ RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true);
Assert.assertEquals(0, allocator.getUsedSize());
try {
- re.writeToCache(ioEngine, allocator, null);
+ re.writeToCache(ioEngine, allocator, null, null);
Assert.fail();
} catch (Exception e) {
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
index eead815f84b..fd083fd3c89 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
@@ -25,11 +25,17 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
@@ -37,6 +43,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
@@ -65,6 +72,18 @@ public class TestBucketCacheRefCnt {
queueSize, PERSISTENCE_PATH);
}
+ private static MyBucketCache createMyBucketCache(int writerSize, int queueSize)
+ throws IOException {
+ return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
+ queueSize, PERSISTENCE_PATH);
+ }
+
+ private static MyBucketCache2 createMyBucketCache2(int writerSize, int queueSize)
+ throws IOException {
+ return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
+ queueSize, PERSISTENCE_PATH);
+ }
+
private static HFileBlock createBlock(int offset, int size) {
return createBlock(offset, size, ByteBuffAllocator.HEAP);
}
@@ -133,8 +152,10 @@ public class TestBucketCacheRefCnt {
}
}
- private void waitUntilFlushedToCache(BlockCacheKey key) throws InterruptedException {
- while (!cache.backingMap.containsKey(key) || cache.ramCache.containsKey(key)) {
+ private static void waitUntilFlushedToCache(BucketCache bucketCache, BlockCacheKey blockCacheKey)
+ throws InterruptedException {
+ while (!bucketCache.backingMap.containsKey(blockCacheKey)
+ || bucketCache.ramCache.containsKey(blockCacheKey)) {
Thread.sleep(100);
}
Thread.sleep(1000);
@@ -148,7 +169,7 @@ public class TestBucketCacheRefCnt {
HFileBlock blk = createBlock(200, 1020, alloc);
BlockCacheKey key = createKey("testHFile-00", 200);
cache.cacheBlock(key, blk);
- waitUntilFlushedToCache(key);
+ waitUntilFlushedToCache(cache, key);
assertEquals(1, blk.refCnt());
Cacheable block = cache.getBlock(key, false, false, false);
@@ -180,17 +201,18 @@ public class TestBucketCacheRefCnt {
assertFalse(block.release());
assertEquals(1, block.refCnt());
- newBlock = cache.getBlock(key, false, false, false);
- assertEquals(2, block.refCnt());
- assertEquals(2, newBlock.refCnt());
+ /**
+ * The key was evicted from {@link BucketCache#backingMap} and {@link BucketCache#ramCache},
+ * so {@link BucketCache#getBlock} return null.
+ */
+ Cacheable newestBlock = cache.getBlock(key, false, false, false);
+ assertNull(newestBlock);
+ assertEquals(1, block.refCnt());
assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
// Release the block
- assertFalse(block.release());
- assertEquals(1, block.refCnt());
-
- // Release the newBlock;
- assertTrue(newBlock.release());
+ assertTrue(block.release());
+ assertEquals(0, block.refCnt());
assertEquals(0, newBlock.refCnt());
} finally {
cache.shutdown();
@@ -247,7 +269,7 @@ public class TestBucketCacheRefCnt {
HFileBlock blk = createBlock(200, 1020);
BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200);
cache.cacheBlock(key, blk);
- waitUntilFlushedToCache(key);
+ waitUntilFlushedToCache(cache, key);
assertEquals(1, blk.refCnt());
assertNotNull(cache.backingMap.get(key));
assertEquals(1, cache.backingMap.get(key).refCnt());
@@ -260,7 +282,7 @@ public class TestBucketCacheRefCnt {
assertEquals(2, be1.refCnt());
// We've some RPC reference, so it won't have any effect.
- assertFalse(be1.markStaleAsEvicted());
+ assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
assertEquals(2, block1.refCnt());
assertEquals(2, cache.backingMap.get(key).refCnt());
@@ -270,7 +292,7 @@ public class TestBucketCacheRefCnt {
assertEquals(1, cache.backingMap.get(key).refCnt());
// Mark the stale as evicted again, it'll do the de-allocation.
- assertTrue(be1.markStaleAsEvicted());
+ assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
assertEquals(0, block1.refCnt());
assertNull(cache.backingMap.get(key));
assertEquals(0, cache.size());
@@ -278,4 +300,445 @@ public class TestBucketCacheRefCnt {
cache.shutdown();
}
}
+
+ /**
+ *
+ * This test is for HBASE-26281,
+ * test two threads for replacing Block and getting Block execute concurrently.
+ * The threads sequence is:
+ * 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1.
+ * 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied
+ * {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would
+ * replace Block1, but thread1 stopping before {@link BucketCache#cacheBlockWithWaitInternal}
+ * 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
+ * which returned Block1, the {@link RefCnt} of Block1 is 2.
+ * 4. Thread1 continues caching Block2, in {@link BucketCache.WriterThread#putIntoBackingMap},
+ * the old Block1 is freed directly which {@link RefCnt} is 2, but the Block1 is still used
+ * by Thread2 and the content of Block1 would be overwritten after it is freed, which may
+ * cause a serious error.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testReplacingBlockAndGettingBlockConcurrently() throws Exception {
+ ByteBuffAllocator byteBuffAllocator =
+ ByteBuffAllocator.create(HBaseConfiguration.create(), true);
+ final MyBucketCache myBucketCache = createMyBucketCache(1, 1000);
+ try {
+ HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
+ final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 200);
+ myBucketCache.cacheBlock(blockCacheKey, hfileBlock);
+ waitUntilFlushedToCache(myBucketCache, blockCacheKey);
+ assertEquals(1, hfileBlock.refCnt());
+
+ assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey));
+ final AtomicReference exceptionRef = new AtomicReference();
+ Thread cacheBlockThread = new Thread(() -> {
+ try {
+ HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator);
+ myBucketCache.cacheBlock(blockCacheKey, newHFileBlock);
+ waitUntilFlushedToCache(myBucketCache, blockCacheKey);
+
+ } catch (Throwable exception) {
+ exceptionRef.set(exception);
+ }
+ });
+ cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME);
+ cacheBlockThread.start();
+
+ String oldThreadName = Thread.currentThread().getName();
+ HFileBlock gotHFileBlock = null;
+ try {
+
+ Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME);
+
+ gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, false, false, false));
+ assertTrue(gotHFileBlock.equals(hfileBlock));
+ assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
+ assertEquals(2, gotHFileBlock.refCnt());
+ /**
+ * Release the second cyclicBarrier.await in
+ * {@link MyBucketCache#cacheBlockWithWaitInternal}
+ */
+ myBucketCache.cyclicBarrier.await();
+
+ } finally {
+ Thread.currentThread().setName(oldThreadName);
+ }
+
+ cacheBlockThread.join();
+ assertTrue(exceptionRef.get() == null);
+ assertEquals(1, gotHFileBlock.refCnt());
+ assertTrue(gotHFileBlock.equals(hfileBlock));
+ assertTrue(myBucketCache.overwiteByteBuff == null);
+ assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0);
+
+ gotHFileBlock.release();
+ assertEquals(0, gotHFileBlock.refCnt());
+ assertTrue(myBucketCache.overwiteByteBuff != null);
+ assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1);
+ assertTrue(myBucketCache.replaceCounter.get() == 1);
+ assertTrue(myBucketCache.blockEvictCounter.get() == 1);
+ } finally {
+ myBucketCache.shutdown();
+ }
+
+ }
+
+ /**
+ *
+ * This test also is for HBASE-26281,
+ * test three threads for evicting Block,caching Block and getting Block
+ * execute concurrently.
+ * 1. Thread1 caching Block1, stopping after {@link BucketCache.WriterThread#putIntoBackingMap},
+ * the {@link RefCnt} of Block1 is 1.
+ * 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link BlockCacheKey},
+ * but stopping after {@link BucketCache#removeFromRamCache}.
+ * 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
+ * which returned Block1, the {@link RefCnt} of Block1 is 2.
+ * 4. Thread1 continues caching block1,but finding that {@link BucketCache.RAMCache#remove}
+ * returning false, so invoking {@link BucketCache#blockEvicted} to free the the Block1
+ * directly which {@link RefCnt} is 2 and the Block1 is still used by Thread3.
+ *
+ */
+ @Test
+ public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws Exception {
+ ByteBuffAllocator byteBuffAllocator =
+ ByteBuffAllocator.create(HBaseConfiguration.create(), true);
+ final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000);
+ try {
+ final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
+ final BlockCacheKey blockCacheKey = createKey("testThreeThreadConcurrent", 200);
+ final AtomicReference cacheBlockThreadExceptionRef =
+ new AtomicReference();
+ Thread cacheBlockThread = new Thread(() -> {
+ try {
+ myBucketCache2.cacheBlock(blockCacheKey, hfileBlock);
+ /**
+ * Wait for Caching Block completed.
+ */
+ myBucketCache2.writeThreadDoneCyclicBarrier.await();
+ } catch (Throwable exception) {
+ cacheBlockThreadExceptionRef.set(exception);
+ }
+ });
+ cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME);
+ cacheBlockThread.start();
+
+ final AtomicReference evictBlockThreadExceptionRef =
+ new AtomicReference();
+ Thread evictBlockThread = new Thread(() -> {
+ try {
+ myBucketCache2.evictBlock(blockCacheKey);
+ } catch (Throwable exception) {
+ evictBlockThreadExceptionRef.set(exception);
+ }
+ });
+ evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME);
+ evictBlockThread.start();
+
+ String oldThreadName = Thread.currentThread().getName();
+ HFileBlock gotHFileBlock = null;
+ try {
+ Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME);
+ gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, false, false, false));
+ assertTrue(gotHFileBlock.equals(hfileBlock));
+ assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
+ assertEquals(2, gotHFileBlock.refCnt());
+ try {
+ /**
+ * Release the second cyclicBarrier.await in {@link MyBucketCache2#putIntoBackingMap} for
+ * {@link BucketCache.WriterThread},getBlock completed,{@link BucketCache.WriterThread}
+ * could continue.
+ */
+ myBucketCache2.putCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+
+ } finally {
+ Thread.currentThread().setName(oldThreadName);
+ }
+
+ cacheBlockThread.join();
+ evictBlockThread.join();
+ assertTrue(cacheBlockThreadExceptionRef.get() == null);
+ assertTrue(evictBlockThreadExceptionRef.get() == null);
+
+ assertTrue(gotHFileBlock.equals(hfileBlock));
+ assertEquals(1, gotHFileBlock.refCnt());
+ assertTrue(myBucketCache2.overwiteByteBuff == null);
+ assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0);
+
+ gotHFileBlock.release();
+ assertEquals(0, gotHFileBlock.refCnt());
+ assertTrue(myBucketCache2.overwiteByteBuff != null);
+ assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1);
+ assertTrue(myBucketCache2.blockEvictCounter.get() == 1);
+ } finally {
+ myBucketCache2.shutdown();
+ }
+
+ }
+
+ static class MyBucketCache extends BucketCache {
+ private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
+ private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
+
+ private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
+ private final AtomicInteger replaceCounter = new AtomicInteger(0);
+ private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
+ private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
+ private ByteBuff overwiteByteBuff = null;
+
+ public MyBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
+ int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
+ super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
+ persistencePath);
+ }
+
+ /**
+ * Simulate the Block could be replaced.
+ */
+ @Override
+ protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
+ replaceCounter.incrementAndGet();
+ return true;
+ }
+
+ @Override
+ public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
+ boolean updateCacheMetrics) {
+ if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
+ /**
+ * Wait the first cyclicBarrier.await() in {@link MyBucketCache#cacheBlockWithWaitInternal},
+ * so the {@link BucketCache#getBlock} is executed after the {@link BucketEntry#isRpcRef}
+ * checking.
+ */
+ try {
+ cyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
+ return result;
+ }
+
+ @Override
+ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
+ boolean inMemory, boolean wait) {
+ if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
+ /**
+ * Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock}
+ */
+ try {
+ cyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
+ /**
+ * Wait the cyclicBarrier.await() in
+ * {@link TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for
+ * {@link MyBucketCache#getBlock} and Assert completed.
+ */
+ try {
+ cyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
+ }
+
+ @Override
+ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry,
+ boolean decrementBlockNumber) {
+ blockEvictCounter.incrementAndGet();
+ super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber);
+ }
+
+ /**
+ * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
+ * {@link BucketEntry} is freed.
+ */
+ @Override
+ void freeBucketEntry(BucketEntry bucketEntry) {
+ freeBucketEntryCounter.incrementAndGet();
+ super.freeBucketEntry(bucketEntry);
+ this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
+ try {
+ this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ static class MyBucketCache2 extends BucketCache {
+ private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
+ private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
+ private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread";
+
+ private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2);
+ private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2);
+ private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2);
+ /**
+ * This is used for {@link BucketCache.WriterThread},{@link #CACHE_BLOCK_THREAD_NAME} and
+ * {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed.
+ */
+ private final CyclicBarrier writeThreadDoneCyclicBarrier = new CyclicBarrier(3);
+ private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
+ private final AtomicInteger removeRamCounter = new AtomicInteger(0);
+ private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
+ private ByteBuff overwiteByteBuff = null;
+
+ public MyBucketCache2(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
+ int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
+ super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
+ persistencePath);
+ }
+
+ @Override
+ protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
+ super.putIntoBackingMap(key, bucketEntry);
+ /**
+ * The {@link BucketCache.WriterThread} wait for evictCyclicBarrier.await before
+ * {@link MyBucketCache2#removeFromRamCache} for {@link #EVICT_BLOCK_THREAD_NAME}
+ */
+ try {
+ evictCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+
+ /**
+ * Wait the cyclicBarrier.await() in
+ * {@link TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for
+ * {@link MyBucketCache#getBlock} and Assert completed.
+ */
+ try {
+ putCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ void doDrain(List entries) throws InterruptedException {
+ super.doDrain(entries);
+ if (entries.size() > 0) {
+ /**
+ * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
+ * {@link #EVICT_BLOCK_THREAD_NAME}.
+ */
+ try {
+ writeThreadDoneCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ @Override
+ public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
+ boolean updateCacheMetrics) {
+ if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
+ /**
+ * Wait for second getCyclicBarrier.await in {@link MyBucketCache2#removeFromRamCache} after
+ * {@link BucketCache#removeFromRamCache}.
+ */
+ try {
+ getCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
+ return result;
+ }
+
+ @Override
+ protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
+ boolean firstTime = false;
+ if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
+ int count = this.removeRamCounter.incrementAndGet();
+ firstTime = (count == 1);
+ if (firstTime) {
+ /**
+ * The {@link #EVICT_BLOCK_THREAD_NAME} wait for evictCyclicBarrier.await after
+ * {@link BucketCache#putIntoBackingMap}.
+ */
+ try {
+ evictCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ boolean result = super.removeFromRamCache(cacheKey);
+ if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
+ if (firstTime) {
+ /**
+ * Wait for getCyclicBarrier.await before {@link BucketCache#getBlock}.
+ */
+ try {
+ getCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ /**
+ * Wait for Caching Block completed, after Caching Block completed, evictBlock could
+ * continue.
+ */
+ try {
+ writeThreadDoneCyclicBarrier.await();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry,
+ boolean decrementBlockNumber) {
+ /**
+ * This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create
+ * only one {@link BucketCache.WriterThread}.
+ */
+ assertTrue(Thread.currentThread() == this.writerThreads[0]);
+
+ blockEvictCounter.incrementAndGet();
+ super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber);
+ }
+
+ /**
+ * Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
+ * {@link BucketEntry} is freed.
+ */
+ @Override
+ void freeBucketEntry(BucketEntry bucketEntry) {
+ freeBucketEntryCounter.incrementAndGet();
+ super.freeBucketEntry(bucketEntry);
+ this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
+ try {
+ this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) {
+ int byteSize = bucketEntry.getLength();
+ byte[] data = new byte[byteSize];
+ Arrays.fill(data, (byte) 0xff);
+ return ByteBuff.wrap(ByteBuffer.wrap(data));
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
index b631cf96eac..0ba7dea4808 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
@@ -73,7 +73,6 @@ public class TestBucketWriterThread {
/**
* Set up variables and get BucketCache and WriterThread into state where tests can manually
* control the running of WriterThread and BucketCache is empty.
- * @throws Exception
*/
@Before
public void setUp() throws Exception {
@@ -141,7 +140,7 @@ public class TestBucketWriterThread {
RAMQueueEntry rqe = q.remove();
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
- writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
+ writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
// Cache disabled when ioes w/o ever healing.
@@ -163,7 +162,7 @@ public class TestBucketWriterThread {
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
Mockito.doThrow(cfe).
doReturn(mockedBucketEntry).
- when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
+ when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
}
@@ -172,7 +171,7 @@ public class TestBucketWriterThread {
final BlockingQueue q)
throws InterruptedException {
List rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
- wt.doDrain(rqes);
+ bc.doDrain(rqes);
assertTrue(q.isEmpty());
assertTrue(bc.ramCache.isEmpty());
assertEquals(0, bc.heapSize());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
index d1b8f9a87e8..97a5283e470 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
@@ -50,7 +49,7 @@ public class TestByteBufferIOEngine {
private long off;
MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
- super(offset & 0xFF00, length, 0, false, RefCnt.create(), allocator);
+ super(offset & 0xFF00, length, 0, false, null, allocator);
this.off = offset;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
index eb9dca91311..4b0801f858b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
@@ -91,7 +91,7 @@ public class TestRAMCache {
MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
- RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, ByteBuffAllocator.NONE);
+ RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false);
Assert.assertNull(cache.putIfAbsent(key, re));
Assert.assertEquals(cache.putIfAbsent(key, re), re);