From 108479e95e38c5a6ec5622a7d93e28b55dc2f717 Mon Sep 17 00:00:00 2001 From: Rajeshbabu Chintaguntla Date: Tue, 4 Oct 2022 23:35:56 +0530 Subject: [PATCH] HBASE-27365 Minimise block addition failures due to no space in bucket cache writers queue by introducing wait time --- .../hadoop/hbase/io/hfile/BlockCache.java | 13 ++++ .../hbase/io/hfile/HFileReaderImpl.java | 6 +- .../hbase/io/hfile/HFileWriterImpl.java | 2 +- .../hbase/io/hfile/bucket/BucketCache.java | 29 +++++--- .../io/hfile/bucket/TestBucketCache.java | 69 +++++++++++++++++-- .../hfile/bucket/TestBucketCacheRefCnt.java | 2 - .../hfile/bucket/TestPrefetchPersistence.java | 2 - .../src/test/resources/hbase-site.xml | 5 ++ 8 files changed, 104 insertions(+), 24 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 52eaa30317a..1caa1b76f5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -34,6 +34,19 @@ public interface BlockCache extends Iterable { */ void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory); + /** + * Add block to cache. + * @param cacheKey The block's cache key. + * @param buf The block contents wrapped in a ByteBuffer. + * @param inMemory Whether block should be treated as in-memory + * @param waitWhenCache Whether to wait for the cache to be flushed mainly when BucketCache is + * configured. + */ + default void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory, + boolean waitWhenCache) { + cacheBlock(cacheKey, buf, inMemory); + } + /** * Add block to cache (defaults to not in-memory). * @param cacheKey The block's cache key. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 2cf1c3df677..5c2e42e6b8a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1328,7 +1328,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable { // Cache the block if necessary cacheConf.getBlockCache().ifPresent(cache -> { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { - cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory()); + cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly); } }); @@ -1341,8 +1341,8 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable { // Cache the block if necessary cacheConf.getBlockCache().ifPresent(cache -> { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { - cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, - cacheConf.isInMemory()); + // Using the wait on cache during compaction and prefetching. + cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, cacheOnly); } }); if (unpacked != hfileBlock) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 6b7cf3caaa3..b33d471ae49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -550,7 +550,7 @@ public class HFileWriterImpl implements HFile.Writer { HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); try { cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()), - cacheFormatBlock); + cacheFormatBlock, cacheConf.isInMemory(), true); } finally { // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent cacheFormatBlock.release(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 33c7399bed9..9620462e356 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -170,13 +170,6 @@ public class BucketCache implements BlockCache, HeapSize { private static final int DEFAULT_CACHE_WAIT_TIME = 50; - /** - * Used in tests. If this flag is false and the cache speed is very fast, bucket cache will skip - * some blocks when caching. If the flag is true, we will wait until blocks are flushed to - * IOEngine. - */ - boolean wait_when_cache = false; - private final BucketCacheStats cacheStats = new BucketCacheStats(); private final String persistencePath; @@ -244,6 +237,10 @@ public class BucketCache implements BlockCache, HeapSize { "hbase.bucketcache.persistent.file.integrity.check.algorithm"; private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; + private static final String QUEUE_ADDITION_WAIT_TIME = + "hbase.bucketcache.queue.addition.waittime"; + private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0; + private long queueAdditionWaitTime; /** * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file * integrity, default algorithm is MD5 @@ -278,6 +275,8 @@ public class BucketCache implements BlockCache, HeapSize { this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR); this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR); this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR); + this.queueAdditionWaitTime = + conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY); sanityCheckConfigs(); @@ -421,7 +420,19 @@ public class BucketCache implements BlockCache, HeapSize { */ @Override public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) { - cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache); + cacheBlockWithWait(cacheKey, cachedItem, inMemory, false); + } + + /** + * Cache the block with the specified name and buffer. + * @param cacheKey block's cache key + * @param cachedItem block buffer + * @param inMemory if block is in-memory + */ + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, + boolean waitWhenCache) { + cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0); } /** @@ -480,7 +491,7 @@ public class BucketCache implements BlockCache, HeapSize { boolean successfulAddition = false; if (wait) { try { - successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS); + successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 6f7d4e22e2b..4a53a0212ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -120,7 +121,6 @@ public class TestBucketCache { int writerThreads, int writerQLen, String persistencePath) throws IOException { super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen, persistencePath); - super.wait_when_cache = true; } @Override @@ -242,8 +242,8 @@ public class TestBucketCache { // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer // threads will flush it to the bucket and put reference entry in backingMap. private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey, - Cacheable block) throws InterruptedException { - cache.cacheBlock(cacheKey, block); + Cacheable block, boolean waitWhenCache) throws InterruptedException { + cache.cacheBlock(cacheKey, block, false, waitWhenCache); waitUntilFlushedToBucket(cache, cacheKey); } @@ -251,7 +251,7 @@ public class TestBucketCache { public void testMemoryLeak() throws Exception { final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L); cacheAndWaitUntilFlushedToBucket(cache, cacheKey, - new CacheTestUtils.ByteArrayCacheable(new byte[10])); + new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); long lockId = cache.backingMap.get(cacheKey).offset(); ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId); lock.writeLock().lock(); @@ -266,7 +266,7 @@ public class TestBucketCache { cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true); assertEquals(0, cache.getBlockCount()); cacheAndWaitUntilFlushedToBucket(cache, cacheKey, - new CacheTestUtils.ByteArrayCacheable(new byte[10])); + new CacheTestUtils.ByteArrayCacheable(new byte[10]), true); assertEquals(1, cache.getBlockCount()); lock.writeLock().unlock(); evictThread.join(); @@ -312,7 +312,8 @@ public class TestBucketCache { bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); } for (HFileBlockPair block : blocks) { - cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(), + false); } usedSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedSize); @@ -691,7 +692,7 @@ public class TestBucketCache { for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(), - hfileBlockPair.getBlock()); + hfileBlockPair.getBlock(), false); } usedByteSize = bucketCache.getAllocator().getUsedSize(); assertNotEquals(0, usedByteSize); @@ -716,4 +717,58 @@ public class TestBucketCache { } } + @Test + public void testBlockAdditionWaitWhenCache() throws Exception { + try { + final Path dataTestDir = createAndGetTestDir(); + + String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; + String persistencePath = dataTestDir + "/bucketNoRecycler.persistence"; + + BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, 1, 1, persistencePath); + long usedByteSize = bucketCache.getAllocator().getUsedSize(); + assertEquals(0, usedByteSize); + + HFileBlockPair[] hfileBlockPairs = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10); + // Add blocks + for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { + bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, + true); + } + + // Max wait for 10 seconds. + long timeout = 10000; + // Wait for blocks size to match the number of blocks. + while (bucketCache.backingMap.size() != 10) { + if (timeout <= 0) break; + Threads.sleep(100); + timeout = -100; + } + for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { + assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName())); + } + usedByteSize = bucketCache.getAllocator().getUsedSize(); + assertNotEquals(0, usedByteSize); + // persist cache to file + bucketCache.shutdown(); + assertTrue(new File(persistencePath).exists()); + + // restore cache from file + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath); + assertFalse(new File(persistencePath).exists()); + assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); + + for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { + BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName(); + bucketCache.evictBlock(blockCacheKey); + } + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + } finally { + HBASE_TESTING_UTILITY.cleanupTestDir(); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java index 44a398bda44..a3f291b7949 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java @@ -110,8 +110,6 @@ public class TestBucketCacheRefCnt { // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2> public void testBlockInRAMCache() throws IOException { cache = create(1, 1000); - // Set this to true; - cache.wait_when_cache = true; disableWriter(); final String prefix = "testBlockInRamCache"; try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java index 667dabd47f3..900340e57ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -117,7 +117,6 @@ public class TestPrefetchPersistence { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); - bucketCache.wait_when_cache = true; cacheConf = new CacheConfig(conf, bucketCache); long usedSize = bucketCache.getAllocator().getUsedSize(); @@ -137,7 +136,6 @@ public class TestPrefetchPersistence { bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); - bucketCache.wait_when_cache = true; assertFalse(new File(testDir + "/bucket.persistence").exists()); assertFalse(new File(testDir + "/prefetch.persistence").exists()); assertTrue(usedSize != 0); diff --git a/hbase-server/src/test/resources/hbase-site.xml b/hbase-server/src/test/resources/hbase-site.xml index 0b6f1d59a0e..23a84335d94 100644 --- a/hbase-server/src/test/resources/hbase-site.xml +++ b/hbase-server/src/test/resources/hbase-site.xml @@ -277,4 +277,9 @@ 3 Default is unbounded + + hbase.bucketcache.queue.addition.waittime + 1000 + Default is 0 +