HBASE-27365 Minimise block addition failures due to no space in bucket cache writers queue by introducing wait time

This commit is contained in:
Rajeshbabu Chintaguntla 2022-10-04 23:35:56 +05:30
parent af28101ccc
commit 108479e95e
8 changed files with 104 additions and 24 deletions

View File

@ -34,6 +34,19 @@ public interface BlockCache extends Iterable<CachedBlock> {
*/
void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory);
/**
* Add block to cache.
* @param cacheKey The block's cache key.
* @param buf The block contents wrapped in a ByteBuffer.
* @param inMemory Whether block should be treated as in-memory
* @param waitWhenCache Whether to wait for the cache to be flushed mainly when BucketCache is
* configured.
*/
default void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
boolean waitWhenCache) {
cacheBlock(cacheKey, buf, inMemory);
}
/**
* Add block to cache (defaults to not in-memory).
* @param cacheKey The block's cache key.

View File

@ -1328,7 +1328,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly);
}
});
@ -1341,8 +1341,8 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
cacheConf.isInMemory());
// Using the wait on cache during compaction and prefetching.
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, cacheOnly);
}
});
if (unpacked != hfileBlock) {

View File

@ -550,7 +550,7 @@ public class HFileWriterImpl implements HFile.Writer {
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
try {
cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
cacheFormatBlock);
cacheFormatBlock, cacheConf.isInMemory(), true);
} finally {
// refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
cacheFormatBlock.release();

View File

@ -170,13 +170,6 @@ public class BucketCache implements BlockCache, HeapSize {
private static final int DEFAULT_CACHE_WAIT_TIME = 50;
/**
* Used in tests. If this flag is false and the cache speed is very fast, bucket cache will skip
* some blocks when caching. If the flag is true, we will wait until blocks are flushed to
* IOEngine.
*/
boolean wait_when_cache = false;
private final BucketCacheStats cacheStats = new BucketCacheStats();
private final String persistencePath;
@ -244,6 +237,10 @@ public class BucketCache implements BlockCache, HeapSize {
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
private static final String QUEUE_ADDITION_WAIT_TIME =
"hbase.bucketcache.queue.addition.waittime";
private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
private long queueAdditionWaitTime;
/**
* Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
* integrity, default algorithm is MD5
@ -278,6 +275,8 @@ public class BucketCache implements BlockCache, HeapSize {
this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
this.queueAdditionWaitTime =
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);
sanityCheckConfigs();
@ -421,7 +420,19 @@ public class BucketCache implements BlockCache, HeapSize {
*/
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
cacheBlockWithWait(cacheKey, cachedItem, inMemory, false);
}
/**
* Cache the block with the specified name and buffer.
* @param cacheKey block's cache key
* @param cachedItem block buffer
* @param inMemory if block is in-memory
*/
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
boolean waitWhenCache) {
cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0);
}
/**
@ -480,7 +491,7 @@ public class BucketCache implements BlockCache, HeapSize {
boolean successfulAddition = false;
if (wait) {
try {
successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -120,7 +121,6 @@ public class TestBucketCache {
int writerThreads, int writerQLen, String persistencePath) throws IOException {
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
persistencePath);
super.wait_when_cache = true;
}
@Override
@ -242,8 +242,8 @@ public class TestBucketCache {
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
// threads will flush it to the bucket and put reference entry in backingMap.
private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
Cacheable block) throws InterruptedException {
cache.cacheBlock(cacheKey, block);
Cacheable block, boolean waitWhenCache) throws InterruptedException {
cache.cacheBlock(cacheKey, block, false, waitWhenCache);
waitUntilFlushedToBucket(cache, cacheKey);
}
@ -251,7 +251,7 @@ public class TestBucketCache {
public void testMemoryLeak() throws Exception {
final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
new CacheTestUtils.ByteArrayCacheable(new byte[10]));
new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
long lockId = cache.backingMap.get(cacheKey).offset();
ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
lock.writeLock().lock();
@ -266,7 +266,7 @@ public class TestBucketCache {
cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, true);
assertEquals(0, cache.getBlockCount());
cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
new CacheTestUtils.ByteArrayCacheable(new byte[10]));
new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
assertEquals(1, cache.getBlockCount());
lock.writeLock().unlock();
evictThread.join();
@ -312,7 +312,8 @@ public class TestBucketCache {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
false);
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
@ -691,7 +692,7 @@ public class TestBucketCache {
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
hfileBlockPair.getBlock());
hfileBlockPair.getBlock(), false);
}
usedByteSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedByteSize);
@ -716,4 +717,58 @@ public class TestBucketCache {
}
}
@Test
public void testBlockAdditionWaitWhenCache() throws Exception {
try {
final Path dataTestDir = createAndGetTestDir();
String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, 1, 1, persistencePath);
long usedByteSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedByteSize);
HFileBlockPair[] hfileBlockPairs =
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
// Add blocks
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
true);
}
// Max wait for 10 seconds.
long timeout = 10000;
// Wait for blocks size to match the number of blocks.
while (bucketCache.backingMap.size() != 10) {
if (timeout <= 0) break;
Threads.sleep(100);
timeout = -100;
}
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
}
usedByteSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedByteSize);
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());
// restore cache from file
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
bucketCache.evictBlock(blockCacheKey);
}
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
} finally {
HBASE_TESTING_UTILITY.cleanupTestDir();
}
}
}

View File

@ -110,8 +110,6 @@ public class TestBucketCacheRefCnt {
// Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2>
public void testBlockInRAMCache() throws IOException {
cache = create(1, 1000);
// Set this to true;
cache.wait_when_cache = true;
disableWriter();
final String prefix = "testBlockInRamCache";
try {

View File

@ -117,7 +117,6 @@ public class TestPrefetchPersistence {
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", 60 * 1000, conf);
bucketCache.wait_when_cache = true;
cacheConf = new CacheConfig(conf, bucketCache);
long usedSize = bucketCache.getAllocator().getUsedSize();
@ -137,7 +136,6 @@ public class TestPrefetchPersistence {
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", 60 * 1000, conf);
bucketCache.wait_when_cache = true;
assertFalse(new File(testDir + "/bucket.persistence").exists());
assertFalse(new File(testDir + "/prefetch.persistence").exists());
assertTrue(usedSize != 0);

View File

@ -277,4 +277,9 @@
<value>3</value>
<description>Default is unbounded</description>
</property>
<property>
<name>hbase.bucketcache.queue.addition.waittime</name>
<value>1000</value>
<description>Default is 0</description>
</property>
</configuration>