HBASE-15560 W-TinyLFU based BlockCache
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
86ce519849
commit
cadf321d53
|
@ -895,6 +895,11 @@ possible configurations would overwhelm and obscure the important.
|
|||
<description>
|
||||
The default thread pool size if parallel-seeking feature enabled.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hfile.block.cache.policy</name>
|
||||
<value>LRU</value>
|
||||
<description>The eviction policy for the L1 block cache (LRU or TinyLFU).</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hfile.block.cache.size</name>
|
||||
<value>0.4</value>
|
||||
|
|
|
@ -2928,4 +2928,30 @@ Copyright (c) 2007-2017 The JRuby project
|
|||
</licenses>
|
||||
</project>
|
||||
</supplement>
|
||||
<supplement>
|
||||
<project>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
<licenses>
|
||||
<license>
|
||||
<name>Apache License, Version 2.0</name>
|
||||
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
|
||||
<distribution>repo</distribution>
|
||||
</license>
|
||||
</licenses>
|
||||
</project>
|
||||
</supplement>
|
||||
<supplement>
|
||||
<project>
|
||||
<groupId>com.google.errorprone</groupId>
|
||||
<artifactId>error_prone_annotations</artifactId>
|
||||
<licenses>
|
||||
<license>
|
||||
<name>Apache License, Version 2.0</name>
|
||||
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
|
||||
<distribution>repo</distribution>
|
||||
</license>
|
||||
</licenses>
|
||||
</project>
|
||||
</supplement>
|
||||
</supplementalDataModels>
|
||||
|
|
|
@ -455,6 +455,10 @@
|
|||
<artifactId>findbugs-annotations</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
|
|
|
@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
|
|||
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -41,6 +42,12 @@ public final class BlockCacheFactory {
|
|||
* Configuration keys for Bucket cache
|
||||
*/
|
||||
|
||||
/**
|
||||
* Configuration key to cache block policy (Lru, TinyLfu).
|
||||
*/
|
||||
public static final String BLOCKCACHE_POLICY_KEY = "hfile.block.cache.policy";
|
||||
public static final String BLOCKCACHE_POLICY_DEFAULT = "LRU";
|
||||
|
||||
/**
|
||||
* If the chosen ioengine can persist its state across restarts, the path to the file to persist
|
||||
* to. This file is NOT the data file. It is a file into which we will serialize the map of
|
||||
|
@ -85,16 +92,16 @@ public final class BlockCacheFactory {
|
|||
}
|
||||
|
||||
public static BlockCache createBlockCache(Configuration conf) {
|
||||
LruBlockCache onHeapCache = createOnHeapCache(conf);
|
||||
if (onHeapCache == null) {
|
||||
FirstLevelBlockCache l1Cache = createFirstLevelCache(conf);
|
||||
if (l1Cache == null) {
|
||||
return null;
|
||||
}
|
||||
boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT);
|
||||
if (useExternal) {
|
||||
BlockCache l2CacheInstance = createExternalBlockcache(conf);
|
||||
return l2CacheInstance == null ?
|
||||
onHeapCache :
|
||||
new InclusiveCombinedBlockCache(onHeapCache, l2CacheInstance);
|
||||
l1Cache :
|
||||
new InclusiveCombinedBlockCache(l1Cache, l2CacheInstance);
|
||||
} else {
|
||||
// otherwise use the bucket cache.
|
||||
BucketCache bucketCache = createBucketCache(conf);
|
||||
|
@ -103,20 +110,26 @@ public final class BlockCacheFactory {
|
|||
LOG.warn(
|
||||
"From HBase 2.0 onwards only combined mode of LRU cache and bucket cache is available");
|
||||
}
|
||||
return bucketCache == null ? onHeapCache : new CombinedBlockCache(onHeapCache, bucketCache);
|
||||
return bucketCache == null ? l1Cache : new CombinedBlockCache(l1Cache, bucketCache);
|
||||
}
|
||||
}
|
||||
|
||||
private static LruBlockCache createOnHeapCache(final Configuration c) {
|
||||
private static FirstLevelBlockCache createFirstLevelCache(final Configuration c) {
|
||||
final long cacheSize = MemorySizeUtil.getOnHeapCacheSize(c);
|
||||
if (cacheSize < 0) {
|
||||
return null;
|
||||
}
|
||||
String policy = c.get(BLOCKCACHE_POLICY_KEY, BLOCKCACHE_POLICY_DEFAULT);
|
||||
int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE);
|
||||
LOG.info(
|
||||
"Allocating onheap LruBlockCache size=" + StringUtils.byteDesc(cacheSize) + ", blockSize="
|
||||
+ StringUtils.byteDesc(blockSize));
|
||||
return new LruBlockCache(cacheSize, blockSize, true, c);
|
||||
LOG.info("Allocating BlockCache size=" +
|
||||
StringUtils.byteDesc(cacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize));
|
||||
if (policy.equalsIgnoreCase("LRU")) {
|
||||
return new LruBlockCache(cacheSize, blockSize, true, c);
|
||||
} else if (policy.equalsIgnoreCase("TinyLFU")) {
|
||||
return new TinyLfuBlockCache(cacheSize, blockSize, ForkJoinPool.commonPool(), c);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unknown policy: " + policy);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -30,22 +30,23 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
|||
|
||||
/**
|
||||
* CombinedBlockCache is an abstraction layer that combines
|
||||
* {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used
|
||||
* {@link FirstLevelBlockCache} and {@link BucketCache}. The smaller lruCache is used
|
||||
* to cache bloom blocks and index blocks. The larger Cache is used to
|
||||
* cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean, boolean)} reads
|
||||
* first from the smaller lruCache before looking for the block in the l2Cache.
|
||||
* first from the smaller l1Cache before looking for the block in the l2Cache. Blocks evicted
|
||||
* from l1Cache are put into the bucket cache.
|
||||
* Metrics are the combined size and hits and misses of both caches.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
|
||||
protected final LruBlockCache onHeapCache;
|
||||
protected final FirstLevelBlockCache l1Cache;
|
||||
protected final BlockCache l2Cache;
|
||||
protected final CombinedCacheStats combinedCacheStats;
|
||||
|
||||
public CombinedBlockCache(LruBlockCache onHeapCache, BlockCache l2Cache) {
|
||||
this.onHeapCache = onHeapCache;
|
||||
public CombinedBlockCache(FirstLevelBlockCache l1Cache, BlockCache l2Cache) {
|
||||
this.l1Cache = l1Cache;
|
||||
this.l2Cache = l2Cache;
|
||||
this.combinedCacheStats = new CombinedCacheStats(onHeapCache.getStats(),
|
||||
this.combinedCacheStats = new CombinedCacheStats(l1Cache.getStats(),
|
||||
l2Cache.getStats());
|
||||
}
|
||||
|
||||
|
@ -55,14 +56,14 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
|
|||
if (l2Cache instanceof HeapSize) {
|
||||
l2size = ((HeapSize) l2Cache).heapSize();
|
||||
}
|
||||
return onHeapCache.heapSize() + l2size;
|
||||
return l1Cache.heapSize() + l2size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
|
||||
boolean metaBlock = buf.getBlockType().getCategory() != BlockCategory.DATA;
|
||||
if (metaBlock) {
|
||||
onHeapCache.cacheBlock(cacheKey, buf, inMemory);
|
||||
l1Cache.cacheBlock(cacheKey, buf, inMemory);
|
||||
} else {
|
||||
l2Cache.cacheBlock(cacheKey, buf, inMemory);
|
||||
}
|
||||
|
@ -80,19 +81,19 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
|
|||
// we end up calling l2Cache.getBlock.
|
||||
// We are not in a position to exactly look at LRU cache or BC as BlockType may not be getting
|
||||
// passed always.
|
||||
return onHeapCache.containsBlock(cacheKey)?
|
||||
onHeapCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics):
|
||||
return l1Cache.containsBlock(cacheKey)?
|
||||
l1Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics):
|
||||
l2Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean evictBlock(BlockCacheKey cacheKey) {
|
||||
return onHeapCache.evictBlock(cacheKey) || l2Cache.evictBlock(cacheKey);
|
||||
return l1Cache.evictBlock(cacheKey) || l2Cache.evictBlock(cacheKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int evictBlocksByHfileName(String hfileName) {
|
||||
return onHeapCache.evictBlocksByHfileName(hfileName)
|
||||
return l1Cache.evictBlocksByHfileName(hfileName)
|
||||
+ l2Cache.evictBlocksByHfileName(hfileName);
|
||||
}
|
||||
|
||||
|
@ -103,43 +104,43 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
|
|||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
onHeapCache.shutdown();
|
||||
l1Cache.shutdown();
|
||||
l2Cache.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long size() {
|
||||
return onHeapCache.size() + l2Cache.size();
|
||||
return l1Cache.size() + l2Cache.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxSize() {
|
||||
return onHeapCache.getMaxSize() + l2Cache.getMaxSize();
|
||||
return l1Cache.getMaxSize() + l2Cache.getMaxSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCurrentDataSize() {
|
||||
return onHeapCache.getCurrentDataSize() + l2Cache.getCurrentDataSize();
|
||||
return l1Cache.getCurrentDataSize() + l2Cache.getCurrentDataSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFreeSize() {
|
||||
return onHeapCache.getFreeSize() + l2Cache.getFreeSize();
|
||||
return l1Cache.getFreeSize() + l2Cache.getFreeSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCurrentSize() {
|
||||
return onHeapCache.getCurrentSize() + l2Cache.getCurrentSize();
|
||||
return l1Cache.getCurrentSize() + l2Cache.getCurrentSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBlockCount() {
|
||||
return onHeapCache.getBlockCount() + l2Cache.getBlockCount();
|
||||
return l1Cache.getBlockCount() + l2Cache.getBlockCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDataBlockCount() {
|
||||
return onHeapCache.getDataBlockCount() + l2Cache.getDataBlockCount();
|
||||
return l1Cache.getDataBlockCount() + l2Cache.getDataBlockCount();
|
||||
}
|
||||
|
||||
public static class CombinedCacheStats extends CacheStats {
|
||||
|
@ -332,7 +333,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
|
|||
lruCacheStats.rollMetricsPeriod();
|
||||
bucketCacheStats.rollMetricsPeriod();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getFailedInserts() {
|
||||
return lruCacheStats.getFailedInserts() + bucketCacheStats.getFailedInserts();
|
||||
|
@ -343,13 +344,13 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
|
|||
return lruCacheStats.getSumHitCountsPastNPeriods()
|
||||
+ bucketCacheStats.getSumHitCountsPastNPeriods();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getSumRequestCountsPastNPeriods() {
|
||||
return lruCacheStats.getSumRequestCountsPastNPeriods()
|
||||
+ bucketCacheStats.getSumRequestCountsPastNPeriods();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long getSumHitCachingCountsPastNPeriods() {
|
||||
return lruCacheStats.getSumHitCachingCountsPastNPeriods()
|
||||
|
@ -370,12 +371,12 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
|
|||
|
||||
@Override
|
||||
public BlockCache[] getBlockCaches() {
|
||||
return new BlockCache [] {this.onHeapCache, this.l2Cache};
|
||||
return new BlockCache [] {this.l1Cache, this.l2Cache};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxSize(long size) {
|
||||
this.onHeapCache.setMaxSize(size);
|
||||
this.l1Cache.setMaxSize(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -390,7 +391,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
|
|||
? ((BucketCache) this.l2Cache).getRefCount(cacheKey) : 0;
|
||||
}
|
||||
|
||||
public LruBlockCache getOnHeapCache() {
|
||||
return onHeapCache;
|
||||
public FirstLevelBlockCache getFirstLevelCache() {
|
||||
return l1Cache;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* In-memory BlockCache that may be backed by secondary layer(s).
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface FirstLevelBlockCache extends ResizableBlockCache, HeapSize {
|
||||
|
||||
/**
|
||||
* Whether the cache contains the block with specified cacheKey
|
||||
*
|
||||
* @param cacheKey cache key for the block
|
||||
* @return true if it contains the block
|
||||
*/
|
||||
boolean containsBlock(BlockCacheKey cacheKey);
|
||||
|
||||
/**
|
||||
* Specifies the secondary cache. An entry that is evicted from this cache due to a size
|
||||
* constraint will be inserted into the victim cache.
|
||||
*
|
||||
* @param victimCache the second level cache
|
||||
* @throws IllegalArgumentException if the victim cache had already been set
|
||||
*/
|
||||
void setVictimCache(BlockCache victimCache);
|
||||
}
|
|
@ -23,7 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
|
||||
@InterfaceAudience.Private
|
||||
public class InclusiveCombinedBlockCache extends CombinedBlockCache {
|
||||
public InclusiveCombinedBlockCache(LruBlockCache l1, BlockCache l2) {
|
||||
public InclusiveCombinedBlockCache(FirstLevelBlockCache l1, BlockCache l2) {
|
||||
super(l1,l2);
|
||||
l1.setVictimCache(l2);
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ public class InclusiveCombinedBlockCache extends CombinedBlockCache {
|
|||
// On all external cache set ups the lru should have the l2 cache set as the victimHandler
|
||||
// Because of that all requests that miss inside of the lru block cache will be
|
||||
// tried in the l2 block cache.
|
||||
return onHeapCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
|
||||
return l1Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -48,7 +48,7 @@ public class InclusiveCombinedBlockCache extends CombinedBlockCache {
|
|||
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
|
||||
// This is the inclusive part of the combined block cache.
|
||||
// Every block is placed into both block caches.
|
||||
onHeapCache.cacheBlock(cacheKey, buf, inMemory);
|
||||
l1Cache.cacheBlock(cacheKey, buf, inMemory);
|
||||
|
||||
// This assumes that insertion into the L2 block cache is either async or very fast.
|
||||
l2Cache.cacheBlock(cacheKey, buf, inMemory);
|
||||
|
@ -56,7 +56,7 @@ public class InclusiveCombinedBlockCache extends CombinedBlockCache {
|
|||
|
||||
@Override
|
||||
public boolean evictBlock(BlockCacheKey cacheKey) {
|
||||
boolean l1Result = this.onHeapCache.evictBlock(cacheKey);
|
||||
boolean l1Result = this.l1Cache.evictBlock(cacheKey);
|
||||
boolean l2Result = this.l2Cache.evictBlock(cacheKey);
|
||||
return l1Result || l2Result;
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.util.EnumMap;
|
||||
import java.util.Iterator;
|
||||
|
@ -91,7 +93,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
|
|||
* to the relative sizes and usage.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||
public class LruBlockCache implements FirstLevelBlockCache {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LruBlockCache.class);
|
||||
|
||||
|
@ -252,8 +254,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
|||
DEFAULT_MEMORY_FACTOR,
|
||||
DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
|
||||
false,
|
||||
DEFAULT_MAX_BLOCK_SIZE
|
||||
);
|
||||
DEFAULT_MAX_BLOCK_SIZE);
|
||||
}
|
||||
|
||||
public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
|
||||
|
@ -269,8 +270,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
|||
conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME,
|
||||
DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
|
||||
conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
|
||||
conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)
|
||||
);
|
||||
conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE));
|
||||
}
|
||||
|
||||
public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
|
||||
|
@ -338,6 +338,14 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
|||
STAT_THREAD_PERIOD, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setVictimCache(BlockCache victimCache) {
|
||||
if (victimHandler != null) {
|
||||
throw new IllegalArgumentException("The victim cache has already been set");
|
||||
}
|
||||
victimHandler = requireNonNull(victimCache);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxSize(long maxSize) {
|
||||
this.maxSize = maxSize;
|
||||
|
@ -505,6 +513,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
|||
*
|
||||
* @return true if contains the block
|
||||
*/
|
||||
@Override
|
||||
public boolean containsBlock(BlockCacheKey cacheKey) {
|
||||
return map.containsKey(cacheKey);
|
||||
}
|
||||
|
@ -1155,11 +1164,6 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
|||
return counts;
|
||||
}
|
||||
|
||||
public void setVictimCache(BlockCache handler) {
|
||||
assert victimHandler == null;
|
||||
victimHandler = handler;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
|
||||
return map;
|
||||
|
|
|
@ -0,0 +1,417 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.github.benmanes.caffeine.cache.Policy.Eviction;
|
||||
import com.github.benmanes.caffeine.cache.RemovalCause;
|
||||
import com.github.benmanes.caffeine.cache.RemovalListener;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* A block cache that is memory-aware using {@link HeapSize}, memory bounded using the W-TinyLFU
|
||||
* eviction algorithm, and concurrent. This implementation delegates to a Caffeine cache to provide
|
||||
* O(1) read and write operations.
|
||||
* <ul>
|
||||
* <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li>
|
||||
* <li>Caffeine: https://github.com/ben-manes/caffeine</li>
|
||||
* <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li>
|
||||
* </ul>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class TinyLfuBlockCache implements FirstLevelBlockCache {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TinyLfuBlockCache.class);
|
||||
|
||||
private static final String MAX_BLOCK_SIZE = "hbase.tinylfu.max.block.size";
|
||||
private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
|
||||
private static final int STAT_THREAD_PERIOD_SECONDS = 5 * 60;
|
||||
|
||||
private final Eviction<BlockCacheKey, Cacheable> policy;
|
||||
private final ScheduledExecutorService statsThreadPool;
|
||||
private final long maxBlockSize;
|
||||
private final CacheStats stats;
|
||||
|
||||
private BlockCache victimCache;
|
||||
|
||||
@VisibleForTesting
|
||||
final Cache<BlockCacheKey, Cacheable> cache;
|
||||
|
||||
/**
|
||||
* Creates a block cache.
|
||||
*
|
||||
* @param maximumSizeInBytes maximum size of this cache, in bytes
|
||||
* @param avgBlockSize expected average size of blocks, in bytes
|
||||
* @param executor the cache's executor
|
||||
* @param conf additional configuration
|
||||
*/
|
||||
public TinyLfuBlockCache(long maximumSizeInBytes, long avgBlockSize,
|
||||
Executor executor, Configuration conf) {
|
||||
this(maximumSizeInBytes, avgBlockSize,
|
||||
conf.getLong(MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE), executor);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a block cache.
|
||||
*
|
||||
* @param maximumSizeInBytes maximum size of this cache, in bytes
|
||||
* @param avgBlockSize expected average size of blocks, in bytes
|
||||
* @param maxBlockSize maximum size of a block, in bytes
|
||||
* @param executor the cache's executor
|
||||
*/
|
||||
public TinyLfuBlockCache(long maximumSizeInBytes,
|
||||
long avgBlockSize, long maxBlockSize, Executor executor) {
|
||||
this.cache = Caffeine.newBuilder()
|
||||
.executor(executor)
|
||||
.maximumWeight(maximumSizeInBytes)
|
||||
.removalListener(new EvictionListener())
|
||||
.weigher((BlockCacheKey key, Cacheable value) ->
|
||||
(int) Math.min(value.heapSize(), Integer.MAX_VALUE))
|
||||
.initialCapacity((int) Math.ceil((1.2 * maximumSizeInBytes) / avgBlockSize))
|
||||
.build();
|
||||
this.maxBlockSize = maxBlockSize;
|
||||
this.policy = cache.policy().eviction().get();
|
||||
this.stats = new CacheStats(getClass().getSimpleName());
|
||||
|
||||
statsThreadPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
|
||||
.setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true).build());
|
||||
statsThreadPool.scheduleAtFixedRate(this::logStats,
|
||||
STAT_THREAD_PERIOD_SECONDS, STAT_THREAD_PERIOD_SECONDS, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setVictimCache(BlockCache victimCache) {
|
||||
if (this.victimCache != null) {
|
||||
throw new IllegalArgumentException("The victim cache has already been set");
|
||||
}
|
||||
this.victimCache = requireNonNull(victimCache);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long size() {
|
||||
return policy.getMaximum();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFreeSize() {
|
||||
return size() - getCurrentSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCurrentSize() {
|
||||
return policy.weightedSize().getAsLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBlockCount() {
|
||||
return cache.estimatedSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long heapSize() {
|
||||
return getCurrentSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxSize(long size) {
|
||||
policy.setMaximum(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsBlock(BlockCacheKey cacheKey) {
|
||||
return cache.asMap().containsKey(cacheKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cacheable getBlock(BlockCacheKey cacheKey,
|
||||
boolean caching, boolean repeat, boolean updateCacheMetrics) {
|
||||
Cacheable value = cache.getIfPresent(cacheKey);
|
||||
if (value == null) {
|
||||
if (repeat) {
|
||||
return null;
|
||||
}
|
||||
if (updateCacheMetrics) {
|
||||
stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
|
||||
}
|
||||
if (victimCache != null) {
|
||||
value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
|
||||
if ((value != null) && caching) {
|
||||
if ((value instanceof HFileBlock) && ((HFileBlock) value).usesSharedMemory()) {
|
||||
value = ((HFileBlock) value).deepClone();
|
||||
}
|
||||
cacheBlock(cacheKey, value);
|
||||
}
|
||||
}
|
||||
} else if (updateCacheMetrics) {
|
||||
stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, boolean inMemory) {
|
||||
cacheBlock(cacheKey, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cacheBlock(BlockCacheKey key, Cacheable value) {
|
||||
if (value.heapSize() > maxBlockSize) {
|
||||
// If there are a lot of blocks that are too big this can make the logs too noisy (2% logged)
|
||||
if (stats.failInsert() % 50 == 0) {
|
||||
LOG.warn(String.format(
|
||||
"Trying to cache too large a block %s @ %,d is %,d which is larger than %,d",
|
||||
key.getHfileName(), key.getOffset(), value.heapSize(), DEFAULT_MAX_BLOCK_SIZE));
|
||||
}
|
||||
} else {
|
||||
cache.put(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean evictBlock(BlockCacheKey cacheKey) {
|
||||
Cacheable value = cache.asMap().remove(cacheKey);
|
||||
return (value != null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int evictBlocksByHfileName(String hfileName) {
|
||||
int evicted = 0;
|
||||
for (BlockCacheKey key : cache.asMap().keySet()) {
|
||||
if (key.getHfileName().equals(hfileName) && evictBlock(key)) {
|
||||
evicted++;
|
||||
}
|
||||
}
|
||||
if (victimCache != null) {
|
||||
evicted += victimCache.evictBlocksByHfileName(hfileName);
|
||||
}
|
||||
return evicted;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CacheStats getStats() {
|
||||
return stats;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
if (victimCache != null) {
|
||||
victimCache.shutdown();
|
||||
}
|
||||
statsThreadPool.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockCache[] getBlockCaches() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<CachedBlock> iterator() {
|
||||
long now = System.nanoTime();
|
||||
return cache.asMap().entrySet().stream()
|
||||
.map(entry -> (CachedBlock) new CachedBlockView(entry.getKey(), entry.getValue(), now))
|
||||
.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
|
||||
// There is no SHARED type here in L1. But the block might have been served from the L2 victim
|
||||
// cache (when the Combined mode = false). So just try return this block to the victim cache.
|
||||
// Note : In case of CombinedBlockCache we will have this victim cache configured for L1
|
||||
// cache. But CombinedBlockCache will only call returnBlock on L2 cache.
|
||||
if (victimCache != null) {
|
||||
victimCache.returnBlock(cacheKey, block);
|
||||
}
|
||||
}
|
||||
|
||||
private void logStats() {
|
||||
LOG.info(
|
||||
"totalSize=" + StringUtils.byteDesc(heapSize()) + ", " +
|
||||
"freeSize=" + StringUtils.byteDesc(getFreeSize()) + ", " +
|
||||
"max=" + StringUtils.byteDesc(size()) + ", " +
|
||||
"blockCount=" + getBlockCount() + ", " +
|
||||
"accesses=" + stats.getRequestCount() + ", " +
|
||||
"hits=" + stats.getHitCount() + ", " +
|
||||
"hitRatio=" + (stats.getHitCount() == 0 ?
|
||||
"0," : StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ") +
|
||||
"cachingAccesses=" + stats.getRequestCachingCount() + ", " +
|
||||
"cachingHits=" + stats.getHitCachingCount() + ", " +
|
||||
"cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
|
||||
"0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
|
||||
"evictions=" + stats.getEvictionCount() + ", " +
|
||||
"evicted=" + stats.getEvictedCount());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("blockCount", getBlockCount())
|
||||
.add("currentSize", getCurrentSize())
|
||||
.add("freeSize", getFreeSize())
|
||||
.add("maxSize", size())
|
||||
.add("heapSize", heapSize())
|
||||
.add("victimCache", (victimCache != null))
|
||||
.toString();
|
||||
}
|
||||
|
||||
/** A removal listener to asynchronously record evictions and populate the victim cache. */
|
||||
private final class EvictionListener implements RemovalListener<BlockCacheKey, Cacheable> {
|
||||
|
||||
@Override
|
||||
public void onRemoval(BlockCacheKey key, Cacheable value, RemovalCause cause) {
|
||||
if (!cause.wasEvicted()) {
|
||||
// An explicit eviction (invalidation) is not added to the victim cache as the data may
|
||||
// no longer be valid for subsequent queries.
|
||||
return;
|
||||
}
|
||||
|
||||
recordEviction();
|
||||
|
||||
if (victimCache == null) {
|
||||
return;
|
||||
} else if (victimCache instanceof BucketCache) {
|
||||
BucketCache victimBucketCache = (BucketCache) victimCache;
|
||||
victimBucketCache.cacheBlockWithWait(key, value, /* inMemory */ true, /* wait */ true);
|
||||
} else {
|
||||
victimCache.cacheBlock(key, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Records an eviction. The number of eviction operations and evicted blocks are identical, as
|
||||
* an eviction is triggered immediately when the capacity has been exceeded. An eviction is
|
||||
* performed asynchronously. See the library's documentation for details on write buffers,
|
||||
* batching, and maintenance behavior.
|
||||
*/
|
||||
private void recordEviction() {
|
||||
// FIXME: Currently does not capture the insertion time
|
||||
stats.evicted(Long.MAX_VALUE, true);
|
||||
stats.evict();
|
||||
}
|
||||
|
||||
private static final class CachedBlockView implements CachedBlock {
|
||||
private static final Comparator<CachedBlock> COMPARATOR = Comparator
|
||||
.comparing(CachedBlock::getFilename)
|
||||
.thenComparing(CachedBlock::getOffset)
|
||||
.thenComparing(CachedBlock::getCachedTime);
|
||||
|
||||
private final BlockCacheKey key;
|
||||
private final Cacheable value;
|
||||
private final long now;
|
||||
|
||||
public CachedBlockView(BlockCacheKey key, Cacheable value, long now) {
|
||||
this.now = now;
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockPriority getBlockPriority() {
|
||||
// This does not appear to be used in any meaningful way and is irrelevant to this cache
|
||||
return BlockPriority.MEMORY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockType getBlockType() {
|
||||
return value.getBlockType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getOffset() {
|
||||
return key.getOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSize() {
|
||||
return value.heapSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCachedTime() {
|
||||
// This does not appear to be used in any meaningful way, so not captured
|
||||
return 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getFilename() {
|
||||
return key.getHfileName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(CachedBlock other) {
|
||||
return COMPARATOR.compare(this, other);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == this) {
|
||||
return true;
|
||||
} else if (!(obj instanceof CachedBlock)) {
|
||||
return false;
|
||||
}
|
||||
CachedBlock other = (CachedBlock) obj;
|
||||
return compareTo(other) == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return key.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return BlockCacheUtil.toString(this, now);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxSize() {
|
||||
return size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCurrentDataSize() {
|
||||
return getCurrentSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDataBlockCount() {
|
||||
return getBlockCount();
|
||||
}
|
||||
}
|
|
@ -94,11 +94,10 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
|
|||
*
|
||||
* <p>BucketCache can be used as mainly a block cache (see
|
||||
* {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with
|
||||
* LruBlockCache to decrease CMS GC and heap fragmentation.
|
||||
* a BlockCache to decrease CMS GC and heap fragmentation.
|
||||
*
|
||||
* <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store
|
||||
* blocks) to enlarge cache space via
|
||||
* {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache}
|
||||
* blocks) to enlarge cache space via a victim cache.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class BucketCache implements BlockCache, HeapSize {
|
||||
|
@ -427,7 +426,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
* @param inMemory if block is in-memory
|
||||
* @param wait if true, blocking wait when queue is full
|
||||
*/
|
||||
private void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
|
||||
public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
|
||||
boolean wait) {
|
||||
if (cacheEnabled) {
|
||||
if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
|
||||
|
@ -698,7 +697,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
return this.realCacheSize.sum();
|
||||
}
|
||||
|
||||
private long acceptableSize() {
|
||||
public long acceptableSize() {
|
||||
return (long) Math.floor(bucketAllocator.getTotalSize() * acceptableFactor);
|
||||
}
|
||||
|
||||
|
|
|
@ -124,7 +124,7 @@ public class HeapMemoryManager {
|
|||
|
||||
private ResizableBlockCache toResizableBlockCache(BlockCache blockCache) {
|
||||
if (blockCache instanceof CombinedBlockCache) {
|
||||
return (ResizableBlockCache) ((CombinedBlockCache) blockCache).getOnHeapCache();
|
||||
return (ResizableBlockCache) ((CombinedBlockCache) blockCache).getFirstLevelCache();
|
||||
} else {
|
||||
return (ResizableBlockCache) blockCache;
|
||||
}
|
||||
|
|
|
@ -98,7 +98,6 @@ public class TestBlockCacheReporting {
|
|||
CacheConfig cc = new CacheConfig(this.conf);
|
||||
assertTrue(CacheConfig.DEFAULT_IN_MEMORY == cc.isInMemory());
|
||||
BlockCache blockCache = BlockCacheFactory.createBlockCache(this.conf);
|
||||
assertTrue(blockCache instanceof LruBlockCache);
|
||||
logPerBlock(blockCache);
|
||||
addDataAndHits(blockCache, 3);
|
||||
// The below has no asserts. It is just exercising toString and toJSON code.
|
||||
|
@ -140,7 +139,7 @@ public class TestBlockCacheReporting {
|
|||
}
|
||||
LOG.info("filename=" + e.getKey() + ", count=" + count + ", countData=" + countData +
|
||||
", size=" + size + ", sizeData=" + sizeData);
|
||||
LOG.info(BlockCacheUtil.toJSON(e.getKey(), e.getValue()));
|
||||
//LOG.info(BlockCacheUtil.toJSON(e.getKey(), e.getValue()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -148,7 +147,7 @@ public class TestBlockCacheReporting {
|
|||
BlockCacheUtil.CachedBlocksByFile cbsbf = new BlockCacheUtil.CachedBlocksByFile();
|
||||
for (CachedBlock cb : bc) {
|
||||
LOG.info(cb.toString());
|
||||
LOG.info(BlockCacheUtil.toJSON(bc));
|
||||
//LOG.info(BlockCacheUtil.toJSON(bc));
|
||||
cbsbf.update(cb);
|
||||
}
|
||||
return cbsbf;
|
||||
|
|
|
@ -336,7 +336,7 @@ public class TestCacheConfig {
|
|||
assertTrue(blockCache instanceof CombinedBlockCache);
|
||||
// TODO: Assert sizes allocated are right and proportions.
|
||||
CombinedBlockCache cbc = (CombinedBlockCache) blockCache;
|
||||
LruBlockCache lbc = cbc.onHeapCache;
|
||||
FirstLevelBlockCache lbc = cbc.l1Cache;
|
||||
assertEquals(lruExpectedSize, lbc.getMaxSize());
|
||||
BlockCache bc = cbc.l2Cache;
|
||||
// getMaxSize comes back in bytes but we specified size in MB
|
||||
|
@ -350,7 +350,7 @@ public class TestCacheConfig {
|
|||
assertEquals(initialL1BlockCount + 1, lbc.getBlockCount());
|
||||
assertEquals(initialL2BlockCount, bc.getBlockCount());
|
||||
// Force evictions by putting in a block too big.
|
||||
final long justTooBigSize = lbc.acceptableSize() + 1;
|
||||
final long justTooBigSize = ((LruBlockCache)lbc).acceptableSize() + 1;
|
||||
lbc.cacheBlock(new BlockCacheKey("bck2", 0), new DataCacheEntry() {
|
||||
@Override
|
||||
public long heapSize() {
|
||||
|
|
|
@ -123,16 +123,17 @@ public class TestScannerSelectionUsingKeyRange {
|
|||
}
|
||||
|
||||
Scan scan = new Scan(Bytes.toBytes("aaa"), Bytes.toBytes("aaz"));
|
||||
LruBlockCache cache = (LruBlockCache) BlockCacheFactory.createBlockCache(conf);
|
||||
cache.clearCache();
|
||||
BlockCache cache = BlockCacheFactory.createBlockCache(conf);
|
||||
InternalScanner scanner = region.getScanner(scan);
|
||||
List<Cell> results = new ArrayList<>();
|
||||
while (scanner.next(results)) {
|
||||
}
|
||||
scanner.close();
|
||||
assertEquals(0, results.size());
|
||||
Set<String> accessedFiles = cache.getCachedFileNamesForTest();
|
||||
assertEquals(expectedCount, accessedFiles.size());
|
||||
if (cache instanceof LruBlockCache) {
|
||||
Set<String> accessedFiles = ((LruBlockCache)cache).getCachedFileNamesForTest();
|
||||
assertEquals(expectedCount, accessedFiles.size());
|
||||
}
|
||||
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,309 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io.hfile;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Tests the concurrent TinyLfuBlockCache.
|
||||
*/
|
||||
@Category({IOTests.class, SmallTests.class})
|
||||
public class TestTinyLfuBlockCache {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestTinyLfuBlockCache.class);
|
||||
|
||||
@Test
|
||||
public void testCacheSimple() throws Exception {
|
||||
|
||||
long maxSize = 1000000;
|
||||
long blockSize = calculateBlockSizeDefault(maxSize, 101);
|
||||
|
||||
TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, blockSize, blockSize, Runnable::run);
|
||||
|
||||
CachedItem [] blocks = generateRandomBlocks(100, blockSize);
|
||||
|
||||
long expectedCacheSize = cache.heapSize();
|
||||
|
||||
// Confirm empty
|
||||
for (CachedItem block : blocks) {
|
||||
assertTrue(cache.getBlock(block.cacheKey, true, false, true) == null);
|
||||
}
|
||||
|
||||
// Add blocks
|
||||
for (CachedItem block : blocks) {
|
||||
cache.cacheBlock(block.cacheKey, block);
|
||||
expectedCacheSize += block.heapSize();
|
||||
}
|
||||
|
||||
// Verify correctly calculated cache heap size
|
||||
assertEquals(expectedCacheSize, cache.heapSize());
|
||||
|
||||
// Check if all blocks are properly cached and retrieved
|
||||
for (CachedItem block : blocks) {
|
||||
HeapSize buf = cache.getBlock(block.cacheKey, true, false, true);
|
||||
assertTrue(buf != null);
|
||||
assertEquals(buf.heapSize(), block.heapSize());
|
||||
}
|
||||
|
||||
// Re-add same blocks and ensure nothing has changed
|
||||
long expectedBlockCount = cache.getBlockCount();
|
||||
for (CachedItem block : blocks) {
|
||||
cache.cacheBlock(block.cacheKey, block);
|
||||
}
|
||||
assertEquals(
|
||||
"Cache should ignore cache requests for blocks already in cache",
|
||||
expectedBlockCount, cache.getBlockCount());
|
||||
|
||||
// Verify correctly calculated cache heap size
|
||||
assertEquals(expectedCacheSize, cache.heapSize());
|
||||
|
||||
// Check if all blocks are properly cached and retrieved
|
||||
for (CachedItem block : blocks) {
|
||||
HeapSize buf = cache.getBlock(block.cacheKey, true, false, true);
|
||||
assertTrue(buf != null);
|
||||
assertEquals(buf.heapSize(), block.heapSize());
|
||||
}
|
||||
|
||||
// Expect no evictions
|
||||
assertEquals(0, cache.getStats().getEvictionCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCacheEvictionSimple() throws Exception {
|
||||
|
||||
long maxSize = 100000;
|
||||
long blockSize = calculateBlockSizeDefault(maxSize, 10);
|
||||
|
||||
TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, blockSize, blockSize, Runnable::run);
|
||||
|
||||
CachedItem [] blocks = generateFixedBlocks(11, blockSize, "block");
|
||||
|
||||
// Add all the blocks
|
||||
for (CachedItem block : blocks) {
|
||||
cache.cacheBlock(block.cacheKey, block);
|
||||
}
|
||||
|
||||
// A single eviction run should have occurred
|
||||
assertEquals(1, cache.getStats().getEvictionCount());
|
||||
|
||||
// The cache did not grow beyond max
|
||||
assertTrue(cache.heapSize() < maxSize);
|
||||
|
||||
// All blocks except one should be in the cache
|
||||
assertEquals(10, cache.getBlockCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScanResistance() throws Exception {
|
||||
|
||||
long maxSize = 100000;
|
||||
long blockSize = calculateBlockSize(maxSize, 10);
|
||||
|
||||
TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, blockSize, blockSize, Runnable::run);
|
||||
|
||||
CachedItem [] singleBlocks = generateFixedBlocks(20, blockSize, "single");
|
||||
CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
|
||||
|
||||
// Add 5 blocks from each
|
||||
for(int i=0; i<5; i++) {
|
||||
cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
|
||||
cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]);
|
||||
}
|
||||
|
||||
// Add frequency
|
||||
for (int i = 0; i < 5; i++) {
|
||||
for (int j = 0; j < 10; j++) {
|
||||
CachedItem block = multiBlocks[i];
|
||||
cache.getBlock(block.cacheKey, true, false, true);
|
||||
}
|
||||
}
|
||||
|
||||
// Let's keep "scanning" by adding single blocks. From here on we only
|
||||
// expect evictions from the single bucket.
|
||||
|
||||
for(int i=5;i<18;i++) {
|
||||
cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
|
||||
}
|
||||
|
||||
for (CachedItem block : multiBlocks) {
|
||||
assertTrue(cache.cache.asMap().containsKey(block.cacheKey));
|
||||
}
|
||||
|
||||
assertEquals(10, cache.getBlockCount());
|
||||
assertEquals(13, cache.getStats().getEvictionCount());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxBlockSize() throws Exception {
|
||||
long maxSize = 100000;
|
||||
long blockSize = calculateBlockSize(maxSize, 10);
|
||||
|
||||
TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, blockSize, blockSize, Runnable::run);
|
||||
CachedItem [] tooLong = generateFixedBlocks(10, 2 * blockSize, "long");
|
||||
CachedItem [] small = generateFixedBlocks(15, blockSize / 2, "small");
|
||||
|
||||
for (CachedItem i:tooLong) {
|
||||
cache.cacheBlock(i.cacheKey, i);
|
||||
}
|
||||
for (CachedItem i:small) {
|
||||
cache.cacheBlock(i.cacheKey, i);
|
||||
}
|
||||
assertEquals(15,cache.getBlockCount());
|
||||
for (CachedItem i:small) {
|
||||
assertNotNull(cache.getBlock(i.cacheKey, true, false, false));
|
||||
}
|
||||
for (CachedItem i:tooLong) {
|
||||
assertNull(cache.getBlock(i.cacheKey, true, false, false));
|
||||
}
|
||||
|
||||
assertEquals(10, cache.getStats().getFailedInserts());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResizeBlockCache() throws Exception {
|
||||
|
||||
long maxSize = 100000;
|
||||
long blockSize = calculateBlockSize(maxSize, 10);
|
||||
|
||||
TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, blockSize, blockSize, Runnable::run);
|
||||
|
||||
CachedItem [] blocks = generateFixedBlocks(10, blockSize, "block");
|
||||
|
||||
for(CachedItem block : blocks) {
|
||||
cache.cacheBlock(block.cacheKey, block);
|
||||
}
|
||||
|
||||
// Do not expect any evictions yet
|
||||
assertEquals(10, cache.getBlockCount());
|
||||
assertEquals(0, cache.getStats().getEvictionCount());
|
||||
|
||||
// Resize to half capacity plus an extra block (otherwise we evict an extra)
|
||||
cache.setMaxSize(maxSize / 2);
|
||||
|
||||
// And we expect 1/2 of the blocks to be evicted
|
||||
assertEquals(5, cache.getBlockCount());
|
||||
assertEquals(5, cache.getStats().getEvictedCount());
|
||||
}
|
||||
|
||||
private CachedItem [] generateFixedBlocks(int numBlocks, int size, String pfx) {
|
||||
CachedItem [] blocks = new CachedItem[numBlocks];
|
||||
for(int i=0;i<numBlocks;i++) {
|
||||
blocks[i] = new CachedItem(pfx + i, size);
|
||||
}
|
||||
return blocks;
|
||||
}
|
||||
|
||||
private CachedItem [] generateFixedBlocks(int numBlocks, long size, String pfx) {
|
||||
return generateFixedBlocks(numBlocks, (int)size, pfx);
|
||||
}
|
||||
|
||||
private CachedItem [] generateRandomBlocks(int numBlocks, long maxSize) {
|
||||
CachedItem [] blocks = new CachedItem[numBlocks];
|
||||
Random r = new Random();
|
||||
for(int i=0;i<numBlocks;i++) {
|
||||
blocks[i] = new CachedItem("block" + i, r.nextInt((int)maxSize)+1);
|
||||
}
|
||||
return blocks;
|
||||
}
|
||||
|
||||
private long calculateBlockSize(long maxSize, int numBlocks) {
|
||||
long roughBlockSize = maxSize / numBlocks;
|
||||
int numEntries = (int)Math.ceil((1.2)*maxSize/roughBlockSize);
|
||||
long totalOverhead = LruBlockCache.CACHE_FIXED_OVERHEAD +
|
||||
ClassSize.CONCURRENT_HASHMAP +
|
||||
(numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
|
||||
(LruBlockCache.DEFAULT_CONCURRENCY_LEVEL * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
|
||||
long negateBlockSize = totalOverhead/numEntries;
|
||||
negateBlockSize += LruCachedBlock.PER_BLOCK_OVERHEAD;
|
||||
return ClassSize.align((long)Math.floor((roughBlockSize - negateBlockSize)*0.99f));
|
||||
}
|
||||
|
||||
private long calculateBlockSizeDefault(long maxSize, int numBlocks) {
|
||||
long roughBlockSize = maxSize / numBlocks;
|
||||
int numEntries = (int)Math.ceil((1.2)*maxSize/roughBlockSize);
|
||||
long totalOverhead = LruBlockCache.CACHE_FIXED_OVERHEAD +
|
||||
ClassSize.CONCURRENT_HASHMAP +
|
||||
(numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
|
||||
(LruBlockCache.DEFAULT_CONCURRENCY_LEVEL * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
|
||||
long negateBlockSize = totalOverhead / numEntries;
|
||||
negateBlockSize += LruCachedBlock.PER_BLOCK_OVERHEAD;
|
||||
return ClassSize.align((long)Math.floor((roughBlockSize - negateBlockSize)*
|
||||
LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
|
||||
}
|
||||
|
||||
private static class CachedItem implements Cacheable {
|
||||
BlockCacheKey cacheKey;
|
||||
int size;
|
||||
|
||||
CachedItem(String blockName, int size) {
|
||||
this.cacheKey = new BlockCacheKey(blockName, 0);
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
/** The size of this item reported to the block cache layer */
|
||||
@Override
|
||||
public long heapSize() {
|
||||
return ClassSize.align(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSerializedLength() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CacheableDeserializer<Cacheable> getDeserializer() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockType getBlockType() {
|
||||
return BlockType.DATA;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MemoryType getMemoryType() {
|
||||
return MemoryType.EXCLUSIVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -171,6 +171,10 @@
|
|||
<pattern>com.fasterxml</pattern>
|
||||
<shadedPattern>${shaded.prefix}.com.fasterxml</shadedPattern>
|
||||
</relocation>
|
||||
<relocation>
|
||||
<pattern>com.github.benmanes.caffeine</pattern>
|
||||
<shadedPattern>${shaded.prefix}.com.github.benmanes.caffeine</shadedPattern>
|
||||
</relocation>
|
||||
<relocation>
|
||||
<pattern>com.google</pattern>
|
||||
<shadedPattern>${shaded.prefix}.com.google</shadedPattern>
|
||||
|
|
6
pom.xml
6
pom.xml
|
@ -1378,6 +1378,7 @@
|
|||
<!-- end HBASE-15925 default hadoop compatibility values -->
|
||||
<audience-annotations.version>0.5.0</audience-annotations.version>
|
||||
<avro.version>1.7.7</avro.version>
|
||||
<caffeine.version>2.6.2</caffeine.version>
|
||||
<commons-codec.version>1.10</commons-codec.version>
|
||||
<!-- pretty outdated -->
|
||||
<commons-io.version>2.5</commons-io.version>
|
||||
|
@ -1783,6 +1784,11 @@
|
|||
<artifactId>slf4j-api</artifactId>
|
||||
<version>${slf4j.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
<version>${caffeine.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
|
|
Loading…
Reference in New Issue