HBASE-26281 DBB got from BucketCache would be freed unexpectedly before RPC completed (#3680)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
chenglei 2021-09-17 22:15:23 +08:00 committed by GitHub
parent c60adfd13a
commit 4614c7bc57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 768 additions and 207 deletions

View File

@ -49,6 +49,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
@ -429,7 +431,7 @@ public class BucketCache implements BlockCache, HeapSize {
boolean wait) {
if (cacheEnabled) {
if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) {
if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) {
BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry != null && bucketEntry.isRpcRef()) {
// avoid replace when there are RPC refs for the bucket entry in bucket cache
@ -443,7 +445,11 @@ public class BucketCache implements BlockCache, HeapSize {
}
}
private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock);
}
protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
boolean inMemory, boolean wait) {
if (!cacheEnabled) {
return;
@ -451,8 +457,7 @@ public class BucketCache implements BlockCache, HeapSize {
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
// Stuff the entry into the RAM cache so it can get drained to the persistent store
RAMQueueEntry re =
new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory,
createRecycler(cacheKey));
new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
/**
* Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
* key in ramCache, the heap size of bucket cache need to update if replacing entry from
@ -550,13 +555,25 @@ public class BucketCache implements BlockCache, HeapSize {
return null;
}
/**
* This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap}
*/
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
bucketAllocator.freeBlock(bucketEntry.offset());
realCacheSize.add(-1 * bucketEntry.getLength());
bucketEntry.markAsEvicted();
blocksByHFile.remove(cacheKey);
if (decrementBlockNumber) {
this.blockNumber.decrement();
}
cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
}
/**
* Free the {{@link BucketEntry} actually,which could only be invoked when the
* {@link BucketEntry#refCnt} becoming 0.
*/
void freeBucketEntry(BucketEntry bucketEntry) {
bucketAllocator.freeBlock(bucketEntry.offset());
realCacheSize.add(-1 * bucketEntry.getLength());
}
/**
@ -564,10 +581,10 @@ public class BucketCache implements BlockCache, HeapSize {
* 1. Close an HFile, and clear all cached blocks. <br>
* 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br>
* <p>
* Firstly, we'll try to remove the block from RAMCache. If it doesn't exist in RAMCache, then try
* to evict from backingMap. Here we only need to free the reference from bucket cache by calling
* {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this block, block can
* only be de-allocated when all of them release the block.
* Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap.
* Here we evict the block from backingMap immediately, but only free the reference from bucket
* cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this
* block, block can only be de-allocated when all of them release the block.
* <p>
* NOTICE: we need to grab the write offset lock firstly before releasing the reference from
* bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when
@ -577,43 +594,92 @@ public class BucketCache implements BlockCache, HeapSize {
*/
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
return doEvictBlock(cacheKey, null);
}
/**
* Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and
* {@link BucketCache#ramCache}. <br/>
* NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
* {@link BucketEntry} could be removed.
* @param cacheKey {@link BlockCacheKey} to evict.
* @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict.
* @return true to indicate whether we've evicted successfully or not.
*/
private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry) {
if (!cacheEnabled) {
return false;
}
boolean existed = removeFromRamCache(cacheKey);
BucketEntry be = backingMap.get(cacheKey);
if (be == null) {
if (existed) {
boolean existedInRamCache = removeFromRamCache(cacheKey);
if (bucketEntry == null) {
bucketEntry = backingMap.get(cacheKey);
}
final BucketEntry bucketEntryToUse = bucketEntry;
if (bucketEntryToUse == null) {
if (existedInRamCache) {
cacheStats.evicted(0, cacheKey.isPrimary());
}
return existed;
return existedInRamCache;
} else {
return be.withWriteLock(offsetLock, be::markAsEvicted);
return bucketEntryToUse.withWriteLock(offsetLock, () -> {
if (backingMap.remove(cacheKey, bucketEntryToUse)) {
blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache);
return true;
}
return false;
});
}
}
private Recycler createRecycler(BlockCacheKey cacheKey) {
/**
* <pre>
* Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as
* {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}.
* NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf}
* from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different:
* 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap},
* it is the return value of current {@link BucketCache#createRecycler} method.
*
* 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache},
* it is {@link ByteBuffAllocator#putbackBuffer}.
* </pre>
*/
protected Recycler createRecycler(final BucketEntry bucketEntry) {
return () -> {
if (!cacheEnabled) {
return;
}
boolean existed = removeFromRamCache(cacheKey);
BucketEntry be = backingMap.get(cacheKey);
if (be == null && existed) {
cacheStats.evicted(0, cacheKey.isPrimary());
} else if (be != null) {
be.withWriteLock(offsetLock, () -> {
if (backingMap.remove(cacheKey, be)) {
blockEvicted(cacheKey, be, !existed);
cacheStats.evicted(be.getCachedTime(), cacheKey.isPrimary());
}
return null;
});
}
freeBucketEntry(bucketEntry);
return;
};
}
private boolean removeFromRamCache(BlockCacheKey cacheKey) {
/**
* NOTE: This method is only for test.
*/
public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) {
BucketEntry bucketEntry = backingMap.get(blockCacheKey);
if (bucketEntry == null) {
return false;
}
return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry);
}
/**
* Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if
* {@link BucketEntry#isRpcRef} is false. <br/>
* NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
* {@link BucketEntry} could be removed.
* @param blockCacheKey {@link BlockCacheKey} to evict.
* @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict.
* @return true to indicate whether we've evicted successfully or not.
*/
boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) {
if (!bucketEntry.isRpcRef()) {
return doEvictBlock(blockCacheKey, bucketEntry);
}
return false;
}
protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
return ramCache.remove(cacheKey, re -> {
if (re != null) {
this.blockNumber.decrement();
@ -717,7 +783,7 @@ public class BucketCache implements BlockCache, HeapSize {
bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded);
for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) {
entry.getValue().withWriteLock(offsetLock, entry.getValue()::markStaleAsEvicted);
evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue());
}
}
}
@ -906,134 +972,134 @@ public class BucketCache implements BlockCache, HeapSize {
}
LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
}
}
/**
* Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
* cache with a new block for the same cache key. there's a corner case: one thread cache a
* block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another
* new block with the same cache key do the same thing for the same cache key, so if not evict
* the previous bucket entry, then memory leak happen because the previous bucketEntry is gone
* but the bucketAllocator do not free its memory.
* @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
* cacheKey, Cacheable newBlock)
* @param key Block cache key
* @param bucketEntry Bucket entry to put into backingMap.
*/
private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
BucketEntry previousEntry = backingMap.put(key, bucketEntry);
if (previousEntry != null && previousEntry != bucketEntry) {
previousEntry.withWriteLock(offsetLock, () -> {
blockEvicted(key, previousEntry, false);
/**
* Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
* cache with a new block for the same cache key. there's a corner case: one thread cache a block
* in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block
* with the same cache key do the same thing for the same cache key, so if not evict the previous
* bucket entry, then memory leak happen because the previous bucketEntry is gone but the
* bucketAllocator do not free its memory.
* @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
* cacheKey, Cacheable newBlock)
* @param key Block cache key
* @param bucketEntry Bucket entry to put into backingMap.
*/
protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
BucketEntry previousEntry = backingMap.put(key, bucketEntry);
if (previousEntry != null && previousEntry != bucketEntry) {
previousEntry.withWriteLock(offsetLock, () -> {
blockEvicted(key, previousEntry, false);
return null;
});
}
}
/**
* Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that
* are passed in even if failure being sure to remove from ramCache else we'll never undo the
* references and we'll OOME.
* @param entries Presumes list passed in here will be processed by this invocation only. No
* interference expected.
*/
void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
if (entries.isEmpty()) {
return;
}
// This method is a little hard to follow. We run through the passed in entries and for each
// successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
// do cleanup making sure we've cleared ramCache of all entries regardless of whether we
// successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
// filling ramCache. We do the clean up by again running through the passed in entries
// doing extra work when we find a non-null bucketEntries corresponding entry.
final int size = entries.size();
BucketEntry[] bucketEntries = new BucketEntry[size];
// Index updated inside loop if success or if we can't succeed. We retry if cache is full
// when we go to add an entry by going around the loop again without upping the index.
int index = 0;
while (cacheEnabled && index < size) {
RAMQueueEntry re = null;
try {
re = entries.get(index);
if (re == null) {
LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
index++;
continue;
}
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
(entry) -> createRecycler(entry));
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
bucketEntries[index] = bucketEntry;
if (ioErrorStartTime > 0) {
ioErrorStartTime = -1;
}
index++;
} catch (BucketAllocatorException fle) {
LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
// Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
bucketEntries[index] = null;
index++;
} catch (CacheFullException cfe) {
// Cache full when we tried to add. Try freeing space and then retrying (don't up index)
if (!freeInProgress) {
freeSpace("Full!");
} else {
Thread.sleep(50);
}
} catch (IOException ioex) {
// Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
LOG.error("Failed writing to bucket cache", ioex);
checkIOErrorIsTolerated();
}
}
// Make sure data pages are written on media before we update maps.
try {
ioEngine.sync();
} catch (IOException ioex) {
LOG.error("Failed syncing IO engine", ioex);
checkIOErrorIsTolerated();
// Since we failed sync, free the blocks in bucket allocator
for (int i = 0; i < entries.size(); ++i) {
if (bucketEntries[i] != null) {
bucketAllocator.freeBlock(bucketEntries[i].offset());
bucketEntries[i] = null;
}
}
}
// Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
// success or error.
for (int i = 0; i < size; ++i) {
BlockCacheKey key = entries.get(i).getKey();
// Only add if non-null entry.
if (bucketEntries[i] != null) {
putIntoBackingMap(key, bucketEntries[i]);
}
// Always remove from ramCache even if we failed adding it to the block cache above.
boolean existed = ramCache.remove(key, re -> {
if (re != null) {
heapSize.add(-1 * re.getData().heapSize());
}
});
if (!existed && bucketEntries[i] != null) {
// Block should have already been evicted. Remove it and free space.
final BucketEntry bucketEntry = bucketEntries[i];
bucketEntry.withWriteLock(offsetLock, () -> {
if (backingMap.remove(key, bucketEntry)) {
blockEvicted(key, bucketEntry, false);
}
return null;
});
}
}
/**
* Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
* Process all that are passed in even if failure being sure to remove from ramCache else we'll
* never undo the references and we'll OOME.
* @param entries Presumes list passed in here will be processed by this invocation only. No
* interference expected.
* @throws InterruptedException
*/
void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
if (entries.isEmpty()) {
return;
}
// This method is a little hard to follow. We run through the passed in entries and for each
// successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
// do cleanup making sure we've cleared ramCache of all entries regardless of whether we
// successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
// filling ramCache. We do the clean up by again running through the passed in entries
// doing extra work when we find a non-null bucketEntries corresponding entry.
final int size = entries.size();
BucketEntry[] bucketEntries = new BucketEntry[size];
// Index updated inside loop if success or if we can't succeed. We retry if cache is full
// when we go to add an entry by going around the loop again without upping the index.
int index = 0;
while (cacheEnabled && index < size) {
RAMQueueEntry re = null;
try {
re = entries.get(index);
if (re == null) {
LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
index++;
continue;
}
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize);
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
bucketEntries[index] = bucketEntry;
if (ioErrorStartTime > 0) {
ioErrorStartTime = -1;
}
index++;
} catch (BucketAllocatorException fle) {
LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
// Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
bucketEntries[index] = null;
index++;
} catch (CacheFullException cfe) {
// Cache full when we tried to add. Try freeing space and then retrying (don't up index)
if (!freeInProgress) {
freeSpace("Full!");
} else {
Thread.sleep(50);
}
} catch (IOException ioex) {
// Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
LOG.error("Failed writing to bucket cache", ioex);
checkIOErrorIsTolerated();
}
}
// Make sure data pages are written on media before we update maps.
try {
ioEngine.sync();
} catch (IOException ioex) {
LOG.error("Failed syncing IO engine", ioex);
checkIOErrorIsTolerated();
// Since we failed sync, free the blocks in bucket allocator
for (int i = 0; i < entries.size(); ++i) {
if (bucketEntries[i] != null) {
bucketAllocator.freeBlock(bucketEntries[i].offset());
bucketEntries[i] = null;
}
}
}
// Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
// success or error.
for (int i = 0; i < size; ++i) {
BlockCacheKey key = entries.get(i).getKey();
// Only add if non-null entry.
if (bucketEntries[i] != null) {
putIntoBackingMap(key, bucketEntries[i]);
}
// Always remove from ramCache even if we failed adding it to the block cache above.
boolean existed = ramCache.remove(key, re -> {
if (re != null) {
heapSize.add(-1 * re.getData().heapSize());
}
});
if (!existed && bucketEntries[i] != null) {
// Block should have already been evicted. Remove it and free space.
final BucketEntry bucketEntry = bucketEntries[i];
bucketEntry.withWriteLock(offsetLock, () -> {
if (backingMap.remove(key, bucketEntry)) {
blockEvicted(key, bucketEntry, false);
}
return null;
});
}
}
long used = bucketAllocator.getUsedSize();
if (used > acceptableSize()) {
freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
}
return;
long used = bucketAllocator.getUsedSize();
if (used > acceptableSize()) {
freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
}
return;
}
/**
@ -1327,8 +1393,9 @@ public class BucketCache implements BlockCache, HeapSize {
// TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
// What to do then? Caching attempt fail? Need some changes in cacheBlock API?
while ((entry = queue.pollLast()) != null) {
BlockCacheKey blockCacheKey = entry.getKey();
BucketEntry be = entry.getValue();
if (be.withWriteLock(offsetLock, be::markStaleAsEvicted)) {
if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) {
freedBytes += be.getLength();
}
if (freedBytes >= toFree) {
@ -1355,15 +1422,12 @@ public class BucketCache implements BlockCache, HeapSize {
private final Cacheable data;
private long accessCounter;
private boolean inMemory;
private final Recycler recycler;
RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory,
Recycler recycler) {
RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) {
this.key = bck;
this.data = data;
this.accessCounter = accessCounter;
this.inMemory = inMemory;
this.recycler = recycler;
}
public Cacheable getData() {
@ -1386,7 +1450,8 @@ public class BucketCache implements BlockCache, HeapSize {
}
public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
final LongAdder realCacheSize) throws IOException {
final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler)
throws IOException {
int len = data.getSerializedLength();
// This cacheable thing can't be serialized
if (len == 0) {
@ -1396,7 +1461,7 @@ public class BucketCache implements BlockCache, HeapSize {
boolean succ = false;
BucketEntry bucketEntry = null;
try {
bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, RefCnt.create(recycler),
bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, createRecycler,
getByteBuffAllocator());
bucketEntry.setDeserializerReference(data.getDeserializer());
if (data instanceof HFileBlock) {

View File

@ -25,8 +25,10 @@ import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.io.hfile.BlockPriority;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
@ -88,16 +90,21 @@ class BucketEntry implements HBaseReferenceCounted {
private final long cachedTime = System.nanoTime();
BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
this(offset, length, accessCounter, inMemory, RefCnt.create(), ByteBuffAllocator.HEAP);
this(offset, length, accessCounter, inMemory, null, ByteBuffAllocator.HEAP);
}
BucketEntry(long offset, int length, long accessCounter, boolean inMemory, RefCnt refCnt,
BucketEntry(long offset, int length, long accessCounter, boolean inMemory,
Function<BucketEntry, Recycler> createRecycler,
ByteBuffAllocator allocator) {
setOffset(offset);
this.length = length;
this.accessCounter = accessCounter;
this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
this.refCnt = refCnt;
if (createRecycler == null) {
this.refCnt = RefCnt.create();
} else {
this.refCnt = RefCnt.create(createRecycler.apply(this));
}
this.markedAsEvicted = new AtomicBoolean(false);
this.allocator = allocator;
}
@ -165,19 +172,6 @@ class BucketEntry implements HBaseReferenceCounted {
return false;
}
/**
* Mark as evicted only when NO RPC references. Mainly used for eviction when cache size exceed
* the max acceptable size.
* @return true if we deallocate this entry successfully.
*/
boolean markStaleAsEvicted() {
if (!markedAsEvicted.get() && this.refCnt() == 1) {
// The only reference was coming from backingMap, now release the stale entry.
return this.markAsEvicted();
}
return false;
}
/**
* Check whether have some RPC patch referring this block. There're two case: <br>
* 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br>

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
@ -43,6 +44,8 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
@ -372,7 +375,11 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
CachedBlock next = iterator.next();
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
cacheList.add(cacheKey);
cache.evictBlock(cacheKey);
/**
* There is only one Block referenced by rpc,here we evict blocks which have no rpc
* referenced.
*/
evictBlock(cache, cacheKey);
}
try {
Thread.sleep(1);
@ -434,4 +441,20 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
assertEquals("Count should give all rows ", 10, count);
}
}
/**
* For {@link BucketCache},we only evict Block if there is no rpc referenced.
*/
private void evictBlock(BlockCache blockCache, BlockCacheKey blockCacheKey) {
assertTrue(blockCache instanceof CombinedBlockCache);
BlockCache[] blockCaches = blockCache.getBlockCaches();
for (BlockCache currentBlockCache : blockCaches) {
if (currentBlockCache instanceof BucketCache) {
((BucketCache) currentBlockCache).evictBlockIfNoRpcReferenced(blockCacheKey);
} else {
currentBlockCache.evictBlock(blockCacheKey);
}
}
}
}

