HBASE-27365 Minimise block addition failures due to no space in bucket cache writers queue by introducing wait time

This commit is contained in:
Rajeshbabu Chintaguntla 2022-10-05 04:03:45 +05:30
parent 804844c018
commit 5fa5b0364f
8 changed files with 111 additions and 23 deletions

View File

@ -34,6 +34,19 @@ public interface BlockCache extends Iterable<CachedBlock> {
*/
void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory);
/**
* Add block to cache.
* @param cacheKey The block's cache key.
* @param buf The block contents wrapped in a ByteBuffer.
* @param inMemory Whether block should be treated as in-memory
* @param waitWhenCache Whether to wait for the cache to be flushed mainly when BucketCache is
* configured.
*/
default void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
boolean waitWhenCache) {
cacheBlock(cacheKey, buf, inMemory);
}
/**
* Add block to cache (defaults to not in-memory).
* @param cacheKey The block's cache key.

View File

@ -53,11 +53,17 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
cacheBlock(cacheKey, buf, inMemory, false);
}
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
boolean waitWhenCache) {
boolean metaBlock = buf.getBlockType().getCategory() != BlockType.BlockCategory.DATA;
if (metaBlock) {
l1Cache.cacheBlock(cacheKey, buf, inMemory);
} else {
l2Cache.cacheBlock(cacheKey, buf, inMemory);
l2Cache.cacheBlock(cacheKey, buf, inMemory, waitWhenCache);
}
}

View File

@ -1324,7 +1324,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly);
}
});
@ -1337,8 +1337,8 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
cacheConf.isInMemory());
// Using the wait on cache during compaction and prefetching.
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, cacheOnly);
}
});
if (unpacked != hfileBlock) {

View File

@ -534,7 +534,7 @@ public class HFileWriterImpl implements HFile.Writer {
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
try {
cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
cacheFormatBlock);
cacheFormatBlock, cacheConf.isInMemory(), true);
} finally {
// refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
cacheFormatBlock.release();

View File

@ -167,13 +167,6 @@ public class BucketCache implements BlockCache, HeapSize {
private static final int DEFAULT_CACHE_WAIT_TIME = 50;
/**
* Used in tests. If this flag is false and the cache speed is very fast, bucket cache will skip
* some blocks when caching. If the flag is true, we will wait until blocks are flushed to
* IOEngine.
*/
boolean wait_when_cache = false;
private final BucketCacheStats cacheStats = new BucketCacheStats();
private final String persistencePath;
@ -239,6 +232,10 @@ public class BucketCache implements BlockCache, HeapSize {
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
private static final String QUEUE_ADDITION_WAIT_TIME =
"hbase.bucketcache.queue.addition.waittime";
private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
private long queueAdditionWaitTime;
/**
* Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
* integrity, default algorithm is MD5
@ -273,6 +270,8 @@ public class BucketCache implements BlockCache, HeapSize {
this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
this.queueAdditionWaitTime =
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
sanityCheckConfigs();
@ -415,7 +414,19 @@ public class BucketCache implements BlockCache, HeapSize {
*/
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
cacheBlockWithWait(cacheKey, cachedItem, inMemory, false);
}
/**
* Cache the block with the specified name and buffer.
* @param cacheKey block's cache key
* @param cachedItem block buffer
* @param inMemory if block is in-memory
*/
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
boolean waitWhenCache) {
cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0);
}
/**
@ -471,7 +482,7 @@ public class BucketCache implements BlockCache, HeapSize {
boolean successfulAddition = false;
if (wait) {
try {
successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -120,7 +121,6 @@ public class TestBucketCache {
int writerThreads, int writerQLen, String persistencePath) throws IOException {
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
persistencePath);
super.wait_when_cache = true;
}
@Override
@ -242,8 +242,8 @@ public class TestBucketCache {
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
// threads will flush it to the bucket and put reference entry in backingMap.
private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
Cacheable block) throws InterruptedException {
cache.cacheBlock(cacheKey, block);
Cacheable block, boolean waitWhenCache) throws InterruptedException {
cache.cacheBlock(cacheKey, block, false, waitWhenCache);
waitUntilFlushedToBucket(cache, cacheKey);
}
@ -251,7 +251,7 @@ public class TestBucketCache {
public void testMemoryLeak() throws Exception {
final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
new CacheTestUtils.ByteArrayCacheable(new byte[10]));
new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
long lockId = cache.backingMap.get(cacheKey).offset();
ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
lock.writeLock().lock();
@ -266,7 +266,7 @@ public class TestBucketCache {
cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
assertEquals(0, cache.getBlockCount());
cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
new CacheTestUtils.ByteArrayCacheable(new byte[10]));
new CacheTestUtils.ByteArrayCacheable(new byte[10]), false);
assertEquals(1, cache.getBlockCount());
lock.writeLock().unlock();
evictThread.join();
@ -312,7 +312,8 @@ public class TestBucketCache {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
false);
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
@ -640,7 +641,7 @@ public class TestBucketCache {
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
hfileBlockPair.getBlock());
hfileBlockPair.getBlock(), false);
}
usedByteSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedByteSize);
@ -665,4 +666,58 @@ public class TestBucketCache {
}
}
@Test
public void testBlockAdditionWaitWhenCache() throws Exception {
try {
final Path dataTestDir = createAndGetTestDir();
String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, 1, 1, persistencePath);
long usedByteSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedByteSize);
HFileBlockPair[] hfileBlockPairs =
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
// Add blocks
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
true);
}
// Max wait for 10 seconds.
long timeout = 10000;
// Wait for blocks size to match the number of blocks.
while (bucketCache.backingMap.size() != 10) {
if (timeout <= 0) break;
Threads.sleep(100);
timeout = -100;
}
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
}
usedByteSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedByteSize);
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());
// restore cache from file
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
bucketCache.evictBlock(blockCacheKey);
}
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
} finally {
HBASE_TESTING_UTILITY.cleanupTestDir();
}
}
}

View File

@ -110,8 +110,6 @@ public class TestBucketCacheRefCnt {
// Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2>
public void testBlockInRAMCache() throws IOException {
cache = create(1, 1000);
// Set this to true;
cache.wait_when_cache = true;
disableWriter();
final String prefix = "testBlockInRamCache";
try {

View File

@ -282,4 +282,9 @@
<value>3</value>
<description>Default is unbounded</description>
</property>
<property>
<name>hbase.bucketcache.queue.addition.waittime</name>
<value>1000</value>
<description>Default is 0</description>
</property>
</configuration>