diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java index 8a100aec437..36f9e61809c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java @@ -198,22 +198,57 @@ public class BlockCacheUtil { */ public static int validateBlockAddition(Cacheable existing, Cacheable newBlock, BlockCacheKey cacheKey) { - int comparison = compareCacheBlock(existing, newBlock, true); + int comparison = compareCacheBlock(existing, newBlock, false); if (comparison != 0) { - LOG.warn("Cached block contents differ, trying to just compare the block contents " + - "without the next block. CacheKey: " + cacheKey); - - // compare the contents, if they are not equal, we are in big trouble - int comparisonWithoutNextBlockMetadata = compareCacheBlock(existing, newBlock, false); - - if (comparisonWithoutNextBlockMetadata != 0) { - throw new RuntimeException("Cached block contents differ, which should not have happened." - + "cacheKey:" + cacheKey); - } + throw new RuntimeException("Cached block contents differ, which should not have happened." + + "cacheKey:" + cacheKey); + } + if ((existing instanceof HFileBlock) && (newBlock instanceof HFileBlock)) { + comparison = ((HFileBlock) existing).getNextBlockOnDiskSize() + - ((HFileBlock) newBlock).getNextBlockOnDiskSize(); } return comparison; } + /** + * Because of the region splitting, it's possible that the split key locate in the middle of a + * block. So it's possible that both the daughter regions load the same block from their parent + * HFile. When pread, we don't force the read to read all of the next block header. So when two + * threads try to cache the same block, it's possible that one thread read all of the next block + * header but the other one didn't. if the already cached block hasn't next block header but the + * new block to cache has, then we can replace the existing block with the new block for better + * performance.(HBASE-20447) + * @param blockCache BlockCache to check + * @param cacheKey the block cache key + * @param newBlock the new block which try to put into the block cache. + * @return true means need to replace existing block with new block for the same block cache key. + * false means just keep the existing block. + */ + public static boolean shouldReplaceExistingCacheBlock(BlockCache blockCache, + BlockCacheKey cacheKey, Cacheable newBlock) { + Cacheable existingBlock = blockCache.getBlock(cacheKey, false, false, false); + try { + int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, newBlock, cacheKey); + if (comparison < 0) { + LOG.warn("Cached block contents differ by nextBlockOnDiskSize, the new block has " + + "nextBlockOnDiskSize set. Caching new block."); + return true; + } else if (comparison > 0) { + LOG.warn("Cached block contents differ by nextBlockOnDiskSize, the existing block has " + + "nextBlockOnDiskSize set, Keeping cached block."); + return false; + } else { + LOG.warn("Caching an already cached block: {}. This is harmless and can happen in rare " + + "cases (see HBASE-8547)", + cacheKey); + return false; + } + } finally { + // return the block since we need to decrement the count + blockCache.returnBlock(cacheKey, existingBlock); + } + } + /** * Use one of these to keep a running account of cached blocks by file. Throw it away when done. * This is different than metrics in that it is stats on current state of a cache. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 8b8bd88a7fa..1dab0537a8a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -379,21 +379,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { } LruCachedBlock cb = map.get(cacheKey); - if (cb != null) { - int comparison = BlockCacheUtil.validateBlockAddition(cb.getBuffer(), buf, cacheKey); - if (comparison != 0) { - if (comparison < 0) { - LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Keeping cached block."); - return; - } else { - LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Caching new block."); - } - } else { - String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey(); - msg += ". This is harmless and can happen in rare cases (see HBASE-8547)"; - LOG.debug(msg); - return; - } + if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) { + return; } long currentSize = size.get(); long currentAcceptableSize = acceptableSize(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index bc45d398862..40c0a00cb9a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -418,42 +418,35 @@ public class BucketCache implements BlockCache, HeapSize { * @param inMemory if block is in-memory * @param wait if true, blocking wait when queue is full */ - public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, + private void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait) { - if (LOG.isTraceEnabled()) LOG.trace("Caching key=" + cacheKey + ", item=" + cachedItem); + if (cacheEnabled) { + if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { + if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) { + cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); + } + } else { + cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait); + } + } + } + + private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem, + boolean inMemory, boolean wait) { if (!cacheEnabled) { return; } - - if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) { - Cacheable existingBlock = getBlock(cacheKey, false, false, false); - - try { - int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, cachedItem, cacheKey); - if (comparison != 0) { - if (comparison < 0) { - LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Keeping cached block."); - return; - } else { - LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Caching new block."); - } - } else { - String msg = "Caching an already cached block: " + cacheKey; - msg += ". This is harmless and can happen in rare cases (see HBASE-8547)"; - LOG.warn(msg); - return; - } - } finally { - // return the block since we need to decrement the count - returnBlock(cacheKey, existingBlock); - } - } - - /* - * Stuff the entry into the RAM cache so it can get drained to the persistent store - */ + LOG.trace("Caching key={}, item={}", cacheKey, cachedItem); + // Stuff the entry into the RAM cache so it can get drained to the persistent store RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory); + /** + * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same + * key in ramCache, the heap size of bucket cache need to update if replacing entry from + * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if + * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct + * one, then the heap size will mess up (HBASE-20789) + */ if (ramCache.putIfAbsent(cacheKey, re) != null) { return; } @@ -936,6 +929,31 @@ public class BucketCache implements BlockCache, HeapSize { LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled); } + /** + * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing + * cache with a new block for the same cache key. there's a corner case: one thread cache a + * block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another + * new block with the same cache key do the same thing for the same cache key, so if not evict + * the previous bucket entry, then memory leak happen because the previous bucketEntry is gone + * but the bucketAllocator do not free its memory. + * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey + * cacheKey, Cacheable newBlock) + * @param key Block cache key + * @param bucketEntry Bucket entry to put into backingMap. + */ + private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { + BucketEntry previousEntry = backingMap.put(key, bucketEntry); + if (previousEntry != null && previousEntry != bucketEntry) { + ReentrantReadWriteLock lock = offsetLock.getLock(previousEntry.offset()); + lock.writeLock().lock(); + try { + blockEvicted(key, previousEntry, false); + } finally { + lock.writeLock().unlock(); + } + } + } + /** * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap. * Process all that are passed in even if failure being sure to remove from ramCache else we'll @@ -1017,7 +1035,7 @@ public class BucketCache implements BlockCache, HeapSize { BlockCacheKey key = entries.get(i).getKey(); // Only add if non-null entry. if (bucketEntries[i] != null) { - backingMap.put(key, bucketEntries[i]); + putIntoBackingMap(key, bucketEntries[i]); } // Always remove from ramCache even if we failed adding it to the block cache above. RAMQueueEntry ramCacheEntry = ramCache.remove(key); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index b3656694bfa..924dd027688 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -197,14 +197,19 @@ public class TestBucketCache { CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE); } + private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) + throws InterruptedException { + while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) { + Thread.sleep(100); + } + } + // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer // threads will flush it to the bucket and put reference entry in backingMap. private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, Cacheable block) throws InterruptedException { cache.cacheBlock(cacheKey, block); - while (!cache.backingMap.containsKey(cacheKey)) { - Thread.sleep(100); - } + waitUntilFlushedToBucket(cache, cacheKey); } @Test @@ -409,7 +414,7 @@ public class TestBucketCache { } @Test - public void testCacheBlockNextBlockMetadataMissing() { + public void testCacheBlockNextBlockMetadataMissing() throws Exception { int size = 100; int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; byte[] byteArr = new byte[length]; @@ -427,22 +432,26 @@ public class TestBucketCache { blockWithNextBlockMetadata.serialize(block1Buffer, true); blockWithoutNextBlockMetadata.serialize(block2Buffer, true); - //Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. + // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, - block1Buffer); + block1Buffer); - //Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. + waitUntilFlushedToBucket(cache, key); + + // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, - block1Buffer); + block1Buffer); - //Clear and add blockWithoutNextBlockMetadata + // Clear and add blockWithoutNextBlockMetadata cache.evictBlock(key); assertNull(cache.getBlock(key, false, false, false)); CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, - block2Buffer); + block2Buffer); - //Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. + waitUntilFlushedToBucket(cache, key); + + // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, - block1Buffer); + block1Buffer); } }