View File

@ -263,8 +263,26 @@ public class TestBucketCache {
assertEquals(1, cache.getBlockCount());
lock.writeLock().unlock();
evictThread.join();
assertEquals(0, cache.getBlockCount());
assertEquals(cache.getCurrentSize(), 0L);
/**
* <pre>
* The asserts here before HBASE-21957 are:
* assertEquals(1L, cache.getBlockCount());
* assertTrue(cache.getCurrentSize() > 0L);
* assertTrue("We should have a block!", cache.iterator().hasNext());
*
* The asserts here after HBASE-21957 are:
* assertEquals(0, cache.getBlockCount());
* assertEquals(cache.getCurrentSize(), 0L);
*
* I think the asserts before HBASE-21957 is more reasonable,because
* {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
* it had seen, and newly added Block after the {@link BucketEntry}
* it had seen should not be evicted.
* </pre>
*/
assertEquals(1L, cache.getBlockCount());
assertTrue(cache.getCurrentSize() > 0L);
assertTrue("We should have a block!", cache.iterator().hasNext());
}
@Test
@ -613,8 +631,8 @@ public class TestBucketCache {
HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, ByteBuffAllocator.NONE);
RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, ByteBuffAllocator.NONE);
RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false);
RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false);
assertFalse(cache.containsKey(key1));
assertNull(cache.putIfAbsent(key1, re1));
@ -661,11 +679,11 @@ public class TestBucketCache {
BucketAllocator allocator = new BucketAllocator(availableSpace, null);
BlockCacheKey key = new BlockCacheKey("dummy", 1L);
RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, ByteBuffAllocator.NONE);
RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true);
Assert.assertEquals(0, allocator.getUsedSize());
try {
re.writeToCache(ioEngine, allocator, null);
re.writeToCache(ioEngine, allocator, null, null);
Assert.fail();
} catch (Exception e) {
}

