HBASE-26281 DBB got from BucketCache would be freed unexpectedly before RPC completed (#3680)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
e8d62139d5
commit
b21b250570
@ -49,6 +49,7 @@ import java.util.concurrent.locks.Lock;
|
|||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.Function;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
@ -419,7 +420,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||||||
boolean wait) {
|
boolean wait) {
|
||||||
if (cacheEnabled) {
|
if (cacheEnabled) {
|
||||||
if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
|
if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
|
||||||
if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) {
|
if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) {
|
||||||
BucketEntry bucketEntry = backingMap.get(cacheKey);
|
BucketEntry bucketEntry = backingMap.get(cacheKey);
|
||||||
if (bucketEntry != null && bucketEntry.isRpcRef()) {
|
if (bucketEntry != null && bucketEntry.isRpcRef()) {
|
||||||
// avoid replace when there are RPC refs for the bucket entry in bucket cache
|
// avoid replace when there are RPC refs for the bucket entry in bucket cache
|
||||||
@ -433,7 +434,11 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
|
protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
|
||||||
|
return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, newBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
|
||||||
boolean inMemory, boolean wait) {
|
boolean inMemory, boolean wait) {
|
||||||
if (!cacheEnabled) {
|
if (!cacheEnabled) {
|
||||||
return;
|
return;
|
||||||
@ -441,8 +446,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||||||
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
|
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
|
||||||
// Stuff the entry into the RAM cache so it can get drained to the persistent store
|
// Stuff the entry into the RAM cache so it can get drained to the persistent store
|
||||||
RAMQueueEntry re =
|
RAMQueueEntry re =
|
||||||
new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory,
|
new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
|
||||||
createRecycler(cacheKey));
|
|
||||||
/**
|
/**
|
||||||
* Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
|
* Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
|
||||||
* key in ramCache, the heap size of bucket cache need to update if replacing entry from
|
* key in ramCache, the heap size of bucket cache need to update if replacing entry from
|
||||||
@ -540,13 +544,25 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is invoked after the bucketEntry is removed from {@link BucketCache#backingMap}
|
||||||
|
*/
|
||||||
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
|
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
|
||||||
bucketAllocator.freeBlock(bucketEntry.offset());
|
bucketEntry.markAsEvicted();
|
||||||
realCacheSize.add(-1 * bucketEntry.getLength());
|
|
||||||
blocksByHFile.remove(cacheKey);
|
blocksByHFile.remove(cacheKey);
|
||||||
if (decrementBlockNumber) {
|
if (decrementBlockNumber) {
|
||||||
this.blockNumber.decrement();
|
this.blockNumber.decrement();
|
||||||
}
|
}
|
||||||
|
cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Free the {{@link BucketEntry} actually,which could only be invoked when the
|
||||||
|
* {@link BucketEntry#refCnt} becoming 0.
|
||||||
|
*/
|
||||||
|
void freeBucketEntry(BucketEntry bucketEntry) {
|
||||||
|
bucketAllocator.freeBlock(bucketEntry.offset());
|
||||||
|
realCacheSize.add(-1 * bucketEntry.getLength());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -554,10 +570,10 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||||||
* 1. Close an HFile, and clear all cached blocks. <br>
|
* 1. Close an HFile, and clear all cached blocks. <br>
|
||||||
* 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br>
|
* 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for a given table.<br>
|
||||||
* <p>
|
* <p>
|
||||||
* Firstly, we'll try to remove the block from RAMCache. If it doesn't exist in RAMCache, then try
|
* Firstly, we'll try to remove the block from RAMCache,and then try to evict from backingMap.
|
||||||
* to evict from backingMap. Here we only need to free the reference from bucket cache by calling
|
* Here we evict the block from backingMap immediately, but only free the reference from bucket
|
||||||
* {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this block, block can
|
* cache by calling {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring this
|
||||||
* only be de-allocated when all of them release the block.
|
* block, block can only be de-allocated when all of them release the block.
|
||||||
* <p>
|
* <p>
|
||||||
* NOTICE: we need to grab the write offset lock firstly before releasing the reference from
|
* NOTICE: we need to grab the write offset lock firstly before releasing the reference from
|
||||||
* bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when
|
* bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt = 0 when
|
||||||
@ -567,43 +583,92 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean evictBlock(BlockCacheKey cacheKey) {
|
public boolean evictBlock(BlockCacheKey cacheKey) {
|
||||||
|
return doEvictBlock(cacheKey, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link BucketCache#backingMap} and
|
||||||
|
* {@link BucketCache#ramCache}. <br/>
|
||||||
|
* NOTE:When Evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
|
||||||
|
* {@link BucketEntry} could be removed.
|
||||||
|
* @param cacheKey {@link BlockCacheKey} to evict.
|
||||||
|
* @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict.
|
||||||
|
* @return true to indicate whether we've evicted successfully or not.
|
||||||
|
*/
|
||||||
|
private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry) {
|
||||||
if (!cacheEnabled) {
|
if (!cacheEnabled) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
boolean existed = removeFromRamCache(cacheKey);
|
boolean existedInRamCache = removeFromRamCache(cacheKey);
|
||||||
BucketEntry be = backingMap.get(cacheKey);
|
if (bucketEntry == null) {
|
||||||
if (be == null) {
|
bucketEntry = backingMap.get(cacheKey);
|
||||||
if (existed) {
|
}
|
||||||
|
final BucketEntry bucketEntryToUse = bucketEntry;
|
||||||
|
|
||||||
|
if (bucketEntryToUse == null) {
|
||||||
|
if (existedInRamCache) {
|
||||||
cacheStats.evicted(0, cacheKey.isPrimary());
|
cacheStats.evicted(0, cacheKey.isPrimary());
|
||||||
}
|
}
|
||||||
return existed;
|
return existedInRamCache;
|
||||||
} else {
|
} else {
|
||||||
return be.withWriteLock(offsetLock, be::markAsEvicted);
|
return bucketEntryToUse.withWriteLock(offsetLock, () -> {
|
||||||
|
if (backingMap.remove(cacheKey, bucketEntryToUse)) {
|
||||||
|
blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Recycler createRecycler(BlockCacheKey cacheKey) {
|
/**
|
||||||
|
* <pre>
|
||||||
|
* Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be used as
|
||||||
|
* {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link BucketCache#getBlock}.
|
||||||
|
* NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of {@link HFileBlock#buf}
|
||||||
|
* from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are different:
|
||||||
|
* 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#backingMap},
|
||||||
|
* it is the return value of current {@link BucketCache#createRecycler} method.
|
||||||
|
*
|
||||||
|
* 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link BucketCache#ramCache},
|
||||||
|
* it is {@link ByteBuffAllocator#putbackBuffer}.
|
||||||
|
* </pre>
|
||||||
|
*/
|
||||||
|
protected Recycler createRecycler(final BucketEntry bucketEntry) {
|
||||||
return () -> {
|
return () -> {
|
||||||
if (!cacheEnabled) {
|
freeBucketEntry(bucketEntry);
|
||||||
return;
|
return;
|
||||||
}
|
|
||||||
boolean existed = removeFromRamCache(cacheKey);
|
|
||||||
BucketEntry be = backingMap.get(cacheKey);
|
|
||||||
if (be == null && existed) {
|
|
||||||
cacheStats.evicted(0, cacheKey.isPrimary());
|
|
||||||
} else if (be != null) {
|
|
||||||
be.withWriteLock(offsetLock, () -> {
|
|
||||||
if (backingMap.remove(cacheKey, be)) {
|
|
||||||
blockEvicted(cacheKey, be, !existed);
|
|
||||||
cacheStats.evicted(be.getCachedTime(), cacheKey.isPrimary());
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean removeFromRamCache(BlockCacheKey cacheKey) {
|
/**
|
||||||
|
* NOTE: This method is only for test.
|
||||||
|
*/
|
||||||
|
public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) {
|
||||||
|
BucketEntry bucketEntry = backingMap.get(blockCacheKey);
|
||||||
|
if (bucketEntry == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} only if
|
||||||
|
* {@link BucketEntry#isRpcRef} is false. <br/>
|
||||||
|
* NOTE:When evict from {@link BucketCache#backingMap},only the matched {@link BlockCacheKey} and
|
||||||
|
* {@link BucketEntry} could be removed.
|
||||||
|
* @param blockCacheKey {@link BlockCacheKey} to evict.
|
||||||
|
* @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to evict.
|
||||||
|
* @return true to indicate whether we've evicted successfully or not.
|
||||||
|
*/
|
||||||
|
boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, BucketEntry bucketEntry) {
|
||||||
|
if (!bucketEntry.isRpcRef()) {
|
||||||
|
return doEvictBlock(blockCacheKey, bucketEntry);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
|
||||||
return ramCache.remove(cacheKey, re -> {
|
return ramCache.remove(cacheKey, re -> {
|
||||||
if (re != null) {
|
if (re != null) {
|
||||||
this.blockNumber.decrement();
|
this.blockNumber.decrement();
|
||||||
@ -707,7 +772,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||||||
bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded);
|
bucketAllocator.getLeastFilledBuckets(inUseBuckets, completelyFreeBucketsNeeded);
|
||||||
for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
|
for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
|
||||||
if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) {
|
if (candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset()))) {
|
||||||
entry.getValue().withWriteLock(offsetLock, entry.getValue()::markStaleAsEvicted);
|
evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -896,134 +961,134 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||||||
}
|
}
|
||||||
LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
|
LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
|
* Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
|
||||||
* cache with a new block for the same cache key. there's a corner case: one thread cache a
|
* cache with a new block for the same cache key. there's a corner case: one thread cache a block
|
||||||
* block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another
|
* in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another new block
|
||||||
* new block with the same cache key do the same thing for the same cache key, so if not evict
|
* with the same cache key do the same thing for the same cache key, so if not evict the previous
|
||||||
* the previous bucket entry, then memory leak happen because the previous bucketEntry is gone
|
* bucket entry, then memory leak happen because the previous bucketEntry is gone but the
|
||||||
* but the bucketAllocator do not free its memory.
|
* bucketAllocator do not free its memory.
|
||||||
* @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
|
* @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
|
||||||
* cacheKey, Cacheable newBlock)
|
* cacheKey, Cacheable newBlock)
|
||||||
* @param key Block cache key
|
* @param key Block cache key
|
||||||
* @param bucketEntry Bucket entry to put into backingMap.
|
* @param bucketEntry Bucket entry to put into backingMap.
|
||||||
*/
|
*/
|
||||||
private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
|
protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
|
||||||
BucketEntry previousEntry = backingMap.put(key, bucketEntry);
|
BucketEntry previousEntry = backingMap.put(key, bucketEntry);
|
||||||
if (previousEntry != null && previousEntry != bucketEntry) {
|
if (previousEntry != null && previousEntry != bucketEntry) {
|
||||||
previousEntry.withWriteLock(offsetLock, () -> {
|
previousEntry.withWriteLock(offsetLock, () -> {
|
||||||
blockEvicted(key, previousEntry, false);
|
blockEvicted(key, previousEntry, false);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. Process all that
|
||||||
|
* are passed in even if failure being sure to remove from ramCache else we'll never undo the
|
||||||
|
* references and we'll OOME.
|
||||||
|
* @param entries Presumes list passed in here will be processed by this invocation only. No
|
||||||
|
* interference expected.
|
||||||
|
*/
|
||||||
|
void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
|
||||||
|
if (entries.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// This method is a little hard to follow. We run through the passed in entries and for each
|
||||||
|
// successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
|
||||||
|
// do cleanup making sure we've cleared ramCache of all entries regardless of whether we
|
||||||
|
// successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
|
||||||
|
// filling ramCache. We do the clean up by again running through the passed in entries
|
||||||
|
// doing extra work when we find a non-null bucketEntries corresponding entry.
|
||||||
|
final int size = entries.size();
|
||||||
|
BucketEntry[] bucketEntries = new BucketEntry[size];
|
||||||
|
// Index updated inside loop if success or if we can't succeed. We retry if cache is full
|
||||||
|
// when we go to add an entry by going around the loop again without upping the index.
|
||||||
|
int index = 0;
|
||||||
|
while (cacheEnabled && index < size) {
|
||||||
|
RAMQueueEntry re = null;
|
||||||
|
try {
|
||||||
|
re = entries.get(index);
|
||||||
|
if (re == null) {
|
||||||
|
LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
|
||||||
|
index++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
|
||||||
|
(entry) -> createRecycler(entry));
|
||||||
|
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
|
||||||
|
bucketEntries[index] = bucketEntry;
|
||||||
|
if (ioErrorStartTime > 0) {
|
||||||
|
ioErrorStartTime = -1;
|
||||||
|
}
|
||||||
|
index++;
|
||||||
|
} catch (BucketAllocatorException fle) {
|
||||||
|
LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
|
||||||
|
// Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
|
||||||
|
bucketEntries[index] = null;
|
||||||
|
index++;
|
||||||
|
} catch (CacheFullException cfe) {
|
||||||
|
// Cache full when we tried to add. Try freeing space and then retrying (don't up index)
|
||||||
|
if (!freeInProgress) {
|
||||||
|
freeSpace("Full!");
|
||||||
|
} else {
|
||||||
|
Thread.sleep(50);
|
||||||
|
}
|
||||||
|
} catch (IOException ioex) {
|
||||||
|
// Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
|
||||||
|
LOG.error("Failed writing to bucket cache", ioex);
|
||||||
|
checkIOErrorIsTolerated();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure data pages are written on media before we update maps.
|
||||||
|
try {
|
||||||
|
ioEngine.sync();
|
||||||
|
} catch (IOException ioex) {
|
||||||
|
LOG.error("Failed syncing IO engine", ioex);
|
||||||
|
checkIOErrorIsTolerated();
|
||||||
|
// Since we failed sync, free the blocks in bucket allocator
|
||||||
|
for (int i = 0; i < entries.size(); ++i) {
|
||||||
|
if (bucketEntries[i] != null) {
|
||||||
|
bucketAllocator.freeBlock(bucketEntries[i].offset());
|
||||||
|
bucketEntries[i] = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
|
||||||
|
// success or error.
|
||||||
|
for (int i = 0; i < size; ++i) {
|
||||||
|
BlockCacheKey key = entries.get(i).getKey();
|
||||||
|
// Only add if non-null entry.
|
||||||
|
if (bucketEntries[i] != null) {
|
||||||
|
putIntoBackingMap(key, bucketEntries[i]);
|
||||||
|
}
|
||||||
|
// Always remove from ramCache even if we failed adding it to the block cache above.
|
||||||
|
boolean existed = ramCache.remove(key, re -> {
|
||||||
|
if (re != null) {
|
||||||
|
heapSize.add(-1 * re.getData().heapSize());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (!existed && bucketEntries[i] != null) {
|
||||||
|
// Block should have already been evicted. Remove it and free space.
|
||||||
|
final BucketEntry bucketEntry = bucketEntries[i];
|
||||||
|
bucketEntry.withWriteLock(offsetLock, () -> {
|
||||||
|
if (backingMap.remove(key, bucketEntry)) {
|
||||||
|
blockEvicted(key, bucketEntry, false);
|
||||||
|
}
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
long used = bucketAllocator.getUsedSize();
|
||||||
* Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
|
if (used > acceptableSize()) {
|
||||||
* Process all that are passed in even if failure being sure to remove from ramCache else we'll
|
freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
|
||||||
* never undo the references and we'll OOME.
|
|
||||||
* @param entries Presumes list passed in here will be processed by this invocation only. No
|
|
||||||
* interference expected.
|
|
||||||
* @throws InterruptedException
|
|
||||||
*/
|
|
||||||
void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
|
|
||||||
if (entries.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// This method is a little hard to follow. We run through the passed in entries and for each
|
|
||||||
// successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
|
|
||||||
// do cleanup making sure we've cleared ramCache of all entries regardless of whether we
|
|
||||||
// successfully added the item to the bucketcache; if we don't do the cleanup, we'll OOME by
|
|
||||||
// filling ramCache. We do the clean up by again running through the passed in entries
|
|
||||||
// doing extra work when we find a non-null bucketEntries corresponding entry.
|
|
||||||
final int size = entries.size();
|
|
||||||
BucketEntry[] bucketEntries = new BucketEntry[size];
|
|
||||||
// Index updated inside loop if success or if we can't succeed. We retry if cache is full
|
|
||||||
// when we go to add an entry by going around the loop again without upping the index.
|
|
||||||
int index = 0;
|
|
||||||
while (cacheEnabled && index < size) {
|
|
||||||
RAMQueueEntry re = null;
|
|
||||||
try {
|
|
||||||
re = entries.get(index);
|
|
||||||
if (re == null) {
|
|
||||||
LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");
|
|
||||||
index++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize);
|
|
||||||
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
|
|
||||||
bucketEntries[index] = bucketEntry;
|
|
||||||
if (ioErrorStartTime > 0) {
|
|
||||||
ioErrorStartTime = -1;
|
|
||||||
}
|
|
||||||
index++;
|
|
||||||
} catch (BucketAllocatorException fle) {
|
|
||||||
LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);
|
|
||||||
// Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.
|
|
||||||
bucketEntries[index] = null;
|
|
||||||
index++;
|
|
||||||
} catch (CacheFullException cfe) {
|
|
||||||
// Cache full when we tried to add. Try freeing space and then retrying (don't up index)
|
|
||||||
if (!freeInProgress) {
|
|
||||||
freeSpace("Full!");
|
|
||||||
} else {
|
|
||||||
Thread.sleep(50);
|
|
||||||
}
|
|
||||||
} catch (IOException ioex) {
|
|
||||||
// Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.
|
|
||||||
LOG.error("Failed writing to bucket cache", ioex);
|
|
||||||
checkIOErrorIsTolerated();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure data pages are written on media before we update maps.
|
|
||||||
try {
|
|
||||||
ioEngine.sync();
|
|
||||||
} catch (IOException ioex) {
|
|
||||||
LOG.error("Failed syncing IO engine", ioex);
|
|
||||||
checkIOErrorIsTolerated();
|
|
||||||
// Since we failed sync, free the blocks in bucket allocator
|
|
||||||
for (int i = 0; i < entries.size(); ++i) {
|
|
||||||
if (bucketEntries[i] != null) {
|
|
||||||
bucketAllocator.freeBlock(bucketEntries[i].offset());
|
|
||||||
bucketEntries[i] = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now add to backingMap if successfully added to bucket cache. Remove from ramCache if
|
|
||||||
// success or error.
|
|
||||||
for (int i = 0; i < size; ++i) {
|
|
||||||
BlockCacheKey key = entries.get(i).getKey();
|
|
||||||
// Only add if non-null entry.
|
|
||||||
if (bucketEntries[i] != null) {
|
|
||||||
putIntoBackingMap(key, bucketEntries[i]);
|
|
||||||
}
|
|
||||||
// Always remove from ramCache even if we failed adding it to the block cache above.
|
|
||||||
boolean existed = ramCache.remove(key, re -> {
|
|
||||||
if (re != null) {
|
|
||||||
heapSize.add(-1 * re.getData().heapSize());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if (!existed && bucketEntries[i] != null) {
|
|
||||||
// Block should have already been evicted. Remove it and free space.
|
|
||||||
final BucketEntry bucketEntry = bucketEntries[i];
|
|
||||||
bucketEntry.withWriteLock(offsetLock, () -> {
|
|
||||||
if (backingMap.remove(key, bucketEntry)) {
|
|
||||||
blockEvicted(key, bucketEntry, false);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
long used = bucketAllocator.getUsedSize();
|
|
||||||
if (used > acceptableSize()) {
|
|
||||||
freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1313,8 +1378,9 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||||||
// TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
|
// TODO avoid a cycling siutation. We find no block which is not in use and so no way to free
|
||||||
// What to do then? Caching attempt fail? Need some changes in cacheBlock API?
|
// What to do then? Caching attempt fail? Need some changes in cacheBlock API?
|
||||||
while ((entry = queue.pollLast()) != null) {
|
while ((entry = queue.pollLast()) != null) {
|
||||||
|
BlockCacheKey blockCacheKey = entry.getKey();
|
||||||
BucketEntry be = entry.getValue();
|
BucketEntry be = entry.getValue();
|
||||||
if (be.withWriteLock(offsetLock, be::markStaleAsEvicted)) {
|
if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) {
|
||||||
freedBytes += be.getLength();
|
freedBytes += be.getLength();
|
||||||
}
|
}
|
||||||
if (freedBytes >= toFree) {
|
if (freedBytes >= toFree) {
|
||||||
@ -1341,15 +1407,12 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||||||
private final Cacheable data;
|
private final Cacheable data;
|
||||||
private long accessCounter;
|
private long accessCounter;
|
||||||
private boolean inMemory;
|
private boolean inMemory;
|
||||||
private final Recycler recycler;
|
|
||||||
|
|
||||||
RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory,
|
RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, boolean inMemory) {
|
||||||
Recycler recycler) {
|
|
||||||
this.key = bck;
|
this.key = bck;
|
||||||
this.data = data;
|
this.data = data;
|
||||||
this.accessCounter = accessCounter;
|
this.accessCounter = accessCounter;
|
||||||
this.inMemory = inMemory;
|
this.inMemory = inMemory;
|
||||||
this.recycler = recycler;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Cacheable getData() {
|
public Cacheable getData() {
|
||||||
@ -1372,7 +1435,8 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
|
public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
|
||||||
final LongAdder realCacheSize) throws IOException {
|
final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler)
|
||||||
|
throws IOException {
|
||||||
int len = data.getSerializedLength();
|
int len = data.getSerializedLength();
|
||||||
// This cacheable thing can't be serialized
|
// This cacheable thing can't be serialized
|
||||||
if (len == 0) {
|
if (len == 0) {
|
||||||
@ -1382,7 +1446,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||||||
boolean succ = false;
|
boolean succ = false;
|
||||||
BucketEntry bucketEntry = null;
|
BucketEntry bucketEntry = null;
|
||||||
try {
|
try {
|
||||||
bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, RefCnt.create(recycler),
|
bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, createRecycler,
|
||||||
getByteBuffAllocator());
|
getByteBuffAllocator());
|
||||||
bucketEntry.setDeserializerReference(data.getDeserializer());
|
bucketEntry.setDeserializerReference(data.getDeserializer());
|
||||||
if (data instanceof HFileBlock) {
|
if (data instanceof HFileBlock) {
|
||||||
|
@ -25,8 +25,10 @@ import java.nio.ByteBuffer;
|
|||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||||
|
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
|
||||||
import org.apache.hadoop.hbase.io.hfile.BlockPriority;
|
import org.apache.hadoop.hbase.io.hfile.BlockPriority;
|
||||||
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
|
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
|
||||||
@ -88,16 +90,21 @@ class BucketEntry implements HBaseReferenceCounted {
|
|||||||
private final long cachedTime = System.nanoTime();
|
private final long cachedTime = System.nanoTime();
|
||||||
|
|
||||||
BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
|
BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
|
||||||
this(offset, length, accessCounter, inMemory, RefCnt.create(), ByteBuffAllocator.HEAP);
|
this(offset, length, accessCounter, inMemory, null, ByteBuffAllocator.HEAP);
|
||||||
}
|
}
|
||||||
|
|
||||||
BucketEntry(long offset, int length, long accessCounter, boolean inMemory, RefCnt refCnt,
|
BucketEntry(long offset, int length, long accessCounter, boolean inMemory,
|
||||||
|
Function<BucketEntry, Recycler> createRecycler,
|
||||||
ByteBuffAllocator allocator) {
|
ByteBuffAllocator allocator) {
|
||||||
setOffset(offset);
|
setOffset(offset);
|
||||||
this.length = length;
|
this.length = length;
|
||||||
this.accessCounter = accessCounter;
|
this.accessCounter = accessCounter;
|
||||||
this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
|
this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
|
||||||
this.refCnt = refCnt;
|
if (createRecycler == null) {
|
||||||
|
this.refCnt = RefCnt.create();
|
||||||
|
} else {
|
||||||
|
this.refCnt = RefCnt.create(createRecycler.apply(this));
|
||||||
|
}
|
||||||
this.markedAsEvicted = new AtomicBoolean(false);
|
this.markedAsEvicted = new AtomicBoolean(false);
|
||||||
this.allocator = allocator;
|
this.allocator = allocator;
|
||||||
}
|
}
|
||||||
@ -165,19 +172,6 @@ class BucketEntry implements HBaseReferenceCounted {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Mark as evicted only when NO RPC references. Mainly used for eviction when cache size exceed
|
|
||||||
* the max acceptable size.
|
|
||||||
* @return true if we deallocate this entry successfully.
|
|
||||||
*/
|
|
||||||
boolean markStaleAsEvicted() {
|
|
||||||
if (!markedAsEvicted.get() && this.refCnt() == 1) {
|
|
||||||
// The only reference was coming from backingMap, now release the stale entry.
|
|
||||||
return this.markAsEvicted();
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check whether have some RPC patch referring this block. There're two case: <br>
|
* Check whether have some RPC patch referring this block. There're two case: <br>
|
||||||
* 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br>
|
* 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br>
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -43,6 +44,8 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
|||||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
|
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
|
||||||
import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
|
import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
@ -372,7 +375,11 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
|
|||||||
CachedBlock next = iterator.next();
|
CachedBlock next = iterator.next();
|
||||||
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
|
BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
|
||||||
cacheList.add(cacheKey);
|
cacheList.add(cacheKey);
|
||||||
cache.evictBlock(cacheKey);
|
/**
|
||||||
|
* There is only one Block referenced by rpc,here we evict blocks which have no rpc
|
||||||
|
* referenced.
|
||||||
|
*/
|
||||||
|
evictBlock(cache, cacheKey);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
Thread.sleep(1);
|
Thread.sleep(1);
|
||||||
@ -437,4 +444,20 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
|
|||||||
table.close();
|
table.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For {@link BucketCache},we only evict Block if there is no rpc referenced.
|
||||||
|
*/
|
||||||
|
private void evictBlock(BlockCache blockCache, BlockCacheKey blockCacheKey) {
|
||||||
|
assertTrue(blockCache instanceof CombinedBlockCache);
|
||||||
|
BlockCache[] blockCaches = blockCache.getBlockCaches();
|
||||||
|
for (BlockCache currentBlockCache : blockCaches) {
|
||||||
|
if (currentBlockCache instanceof BucketCache) {
|
||||||
|
((BucketCache) currentBlockCache).evictBlockIfNoRpcReferenced(blockCacheKey);
|
||||||
|
} else {
|
||||||
|
currentBlockCache.evictBlock(blockCacheKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -249,8 +249,26 @@ public class TestBucketCache {
|
|||||||
assertEquals(1, cache.getBlockCount());
|
assertEquals(1, cache.getBlockCount());
|
||||||
lock.writeLock().unlock();
|
lock.writeLock().unlock();
|
||||||
evictThread.join();
|
evictThread.join();
|
||||||
assertEquals(0, cache.getBlockCount());
|
/**
|
||||||
assertEquals(cache.getCurrentSize(), 0L);
|
* <pre>
|
||||||
|
* The asserts here before HBASE-21957 are:
|
||||||
|
* assertEquals(1L, cache.getBlockCount());
|
||||||
|
* assertTrue(cache.getCurrentSize() > 0L);
|
||||||
|
* assertTrue("We should have a block!", cache.iterator().hasNext());
|
||||||
|
*
|
||||||
|
* The asserts here after HBASE-21957 are:
|
||||||
|
* assertEquals(0, cache.getBlockCount());
|
||||||
|
* assertEquals(cache.getCurrentSize(), 0L);
|
||||||
|
*
|
||||||
|
* I think the asserts before HBASE-21957 is more reasonable,because
|
||||||
|
* {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
|
||||||
|
* it had seen, and newly added Block after the {@link BucketEntry}
|
||||||
|
* it had seen should not be evicted.
|
||||||
|
* </pre>
|
||||||
|
*/
|
||||||
|
assertEquals(1L, cache.getBlockCount());
|
||||||
|
assertTrue(cache.getCurrentSize() > 0L);
|
||||||
|
assertTrue("We should have a block!", cache.iterator().hasNext());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -503,8 +521,8 @@ public class TestBucketCache {
|
|||||||
HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
|
HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
|
||||||
HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
|
HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
|
||||||
HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
|
HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
|
||||||
RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, ByteBuffAllocator.NONE);
|
RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false);
|
||||||
RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, ByteBuffAllocator.NONE);
|
RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false);
|
||||||
|
|
||||||
assertFalse(cache.containsKey(key1));
|
assertFalse(cache.containsKey(key1));
|
||||||
assertNull(cache.putIfAbsent(key1, re1));
|
assertNull(cache.putIfAbsent(key1, re1));
|
||||||
@ -551,11 +569,11 @@ public class TestBucketCache {
|
|||||||
BucketAllocator allocator = new BucketAllocator(availableSpace, null);
|
BucketAllocator allocator = new BucketAllocator(availableSpace, null);
|
||||||
|
|
||||||
BlockCacheKey key = new BlockCacheKey("dummy", 1L);
|
BlockCacheKey key = new BlockCacheKey("dummy", 1L);
|
||||||
RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, ByteBuffAllocator.NONE);
|
RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true);
|
||||||
|
|
||||||
Assert.assertEquals(0, allocator.getUsedSize());
|
Assert.assertEquals(0, allocator.getUsedSize());
|
||||||
try {
|
try {
|
||||||
re.writeToCache(ioEngine, allocator, null);
|
re.writeToCache(ioEngine, allocator, null, null);
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
}
|
}
|
||||||
|
@ -25,11 +25,17 @@ import static org.junit.Assert.assertTrue;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
|
||||||
import org.apache.hadoop.hbase.io.hfile.BlockType;
|
import org.apache.hadoop.hbase.io.hfile.BlockType;
|
||||||
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||||
@ -37,6 +43,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
|||||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
|
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
|
||||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||||
|
import org.apache.hadoop.hbase.nio.RefCnt;
|
||||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
@ -65,6 +72,18 @@ public class TestBucketCacheRefCnt {
|
|||||||
queueSize, PERSISTENCE_PATH);
|
queueSize, PERSISTENCE_PATH);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static MyBucketCache createMyBucketCache(int writerSize, int queueSize)
|
||||||
|
throws IOException {
|
||||||
|
return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
|
||||||
|
queueSize, PERSISTENCE_PATH);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MyBucketCache2 createMyBucketCache2(int writerSize, int queueSize)
|
||||||
|
throws IOException {
|
||||||
|
return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, BLOCK_SIZE_ARRAY, writerSize,
|
||||||
|
queueSize, PERSISTENCE_PATH);
|
||||||
|
}
|
||||||
|
|
||||||
private static HFileBlock createBlock(int offset, int size) {
|
private static HFileBlock createBlock(int offset, int size) {
|
||||||
return createBlock(offset, size, ByteBuffAllocator.HEAP);
|
return createBlock(offset, size, ByteBuffAllocator.HEAP);
|
||||||
}
|
}
|
||||||
@ -133,8 +152,10 @@ public class TestBucketCacheRefCnt {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitUntilFlushedToCache(BlockCacheKey key) throws InterruptedException {
|
private static void waitUntilFlushedToCache(BucketCache bucketCache, BlockCacheKey blockCacheKey)
|
||||||
while (!cache.backingMap.containsKey(key) || cache.ramCache.containsKey(key)) {
|
throws InterruptedException {
|
||||||
|
while (!bucketCache.backingMap.containsKey(blockCacheKey)
|
||||||
|
|| bucketCache.ramCache.containsKey(blockCacheKey)) {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
@ -148,7 +169,7 @@ public class TestBucketCacheRefCnt {
|
|||||||
HFileBlock blk = createBlock(200, 1020, alloc);
|
HFileBlock blk = createBlock(200, 1020, alloc);
|
||||||
BlockCacheKey key = createKey("testHFile-00", 200);
|
BlockCacheKey key = createKey("testHFile-00", 200);
|
||||||
cache.cacheBlock(key, blk);
|
cache.cacheBlock(key, blk);
|
||||||
waitUntilFlushedToCache(key);
|
waitUntilFlushedToCache(cache, key);
|
||||||
assertEquals(1, blk.refCnt());
|
assertEquals(1, blk.refCnt());
|
||||||
|
|
||||||
Cacheable block = cache.getBlock(key, false, false, false);
|
Cacheable block = cache.getBlock(key, false, false, false);
|
||||||
@ -180,17 +201,18 @@ public class TestBucketCacheRefCnt {
|
|||||||
assertFalse(block.release());
|
assertFalse(block.release());
|
||||||
assertEquals(1, block.refCnt());
|
assertEquals(1, block.refCnt());
|
||||||
|
|
||||||
newBlock = cache.getBlock(key, false, false, false);
|
/**
|
||||||
assertEquals(2, block.refCnt());
|
* The key was evicted from {@link BucketCache#backingMap} and {@link BucketCache#ramCache},
|
||||||
assertEquals(2, newBlock.refCnt());
|
* so {@link BucketCache#getBlock} return null.
|
||||||
|
*/
|
||||||
|
Cacheable newestBlock = cache.getBlock(key, false, false, false);
|
||||||
|
assertNull(newestBlock);
|
||||||
|
assertEquals(1, block.refCnt());
|
||||||
assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
|
assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
|
||||||
|
|
||||||
// Release the block
|
// Release the block
|
||||||
assertFalse(block.release());
|
assertTrue(block.release());
|
||||||
assertEquals(1, block.refCnt());
|
assertEquals(0, block.refCnt());
|
||||||
|
|
||||||
// Release the newBlock;
|
|
||||||
assertTrue(newBlock.release());
|
|
||||||
assertEquals(0, newBlock.refCnt());
|
assertEquals(0, newBlock.refCnt());
|
||||||
} finally {
|
} finally {
|
||||||
cache.shutdown();
|
cache.shutdown();
|
||||||
@ -247,7 +269,7 @@ public class TestBucketCacheRefCnt {
|
|||||||
HFileBlock blk = createBlock(200, 1020);
|
HFileBlock blk = createBlock(200, 1020);
|
||||||
BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200);
|
BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200);
|
||||||
cache.cacheBlock(key, blk);
|
cache.cacheBlock(key, blk);
|
||||||
waitUntilFlushedToCache(key);
|
waitUntilFlushedToCache(cache, key);
|
||||||
assertEquals(1, blk.refCnt());
|
assertEquals(1, blk.refCnt());
|
||||||
assertNotNull(cache.backingMap.get(key));
|
assertNotNull(cache.backingMap.get(key));
|
||||||
assertEquals(1, cache.backingMap.get(key).refCnt());
|
assertEquals(1, cache.backingMap.get(key).refCnt());
|
||||||
@ -260,7 +282,7 @@ public class TestBucketCacheRefCnt {
|
|||||||
assertEquals(2, be1.refCnt());
|
assertEquals(2, be1.refCnt());
|
||||||
|
|
||||||
// We've some RPC reference, so it won't have any effect.
|
// We've some RPC reference, so it won't have any effect.
|
||||||
assertFalse(be1.markStaleAsEvicted());
|
assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
|
||||||
assertEquals(2, block1.refCnt());
|
assertEquals(2, block1.refCnt());
|
||||||
assertEquals(2, cache.backingMap.get(key).refCnt());
|
assertEquals(2, cache.backingMap.get(key).refCnt());
|
||||||
|
|
||||||
@ -270,7 +292,7 @@ public class TestBucketCacheRefCnt {
|
|||||||
assertEquals(1, cache.backingMap.get(key).refCnt());
|
assertEquals(1, cache.backingMap.get(key).refCnt());
|
||||||
|
|
||||||
// Mark the stale as evicted again, it'll do the de-allocation.
|
// Mark the stale as evicted again, it'll do the de-allocation.
|
||||||
assertTrue(be1.markStaleAsEvicted());
|
assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
|
||||||
assertEquals(0, block1.refCnt());
|
assertEquals(0, block1.refCnt());
|
||||||
assertNull(cache.backingMap.get(key));
|
assertNull(cache.backingMap.get(key));
|
||||||
assertEquals(0, cache.size());
|
assertEquals(0, cache.size());
|
||||||
@ -278,4 +300,445 @@ public class TestBucketCacheRefCnt {
|
|||||||
cache.shutdown();
|
cache.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <pre>
|
||||||
|
* This test is for HBASE-26281,
|
||||||
|
* test two threads for replacing Block and getting Block execute concurrently.
|
||||||
|
* The threads sequence is:
|
||||||
|
* 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1.
|
||||||
|
* 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied
|
||||||
|
* {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would
|
||||||
|
* replace Block1, but thread1 stopping before {@link BucketCache#cacheBlockWithWaitInternal}
|
||||||
|
* 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
|
||||||
|
* which returned Block1, the {@link RefCnt} of Block1 is 2.
|
||||||
|
* 4. Thread1 continues caching Block2, in {@link BucketCache.WriterThread#putIntoBackingMap},
|
||||||
|
* the old Block1 is freed directly which {@link RefCnt} is 2, but the Block1 is still used
|
||||||
|
* by Thread2 and the content of Block1 would be overwritten after it is freed, which may
|
||||||
|
* cause a serious error.
|
||||||
|
* </pre>
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testReplacingBlockAndGettingBlockConcurrently() throws Exception {
|
||||||
|
ByteBuffAllocator byteBuffAllocator =
|
||||||
|
ByteBuffAllocator.create(HBaseConfiguration.create(), true);
|
||||||
|
final MyBucketCache myBucketCache = createMyBucketCache(1, 1000);
|
||||||
|
try {
|
||||||
|
HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
|
||||||
|
final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 200);
|
||||||
|
myBucketCache.cacheBlock(blockCacheKey, hfileBlock);
|
||||||
|
waitUntilFlushedToCache(myBucketCache, blockCacheKey);
|
||||||
|
assertEquals(1, hfileBlock.refCnt());
|
||||||
|
|
||||||
|
assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey));
|
||||||
|
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
|
||||||
|
Thread cacheBlockThread = new Thread(() -> {
|
||||||
|
try {
|
||||||
|
HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator);
|
||||||
|
myBucketCache.cacheBlock(blockCacheKey, newHFileBlock);
|
||||||
|
waitUntilFlushedToCache(myBucketCache, blockCacheKey);
|
||||||
|
|
||||||
|
} catch (Throwable exception) {
|
||||||
|
exceptionRef.set(exception);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME);
|
||||||
|
cacheBlockThread.start();
|
||||||
|
|
||||||
|
String oldThreadName = Thread.currentThread().getName();
|
||||||
|
HFileBlock gotHFileBlock = null;
|
||||||
|
try {
|
||||||
|
|
||||||
|
Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME);
|
||||||
|
|
||||||
|
gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, false, false, false));
|
||||||
|
assertTrue(gotHFileBlock.equals(hfileBlock));
|
||||||
|
assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
|
||||||
|
assertEquals(2, gotHFileBlock.refCnt());
|
||||||
|
/**
|
||||||
|
* Release the second cyclicBarrier.await in
|
||||||
|
* {@link MyBucketCache#cacheBlockWithWaitInternal}
|
||||||
|
*/
|
||||||
|
myBucketCache.cyclicBarrier.await();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
Thread.currentThread().setName(oldThreadName);
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheBlockThread.join();
|
||||||
|
assertTrue(exceptionRef.get() == null);
|
||||||
|
assertEquals(1, gotHFileBlock.refCnt());
|
||||||
|
assertTrue(gotHFileBlock.equals(hfileBlock));
|
||||||
|
assertTrue(myBucketCache.overwiteByteBuff == null);
|
||||||
|
assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0);
|
||||||
|
|
||||||
|
gotHFileBlock.release();
|
||||||
|
assertEquals(0, gotHFileBlock.refCnt());
|
||||||
|
assertTrue(myBucketCache.overwiteByteBuff != null);
|
||||||
|
assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1);
|
||||||
|
assertTrue(myBucketCache.replaceCounter.get() == 1);
|
||||||
|
assertTrue(myBucketCache.blockEvictCounter.get() == 1);
|
||||||
|
} finally {
|
||||||
|
myBucketCache.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <pre>
|
||||||
|
* This test also is for HBASE-26281,
|
||||||
|
* test three threads for evicting Block,caching Block and getting Block
|
||||||
|
* execute concurrently.
|
||||||
|
* 1. Thread1 caching Block1, stopping after {@link BucketCache.WriterThread#putIntoBackingMap},
|
||||||
|
* the {@link RefCnt} of Block1 is 1.
|
||||||
|
* 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link BlockCacheKey},
|
||||||
|
* but stopping after {@link BucketCache#removeFromRamCache}.
|
||||||
|
* 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link BlockCacheKey},
|
||||||
|
* which returned Block1, the {@link RefCnt} of Block1 is 2.
|
||||||
|
* 4. Thread1 continues caching block1,but finding that {@link BucketCache.RAMCache#remove}
|
||||||
|
* returning false, so invoking {@link BucketCache#blockEvicted} to free the the Block1
|
||||||
|
* directly which {@link RefCnt} is 2 and the Block1 is still used by Thread3.
|
||||||
|
* </pre>
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws Exception {
|
||||||
|
ByteBuffAllocator byteBuffAllocator =
|
||||||
|
ByteBuffAllocator.create(HBaseConfiguration.create(), true);
|
||||||
|
final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000);
|
||||||
|
try {
|
||||||
|
final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
|
||||||
|
final BlockCacheKey blockCacheKey = createKey("testThreeThreadConcurrent", 200);
|
||||||
|
final AtomicReference<Throwable> cacheBlockThreadExceptionRef =
|
||||||
|
new AtomicReference<Throwable>();
|
||||||
|
Thread cacheBlockThread = new Thread(() -> {
|
||||||
|
try {
|
||||||
|
myBucketCache2.cacheBlock(blockCacheKey, hfileBlock);
|
||||||
|
/**
|
||||||
|
* Wait for Caching Block completed.
|
||||||
|
*/
|
||||||
|
myBucketCache2.writeThreadDoneCyclicBarrier.await();
|
||||||
|
} catch (Throwable exception) {
|
||||||
|
cacheBlockThreadExceptionRef.set(exception);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME);
|
||||||
|
cacheBlockThread.start();
|
||||||
|
|
||||||
|
final AtomicReference<Throwable> evictBlockThreadExceptionRef =
|
||||||
|
new AtomicReference<Throwable>();
|
||||||
|
Thread evictBlockThread = new Thread(() -> {
|
||||||
|
try {
|
||||||
|
myBucketCache2.evictBlock(blockCacheKey);
|
||||||
|
} catch (Throwable exception) {
|
||||||
|
evictBlockThreadExceptionRef.set(exception);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME);
|
||||||
|
evictBlockThread.start();
|
||||||
|
|
||||||
|
String oldThreadName = Thread.currentThread().getName();
|
||||||
|
HFileBlock gotHFileBlock = null;
|
||||||
|
try {
|
||||||
|
Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME);
|
||||||
|
gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, false, false, false));
|
||||||
|
assertTrue(gotHFileBlock.equals(hfileBlock));
|
||||||
|
assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
|
||||||
|
assertEquals(2, gotHFileBlock.refCnt());
|
||||||
|
try {
|
||||||
|
/**
|
||||||
|
* Release the second cyclicBarrier.await in {@link MyBucketCache2#putIntoBackingMap} for
|
||||||
|
* {@link BucketCache.WriterThread},getBlock completed,{@link BucketCache.WriterThread}
|
||||||
|
* could continue.
|
||||||
|
*/
|
||||||
|
myBucketCache2.putCyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
Thread.currentThread().setName(oldThreadName);
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheBlockThread.join();
|
||||||
|
evictBlockThread.join();
|
||||||
|
assertTrue(cacheBlockThreadExceptionRef.get() == null);
|
||||||
|
assertTrue(evictBlockThreadExceptionRef.get() == null);
|
||||||
|
|
||||||
|
assertTrue(gotHFileBlock.equals(hfileBlock));
|
||||||
|
assertEquals(1, gotHFileBlock.refCnt());
|
||||||
|
assertTrue(myBucketCache2.overwiteByteBuff == null);
|
||||||
|
assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0);
|
||||||
|
|
||||||
|
gotHFileBlock.release();
|
||||||
|
assertEquals(0, gotHFileBlock.refCnt());
|
||||||
|
assertTrue(myBucketCache2.overwiteByteBuff != null);
|
||||||
|
assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1);
|
||||||
|
assertTrue(myBucketCache2.blockEvictCounter.get() == 1);
|
||||||
|
} finally {
|
||||||
|
myBucketCache2.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static class MyBucketCache extends BucketCache {
|
||||||
|
private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
|
||||||
|
private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
|
||||||
|
|
||||||
|
private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
|
||||||
|
private final AtomicInteger replaceCounter = new AtomicInteger(0);
|
||||||
|
private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
|
||||||
|
private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
|
||||||
|
private ByteBuff overwiteByteBuff = null;
|
||||||
|
|
||||||
|
public MyBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
|
||||||
|
int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
|
||||||
|
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
|
||||||
|
persistencePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simulate the Block could be replaced.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, Cacheable newBlock) {
|
||||||
|
replaceCounter.incrementAndGet();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
|
||||||
|
boolean updateCacheMetrics) {
|
||||||
|
if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
|
||||||
|
/**
|
||||||
|
* Wait the first cyclicBarrier.await() in {@link MyBucketCache#cacheBlockWithWaitInternal},
|
||||||
|
* so the {@link BucketCache#getBlock} is executed after the {@link BucketEntry#isRpcRef}
|
||||||
|
* checking.
|
||||||
|
*/
|
||||||
|
try {
|
||||||
|
cyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
|
||||||
|
boolean inMemory, boolean wait) {
|
||||||
|
if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
|
||||||
|
/**
|
||||||
|
* Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock}
|
||||||
|
*/
|
||||||
|
try {
|
||||||
|
cyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
|
||||||
|
/**
|
||||||
|
* Wait the cyclicBarrier.await() in
|
||||||
|
* {@link TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for
|
||||||
|
* {@link MyBucketCache#getBlock} and Assert completed.
|
||||||
|
*/
|
||||||
|
try {
|
||||||
|
cyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry,
|
||||||
|
boolean decrementBlockNumber) {
|
||||||
|
blockEvictCounter.incrementAndGet();
|
||||||
|
super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
|
||||||
|
* {@link BucketEntry} is freed.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
void freeBucketEntry(BucketEntry bucketEntry) {
|
||||||
|
freeBucketEntryCounter.incrementAndGet();
|
||||||
|
super.freeBucketEntry(bucketEntry);
|
||||||
|
this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
|
||||||
|
try {
|
||||||
|
this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class MyBucketCache2 extends BucketCache {
|
||||||
|
private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
|
||||||
|
private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
|
||||||
|
private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread";
|
||||||
|
|
||||||
|
private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2);
|
||||||
|
private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2);
|
||||||
|
private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2);
|
||||||
|
/**
|
||||||
|
* This is used for {@link BucketCache.WriterThread},{@link #CACHE_BLOCK_THREAD_NAME} and
|
||||||
|
* {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed.
|
||||||
|
*/
|
||||||
|
private final CyclicBarrier writeThreadDoneCyclicBarrier = new CyclicBarrier(3);
|
||||||
|
private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
|
||||||
|
private final AtomicInteger removeRamCounter = new AtomicInteger(0);
|
||||||
|
private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
|
||||||
|
private ByteBuff overwiteByteBuff = null;
|
||||||
|
|
||||||
|
public MyBucketCache2(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
|
||||||
|
int writerThreadNum, int writerQLen, String persistencePath) throws IOException {
|
||||||
|
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
|
||||||
|
persistencePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
|
||||||
|
super.putIntoBackingMap(key, bucketEntry);
|
||||||
|
/**
|
||||||
|
* The {@link BucketCache.WriterThread} wait for evictCyclicBarrier.await before
|
||||||
|
* {@link MyBucketCache2#removeFromRamCache} for {@link #EVICT_BLOCK_THREAD_NAME}
|
||||||
|
*/
|
||||||
|
try {
|
||||||
|
evictCyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait the cyclicBarrier.await() in
|
||||||
|
* {@link TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for
|
||||||
|
* {@link MyBucketCache#getBlock} and Assert completed.
|
||||||
|
*/
|
||||||
|
try {
|
||||||
|
putCyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void doDrain(List<RAMQueueEntry> entries) throws InterruptedException {
|
||||||
|
super.doDrain(entries);
|
||||||
|
if (entries.size() > 0) {
|
||||||
|
/**
|
||||||
|
* Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
|
||||||
|
* {@link #EVICT_BLOCK_THREAD_NAME}.
|
||||||
|
*/
|
||||||
|
try {
|
||||||
|
writeThreadDoneCyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
|
||||||
|
boolean updateCacheMetrics) {
|
||||||
|
if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
|
||||||
|
/**
|
||||||
|
* Wait for second getCyclicBarrier.await in {@link MyBucketCache2#removeFromRamCache} after
|
||||||
|
* {@link BucketCache#removeFromRamCache}.
|
||||||
|
*/
|
||||||
|
try {
|
||||||
|
getCyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Cacheable result = super.getBlock(key, caching, repeat, updateCacheMetrics);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
|
||||||
|
boolean firstTime = false;
|
||||||
|
if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
|
||||||
|
int count = this.removeRamCounter.incrementAndGet();
|
||||||
|
firstTime = (count == 1);
|
||||||
|
if (firstTime) {
|
||||||
|
/**
|
||||||
|
* The {@link #EVICT_BLOCK_THREAD_NAME} wait for evictCyclicBarrier.await after
|
||||||
|
* {@link BucketCache#putIntoBackingMap}.
|
||||||
|
*/
|
||||||
|
try {
|
||||||
|
evictCyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
boolean result = super.removeFromRamCache(cacheKey);
|
||||||
|
if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
|
||||||
|
if (firstTime) {
|
||||||
|
/**
|
||||||
|
* Wait for getCyclicBarrier.await before {@link BucketCache#getBlock}.
|
||||||
|
*/
|
||||||
|
try {
|
||||||
|
getCyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Wait for Caching Block completed, after Caching Block completed, evictBlock could
|
||||||
|
* continue.
|
||||||
|
*/
|
||||||
|
try {
|
||||||
|
writeThreadDoneCyclicBarrier.await();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry,
|
||||||
|
boolean decrementBlockNumber) {
|
||||||
|
/**
|
||||||
|
* This is only invoked by {@link BucketCache.WriterThread}. {@link MyMyBucketCache2} create
|
||||||
|
* only one {@link BucketCache.WriterThread}.
|
||||||
|
*/
|
||||||
|
assertTrue(Thread.currentThread() == this.writerThreads[0]);
|
||||||
|
|
||||||
|
blockEvictCounter.incrementAndGet();
|
||||||
|
super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Overwrite 0xff to the {@link BucketEntry} content to simulate it would be overwrite after the
|
||||||
|
* {@link BucketEntry} is freed.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
void freeBucketEntry(BucketEntry bucketEntry) {
|
||||||
|
freeBucketEntryCounter.incrementAndGet();
|
||||||
|
super.freeBucketEntry(bucketEntry);
|
||||||
|
this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
|
||||||
|
try {
|
||||||
|
this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) {
|
||||||
|
int byteSize = bucketEntry.getLength();
|
||||||
|
byte[] data = new byte[byteSize];
|
||||||
|
Arrays.fill(data, (byte) 0xff);
|
||||||
|
return ByteBuff.wrap(ByteBuffer.wrap(data));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -73,7 +73,6 @@ public class TestBucketWriterThread {
|
|||||||
/**
|
/**
|
||||||
* Set up variables and get BucketCache and WriterThread into state where tests can manually
|
* Set up variables and get BucketCache and WriterThread into state where tests can manually
|
||||||
* control the running of WriterThread and BucketCache is empty.
|
* control the running of WriterThread and BucketCache is empty.
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
@ -141,7 +140,7 @@ public class TestBucketWriterThread {
|
|||||||
RAMQueueEntry rqe = q.remove();
|
RAMQueueEntry rqe = q.remove();
|
||||||
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
|
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
|
||||||
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
|
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
|
||||||
writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
|
writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
|
||||||
this.q.add(spiedRqe);
|
this.q.add(spiedRqe);
|
||||||
doDrainOfOneEntry(bc, wt, q);
|
doDrainOfOneEntry(bc, wt, q);
|
||||||
// Cache disabled when ioes w/o ever healing.
|
// Cache disabled when ioes w/o ever healing.
|
||||||
@ -163,7 +162,7 @@ public class TestBucketWriterThread {
|
|||||||
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
|
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
|
||||||
Mockito.doThrow(cfe).
|
Mockito.doThrow(cfe).
|
||||||
doReturn(mockedBucketEntry).
|
doReturn(mockedBucketEntry).
|
||||||
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
|
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
|
||||||
this.q.add(spiedRqe);
|
this.q.add(spiedRqe);
|
||||||
doDrainOfOneEntry(bc, wt, q);
|
doDrainOfOneEntry(bc, wt, q);
|
||||||
}
|
}
|
||||||
@ -172,7 +171,7 @@ public class TestBucketWriterThread {
|
|||||||
final BlockingQueue<RAMQueueEntry> q)
|
final BlockingQueue<RAMQueueEntry> q)
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
|
List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
|
||||||
wt.doDrain(rqes);
|
bc.doDrain(rqes);
|
||||||
assertTrue(q.isEmpty());
|
assertTrue(q.isEmpty());
|
||||||
assertTrue(bc.ramCache.isEmpty());
|
assertTrue(bc.ramCache.isEmpty());
|
||||||
assertEquals(0, bc.heapSize());
|
assertEquals(0, bc.heapSize());
|
||||||
|
@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable;
|
|||||||
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
|
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
|
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
|
||||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||||
import org.apache.hadoop.hbase.nio.RefCnt;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -50,7 +49,7 @@ public class TestByteBufferIOEngine {
|
|||||||
private long off;
|
private long off;
|
||||||
|
|
||||||
MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
|
MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
|
||||||
super(offset & 0xFF00, length, 0, false, RefCnt.create(), allocator);
|
super(offset & 0xFF00, length, 0, false, null, allocator);
|
||||||
this.off = offset;
|
this.off = offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,7 +91,7 @@ public class TestRAMCache {
|
|||||||
MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
|
MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
|
||||||
ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
|
ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
|
||||||
new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
|
new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
|
||||||
RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, ByteBuffAllocator.NONE);
|
RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false);
|
||||||
|
|
||||||
Assert.assertNull(cache.putIfAbsent(key, re));
|
Assert.assertNull(cache.putIfAbsent(key, re));
|
||||||
Assert.assertEquals(cache.putIfAbsent(key, re), re);
|
Assert.assertEquals(cache.putIfAbsent(key, re), re);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user