From a25cf7bbdbd23ac58b500cba30897ea1504eb06b Mon Sep 17 00:00:00 2001 From: huzheng Date: Thu, 28 Jun 2018 12:01:03 +0800 Subject: [PATCH] HBASE-20789 TestBucketCache#testCacheBlockNextBlockMetadataMissing is flaky --- .../hadoop/hbase/io/hfile/BlockCacheUtil.java | 59 ++++++++++---- .../hadoop/hbase/io/hfile/LruBlockCache.java | 18 +---- .../hbase/io/hfile/bucket/BucketCache.java | 80 ++++++++++++------- .../io/hfile/bucket/TestBucketCache.java | 15 +++- 4 files changed, 106 insertions(+), 66 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java index 0f1ae203cdd..cc4caf3b03e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java @@ -24,8 +24,6 @@ import java.util.NavigableSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; -import com.google.common.base.Preconditions; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -190,31 +188,60 @@ public class BlockCacheUtil { /** * Validate that the existing and newBlock are the same without including the nextBlockMetadata, - * if not, throw an exception. If they are the same without the nextBlockMetadata, - * return the comparison. - * + * if not, throw an exception. If they are the same without the nextBlockMetadata, return the + * comparison. * @param existing block that is existing in the cache. * @param newBlock block that is trying to be cached. * @param cacheKey the cache key of the blocks. * @return comparison of the existing block to the newBlock. */ public static int validateBlockAddition(Cacheable existing, Cacheable newBlock, - BlockCacheKey cacheKey) { - int comparison = compareCacheBlock(existing, newBlock, true); + BlockCacheKey cacheKey) { + int comparison = compareCacheBlock(existing, newBlock, false); if (comparison != 0) { - LOG.debug("Cached block contents differ, trying to just compare the block contents " + - "without the next block. CacheKey: " + cacheKey); - - // compare the contents, if they are not equal, we are in big trouble - int comparisonWithoutNextBlockMetadata = compareCacheBlock(existing, newBlock, false); - - Preconditions.checkArgument(comparisonWithoutNextBlockMetadata == 0, - "Cached block contents differ, which should not have happened. cacheKey:" - + cacheKey); + throw new RuntimeException( + "Cached block contents differ, which should not have happened." + "cacheKey:" + cacheKey); + } + if ((existing instanceof HFileBlock) && (newBlock instanceof HFileBlock)) { + comparison = ((HFileBlock) existing).getNextBlockOnDiskSize() + - ((HFileBlock) newBlock).getNextBlockOnDiskSize(); } return comparison; } + /** + * Because of the region splitting, it's possible that the split key locate in the middle of a + * block. So it's possible that both the daughter regions load the same block from their parent + * HFile. When pread, we don't force the read to read all of the next block header. So when two + * threads try to cache the same block, it's possible that one thread read all of the next block + * header but the other one didn't. if the already cached block hasn't next block header but the + * new block to cache has, then we can replace the existing block with the new block for better + * performance.(HBASE-20447) + * @param blockCache BlockCache to check + * @param cacheKey the block cache key + * @param newBlock the new block which try to put into the block cache. + * @return true means need to replace existing block with new block for the same block cache key. + * false means just keep the existing block. + */ + public static boolean shouldReplaceExistingCacheBlock(BlockCache blockCache, + BlockCacheKey cacheKey, Cacheable newBlock) { + Cacheable existingBlock = blockCache.getBlock(cacheKey, false, false, false); + int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, newBlock, cacheKey); + if (comparison < 0) { + LOG.warn("Cached block contents differ by nextBlockOnDiskSize, the new block has " + + "nextBlockOnDiskSize set. Caching new block."); + return true; + } else if (comparison > 0) { + LOG.warn("Cached block contents differ by nextBlockOnDiskSize, the existing block has " + + "nextBlockOnDiskSize set, Keeping cached block."); + return false; + } else { + LOG.warn("Caching an already cached block: " + cacheKey + + ". This is harmless and can happen in rare " + "cases (see HBASE-8547)"); + return false; + } + } + /** * Use one of these to keep a running account of cached blocks by file. Throw it away when done. * This is different than metrics in that it is stats on current state of a cache. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 9cd2cca7529..d030f831a2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -371,21 +371,9 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { } LruCachedBlock cb = map.get(cacheKey); - if (cb != null) { - int comparison = BlockCacheUtil.validateBlockAddition(cb.getBuffer(), buf, cacheKey); - if (comparison != 0) { - if (comparison < 0) { - LOG.debug("Cached block contents differ by nextBlockOnDiskSize. Keeping cached block."); - return; - } else { - LOG.debug("Cached block contents differ by nextBlockOnDiskSize. Caching new block."); - } - } else { - String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey(); - msg += ". This is harmless and can happen in rare cases (see HBASE-8547)"; - LOG.debug(msg); - return; - } + if (!cacheDataInL1 && cb != null + && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) { + return; } long currentSize = size.get(); long currentAcceptableSize = acceptableSize(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index dd0a056ad66..a2d38e03dd3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -414,42 +414,35 @@ public class BucketCache implements BlockCache, HeapSize { */ public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean cacheDataInL1, boolean wait) { + if (cacheEnabled) { + if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { + if (!cacheDataInL1 + && BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) { + cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, cacheDataInL1, wait); + } + } else { + cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, cacheDataInL1, wait); + } + } + } + + private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, + boolean inMemory, boolean cacheDataInL1, boolean wait) { if (LOG.isTraceEnabled()) LOG.trace("Caching key=" + cacheKey + ", item=" + cachedItem); if (!cacheEnabled) { return; } - if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { - /* - * Compare already cached block only if lruBlockCache is not used to cache data blocks - */ - if (!cacheDataInL1) { - Cacheable existingBlock = getBlock(cacheKey, false, false, false); - - int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, cachedItem, cacheKey); - if (comparison != 0) { - if (comparison < 0) { - LOG.debug("Cached block contents differ by nextBlockOnDiskSize. " + - "Keeping cached block which has nextBlockOnDiskSize populated."); - return; - } else { - LOG.debug("Cached block contents differ by nextBlockOnDiskSize. " + - "Caching new block which has nextBlockOnDiskSize populated."); - } - } else { - String msg = "Caching an already cached block: " + cacheKey; - msg += ". This is harmless and can happen in rare cases (see HBASE-8547)"; - LOG.debug(msg); - return; - } - } - } - - /* - * Stuff the entry into the RAM cache so it can get drained to the persistent store - */ + // Stuff the entry into the RAM cache so it can get drained to the persistent store RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); + /** + * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same + * key in ramCache, the heap size of bucket cache need to update if replacing entry from + * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if + * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct + * one, then the heap size will mess up (HBASE-20789) + */ if (ramCache.putIfAbsent(cacheKey, re) != null) { return; } @@ -872,6 +865,31 @@ public class BucketCache implements BlockCache, HeapSize { LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); } + /** + * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing + * cache with a new block for the same cache key. there's a corner case: one thread cache a + * block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another + * new block with the same cache key do the same thing for the same cache key, so if not evict + * the previous bucket entry, then memory leak happen because the previous bucketEntry is gone + * but the bucketAllocator do not free its memory. + * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey + * cacheKey, Cacheable newBlock) + * @param key Block cache key + * @param bucketEntry Bucket entry to put into backingMap. + */ + private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { + BucketEntry previousEntry = backingMap.put(key, bucketEntry); + if (previousEntry != null && previousEntry != bucketEntry) { + ReentrantReadWriteLock lock = offsetLock.getLock(previousEntry.offset()); + lock.writeLock().lock(); + try { + blockEvicted(key, previousEntry, false); + } finally { + lock.writeLock().unlock(); + } + } + } + /** * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. * Process all that are passed in even if failure being sure to remove from ramCache else we'll @@ -953,7 +971,7 @@ public class BucketCache implements BlockCache, HeapSize { BlockCacheKey key = entries.get(i).getKey(); // Only add if non-null entry. if (bucketEntries[i] != null) { - backingMap.put(key, bucketEntries[i]); + putIntoBackingMap(key, bucketEntries[i]); } // Always remove from ramCache even if we failed adding it to the block cache above. RAMQueueEntry ramCacheEntry = ramCache.remove(key); @@ -987,7 +1005,7 @@ public class BucketCache implements BlockCache, HeapSize { * @param receptical Where to stash the elements taken from queue. We clear before we use it * just in case. * @param q The queue to take from. - * @return receptical laden with elements taken from the queue or empty if none found. + * @return receptical laden with elements taken from the queue or empty if none found. */ @VisibleForTesting static List getRAMQueueEntries(final BlockingQueue q, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 14fb963770e..42c1c01e216 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -191,14 +191,19 @@ public class TestBucketCache { CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); } + private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) + throws InterruptedException { + while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { + Thread.sleep(100); + } + } + // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer // threads will flush it to the bucket and put reference entry in backingMap. private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, Cacheable block) throws InterruptedException { cache.cacheBlock(cacheKey, block); - while (!cache.backingMap.containsKey(cacheKey)) { - Thread.sleep(100); - } + waitUntilFlushedToBucket(cache, cacheKey); } @Test @@ -388,7 +393,7 @@ public class TestBucketCache { } @Test - public void testCacheBlockNextBlockMetadataMissing() { + public void testCacheBlockNextBlockMetadataMissing() throws Exception { int size = 100; int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; byte[] byteArr = new byte[length]; @@ -410,6 +415,7 @@ public class TestBucketCache { CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, block1Buffer); + waitUntilFlushedToBucket(cache, key); //Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, block1Buffer); @@ -420,6 +426,7 @@ public class TestBucketCache { CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, block2Buffer); + waitUntilFlushedToBucket(cache, key); //Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, block1Buffer);