HBASE-26281 DBB got from BucketCache would be freed unexpectedly before RPC completed (#3680)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
74eff79c27
commit
9414e3b471
|
@ -49,6 +49,7 @@ import java.util.concurrent.locks.Lock;
|
|||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
|
@ -419,7 +420,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
boolean wait) {
|
||||
if (cacheEnabled) {
|
||||
if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
|
||||
if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) {
|
||||
if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) {
|
||||
BucketEntry bucketEntry = backingMap.get(cacheKey);
|
||||
if (bucketEntry != null && bucketEntry.isRpcRef()) {
|
||||
// avoid replace when there are RPC refs for the bucket entry in bucket cache
|
||||
|
@ -433,7 +434,11 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
}
|
||||
}
|
||||
|
||||
private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
|
||||
protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
|
||||
return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock);
|
||||
}
|
||||
|
||||
protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
|
||||
boolean inMemory, boolean wait) {
|
||||
if (!cacheEnabled) {
|
||||
return;
|
||||
|
@ -441,8 +446,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
|
||||
// Stuff the entry into the RAM cache so it can get drained to the persistent store
|
||||
RAMQueueEntry re =
|
||||
new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory,
|
||||
createRecycler(cacheKey));
|
||||
new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
|
||||
/**
|
||||
* Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
|
||||
* key in ramCache, the heap size of bucket cache need to update if replacing entry from
|
||||
|
@ -540,13 +544,25 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap}
|
||||
*/
|
||||
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
|
||||
bucketAllocator.freeBlock(bucketEntry.offset());
|
||||
realCacheSize.add(-1 * bucketEntry.getLength());
|
||||
bucketEntry.markAsEvicted();
|
||||
blocksByHFile.remove(cacheKey);
|
||||
if (decrementBlockNumber) {
|
||||
this.blockNumber.decrement();
|
||||
}
|
||||
cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
|
||||
}
|
||||
|
||||
/**
|
||||
* Free the {{@link BucketEntry} actually,which could only be invoked when the
|
||||
* {@link BucketEntry#refCnt} becoming 0.
|
||||
*/
|
||||
void freeBucketEntry(BucketEntry bucketEntry) {
|
||||
bucketAllocator.freeBlock(bucketEntry.offset());
|
||||
realCacheSize.add(-1 * bucketEntry.getLength());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -554,10 +570,10 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
* 1. Close an HFile, and clear all cached blocks. <br>
|
||||
* 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br>
|
||||
* <p>
|
||||
* Firstly, we'll try to remove the block from RAMCache. If it doesn't exist in RAMCache, then try
|
||||
* to evict from backingMap. Here we only need to free the reference from bucket cache by calling
|
||||
* {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this block, block can
|
||||
* only be de-allocated when all of them release the block.
|
||||
* Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap.
|
||||
* Here we evict the block from backingMap immediately, but only free the reference from bucket
|
||||
* cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this
|
||||
* block, block can only be de-allocated when all of them release the block.
|
||||
* <p>
|
||||
* NOTICE: we need to grab the write offset lock firstly before releasing the reference from
|
||||
* bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when
|
||||
|
@ -567,43 +583,92 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
*/
|
||||
@Override
|
||||
public boolean evictBlock(BlockCacheKey cacheKey) {
|
||||
return doEvictBlock(cacheKey, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and
|
||||
* {@link BucketCache#ramCache}. <br/>
|
||||
* NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
|
||||
* {@link BucketEntry} could be removed.
|
||||
* @param cacheKey {@link BlockCacheKey} to evict.
|
||||
* @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict.
|
||||
* @return true to indicate whether we've evicted successfully or not.
|
||||
*/
|
||||
private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry) {
|
||||
if (!cacheEnabled) {
|
||||
return false;
|
||||
}
|
||||
boolean existed = removeFromRamCache(cacheKey);
|
||||
BucketEntry be = backingMap.get(cacheKey);
|
||||
if (be == null) {
|
||||
if (existed) {
|
||||
boolean existedInRamCache = removeFromRamCache(cacheKey);
|
||||
if (bucketEntry == null) {
|
||||
bucketEntry = backingMap.get(cacheKey);
|
||||
}
|
||||
final BucketEntry bucketEntryToUse = bucketEntry;
|
||||
|
||||
if (bucketEntryToUse == null) {
|
||||
if (existedInRamCache) {
|
||||
cacheStats.evicted(0, cacheKey.isPrimary());
|
||||
}
|
||||
return existed;
|
||||
return existedInRamCache;
|
||||
} else {
|
||||
return be.withWriteLock(offsetLock, be::markAsEvicted);
|
||||
return bucketEntryToUse.withWriteLock(offsetLock, () -> {
|
||||
if (backingMap.remove(cacheKey, bucketEntryToUse)) {
|
||||
blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private Recycler createRecycler(BlockCacheKey cacheKey) {
|
||||
/**
|
||||
* <pre>
|
||||
* Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as
|
||||
* {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}.
|
||||
* NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf}
|
||||
* from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different:
|
||||
* 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap},
|
||||
* it is the return value of current {@link BucketCache#createRecycler} method.
|
||||
*
|
||||
* 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache},
|
||||
* it is {@link ByteBuffAllocator#putbackBuffer}.
|
||||
* </pre>
|
||||
*/
|
||||
protected Recycler createRecycler(final BucketEntry bucketEntry) {
|
||||
return () -> {
|
||||
if (!cacheEnabled) {
|
||||
return;
|
||||
}
|
||||
boolean existed = removeFromRamCache(cacheKey);
|
||||
BucketEntry be = backingMap.get(cacheKey);
|
||||
if (be == null && existed) {
|
||||
cacheStats.evicted(0, cacheKey.isPrimary());
|
||||
} else if (be != null) {
|
||||
be.withWriteLock(offsetLock, () -> {
|
||||
if (backingMap.remove(cacheKey, be)) {
|
||||
blockEvicted(cacheKey, be, !existed);
|
||||
cacheStats.evicted(be.getCachedTime(), cacheKey.isPrimary());
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
freeBucketEntry(bucketEntry);
|
||||
return;
|
||||
};
|
||||
}
|
||||
|
||||
private boolean removeFromRamCache(BlockCacheKey cacheKey) {
|
||||
/**
|
||||
* NOTE: This method is only for test.
|
||||
*/
|
||||
public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) {
|
||||
BucketEntry bucketEntry = backingMap.get(blockCacheKey);
|
||||
if (bucketEntry == null) {
|
||||
return false;
|
||||
}
|
||||
return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry);
|
||||
}
|
||||
|
||||
/**
|
||||
* Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if
|
||||
* {@link BucketEntry#isRpcRef} is false. <br/>
|
||||
* NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
|
||||
* {@link BucketEntry} could be removed.
|
||||
* @param blockCacheKey {@link BlockCacheKey} to evict.
|
||||
* @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict.
|
||||
* @return true to indicate whether we've evicted successfully or not.
|
||||
*/
|
||||
boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) {
|
||||
if (!bucketEntry.isRpcRef()) {
|
||||
return doEvictBlock(blockCacheKey, bucketEntry);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
|
||||
return ramCache.remove(cacheKey, re -> {
|
||||
if (re != null) {
|
||||
this.blockNumber.decrement();
|
||||
|
@ -707,7 +772,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded);
|
||||
for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
|
||||
if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) {
|
||||
entry.getValue().withWriteLock(offsetLock, entry.getValue()::markStaleAsEvicted);
|
||||
evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -896,134 +961,134 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
}
|
||||
LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
|
||||
* cache with a new block for the same cache key. there's a corner case: one thread cache a
|
||||
* block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another
|
||||
* new block with the same cache key do the same thing for the same cache key, so if not evict
|
||||
* the previous bucket entry, then memory leak happen because the previous bucketEntry is gone
|
||||
* but the bucketAllocator do not free its memory.
|
||||
* @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
|
||||
* cacheKey, Cacheable newBlock)
|
||||
* @param key Block cache key
|
||||
* @param bucketEntry Bucket entry to put into backingMap.
|
||||
*/
|
||||
private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
|
||||
BucketEntry previousEntry = backingMap.put(key, bucketEntry);
|
||||
if (previousEntry != null && previousEntry != bucketEntry) {
|
||||
previousEntry.withWriteLock(offsetLock, () -> {
|
||||
blockEvicted(key, previousEntry, false);
|
||||
/**
|
||||
* Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
|
||||
* cache with a new block for the same cache key. there's a corner case: one thread cache a block
|
||||
* in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block
|
||||
* with the same cache key do the same thing for the same cache key, so if not evict the previous
|
||||
* bucket entry, then memory leak happen because the previous bucketEntry is gone but the
|
||||
* bucketAllocator do not free its memory.
|
||||
* @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
|
||||
* cacheKey, Cacheable newBlock)
|
||||
* @param key Block cache key
|
||||
* @param bucketEntry Bucket entry to put into backingMap.
|
||||
*/
|
||||
protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
|
||||
BucketEntry previousEntry = backingMap.put(key, bucketEntry);
|
||||
if (previousEntry != null && previousEntry != bucketEntry) {
|
||||
previousEntry.withWriteLock(offsetLock, () -> {
|
||||
blockEvicted(key, previousEntry, false);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that
|
||||
* are passed in even if failure being sure to remove from ramCache else we'll never undo the
|
||||
* references and we'll OOME.
|
||||
* @param entries Presumes list passed in here will be processed by this invocation only. No
|
||||
* interference expected.
|
||||
*/
|
||||
void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
|
||||
if (entries.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
// This method is a little hard to follow. We run through the passed in entries and for each
|
||||
// successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
|
||||
// do cleanup making sure we've cleared ramCache of all entries regardless of whether we
|
||||
// successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
|
||||
// filling ramCache. We do the clean up by again running through the passed in entries
|
||||
// doing extra work when we find a non-null bucketEntries corresponding entry.
|
||||
final int size = entries.size();
|
||||
BucketEntry[] bucketEntries = new BucketEntry[size];
|
||||
// Index updated inside loop if success or if we can't succeed. We retry if cache is full
|
||||
// when we go to add an entry by going around the loop again without upping the index.
|
||||
int index = 0;
|
||||
while (cacheEnabled && index < size) {
|
||||
RAMQueueEntry re = null;
|
||||
try {
|
||||
re = entries.get(index);
|
||||
if (re == null) {
|
||||
LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
|
||||
index++;
|
||||
continue;
|
||||
}
|
||||
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
|
||||
(entry) -> createRecycler(entry));
|
||||
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
|
||||
bucketEntries[index] = bucketEntry;
|
||||
if (ioErrorStartTime > 0) {
|
||||
ioErrorStartTime = -1;
|
||||
}
|
||||
index++;
|
||||
} catch (BucketAllocatorException fle) {
|
||||
LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
|
||||
// Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
|
||||
bucketEntries[index] = null;
|
||||
index++;
|
||||
} catch (CacheFullException cfe) {
|
||||
// Cache full when we tried to add. Try freeing space and then retrying (don't up index)
|
||||
if (!freeInProgress) {
|
||||
freeSpace("Full!");
|
||||
} else {
|
||||
Thread.sleep(50);
|
||||
}
|
||||
} catch (IOException ioex) {
|
||||
// Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
|
||||
LOG.error("Failed writing to bucket cache", ioex);
|
||||
checkIOErrorIsTolerated();
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure data pages are written on media before we update maps.
|
||||
try {
|
||||
ioEngine.sync();
|
||||
} catch (IOException ioex) {
|
||||
LOG.error("Failed syncing IO engine", ioex);
|
||||
checkIOErrorIsTolerated();
|
||||
// Since we failed sync, free the blocks in bucket allocator
|
||||
for (int i = 0; i < entries.size(); ++i) {
|
||||
if (bucketEntries[i] != null) {
|
||||
bucketAllocator.freeBlock(bucketEntries[i].offset());
|
||||
bucketEntries[i] = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
|
||||
// success or error.
|
||||
for (int i = 0; i < size; ++i) {
|
||||
BlockCacheKey key = entries.get(i).getKey();
|
||||
// Only add if non-null entry.
|
||||
if (bucketEntries[i] != null) {
|
||||
putIntoBackingMap(key, bucketEntries[i]);
|
||||
}
|
||||
// Always remove from ramCache even if we failed adding it to the block cache above.
|
||||
boolean existed = ramCache.remove(key, re -> {
|
||||
if (re != null) {
|
||||
heapSize.add(-1 * re.getData().heapSize());
|
||||
}
|
||||
});
|
||||
if (!existed && bucketEntries[i] != null) {
|
||||
// Block should have already been evicted. Remove it and free space.
|
||||
final BucketEntry bucketEntry = bucketEntries[i];
|
||||
bucketEntry.withWriteLock(offsetLock, () -> {
|
||||
if (backingMap.remove(key, bucketEntry)) {
|
||||
blockEvicted(key, bucketEntry, false);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
|
||||
* Process all that are passed in even if failure being sure to remove from ramCache else we'll
|
||||
* never undo the references and we'll OOME.
|
||||
* @param entries Presumes list passed in here will be processed by this invocation only. No
|
||||
* interference expected.
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
|
||||
if (entries.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
// This method is a little hard to follow. We run through the passed in entries and for each
|
||||
// successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
|
||||
// do cleanup making sure we've cleared ramCache of all entries regardless of whether we
|
||||
// successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
|
||||
// filling ramCache. We do the clean up by again running through the passed in entries
|
||||
// doing extra work when we find a non-null bucketEntries corresponding entry.
|
||||
final int size = entries.size();
|
||||
BucketEntry[] bucketEntries = new BucketEntry[size];
|
||||
// Index updated inside loop if success or if we can't succeed. We retry if cache is full
|
||||
// when we go to add an entry by going around the loop again without upping the index.
|
||||
int index = 0;
|
||||
while (cacheEnabled && index < size) {
|
||||
RAMQueueEntry re = null;
|
||||
try {
|
||||
re = entries.get(index);
|
||||
if (re == null) {
|
||||
LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
|
||||
index++;
|
||||
continue;
|
||||
}
|
||||
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize);
|
||||
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
|
||||
bucketEntries[index] = bucketEntry;
|
||||
if (ioErrorStartTime > 0) {
|
||||
ioErrorStartTime = -1;
|
||||
}
|
||||
index++;
|
||||
} catch (BucketAllocatorException fle) {
|
||||
LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
|
||||
// Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
|
||||
bucketEntries[index] = null;
|
||||
index++;
|
||||
} catch (CacheFullException cfe) {
|
||||
// Cache full when we tried to add. Try freeing space and then retrying (don't up index)
|
||||
if (!freeInProgress) {
|
||||
freeSpace("Full!");
|
||||
} else {
|
||||
Thread.sleep(50);
|
||||
}
|
||||
} catch (IOException ioex) {
|
||||
// Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
|
||||
LOG.error("Failed writing to bucket cache", ioex);
|
||||
checkIOErrorIsTolerated();
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure data pages are written on media before we update maps.
|
||||
try {
|
||||
ioEngine.sync();
|
||||
} catch (IOException ioex) {
|
||||
LOG.error("Failed syncing IO engine", ioex);
|
||||
checkIOErrorIsTolerated();
|
||||
// Since we failed sync, free the blocks in bucket allocator
|
||||
for (int i = 0; i < entries.size(); ++i) {
|
||||
if (bucketEntries[i] != null) {
|
||||
bucketAllocator.freeBlock(bucketEntries[i].offset());
|
||||
bucketEntries[i] = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
|
||||
// success or error.
|
||||
for (int i = 0; i < size; ++i) {
|
||||
BlockCacheKey key = entries.get(i).getKey();
|
||||
// Only add if non-null entry.
|
||||
if (bucketEntries[i] != null) {
|
||||
putIntoBackingMap(key, bucketEntries[i]);
|
||||
}
|
||||
// Always remove from ramCache even if we failed adding it to the block cache above.
|
||||
boolean existed = ramCache.remove(key, re -> {
|
||||
if (re != null) {
|
||||
heapSize.add(-1 * re.getData().heapSize());
|
||||
}
|
||||
});
|
||||
if (!existed && bucketEntries[i] != null) {
|
||||
// Block should have already been evicted. Remove it and free space.
|
||||
final BucketEntry bucketEntry = bucketEntries[i];
|
||||
bucketEntry.withWriteLock(offsetLock, () -> {
|
||||
if (backingMap.remove(key, bucketEntry)) {
|
||||
blockEvicted(key, bucketEntry, false);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
long used = bucketAllocator.getUsedSize();
|
||||
if (used > acceptableSize()) {
|
||||
freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
|
||||
}
|
||||
return;
|
||||
long used = bucketAllocator.getUsedSize();
|
||||
if (used > acceptableSize()) {
|
||||
freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1313,8 +1378,9 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
// TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
|
||||
// What to do then? Caching attempt fail? Need some changes in cacheBlock API?
|
||||
while ((entry = queue.pollLast()) != null) {
|
||||
BlockCacheKey blockCacheKey = entry.getKey();
|
||||
BucketEntry be = entry.getValue();
|
||||
if (be.withWriteLock(offsetLock, be::markStaleAsEvicted)) {
|
||||
if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) {
|
||||
freedBytes += be.getLength();
|
||||
}
|
||||
if (freedBytes >= toFree) {
|
||||
|
@ -1341,15 +1407,12 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
private final Cacheable data;
|
||||
private long accessCounter;
|
||||
private boolean inMemory;
|
||||
private final Recycler recycler;
|
||||
|
||||
RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory,
|
||||
Recycler recycler) {
|
||||
RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) {
|
||||
this.key = bck;
|
||||
this.data = data;
|
||||
this.accessCounter = accessCounter;
|
||||
this.inMemory = inMemory;
|
||||
this.recycler = recycler;
|
||||
}
|
||||
|
||||
public Cacheable getData() {
|
||||
|
@ -1372,7 +1435,8 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
}
|
||||
|
||||
public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
|
||||
final LongAdder realCacheSize) throws IOException {
|
||||
final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler)
|
||||
throws IOException {
|
||||
int len = data.getSerializedLength();
|
||||
// This cacheable thing can't be serialized
|
||||
if (len == 0) {
|
||||
|
@ -1382,7 +1446,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
boolean succ = false;
|
||||
BucketEntry bucketEntry = null;
|
||||
try {
|
||||
bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, RefCnt.create(recycler),
|
||||
bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, createRecycler,
|
||||
getByteBuffAllocator());
|
||||
bucketEntry.setDeserializerReference(data.getDeserializer());
|
||||
if (data instanceof HFileBlock) {
|
||||
|
|
|
@ -25,8 +25,10 @@ import java.nio.ByteBuffer;
|
|||
import java.util.Comparator;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockPriority;
|
||||
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
|
||||
|
@ -88,16 +90,21 @@ class BucketEntry implements HBaseReferenceCounted {
|
|||
private final long cachedTime = System.nanoTime();
|
||||
|
||||
BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
|
||||
this(offset, length, accessCounter, inMemory, RefCnt.create(), ByteBuffAllocator.HEAP);
|
||||
this(offset, length, accessCounter, inMemory, null, ByteBuffAllocator.HEAP);
|
||||
}
|
||||
|
||||
BucketEntry(long offset, int length, long accessCounter, boolean inMemory, RefCnt refCnt,
|
||||
BucketEntry(long offset, int length, long accessCounter, boolean inMemory,
|
||||
Function<BucketEntry, Recycler> createRecycler,
|
||||
ByteBuffAllocator allocator) {
|
||||
setOffset(offset);
|
||||
this.length = length;
|
||||
this.accessCounter = accessCounter;
|
||||
this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
|
||||
this.refCnt = refCnt;
|
||||
if (createRecycler == null) {
|
||||
this.refCnt = RefCnt.create();
|
||||
} else {
|
||||
this.refCnt = RefCnt.create(createRecycler.apply(this));
|
||||
}
|
||||
this.markedAsEvicted = new AtomicBoolean(false);
|
||||
this.allocator = allocator;
|
||||
}
|
||||
|
@ -165,19 +172,6 @@ class BucketEntry implements HBaseReferenceCounted {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark as evicted only when NO RPC references. Mainly used for eviction when cache size exceed
|
||||
* the max acceptable size.
|
||||
* @return true if we deallocate this entry successfully.
|
||||
*/
|
||||
boolean markStaleAsEvicted() {
|
||||
if (!markedAsEvicted.get() && this.refCnt() == 1) {
|
||||
// The only reference was coming from backingMap, now release the stale entry.
|
||||
return this.markAsEvicted();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether have some RPC patch referring this block. There're two case: <br>
|
||||
* 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br>
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -43,6 +44,8 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
|||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
|
||||
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
|
||||
import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
|
@ -372,7 +375,11 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
|
|||
CachedBlock next = iterator.next();
|
||||
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
|
||||
cacheList.add(cacheKey);
|
||||
cache.evictBlock(cacheKey);
|
||||
/**
|
||||
* There is only one Block referenced by rpc,here we evict blocks which have no rpc
|
||||
* referenced.
|
||||
*/
|
||||
evictBlock(cache, cacheKey);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1);
|
||||
|
@ -437,4 +444,20 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
|
|||
table.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For {@link BucketCache},we only evict Block if there is no rpc referenced.
|
||||
*/
|
||||
private void evictBlock(BlockCache blockCache, BlockCacheKey blockCacheKey) {
|
||||
assertTrue(blockCache instanceof CombinedBlockCache);
|
||||
BlockCache[] blockCaches = blockCache.getBlockCaches();
|
||||
for (BlockCache currentBlockCache : blockCaches) {
|
||||
if (currentBlockCache instanceof BucketCache) {
|
||||
((BucketCache) currentBlockCache).evictBlockIfNoRpcReferenced(blockCacheKey);
|
||||
} else {
|
||||
currentBlockCache.evictBlock(blockCacheKey);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -249,8 +249,26 @@ public class TestBucketCache {
|
|||
assertEquals(1, cache.getBlockCount());
|
||||
lock.writeLock().unlock();
|
||||
evictThread.join();
|
||||
assertEquals(0, cache.getBlockCount());
|
||||
assertEquals(cache.getCurrentSize(), 0L);
|
||||
/**
|
||||
* <pre>
|
||||
* The asserts here before HBASE-21957 are:
|
||||
* assertEquals(1L, cache.getBlockCount());
|
||||
* assertTrue(cache.getCurrentSize() > 0L);
|
||||
* assertTrue("We should have a block!", cache.iterator().hasNext());
|
||||
*
|
||||
* The asserts here after HBASE-21957 are:
|
||||
* assertEquals(0, cache.getBlockCount());
|
||||
* assertEquals(cache.getCurrentSize(), 0L);
|
||||
*
|
||||
* I think the asserts before HBASE-21957 is more reasonable,because
|
||||
* {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
|
||||
* it had seen, and newly added Block after the {@link BucketEntry}
|
||||
* it had seen should not be evicted.
|
||||
* </pre>
|
||||
*/
|
||||
assertEquals(1L, cache.getBlockCount());
|
||||
assertTrue(cache.getCurrentSize() > 0L);
|
||||
assertTrue("We should have a block!", cache.iterator().hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -503,8 +521,8 @@ public class TestBucketCache {
|
|||
HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
|
||||
HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
|
||||
HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
|
||||
RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, ByteBuffAllocator.NONE);
|
||||
RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, ByteBuffAllocator.NONE);
|
||||
RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false);
|
||||
RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false);
|
||||
|
||||
assertFalse(cache.containsKey(key1));
|
||||
assertNull(cache.putIfAbsent(key1, re1));
|
||||
|
@ -551,11 +569,11 @@ public class TestBucketCache {
|
|||
BucketAllocator allocator = new BucketAllocator(availableSpace, null);
|
||||
|
||||
BlockCacheKey key = new BlockCacheKey("dummy", 1L);
|
||||
RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, ByteBuffAllocator.NONE);
|
||||
RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true);
|
||||
|
||||
Assert.assertEquals(0, allocator.getUsedSize());
|
||||
try {
|
||||
re.writeToCache(ioEngine, allocator, null);
|
||||
re.writeToCache(ioEngine, allocator, null, null);
|
||||
Assert.fail();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
|
|
@ -25,11 +25,17 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockType;
|
||||
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||
|
@ -37,6 +43,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.nio.RefCnt;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -65,6 +72,18 @@ public class TestBucketCacheRefCnt {
|
|||
queueSize, PERSISTENCE_PATH);
|
||||
}
|
||||
|
||||
private static MyBucketCache createMyBucketCache(int writerSize, int queueSize)
|
||||
throws IOException {
|
||||
return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
|
||||
queueSize, PERSISTENCE_PATH);
|
||||
}
|
||||
|
||||
private static MyBucketCache2 createMyBucketCache2(int writerSize, int queueSize)
|
||||
throws IOException {
|
||||
return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
|
||||
queueSize, PERSISTENCE_PATH);
|
||||
}
|
||||
|
||||
private static HFileBlock createBlock(int offset, int size) {
|
||||
return createBlock(offset, size, ByteBuffAllocator.HEAP);
|
||||
}
|
||||
|
@ -133,8 +152,10 @@ public class TestBucketCacheRefCnt {
|
|||
}
|
||||
}
|
||||
|
||||
private void waitUntilFlushedToCache(BlockCacheKey key) throws InterruptedException {
|
||||
while (!cache.backingMap.containsKey(key) || cache.ramCache.containsKey(key)) {
|
||||
private static void waitUntilFlushedToCache(BucketCache bucketCache, BlockCacheKey blockCacheKey)
|
||||
throws InterruptedException {
|
||||
while (!bucketCache.backingMap.containsKey(blockCacheKey)
|
||||
|| bucketCache.ramCache.containsKey(blockCacheKey)) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
|
@ -148,7 +169,7 @@ public class TestBucketCacheRefCnt {
|
|||
HFileBlock blk = createBlock(200, 1020, alloc);
|
||||
BlockCacheKey key = createKey("testHFile-00", 200);
|
||||
cache.cacheBlock(key, blk);
|
||||
waitUntilFlushedToCache(key);
|
||||
waitUntilFlushedToCache(cache, key);
|
||||
assertEquals(1, blk.refCnt());
|
||||
|
||||
Cacheable block = cache.getBlock(key, false, false, false);
|
||||
|
@ -180,17 +201,18 @@ public class TestBucketCacheRefCnt {
|
|||
assertFalse(block.release());
|
||||
assertEquals(1, block.refCnt());
|
||||
|
||||
newBlock = cache.getBlock(key, false, false, false);
|
||||
assertEquals(2, block.refCnt());
|
||||
assertEquals(2, newBlock.refCnt());
|
||||
/**
|
||||
* The key was evicted from {@link BucketCache#backingMap} and {@link BucketCache#ramCache},
|
||||
* so {@link BucketCache#getBlock} return null.
|
||||
*/
|
||||
Cacheable newestBlock = cache.getBlock(key, false, false, false);
|
||||
assertNull(newestBlock);
|
||||
assertEquals(1, block.refCnt());
|
||||
assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
|
||||
|
||||
// Release the block
|
||||
assertFalse(block.release());
|
||||
assertEquals(1, block.refCnt());
|
||||
|
||||
// Release the newBlock;
|
||||
assertTrue(newBlock.release());
|
||||
assertTrue(block.release());
|
||||
assertEquals(0, block.refCnt());
|
||||
assertEquals(0, newBlock.refCnt());
|
||||
} finally {
|
||||
cache.shutdown();
|
||||
|
@ -247,7 +269,7 @@ public class TestBucketCacheRefCnt {
|
|||
HFileBlock blk = createBlock(200, 1020);
|
||||
BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200);
|
||||
cache.cacheBlock(key, blk);
|
||||
waitUntilFlushedToCache(key);
|
||||
waitUntilFlushedToCache(cache, key);
|
||||
assertEquals(1, blk.refCnt());
|
||||
assertNotNull(cache.backingMap.get(key));
|
||||
assertEquals(1, cache.backingMap.get(key).refCnt());
|
||||
|
@ -260,7 +282,7 @@ public class TestBucketCacheRefCnt {
|
|||
assertEquals(2, be1.refCnt());
|
||||
|
||||
// We've some RPC reference, so it won't have any effect.
|
||||
assertFalse(be1.markStaleAsEvicted());
|
||||
assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
|
||||
assertEquals(2, block1.refCnt());
|
||||
assertEquals(2, cache.backingMap.get(key).refCnt());
|
||||
|
||||
|
@ -270,7 +292,7 @@ public class TestBucketCacheRefCnt {
|
|||
assertEquals(1, cache.backingMap.get(key).refCnt());
|
||||
|
||||
// Mark the stale as evicted again, it'll do the de-allocation.
|
||||
assertTrue(be1.markStaleAsEvicted());
|
||||
assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
|
||||
assertEquals(0, block1.refCnt());
|
||||
assertNull(cache.backingMap.get(key));
|
||||
assertEquals(0, cache.size());
|
||||
|
@ -278,4 +300,445 @@ public class TestBucketCacheRefCnt {
|
|||
cache.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <pre>
|
||||
* This test is for HBASE-26281,
|
||||
* test two threads for replacing Block and getting Block execute concurrently.
|
||||
* The threads sequence is:
|
||||
* 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1.
|
||||
* 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied
|
||||
* {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would
|
||||
* replace Block1, but thread1 stopping before {@link BucketCache#cacheBlockWithWaitInternal}
|
||||
* 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
|
||||
* which returned Block1, the {@link RefCnt} of Block1 is 2.
|
||||
* 4. Thread1 continues caching Block2, in {@link BucketCache.WriterThread#putIntoBackingMap},
|
||||
* the old Block1 is freed directly which {@link RefCnt} is 2, but the Block1 is still used
|
||||
* by Thread2 and the content of Block1 would be overwritten after it is freed, which may
|
||||
* cause a serious error.
|
||||
* </pre>
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testReplacingBlockAndGettingBlockConcurrently() throws Exception {
|
||||
ByteBuffAllocator byteBuffAllocator =
|
||||
ByteBuffAllocator.create(HBaseConfiguration.create(), true);
|
||||
final MyBucketCache myBucketCache = createMyBucketCache(1, 1000);
|
||||
try {
|
||||
HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
|
||||
final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 200);
|
||||
myBucketCache.cacheBlock(blockCacheKey, hfileBlock);
|
||||
waitUntilFlushedToCache(myBucketCache, blockCacheKey);
|
||||
assertEquals(1, hfileBlock.refCnt());
|
||||
|
||||
assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey));
|
||||
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
|
||||
Thread cacheBlockThread = new Thread(() -> {
|
||||
try {
|
||||
HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator);
|
||||
myBucketCache.cacheBlock(blockCacheKey, newHFileBlock);
|
||||
waitUntilFlushedToCache(myBucketCache, blockCacheKey);
|
||||
|
||||
} catch (Throwable exception) {
|
||||
exceptionRef.set(exception);
|
||||
}
|
||||
});
|
||||
cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME);
|
||||
cacheBlockThread.start();
|
||||
|
||||
String oldThreadName = Thread.currentThread().getName();
|
||||
HFileBlock gotHFileBlock = null;
|
||||
try {
|
||||
|
||||
Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME);
|
||||
|
||||
gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, false, false, false));
|
||||
assertTrue(gotHFileBlock.equals(hfileBlock));
|
||||
assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
|
||||
assertEquals(2, gotHFileBlock.refCnt());
|
||||
/**
|
||||
* Release the second cyclicBarrier.await in
|
||||
* {@link MyBucketCache#cacheBlockWithWaitInternal}
|
||||
*/
|
||||
myBucketCache.cyclicBarrier.await();
|
||||
|
||||
} finally {
|
||||
Thread.currentThread().setName(oldThreadName);
|
||||
}
|
||||
|
||||
cacheBlockThread.join();
|
||||
assertTrue(exceptionRef.get() == null);
|
||||
assertEquals(1, gotHFileBlock.refCnt());
|
||||
assertTrue(gotHFileBlock.equals(hfileBlock));
|
||||
assertTrue(myBucketCache.overwiteByteBuff == null);
|
||||
assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0);
|
||||
|
||||
gotHFileBlock.release();
|
||||
assertEquals(0, gotHFileBlock.refCnt());
|
||||
assertTrue(myBucketCache.overwiteByteBuff != null);
|
||||
assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1);
|
||||
assertTrue(myBucketCache.replaceCounter.get() == 1);
|
||||
assertTrue(myBucketCache.blockEvictCounter.get() == 1);
|
||||
} finally {
|
||||
myBucketCache.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* <pre>
|
||||
* This test also is for HBASE-26281,
|
||||
* test three threads for evicting Block,caching Block and getting Block
|
||||
* execute concurrently.
|
||||
* 1. Thread1 caching Block1, stopping after {@link BucketCache.WriterThread#putIntoBackingMap},
|
||||
* the {@link RefCnt} of Block1 is 1.
|
||||
* 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link BlockCacheKey},
|
||||
* but stopping after {@link BucketCache#removeFromRamCache}.
|
||||
* 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
|
||||
* which returned Block1, the {@link RefCnt} of Block1 is 2.
|
||||
* 4. Thread1 continues caching block1,but finding that {@link BucketCache.RAMCache#remove}
|
||||
* returning false, so invoking {@link BucketCache#blockEvicted} to free the the Block1
|
||||
* directly which {@link RefCnt} is 2 and the Block1 is still used by Thread3.
|
||||
* </pre>
|
||||
*/
|
||||
@Test
|
||||
public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws Exception {
|
||||
ByteBuffAllocator byteBuffAllocator =
|
||||
ByteBuffAllocator.create(HBaseConfiguration.create(), true);
|
||||
final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000);
|
||||
try {
|
||||
final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
|
||||
final BlockCacheKey blockCacheKey = createKey("testThreeThreadConcurrent", 200);
|
||||
final AtomicReference<Throwable> cacheBlockThreadExceptionRef =
|
||||
new AtomicReference<Throwable>();
|
||||
Thread cacheBlockThread = new Thread(() -> {
|
||||
try {
|
||||
myBucketCache2.cacheBlock(blockCacheKey, hfileBlock);
|
||||
/**
|
||||
* Wait for Caching Block completed.
|
||||
*/
|
||||
myBucketCache2.writeThreadDoneCyclicBarrier.await();
|
||||
} catch (Throwable exception) {
|
||||
cacheBlockThreadExceptionRef.set(exception);
|
||||
}
|
||||
});
|
||||
cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME);
|
||||
cacheBlockThread.start();
|
||||
|
||||
final AtomicReference<Throwable> evictBlockThreadExceptionRef =
|
||||
new AtomicReference<Throwable>();
|
||||
Thread evictBlockThread = new Thread(() -> {
|
||||
try {
|
||||
myBucketCache2.evictBlock(blockCacheKey);
|
||||
} catch (Throwable exception) {
|
||||
evictBlockThreadExceptionRef.set(exception);
|
||||
}
|
||||
});
|
||||
evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME);
|
||||
evictBlockThread.start();
|
||||
|
||||
String oldThreadName = Thread.currentThread().getName();
|
||||
HFileBlock gotHFileBlock = null;
|
||||
try {
|
||||
Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME);
|
||||
gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, false, false, false));
|
||||
assertTrue(gotHFileBlock.equals(hfileBlock));
|
||||
assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
|
||||
assertEquals(2, gotHFileBlock.refCnt());
|
||||
try {
|
||||
/**
|
||||
* Release the second cyclicBarrier.await in {@link MyBucketCache2#putIntoBackingMap} for
|
||||
* {@link BucketCache.WriterThread},getBlock completed,{@link BucketCache.WriterThread}
|
||||
* could continue.
|
||||
*/
|
||||
myBucketCache2.putCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
} finally {
|
||||
Thread.currentThread().setName(oldThreadName);
|
||||
}
|
||||
|
||||
cacheBlockThread.join();
|
||||
evictBlockThread.join();
|
||||
assertTrue(cacheBlockThreadExceptionRef.get() == null);
|
||||
assertTrue(evictBlockThreadExceptionRef.get() == null);
|
||||
|
||||
assertTrue(gotHFileBlock.equals(hfileBlock));
|
||||
assertEquals(1, gotHFileBlock.refCnt());
|
||||
assertTrue(myBucketCache2.overwiteByteBuff == null);
|
||||
assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0);
|
||||
|
||||
gotHFileBlock.release();
|
||||
assertEquals(0, gotHFileBlock.refCnt());
|
||||
assertTrue(myBucketCache2.overwiteByteBuff != null);
|
||||
assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1);
|
||||
assertTrue(myBucketCache2.blockEvictCounter.get() == 1);
|
||||
} finally {
|
||||
myBucketCache2.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static class MyBucketCache extends BucketCache {
|
||||
private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
|
||||
private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
|
||||
|
||||
private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
|
||||
private final AtomicInteger replaceCounter = new AtomicInteger(0);
|
||||
private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
|
||||
private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
|
||||
private ByteBuff overwiteByteBuff = null;
|
||||
|
||||
public MyBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
|
||||
int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
|
||||
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
|
||||
persistencePath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Simulate the Block could be replaced.
|
||||
*/
|
||||
@Override
|
||||
protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
|
||||
replaceCounter.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
|
||||
boolean updateCacheMetrics) {
|
||||
if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
|
||||
/**
|
||||
* Wait the first cyclicBarrier.await() in {@link MyBucketCache#cacheBlockWithWaitInternal},
|
||||
* so the {@link BucketCache#getBlock} is executed after the {@link BucketEntry#isRpcRef}
|
||||
* checking.
|
||||
*/
|
||||
try {
|
||||
cyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
|
||||
boolean inMemory, boolean wait) {
|
||||
if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
|
||||
/**
|
||||
* Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock}
|
||||
*/
|
||||
try {
|
||||
cyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
|
||||
/**
|
||||
* Wait the cyclicBarrier.await() in
|
||||
* {@link TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for
|
||||
* {@link MyBucketCache#getBlock} and Assert completed.
|
||||
*/
|
||||
try {
|
||||
cyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
|
||||
}
|
||||
|
||||
@Override
|
||||
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry,
|
||||
boolean decrementBlockNumber) {
|
||||
blockEvictCounter.incrementAndGet();
|
||||
super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber);
|
||||
}
|
||||
|
||||
/**
|
||||
* Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
|
||||
* {@link BucketEntry} is freed.
|
||||
*/
|
||||
@Override
|
||||
void freeBucketEntry(BucketEntry bucketEntry) {
|
||||
freeBucketEntryCounter.incrementAndGet();
|
||||
super.freeBucketEntry(bucketEntry);
|
||||
this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
|
||||
try {
|
||||
this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static class MyBucketCache2 extends BucketCache {
|
||||
private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
|
||||
private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
|
||||
private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread";
|
||||
|
||||
private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2);
|
||||
private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2);
|
||||
private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2);
|
||||
/**
|
||||
* This is used for {@link BucketCache.WriterThread},{@link #CACHE_BLOCK_THREAD_NAME} and
|
||||
* {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed.
|
||||
*/
|
||||
private final CyclicBarrier writeThreadDoneCyclicBarrier = new CyclicBarrier(3);
|
||||
private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
|
||||
private final AtomicInteger removeRamCounter = new AtomicInteger(0);
|
||||
private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
|
||||
private ByteBuff overwiteByteBuff = null;
|
||||
|
||||
public MyBucketCache2(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
|
||||
int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
|
||||
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
|
||||
persistencePath);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
|
||||
super.putIntoBackingMap(key, bucketEntry);
|
||||
/**
|
||||
* The {@link BucketCache.WriterThread} wait for evictCyclicBarrier.await before
|
||||
* {@link MyBucketCache2#removeFromRamCache} for {@link #EVICT_BLOCK_THREAD_NAME}
|
||||
*/
|
||||
try {
|
||||
evictCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait the cyclicBarrier.await() in
|
||||
* {@link TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for
|
||||
* {@link MyBucketCache#getBlock} and Assert completed.
|
||||
*/
|
||||
try {
|
||||
putCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void doDrain(List<RAMQueueEntry> entries) throws InterruptedException {
|
||||
super.doDrain(entries);
|
||||
if (entries.size() > 0) {
|
||||
/**
|
||||
* Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
|
||||
* {@link #EVICT_BLOCK_THREAD_NAME}.
|
||||
*/
|
||||
try {
|
||||
writeThreadDoneCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
|
||||
boolean updateCacheMetrics) {
|
||||
if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
|
||||
/**
|
||||
* Wait for second getCyclicBarrier.await in {@link MyBucketCache2#removeFromRamCache} after
|
||||
* {@link BucketCache#removeFromRamCache}.
|
||||
*/
|
||||
try {
|
||||
getCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
|
||||
boolean firstTime = false;
|
||||
if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
|
||||
int count = this.removeRamCounter.incrementAndGet();
|
||||
firstTime = (count == 1);
|
||||
if (firstTime) {
|
||||
/**
|
||||
* The {@link #EVICT_BLOCK_THREAD_NAME} wait for evictCyclicBarrier.await after
|
||||
* {@link BucketCache#putIntoBackingMap}.
|
||||
*/
|
||||
try {
|
||||
evictCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
boolean result = super.removeFromRamCache(cacheKey);
|
||||
if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
|
||||
if (firstTime) {
|
||||
/**
|
||||
* Wait for getCyclicBarrier.await before {@link BucketCache#getBlock}.
|
||||
*/
|
||||
try {
|
||||
getCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
/**
|
||||
* Wait for Caching Block completed, after Caching Block completed, evictBlock could
|
||||
* continue.
|
||||
*/
|
||||
try {
|
||||
writeThreadDoneCyclicBarrier.await();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry,
|
||||
boolean decrementBlockNumber) {
|
||||
/**
|
||||
* This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create
|
||||
* only one {@link BucketCache.WriterThread}.
|
||||
*/
|
||||
assertTrue(Thread.currentThread() == this.writerThreads[0]);
|
||||
|
||||
blockEvictCounter.incrementAndGet();
|
||||
super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber);
|
||||
}
|
||||
|
||||
/**
|
||||
* Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
|
||||
* {@link BucketEntry} is freed.
|
||||
*/
|
||||
@Override
|
||||
void freeBucketEntry(BucketEntry bucketEntry) {
|
||||
freeBucketEntryCounter.incrementAndGet();
|
||||
super.freeBucketEntry(bucketEntry);
|
||||
this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
|
||||
try {
|
||||
this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) {
|
||||
int byteSize = bucketEntry.getLength();
|
||||
byte[] data = new byte[byteSize];
|
||||
Arrays.fill(data, (byte) 0xff);
|
||||
return ByteBuff.wrap(ByteBuffer.wrap(data));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,7 +73,6 @@ public class TestBucketWriterThread {
|
|||
/**
|
||||
* Set up variables and get BucketCache and WriterThread into state where tests can manually
|
||||
* control the running of WriterThread and BucketCache is empty.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
@ -141,7 +140,7 @@ public class TestBucketWriterThread {
|
|||
RAMQueueEntry rqe = q.remove();
|
||||
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
|
||||
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
|
||||
writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
|
||||
writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
|
||||
this.q.add(spiedRqe);
|
||||
doDrainOfOneEntry(bc, wt, q);
|
||||
// Cache disabled when ioes w/o ever healing.
|
||||
|
@ -163,7 +162,7 @@ public class TestBucketWriterThread {
|
|||
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
|
||||
Mockito.doThrow(cfe).
|
||||
doReturn(mockedBucketEntry).
|
||||
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
|
||||
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
|
||||
this.q.add(spiedRqe);
|
||||
doDrainOfOneEntry(bc, wt, q);
|
||||
}
|
||||
|
@ -172,7 +171,7 @@ public class TestBucketWriterThread {
|
|||
final BlockingQueue<RAMQueueEntry> q)
|
||||
throws InterruptedException {
|
||||
List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
|
||||
wt.doDrain(rqes);
|
||||
bc.doDrain(rqes);
|
||||
assertTrue(q.isEmpty());
|
||||
assertTrue(bc.ramCache.isEmpty());
|
||||
assertEquals(0, bc.heapSize());
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
|||
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.nio.RefCnt;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Assert;
|
||||
|
@ -50,7 +49,7 @@ public class TestByteBufferIOEngine {
|
|||
private long off;
|
||||
|
||||
MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
|
||||
super(offset & 0xFF00, length, 0, false, RefCnt.create(), allocator);
|
||||
super(offset & 0xFF00, length, 0, false, null, allocator);
|
||||
this.off = offset;
|
||||
}
|
||||
|
||||
|
|
|
@ -91,7 +91,7 @@ public class TestRAMCache {
|
|||
MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
|
||||
ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
|
||||
new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
|
||||
RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, ByteBuffAllocator.NONE);
|
||||
RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false);
|
||||
|
||||
Assert.assertNull(cache.putIfAbsent(key, re));
|
||||
Assert.assertEquals(cache.putIfAbsent(key, re), re);
|
||||
|
|
Loading…
Reference in New Issue