View File

@ -25,11 +25,17 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
@ -37,6 +43,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
@ -65,6 +72,18 @@ public class TestBucketCacheRefCnt {
queueSize, PERSISTENCE_PATH);
}
private static MyBucketCache createMyBucketCache(int writerSize, int queueSize)
throws IOException {
return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
queueSize, PERSISTENCE_PATH);
}
private static MyBucketCache2 createMyBucketCache2(int writerSize, int queueSize)
throws IOException {
return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
queueSize, PERSISTENCE_PATH);
}
private static HFileBlock createBlock(int offset, int size) {
return createBlock(offset, size, ByteBuffAllocator.HEAP);
}
@ -133,8 +152,10 @@ public class TestBucketCacheRefCnt {
}
}
private void waitUntilFlushedToCache(BlockCacheKey key) throws InterruptedException {
while (!cache.backingMap.containsKey(key) || cache.ramCache.containsKey(key)) {
private static void waitUntilFlushedToCache(BucketCache bucketCache, BlockCacheKey blockCacheKey)
throws InterruptedException {
while (!bucketCache.backingMap.containsKey(blockCacheKey)
|| bucketCache.ramCache.containsKey(blockCacheKey)) {
Thread.sleep(100);
}
Thread.sleep(1000);
@ -148,7 +169,7 @@ public class TestBucketCacheRefCnt {
HFileBlock blk = createBlock(200, 1020, alloc);
BlockCacheKey key = createKey("testHFile-00", 200);
cache.cacheBlock(key, blk);
waitUntilFlushedToCache(key);
waitUntilFlushedToCache(cache, key);
assertEquals(1, blk.refCnt());
Cacheable block = cache.getBlock(key, false, false, false);
@ -180,17 +201,18 @@ public class TestBucketCacheRefCnt {
assertFalse(block.release());
assertEquals(1, block.refCnt());
newBlock = cache.getBlock(key, false, false, false);
assertEquals(2, block.refCnt());
assertEquals(2, newBlock.refCnt());
/**
* The key was evicted from {@link BucketCache#backingMap} and {@link BucketCache#ramCache},
* so {@link BucketCache#getBlock} return null.
*/
Cacheable newestBlock = cache.getBlock(key, false, false, false);
assertNull(newestBlock);
assertEquals(1, block.refCnt());
assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
// Release the block
assertFalse(block.release());
assertEquals(1, block.refCnt());
// Release the newBlock;
assertTrue(newBlock.release());
assertTrue(block.release());
assertEquals(0, block.refCnt());
assertEquals(0, newBlock.refCnt());
} finally {
cache.shutdown();
@ -247,7 +269,7 @@ public class TestBucketCacheRefCnt {
HFileBlock blk = createBlock(200, 1020);
BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200);
cache.cacheBlock(key, blk);
waitUntilFlushedToCache(key);
waitUntilFlushedToCache(cache, key);
assertEquals(1, blk.refCnt());
assertNotNull(cache.backingMap.get(key));
assertEquals(1, cache.backingMap.get(key).refCnt());
@ -260,7 +282,7 @@ public class TestBucketCacheRefCnt {
assertEquals(2, be1.refCnt());
// We've some RPC reference, so it won't have any effect.
assertFalse(be1.markStaleAsEvicted());
assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
assertEquals(2, block1.refCnt());
assertEquals(2, cache.backingMap.get(key).refCnt());
@ -270,7 +292,7 @@ public class TestBucketCacheRefCnt {
assertEquals(1, cache.backingMap.get(key).refCnt());
// Mark the stale as evicted again, it'll do the de-allocation.
assertTrue(be1.markStaleAsEvicted());
assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
assertEquals(0, block1.refCnt());
assertNull(cache.backingMap.get(key));
assertEquals(0, cache.size());
@ -278,4 +300,445 @@ public class TestBucketCacheRefCnt {
cache.shutdown();
}
}
/**
* <pre>
* This test is for HBASE-26281,
* test two threads for replacing Block and getting Block execute concurrently.
* The threads sequence is:
* 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1.
* 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied
* {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would
* replace Block1, but thread1 stopping before {@link BucketCache#cacheBlockWithWaitInternal}
* 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
* which returned Block1, the {@link RefCnt} of Block1 is 2.
* 4. Thread1 continues caching Block2, in {@link BucketCache.WriterThread#putIntoBackingMap},
* the old Block1 is freed directly which {@link RefCnt} is 2, but the Block1 is still used
* by Thread2 and the content of Block1 would be overwritten after it is freed, which may
* cause a serious error.
* </pre>
* @throws Exception
*/
@Test
public void testReplacingBlockAndGettingBlockConcurrently() throws Exception {
ByteBuffAllocator byteBuffAllocator =
ByteBuffAllocator.create(HBaseConfiguration.create(), true);
final MyBucketCache myBucketCache = createMyBucketCache(1, 1000);
try {
HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 200);
myBucketCache.cacheBlock(blockCacheKey, hfileBlock);
waitUntilFlushedToCache(myBucketCache, blockCacheKey);
assertEquals(1, hfileBlock.refCnt());
assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey));
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
Thread cacheBlockThread = new Thread(() -> {
try {
HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator);
myBucketCache.cacheBlock(blockCacheKey, newHFileBlock);
waitUntilFlushedToCache(myBucketCache, blockCacheKey);
} catch (Throwable exception) {
exceptionRef.set(exception);
}
});
cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME);
cacheBlockThread.start();
String oldThreadName = Thread.currentThread().getName();
HFileBlock gotHFileBlock = null;
try {
Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME);
gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, false, false, false));
assertTrue(gotHFileBlock.equals(hfileBlock));
assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
assertEquals(2, gotHFileBlock.refCnt());
/**
* Release the second cyclicBarrier.await in
* {@link MyBucketCache#cacheBlockWithWaitInternal}
*/
myBucketCache.cyclicBarrier.await();
} finally {
Thread.currentThread().setName(oldThreadName);
}
cacheBlockThread.join();
assertTrue(exceptionRef.get() == null);
assertEquals(1, gotHFileBlock.refCnt());
assertTrue(gotHFileBlock.equals(hfileBlock));
assertTrue(myBucketCache.overwiteByteBuff == null);
assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0);
gotHFileBlock.release();
assertEquals(0, gotHFileBlock.refCnt());
assertTrue(myBucketCache.overwiteByteBuff != null);
assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1);
assertTrue(myBucketCache.replaceCounter.get() == 1);
assertTrue(myBucketCache.blockEvictCounter.get() == 1);
} finally {
myBucketCache.shutdown();
}
}
/**
* <pre>
* This test also is for HBASE-26281,
* test three threads for evicting Block,caching Block and getting Block
* execute concurrently.
* 1. Thread1 caching Block1, stopping after {@link BucketCache.WriterThread#putIntoBackingMap},
* the {@link RefCnt} of Block1 is 1.
* 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link BlockCacheKey},
* but stopping after {@link BucketCache#removeFromRamCache}.
* 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
* which returned Block1, the {@link RefCnt} of Block1 is 2.
* 4. Thread1 continues caching block1,but finding that {@link BucketCache.RAMCache#remove}
* returning false, so invoking {@link BucketCache#blockEvicted} to free the the Block1
* directly which {@link RefCnt} is 2 and the Block1 is still used by Thread3.
* </pre>
*/
@Test
public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws Exception {
ByteBuffAllocator byteBuffAllocator =
ByteBuffAllocator.create(HBaseConfiguration.create(), true);
final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000);
try {
final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
final BlockCacheKey blockCacheKey = createKey("testThreeThreadConcurrent", 200);
final AtomicReference<Throwable> cacheBlockThreadExceptionRef =
new AtomicReference<Throwable>();
Thread cacheBlockThread = new Thread(() -> {
try {
myBucketCache2.cacheBlock(blockCacheKey, hfileBlock);
/**
* Wait for Caching Block completed.
*/
myBucketCache2.writeThreadDoneCyclicBarrier.await();
} catch (Throwable exception) {
cacheBlockThreadExceptionRef.set(exception);
}
});
cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME);
cacheBlockThread.start();
final AtomicReference<Throwable> evictBlockThreadExceptionRef =
new AtomicReference<Throwable>();
Thread evictBlockThread = new Thread(() -> {
try {
myBucketCache2.evictBlock(blockCacheKey);
} catch (Throwable exception) {
evictBlockThreadExceptionRef.set(exception);
}
});
evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME);
evictBlockThread.start();
String oldThreadName = Thread.currentThread().getName();
HFileBlock gotHFileBlock = null;
try {
Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME);
gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, false, false, false));
assertTrue(gotHFileBlock.equals(hfileBlock));
assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
assertEquals(2, gotHFileBlock.refCnt());
try {
/**
* Release the second cyclicBarrier.await in {@link MyBucketCache2#putIntoBackingMap} for
* {@link BucketCache.WriterThread},getBlock completed,{@link BucketCache.WriterThread}
* could continue.
*/
myBucketCache2.putCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
} finally {
Thread.currentThread().setName(oldThreadName);
}
cacheBlockThread.join();
evictBlockThread.join();
assertTrue(cacheBlockThreadExceptionRef.get() == null);
assertTrue(evictBlockThreadExceptionRef.get() == null);
assertTrue(gotHFileBlock.equals(hfileBlock));
assertEquals(1, gotHFileBlock.refCnt());
assertTrue(myBucketCache2.overwiteByteBuff == null);
assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0);
gotHFileBlock.release();
assertEquals(0, gotHFileBlock.refCnt());
assertTrue(myBucketCache2.overwiteByteBuff != null);
assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1);
assertTrue(myBucketCache2.blockEvictCounter.get() == 1);
} finally {
myBucketCache2.shutdown();
}
}
static class MyBucketCache extends BucketCache {
private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
private final AtomicInteger replaceCounter = new AtomicInteger(0);
private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
private ByteBuff overwiteByteBuff = null;
public MyBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
persistencePath);
}
/**
* Simulate the Block could be replaced.
*/
@Override
protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
replaceCounter.incrementAndGet();
return true;
}
@Override
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
/**
* Wait the first cyclicBarrier.await() in {@link MyBucketCache#cacheBlockWithWaitInternal},
* so the {@link BucketCache#getBlock} is executed after the {@link BucketEntry#isRpcRef}
* checking.
*/
try {
cyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
return result;
}
@Override
protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
boolean inMemory, boolean wait) {
if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
/**
* Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock}
*/
try {
cyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
/**
* Wait the cyclicBarrier.await() in
* {@link TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for
* {@link MyBucketCache#getBlock} and Assert completed.
*/
try {
cyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
}
@Override
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry,
boolean decrementBlockNumber) {
blockEvictCounter.incrementAndGet();
super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber);
}
/**
* Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
* {@link BucketEntry} is freed.
*/
@Override
void freeBucketEntry(BucketEntry bucketEntry) {
freeBucketEntryCounter.incrementAndGet();
super.freeBucketEntry(bucketEntry);
this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
try {
this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
static class MyBucketCache2 extends BucketCache {
private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread";
private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2);
private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2);
private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2);
/**
* This is used for {@link BucketCache.WriterThread},{@link #CACHE_BLOCK_THREAD_NAME} and
* {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed.
*/
private final CyclicBarrier writeThreadDoneCyclicBarrier = new CyclicBarrier(3);
private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
private final AtomicInteger removeRamCounter = new AtomicInteger(0);
private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
private ByteBuff overwiteByteBuff = null;
public MyBucketCache2(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
persistencePath);
}
@Override
protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
super.putIntoBackingMap(key, bucketEntry);
/**
* The {@link BucketCache.WriterThread} wait for evictCyclicBarrier.await before
* {@link MyBucketCache2#removeFromRamCache} for {@link #EVICT_BLOCK_THREAD_NAME}
*/
try {
evictCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
/**
* Wait the cyclicBarrier.await() in
* {@link TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for
* {@link MyBucketCache#getBlock} and Assert completed.
*/
try {
putCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
@Override
void doDrain(List<RAMQueueEntry> entries) throws InterruptedException {
super.doDrain(entries);
if (entries.size() > 0) {
/**
* Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
* {@link #EVICT_BLOCK_THREAD_NAME}.
*/
try {
writeThreadDoneCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
@Override
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
/**
* Wait for second getCyclicBarrier.await in {@link MyBucketCache2#removeFromRamCache} after
* {@link BucketCache#removeFromRamCache}.
*/
try {
getCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
return result;
}
@Override
protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
boolean firstTime = false;
if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
int count = this.removeRamCounter.incrementAndGet();
firstTime = (count == 1);
if (firstTime) {
/**
* The {@link #EVICT_BLOCK_THREAD_NAME} wait for evictCyclicBarrier.await after
* {@link BucketCache#putIntoBackingMap}.
*/
try {
evictCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
boolean result = super.removeFromRamCache(cacheKey);
if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
if (firstTime) {
/**
* Wait for getCyclicBarrier.await before {@link BucketCache#getBlock}.
*/
try {
getCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
/**
* Wait for Caching Block completed, after Caching Block completed, evictBlock could
* continue.
*/
try {
writeThreadDoneCyclicBarrier.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
return result;
}
@Override
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry,
boolean decrementBlockNumber) {
/**
* This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create
* only one {@link BucketCache.WriterThread}.
*/
assertTrue(Thread.currentThread() == this.writerThreads[0]);
blockEvictCounter.incrementAndGet();
super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber);
}
/**
* Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
* {@link BucketEntry} is freed.
*/
@Override
void freeBucketEntry(BucketEntry bucketEntry) {
freeBucketEntryCounter.incrementAndGet();
super.freeBucketEntry(bucketEntry);
this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
try {
this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) {
int byteSize = bucketEntry.getLength();
byte[] data = new byte[byteSize];
Arrays.fill(data, (byte) 0xff);
return ByteBuff.wrap(ByteBuffer.wrap(data));
}
}

View File

@ -73,7 +73,6 @@ public class TestBucketWriterThread {
/**
* Set up variables and get BucketCache and WriterThread into state where tests can manually
* control the running of WriterThread and BucketCache is empty.
* @throws Exception
*/
@Before
public void setUp() throws Exception {
@ -141,7 +140,7 @@ public class TestBucketWriterThread {
RAMQueueEntry rqe = q.remove();
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
// Cache disabled when ioes w/o ever healing.
@ -163,7 +162,7 @@ public class TestBucketWriterThread {
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
Mockito.doThrow(cfe).
doReturn(mockedBucketEntry).
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
}
@ -172,7 +171,7 @@ public class TestBucketWriterThread {
final BlockingQueue<RAMQueueEntry> q)
throws InterruptedException {
List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
wt.doDrain(rqes);
bc.doDrain(rqes);
assertTrue(q.isEmpty());
assertTrue(bc.ramCache.isEmpty());
assertEquals(0, bc.heapSize());

View File

@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
@ -50,7 +49,7 @@ public class TestByteBufferIOEngine {
private long off;
MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
super(offset & 0xFF00, length, 0, false, RefCnt.create(), allocator);
super(offset & 0xFF00, length, 0, false, null, allocator);
this.off = offset;
}

View File

@ -91,7 +91,7 @@ public class TestRAMCache {
MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, ByteBuffAllocator.NONE);
RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false);
Assert.assertNull(cache.putIfAbsent(key, re));
Assert.assertEquals(cache.putIfAbsent(key, re), re);