HBASE-20789 TestBucketCache#testCacheBlockNextBlockMetadataMissing is flaky

This commit is contained in:
huzheng 2018-06-28 12:01:03 +08:00 committed by huzheng
parent 66ad9fdef8
commit 0454878e71
4 changed files with 118 additions and 69 deletions

View File

@ -198,22 +198,57 @@ public class BlockCacheUtil {
*/ */
public static int validateBlockAddition(Cacheable existing, Cacheable newBlock, public static int validateBlockAddition(Cacheable existing, Cacheable newBlock,
BlockCacheKey cacheKey) { BlockCacheKey cacheKey) {
int comparison = compareCacheBlock(existing, newBlock, true); int comparison = compareCacheBlock(existing, newBlock, false);
if (comparison != 0) { if (comparison != 0) {
LOG.warn("Cached block contents differ, trying to just compare the block contents " +
"without the next block. CacheKey: " + cacheKey);
// compare the contents, if they are not equal, we are in big trouble
int comparisonWithoutNextBlockMetadata = compareCacheBlock(existing, newBlock, false);
if (comparisonWithoutNextBlockMetadata != 0) {
throw new RuntimeException("Cached block contents differ, which should not have happened." throw new RuntimeException("Cached block contents differ, which should not have happened."
+ "cacheKey:" + cacheKey); + "cacheKey:" + cacheKey);
} }
if ((existing instanceof HFileBlock) && (newBlock instanceof HFileBlock)) {
comparison = ((HFileBlock) existing).getNextBlockOnDiskSize()
- ((HFileBlock) newBlock).getNextBlockOnDiskSize();
} }
return comparison; return comparison;
} }
/**
* Because of the region splitting, it's possible that the split key locate in the middle of a
* block. So it's possible that both the daughter regions load the same block from their parent
* HFile. When pread, we don't force the read to read all of the next block header. So when two
* threads try to cache the same block, it's possible that one thread read all of the next block
* header but the other one didn't. if the already cached block hasn't next block header but the
* new block to cache has, then we can replace the existing block with the new block for better
* performance.(HBASE-20447)
* @param blockCache BlockCache to check
* @param cacheKey the block cache key
* @param newBlock the new block which try to put into the block cache.
* @return true means need to replace existing block with new block for the same block cache key.
* false means just keep the existing block.
*/
public static boolean shouldReplaceExistingCacheBlock(BlockCache blockCache,
BlockCacheKey cacheKey, Cacheable newBlock) {
Cacheable existingBlock = blockCache.getBlock(cacheKey, false, false, false);
try {
int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, newBlock, cacheKey);
if (comparison < 0) {
LOG.warn("Cached block contents differ by nextBlockOnDiskSize, the new block has "
+ "nextBlockOnDiskSize set. Caching new block.");
return true;
} else if (comparison > 0) {
LOG.warn("Cached block contents differ by nextBlockOnDiskSize, the existing block has "
+ "nextBlockOnDiskSize set, Keeping cached block.");
return false;
} else {
LOG.warn("Caching an already cached block: {}. This is harmless and can happen in rare "
+ "cases (see HBASE-8547)",
cacheKey);
return false;
}
} finally {
// return the block since we need to decrement the count
blockCache.returnBlock(cacheKey, existingBlock);
}
}
/** /**
* Use one of these to keep a running account of cached blocks by file. Throw it away when done. * Use one of these to keep a running account of cached blocks by file. Throw it away when done.
* This is different than metrics in that it is stats on current state of a cache. * This is different than metrics in that it is stats on current state of a cache.

View File

@ -379,21 +379,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
} }
LruCachedBlock cb = map.get(cacheKey); LruCachedBlock cb = map.get(cacheKey);
if (cb != null) { if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) {
int comparison = BlockCacheUtil.validateBlockAddition(cb.getBuffer(), buf, cacheKey);
if (comparison != 0) {
if (comparison < 0) {
LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Keeping cached block.");
return; return;
} else {
LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Caching new block.");
}
} else {
String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
LOG.debug(msg);
return;
}
} }
long currentSize = size.get(); long currentSize = size.get();
long currentAcceptableSize = acceptableSize(); long currentAcceptableSize = acceptableSize();

View File

@ -418,42 +418,35 @@ public class BucketCache implements BlockCache, HeapSize {
* @param inMemory if block is in-memory * @param inMemory if block is in-memory
* @param wait if true, blocking wait when queue is full * @param wait if true, blocking wait when queue is full
*/ */
public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, private void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
boolean wait) { boolean wait) {
if (LOG.isTraceEnabled()) LOG.trace("Caching key=" + cacheKey + ", item=" + cachedItem); if (cacheEnabled) {
if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) {
cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
}
} else {
cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
}
}
}
private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
boolean inMemory, boolean wait) {
if (!cacheEnabled) { if (!cacheEnabled) {
return; return;
} }
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { // Stuff the entry into the RAM cache so it can get drained to the persistent store
Cacheable existingBlock = getBlock(cacheKey, false, false, false);
try {
int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, cachedItem, cacheKey);
if (comparison != 0) {
if (comparison < 0) {
LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Keeping cached block.");
return;
} else {
LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Caching new block.");
}
} else {
String msg = "Caching an already cached block: " + cacheKey;
msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
LOG.warn(msg);
return;
}
} finally {
// return the block since we need to decrement the count
returnBlock(cacheKey, existingBlock);
}
}
/*
* Stuff the entry into the RAM cache so it can get drained to the persistent store
*/
RAMQueueEntry re = RAMQueueEntry re =
new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
/**
* Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
* key in ramCache, the heap size of bucket cache need to update if replacing entry from
* ramCache. But WriterThread will also remove entry from ramCache and update heap size, if
* using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct
* one, then the heap size will mess up (HBASE-20789)
*/
if (ramCache.putIfAbsent(cacheKey, re) != null) { if (ramCache.putIfAbsent(cacheKey, re) != null) {
return; return;
} }
@ -936,6 +929,31 @@ public class BucketCache implements BlockCache, HeapSize {
LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
} }
/**
* Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
* cache with a new block for the same cache key. there's a corner case: one thread cache a
* block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another
* new block with the same cache key do the same thing for the same cache key, so if not evict
* the previous bucket entry, then memory leak happen because the previous bucketEntry is gone
* but the bucketAllocator do not free its memory.
* @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
* cacheKey, Cacheable newBlock)
* @param key Block cache key
* @param bucketEntry Bucket entry to put into backingMap.
*/
private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
BucketEntry previousEntry = backingMap.put(key, bucketEntry);
if (previousEntry != null && previousEntry != bucketEntry) {
ReentrantReadWriteLock lock = offsetLock.getLock(previousEntry.offset());
lock.writeLock().lock();
try {
blockEvicted(key, previousEntry, false);
} finally {
lock.writeLock().unlock();
}
}
}
/** /**
* Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
* Process all that are passed in even if failure being sure to remove from ramCache else we'll * Process all that are passed in even if failure being sure to remove from ramCache else we'll
@ -1017,7 +1035,7 @@ public class BucketCache implements BlockCache, HeapSize {
BlockCacheKey key = entries.get(i).getKey(); BlockCacheKey key = entries.get(i).getKey();
// Only add if non-null entry. // Only add if non-null entry.
if (bucketEntries[i] != null) { if (bucketEntries[i] != null) {
backingMap.put(key, bucketEntries[i]); putIntoBackingMap(key, bucketEntries[i]);
} }
// Always remove from ramCache even if we failed adding it to the block cache above. // Always remove from ramCache even if we failed adding it to the block cache above.
RAMQueueEntry ramCacheEntry = ramCache.remove(key); RAMQueueEntry ramCacheEntry = ramCache.remove(key);

View File

@ -197,14 +197,19 @@ public class TestBucketCache {
CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
} }
private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
throws InterruptedException {
while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
Thread.sleep(100);
}
}
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
// threads will flush it to the bucket and put reference entry in backingMap. // threads will flush it to the bucket and put reference entry in backingMap.
private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
Cacheable block) throws InterruptedException { Cacheable block) throws InterruptedException {
cache.cacheBlock(cacheKey, block); cache.cacheBlock(cacheKey, block);
while (!cache.backingMap.containsKey(cacheKey)) { waitUntilFlushedToBucket(cache, cacheKey);
Thread.sleep(100);
}
} }
@Test @Test
@ -409,7 +414,7 @@ public class TestBucketCache {
} }
@Test @Test
public void testCacheBlockNextBlockMetadataMissing() { public void testCacheBlockNextBlockMetadataMissing() throws Exception {
int size = 100; int size = 100;
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
byte[] byteArr = new byte[length]; byte[] byteArr = new byte[length];
@ -431,6 +436,8 @@ public class TestBucketCache {
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
block1Buffer); block1Buffer);
waitUntilFlushedToBucket(cache, key);
// Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
block1Buffer); block1Buffer);
@ -441,6 +448,8 @@ public class TestBucketCache {
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
block2Buffer); block2Buffer);
waitUntilFlushedToBucket(cache, key);
// Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
block1Buffer); block1Buffer);