HBASE-20789 TestBucketCache#testCacheBlockNextBlockMetadataMissing is flaky
This commit is contained in:
parent
f94ad9d78f
commit
a25cf7bbdb
|
@ -24,8 +24,6 @@ import java.util.NavigableSet;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.concurrent.ConcurrentSkipListSet;
|
import java.util.concurrent.ConcurrentSkipListSet;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
@ -190,31 +188,60 @@ public class BlockCacheUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Validate that the existing and newBlock are the same without including the nextBlockMetadata,
|
* Validate that the existing and newBlock are the same without including the nextBlockMetadata,
|
||||||
* if not, throw an exception. If they are the same without the nextBlockMetadata,
|
* if not, throw an exception. If they are the same without the nextBlockMetadata, return the
|
||||||
* return the comparison.
|
* comparison.
|
||||||
*
|
|
||||||
* @param existing block that is existing in the cache.
|
* @param existing block that is existing in the cache.
|
||||||
* @param newBlock block that is trying to be cached.
|
* @param newBlock block that is trying to be cached.
|
||||||
* @param cacheKey the cache key of the blocks.
|
* @param cacheKey the cache key of the blocks.
|
||||||
* @return comparison of the existing block to the newBlock.
|
* @return comparison of the existing block to the newBlock.
|
||||||
*/
|
*/
|
||||||
public static int validateBlockAddition(Cacheable existing, Cacheable newBlock,
|
public static int validateBlockAddition(Cacheable existing, Cacheable newBlock,
|
||||||
BlockCacheKey cacheKey) {
|
BlockCacheKey cacheKey) {
|
||||||
int comparison = compareCacheBlock(existing, newBlock, true);
|
int comparison = compareCacheBlock(existing, newBlock, false);
|
||||||
if (comparison != 0) {
|
if (comparison != 0) {
|
||||||
LOG.debug("Cached block contents differ, trying to just compare the block contents " +
|
throw new RuntimeException(
|
||||||
"without the next block. CacheKey: " + cacheKey);
|
"Cached block contents differ, which should not have happened." + "cacheKey:" + cacheKey);
|
||||||
|
}
|
||||||
// compare the contents, if they are not equal, we are in big trouble
|
if ((existing instanceof HFileBlock) && (newBlock instanceof HFileBlock)) {
|
||||||
int comparisonWithoutNextBlockMetadata = compareCacheBlock(existing, newBlock, false);
|
comparison = ((HFileBlock) existing).getNextBlockOnDiskSize()
|
||||||
|
- ((HFileBlock) newBlock).getNextBlockOnDiskSize();
|
||||||
Preconditions.checkArgument(comparisonWithoutNextBlockMetadata == 0,
|
|
||||||
"Cached block contents differ, which should not have happened. cacheKey:"
|
|
||||||
+ cacheKey);
|
|
||||||
}
|
}
|
||||||
return comparison;
|
return comparison;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Because of the region splitting, it's possible that the split key locate in the middle of a
|
||||||
|
* block. So it's possible that both the daughter regions load the same block from their parent
|
||||||
|
* HFile. When pread, we don't force the read to read all of the next block header. So when two
|
||||||
|
* threads try to cache the same block, it's possible that one thread read all of the next block
|
||||||
|
* header but the other one didn't. if the already cached block hasn't next block header but the
|
||||||
|
* new block to cache has, then we can replace the existing block with the new block for better
|
||||||
|
* performance.(HBASE-20447)
|
||||||
|
* @param blockCache BlockCache to check
|
||||||
|
* @param cacheKey the block cache key
|
||||||
|
* @param newBlock the new block which try to put into the block cache.
|
||||||
|
* @return true means need to replace existing block with new block for the same block cache key.
|
||||||
|
* false means just keep the existing block.
|
||||||
|
*/
|
||||||
|
public static boolean shouldReplaceExistingCacheBlock(BlockCache blockCache,
|
||||||
|
BlockCacheKey cacheKey, Cacheable newBlock) {
|
||||||
|
Cacheable existingBlock = blockCache.getBlock(cacheKey, false, false, false);
|
||||||
|
int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, newBlock, cacheKey);
|
||||||
|
if (comparison < 0) {
|
||||||
|
LOG.warn("Cached block contents differ by nextBlockOnDiskSize, the new block has "
|
||||||
|
+ "nextBlockOnDiskSize set. Caching new block.");
|
||||||
|
return true;
|
||||||
|
} else if (comparison > 0) {
|
||||||
|
LOG.warn("Cached block contents differ by nextBlockOnDiskSize, the existing block has "
|
||||||
|
+ "nextBlockOnDiskSize set, Keeping cached block.");
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
LOG.warn("Caching an already cached block: " + cacheKey
|
||||||
|
+ ". This is harmless and can happen in rare " + "cases (see HBASE-8547)");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Use one of these to keep a running account of cached blocks by file. Throw it away when done.
|
* Use one of these to keep a running account of cached blocks by file. Throw it away when done.
|
||||||
* This is different than metrics in that it is stats on current state of a cache.
|
* This is different than metrics in that it is stats on current state of a cache.
|
||||||
|
|
|
@ -371,21 +371,9 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
}
|
}
|
||||||
|
|
||||||
LruCachedBlock cb = map.get(cacheKey);
|
LruCachedBlock cb = map.get(cacheKey);
|
||||||
if (cb != null) {
|
if (!cacheDataInL1 && cb != null
|
||||||
int comparison = BlockCacheUtil.validateBlockAddition(cb.getBuffer(), buf, cacheKey);
|
&& !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) {
|
||||||
if (comparison != 0) {
|
return;
|
||||||
if (comparison < 0) {
|
|
||||||
LOG.debug("Cached block contents differ by nextBlockOnDiskSize. Keeping cached block.");
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
LOG.debug("Cached block contents differ by nextBlockOnDiskSize. Caching new block.");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
|
|
||||||
msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
|
|
||||||
LOG.debug(msg);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
long currentSize = size.get();
|
long currentSize = size.get();
|
||||||
long currentAcceptableSize = acceptableSize();
|
long currentAcceptableSize = acceptableSize();
|
||||||
|
|
|
@ -414,42 +414,35 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
*/
|
*/
|
||||||
public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
|
public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
|
||||||
boolean cacheDataInL1, boolean wait) {
|
boolean cacheDataInL1, boolean wait) {
|
||||||
|
if (cacheEnabled) {
|
||||||
|
if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
|
||||||
|
if (!cacheDataInL1
|
||||||
|
&& BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) {
|
||||||
|
cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, cacheDataInL1, wait);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, cacheDataInL1, wait);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
|
||||||
|
boolean inMemory, boolean cacheDataInL1, boolean wait) {
|
||||||
if (LOG.isTraceEnabled()) LOG.trace("Caching key=" + cacheKey + ", item=" + cachedItem);
|
if (LOG.isTraceEnabled()) LOG.trace("Caching key=" + cacheKey + ", item=" + cachedItem);
|
||||||
if (!cacheEnabled) {
|
if (!cacheEnabled) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
|
// Stuff the entry into the RAM cache so it can get drained to the persistent store
|
||||||
/*
|
|
||||||
* Compare already cached block only if lruBlockCache is not used to cache data blocks
|
|
||||||
*/
|
|
||||||
if (!cacheDataInL1) {
|
|
||||||
Cacheable existingBlock = getBlock(cacheKey, false, false, false);
|
|
||||||
|
|
||||||
int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, cachedItem, cacheKey);
|
|
||||||
if (comparison != 0) {
|
|
||||||
if (comparison < 0) {
|
|
||||||
LOG.debug("Cached block contents differ by nextBlockOnDiskSize. " +
|
|
||||||
"Keeping cached block which has nextBlockOnDiskSize populated.");
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
LOG.debug("Cached block contents differ by nextBlockOnDiskSize. " +
|
|
||||||
"Caching new block which has nextBlockOnDiskSize populated.");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
String msg = "Caching an already cached block: " + cacheKey;
|
|
||||||
msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
|
|
||||||
LOG.debug(msg);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Stuff the entry into the RAM cache so it can get drained to the persistent store
|
|
||||||
*/
|
|
||||||
RAMQueueEntry re =
|
RAMQueueEntry re =
|
||||||
new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
|
new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
|
||||||
|
/**
|
||||||
|
* Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
|
||||||
|
* key in ramCache, the heap size of bucket cache need to update if replacing entry from
|
||||||
|
* ramCache. But WriterThread will also remove entry from ramCache and update heap size, if
|
||||||
|
* using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct
|
||||||
|
* one, then the heap size will mess up (HBASE-20789)
|
||||||
|
*/
|
||||||
if (ramCache.putIfAbsent(cacheKey, re) != null) {
|
if (ramCache.putIfAbsent(cacheKey, re) != null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -872,6 +865,31 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
|
LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
|
||||||
|
* cache with a new block for the same cache key. there's a corner case: one thread cache a
|
||||||
|
* block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another
|
||||||
|
* new block with the same cache key do the same thing for the same cache key, so if not evict
|
||||||
|
* the previous bucket entry, then memory leak happen because the previous bucketEntry is gone
|
||||||
|
* but the bucketAllocator do not free its memory.
|
||||||
|
* @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
|
||||||
|
* cacheKey, Cacheable newBlock)
|
||||||
|
* @param key Block cache key
|
||||||
|
* @param bucketEntry Bucket entry to put into backingMap.
|
||||||
|
*/
|
||||||
|
private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
|
||||||
|
BucketEntry previousEntry = backingMap.put(key, bucketEntry);
|
||||||
|
if (previousEntry != null && previousEntry != bucketEntry) {
|
||||||
|
ReentrantReadWriteLock lock = offsetLock.getLock(previousEntry.offset());
|
||||||
|
lock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
blockEvicted(key, previousEntry, false);
|
||||||
|
} finally {
|
||||||
|
lock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
|
* Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
|
||||||
* Process all that are passed in even if failure being sure to remove from ramCache else we'll
|
* Process all that are passed in even if failure being sure to remove from ramCache else we'll
|
||||||
|
@ -953,7 +971,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
BlockCacheKey key = entries.get(i).getKey();
|
BlockCacheKey key = entries.get(i).getKey();
|
||||||
// Only add if non-null entry.
|
// Only add if non-null entry.
|
||||||
if (bucketEntries[i] != null) {
|
if (bucketEntries[i] != null) {
|
||||||
backingMap.put(key, bucketEntries[i]);
|
putIntoBackingMap(key, bucketEntries[i]);
|
||||||
}
|
}
|
||||||
// Always remove from ramCache even if we failed adding it to the block cache above.
|
// Always remove from ramCache even if we failed adding it to the block cache above.
|
||||||
RAMQueueEntry ramCacheEntry = ramCache.remove(key);
|
RAMQueueEntry ramCacheEntry = ramCache.remove(key);
|
||||||
|
@ -987,7 +1005,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
* @param receptical Where to stash the elements taken from queue. We clear before we use it
|
* @param receptical Where to stash the elements taken from queue. We clear before we use it
|
||||||
* just in case.
|
* just in case.
|
||||||
* @param q The queue to take from.
|
* @param q The queue to take from.
|
||||||
* @return <code>receptical laden with elements taken from the queue or empty if none found.
|
* @return receptical laden with elements taken from the queue or empty if none found.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q,
|
static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry> q,
|
||||||
|
|
|
@ -191,14 +191,19 @@ public class TestBucketCache {
|
||||||
CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
|
CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
|
||||||
|
throws InterruptedException {
|
||||||
|
while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
|
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
|
||||||
// threads will flush it to the bucket and put reference entry in backingMap.
|
// threads will flush it to the bucket and put reference entry in backingMap.
|
||||||
private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
|
private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
|
||||||
Cacheable block) throws InterruptedException {
|
Cacheable block) throws InterruptedException {
|
||||||
cache.cacheBlock(cacheKey, block);
|
cache.cacheBlock(cacheKey, block);
|
||||||
while (!cache.backingMap.containsKey(cacheKey)) {
|
waitUntilFlushedToBucket(cache, cacheKey);
|
||||||
Thread.sleep(100);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -388,7 +393,7 @@ public class TestBucketCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCacheBlockNextBlockMetadataMissing() {
|
public void testCacheBlockNextBlockMetadataMissing() throws Exception {
|
||||||
int size = 100;
|
int size = 100;
|
||||||
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
|
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
|
||||||
byte[] byteArr = new byte[length];
|
byte[] byteArr = new byte[length];
|
||||||
|
@ -410,6 +415,7 @@ public class TestBucketCache {
|
||||||
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata,
|
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata,
|
||||||
actualBuffer, block1Buffer);
|
actualBuffer, block1Buffer);
|
||||||
|
|
||||||
|
waitUntilFlushedToBucket(cache, key);
|
||||||
//Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
|
//Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
|
||||||
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata,
|
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata,
|
||||||
actualBuffer, block1Buffer);
|
actualBuffer, block1Buffer);
|
||||||
|
@ -420,6 +426,7 @@ public class TestBucketCache {
|
||||||
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata,
|
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata,
|
||||||
actualBuffer, block2Buffer);
|
actualBuffer, block2Buffer);
|
||||||
|
|
||||||
|
waitUntilFlushedToBucket(cache, key);
|
||||||
//Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
|
//Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
|
||||||
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata,
|
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata,
|
||||||
actualBuffer, block1Buffer);
|
actualBuffer, block1Buffer);
|
||||||
|
|
Loading…
Reference in New Issue