diff --git a/dev-support/spotbugs-exclude.xml b/dev-support/spotbugs-exclude.xml index f57faaf65bc..1137f5fdc14 100644 --- a/dev-support/spotbugs-exclude.xml +++ b/dev-support/spotbugs-exclude.xml @@ -247,4 +247,9 @@ + + + + + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java index 2b9732092ce..234c92104a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java @@ -145,6 +145,8 @@ public final class BlockCacheFactory { return new LruBlockCache(cacheSize, blockSize, true, c); } else if (policy.equalsIgnoreCase("TinyLFU")) { return new TinyLfuBlockCache(cacheSize, blockSize, ForkJoinPool.commonPool(), c); + } else if (policy.equalsIgnoreCase("AdaptiveLRU")) { + return new LruAdaptiveBlockCache(cacheSize, blockSize, true, c); } else { throw new IllegalArgumentException("Unknown policy: " + policy); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java new file mode 100644 index 00000000000..083f1098af5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java @@ -0,0 +1,1421 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static java.util.Objects.requireNonNull; + +import java.lang.ref.WeakReference; +import java.util.EnumMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.util.StringUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; +import org.apache.hbase.thirdparty.com.google.common.base.Objects; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * This realisation improve performance of classical LRU + * cache up to 3 times via reduce GC job. + *

+ * The classical block cache implementation that is memory-aware using {@link HeapSize}, + * memory-bound using an + * LRU eviction algorithm, and concurrent: backed by a {@link ConcurrentHashMap} and with a + * non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock} + * operations. + *

+ * Contains three levels of block priority to allow for scan-resistance and in-memory families + * {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder#setInMemory(boolean)} (An + * in-memory column family is a column family that should be served from memory if possible): + * single-access, multiple-accesses, and in-memory priority. A block is added with an in-memory + * priority flag if {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptor#isInMemory()}, + * otherwise a block becomes a single access priority the first time it is read into this block + * cache. If a block is accessed again while in cache, it is marked as a multiple access priority + * block. This delineation of blocks is used to prevent scans from thrashing the cache adding a + * least-frequently-used element to the eviction algorithm. + *

+ * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each + * priority will retain close to its maximum size, however, if any priority is not using its entire + * chunk the others are able to grow beyond their chunk size. + *

+ * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The + * block size is not especially important as this cache is fully dynamic in its sizing of blocks. It + * is only used for pre-allocating data structures and in initial heap estimation of the map. + *

+ * The detailed constructor defines the sizes for the three priorities (they should total to the + * maximum size defined). It also sets the levels that trigger and control the eviction + * thread. + *

+ * The acceptable size is the cache size level which triggers the eviction process to + * start. It evicts enough blocks to get the size below the minimum size specified. + *

+ * Eviction happens in a separate thread and involves a single full-scan of the map. It determines + * how many bytes must be freed to reach the minimum size, and then while scanning determines the + * fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times + * bytes to free). It then uses the priority chunk sizes to evict fairly according to the relative + * sizes and usage. + *

+ * Adaptive LRU cache lets speed up performance while we are reading much more data than can fit + * into BlockCache and it is the cause of a high rate of evictions. This in turn leads to heavy + * Garbage Collector works. So a lot of blocks put into BlockCache but never read, but spending + * a lot of CPU resources for cleaning. We could avoid this situation via parameters: + *

+ * hbase.lru.cache.heavy.eviction.count.limit - set how many times we have to run the + * eviction process that starts to avoid putting data to BlockCache. By default it is 0 and it + * meats the feature will start at the beginning. But if we have some times short reading the same + * data and some times long-term reading - we can divide it by this parameter. For example we know + * that our short reading used to be about 1 minutes, then we have to set the parameter about 10 + * and it will enable the feature only for long time massive reading (after ~100 seconds). So when + * we use short-reading and want all of them in the cache we will have it (except for eviction of + * course). When we use long-term heavy reading the feature will be enabled after some time and + * bring better performance. + *

+ * hbase.lru.cache.heavy.eviction.mb.size.limit - set how many bytes in 10 seconds desirable + * putting into BlockCache (and evicted from it). The feature will try to reach this value and + * maintain it. Don't try to set it too small because it leads to premature exit from this mode. + * For powerful CPUs (about 20-40 physical cores) it could be about 400-500 MB. Average system + * (~10 cores) 200-300 MB. Some weak systems (2-5 cores) may be good with 50-100 MB. + * How it works: we set the limit and after each ~10 second calculate how many bytes were freed. + * Overhead = Freed Bytes Sum (MB) * 100 / Limit (MB) - 100; + * For example we set the limit = 500 and were evicted 2000 MB. Overhead is: + * 2000 * 100 / 500 - 100 = 300% + * The feature is going to reduce a percent caching data blocks and fit evicted bytes closer to + * 100% (500 MB). Some kind of an auto-scaling. + * If freed bytes less then the limit we have got negative overhead. + * For example if were freed 200 MB: + * 200 * 100 / 500 - 100 = -60% + * The feature will increase the percent of caching blocks. + * That leads to fit evicted bytes closer to 100% (500 MB). + * The current situation we can find out in the log of RegionServer: + * BlockCache evicted (MB): 0, overhead (%): -100, heavy eviction counter: 0, current caching + * DataBlock (%): 100 - means no eviction, 100% blocks is caching + * BlockCache evicted (MB): 2000, overhead (%): 300, heavy eviction counter: 1, current caching + * DataBlock (%): 97 - means eviction begin, reduce of caching blocks by 3%. + * It help to tune your system and find out what value is better set. Don't try to reach 0% + * overhead, it is impossible. Quite good 50-100% overhead, + * it prevents premature exit from this mode. + *

+ * hbase.lru.cache.heavy.eviction.overhead.coefficient - set how fast we want to get the + * result. If we know that our reading is heavy for a long time, we don't want to wait and can + * increase the coefficient and get good performance sooner. But if we aren't sure we can do it + * slowly and it could prevent premature exit from this mode. So, when the coefficient is higher + * we can get better performance when heavy reading is stable. But when reading is changing we + * can adjust to it and set the coefficient to lower value. + * For example, we set the coefficient = 0.01. It means the overhead (see above) will be + * multiplied by 0.01 and the result is the value of reducing percent caching blocks. For example, + * if the overhead = 300% and the coefficient = 0.01, + * then percent of caching blocks will reduce by 3%. + * Similar logic when overhead has got negative value (overshooting). Maybe it is just short-term + * fluctuation and we will try to stay in this mode. It helps avoid premature exit during + * short-term fluctuation. Backpressure has simple logic: more overshooting - more caching blocks. + *

+ * Find more information about improvement: https://issues.apache.org/jira/browse/HBASE-23887 + */ +@InterfaceAudience.Private +public class LruAdaptiveBlockCache implements FirstLevelBlockCache { + + private static final Logger LOG = LoggerFactory.getLogger(LruAdaptiveBlockCache.class); + + /** + * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep + * evicting during an eviction run till the cache size is down to 80% of the total. + */ + private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor"; + + /** + * Acceptable size of cache (no evictions if size < acceptable) + */ + private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = + "hbase.lru.blockcache.acceptable.factor"; + + /** + * Hard capacity limit of cache, will reject any put if size > this * acceptable + */ + static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME = + "hbase.lru.blockcache.hard.capacity.limit.factor"; + private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = + "hbase.lru.blockcache.single.percentage"; + private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = + "hbase.lru.blockcache.multi.percentage"; + private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = + "hbase.lru.blockcache.memory.percentage"; + + /** + * Configuration key to force data-block always (except in-memory are too much) + * cached in memory for in-memory hfile, unlike inMemory, which is a column-family + * configuration, inMemoryForceMode is a cluster-wide configuration + */ + private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = + "hbase.lru.rs.inmemoryforcemode"; + + /* Default Configuration Parameters*/ + + /* Backing Concurrent Map Configuration */ + static final float DEFAULT_LOAD_FACTOR = 0.75f; + static final int DEFAULT_CONCURRENCY_LEVEL = 16; + + /* Eviction thresholds */ + private static final float DEFAULT_MIN_FACTOR = 0.95f; + static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f; + + /* Priority buckets */ + private static final float DEFAULT_SINGLE_FACTOR = 0.25f; + private static final float DEFAULT_MULTI_FACTOR = 0.50f; + private static final float DEFAULT_MEMORY_FACTOR = 0.25f; + + private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f; + + private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false; + + /* Statistics thread */ + private static final int STAT_THREAD_PERIOD = 60 * 5; + private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size"; + private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L; + + private static final String LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT + = "hbase.lru.cache.heavy.eviction.count.limit"; + // Default value actually equal to disable feature of increasing performance. + // Because 2147483647 is about ~680 years (after that it will start to work) + // We can set it to 0-10 and get the profit right now. + // (see details https://issues.apache.org/jira/browse/HBASE-23887). + private static final int DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT = Integer.MAX_VALUE; + + private static final String LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT + = "hbase.lru.cache.heavy.eviction.mb.size.limit"; + private static final long DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT = 500; + + private static final String LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT + = "hbase.lru.cache.heavy.eviction.overhead.coefficient"; + private static final float DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT = 0.01f; + + /** + * Defined the cache map as {@link ConcurrentHashMap} here, because in + * {@link LruAdaptiveBlockCache#getBlock}, we need to guarantee the atomicity + * of map#computeIfPresent (key, func). Besides, the func method must execute exactly once only + * when the key is present and under the lock context, otherwise the reference count will be + * messed up. Notice that the + * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. + */ + private transient final ConcurrentHashMap map; + + /** Eviction lock (locked when eviction in process) */ + private transient final ReentrantLock evictionLock = new ReentrantLock(true); + + private final long maxBlockSize; + + /** Volatile boolean to track if we are in an eviction process or not */ + private volatile boolean evictionInProgress = false; + + /** Eviction thread */ + private transient final EvictionThread evictionThread; + + /** Statistics thread schedule pool (for heavy debugging, could remove) */ + private transient final ScheduledExecutorService scheduleThreadPool = + Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() + .setNameFormat("LruAdaptiveBlockCacheStatsExecutor").setDaemon(true).build()); + + /** Current size of cache */ + private final AtomicLong size; + + /** Current size of data blocks */ + private final LongAdder dataBlockSize; + + /** Current number of cached elements */ + private final AtomicLong elements; + + /** Current number of cached data block elements */ + private final LongAdder dataBlockElements; + + /** Cache access count (sequential ID) */ + private final AtomicLong count; + + /** hard capacity limit */ + private final float hardCapacityLimitFactor; + + /** Cache statistics */ + private final CacheStats stats; + + /** Maximum allowable size of cache (block put if size > max, evict) */ + private long maxSize; + + /** Approximate block size */ + private final long blockSize; + + /** Acceptable size of cache (no evictions if size < acceptable) */ + private final float acceptableFactor; + + /** Minimum threshold of cache (when evicting, evict until size < min) */ + private final float minFactor; + + /** Single access bucket size */ + private final float singleFactor; + + /** Multiple access bucket size */ + private final float multiFactor; + + /** In-memory bucket size */ + private final float memoryFactor; + + /** Overhead of the structure itself */ + private final long overhead; + + /** Whether in-memory hfile's data block has higher priority when evicting */ + private boolean forceInMemory; + + /** + * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an + * external cache as L2. + * Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache + */ + private transient BlockCache victimHandler = null; + + /** Percent of cached data blocks */ + private volatile int cacheDataBlockPercent; + + /** Limit of count eviction process when start to avoid to cache blocks */ + private final int heavyEvictionCountLimit; + + /** Limit of volume eviction process when start to avoid to cache blocks */ + private final long heavyEvictionMbSizeLimit; + + /** Adjust auto-scaling via overhead of evition rate */ + private final float heavyEvictionOverheadCoefficient; + + /** + * Default constructor. Specify maximum size and expected average block + * size (approximation is fine). + * + *

All other factors will be calculated based on defaults specified in + * this class. + * + * @param maxSize maximum size of cache, in bytes + * @param blockSize approximate size of each block, in bytes + */ + public LruAdaptiveBlockCache(long maxSize, long blockSize) { + this(maxSize, blockSize, true); + } + + /** + * Constructor used for testing. Allows disabling of the eviction thread. + */ + public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThread) { + this(maxSize, blockSize, evictionThread, + (int) Math.ceil(1.2 * maxSize / blockSize), + DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, + DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR, + DEFAULT_SINGLE_FACTOR, + DEFAULT_MULTI_FACTOR, + DEFAULT_MEMORY_FACTOR, + DEFAULT_HARD_CAPACITY_LIMIT_FACTOR, + false, + DEFAULT_MAX_BLOCK_SIZE, + DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT, + DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT, + DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT); + } + + public LruAdaptiveBlockCache(long maxSize, long blockSize, + boolean evictionThread, Configuration conf) { + this(maxSize, blockSize, evictionThread, + (int) Math.ceil(1.2 * maxSize / blockSize), + DEFAULT_LOAD_FACTOR, + DEFAULT_CONCURRENCY_LEVEL, + conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR), + conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR), + conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR), + conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR), + conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR), + conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, + DEFAULT_HARD_CAPACITY_LIMIT_FACTOR), + conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE), + conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE), + conf.getInt(LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT, + DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT), + conf.getLong(LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT, + DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT), + conf.getFloat(LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT, + DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT)); + } + + public LruAdaptiveBlockCache(long maxSize, long blockSize, Configuration conf) { + this(maxSize, blockSize, true, conf); + } + + /** + * Configurable constructor. Use this constructor if not using defaults. + * + * @param maxSize maximum size of this cache, in bytes + * @param blockSize expected average size of blocks, in bytes + * @param evictionThread whether to run evictions in a bg thread or not + * @param mapInitialSize initial size of backing ConcurrentHashMap + * @param mapLoadFactor initial load factor of backing ConcurrentHashMap + * @param mapConcurrencyLevel initial concurrency factor for backing CHM + * @param minFactor percentage of total size that eviction will evict until + * @param acceptableFactor percentage of total size that triggers eviction + * @param singleFactor percentage of total size for single-access blocks + * @param multiFactor percentage of total size for multiple-access blocks + * @param memoryFactor percentage of total size for in-memory blocks + */ + public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThread, + int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, + float minFactor, float acceptableFactor, float singleFactor, + float multiFactor, float memoryFactor, float hardLimitFactor, + boolean forceInMemory, long maxBlockSize, + int heavyEvictionCountLimit, long heavyEvictionMbSizeLimit, + float heavyEvictionOverheadCoefficient) { + this.maxBlockSize = maxBlockSize; + if(singleFactor + multiFactor + memoryFactor != 1 || + singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) { + throw new IllegalArgumentException("Single, multi, and memory factors " + + " should be non-negative and total 1.0"); + } + if (minFactor >= acceptableFactor) { + throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); + } + if (minFactor >= 1.0f || acceptableFactor >= 1.0f) { + throw new IllegalArgumentException("all factors must be < 1"); + } + this.maxSize = maxSize; + this.blockSize = blockSize; + this.forceInMemory = forceInMemory; + map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); + this.minFactor = minFactor; + this.acceptableFactor = acceptableFactor; + this.singleFactor = singleFactor; + this.multiFactor = multiFactor; + this.memoryFactor = memoryFactor; + this.stats = new CacheStats(this.getClass().getSimpleName()); + this.count = new AtomicLong(0); + this.elements = new AtomicLong(0); + this.dataBlockElements = new LongAdder(); + this.dataBlockSize = new LongAdder(); + this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); + this.size = new AtomicLong(this.overhead); + this.hardCapacityLimitFactor = hardLimitFactor; + if (evictionThread) { + this.evictionThread = new EvictionThread(this); + this.evictionThread.start(); // FindBugs SC_START_IN_CTOR + } else { + this.evictionThread = null; + } + + // check the bounds + this.heavyEvictionCountLimit = Math.max(heavyEvictionCountLimit, 0); + this.heavyEvictionMbSizeLimit = Math.max(heavyEvictionCountLimit, 1); + this.cacheDataBlockPercent = 100; + heavyEvictionOverheadCoefficient = Math.min(heavyEvictionOverheadCoefficient, 1.0f); + heavyEvictionOverheadCoefficient = Math.max(heavyEvictionOverheadCoefficient, 0.001f); + this.heavyEvictionOverheadCoefficient = heavyEvictionOverheadCoefficient; + + // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log + // every five minutes. + this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD, + STAT_THREAD_PERIOD, TimeUnit.SECONDS); + } + + @Override + public void setVictimCache(BlockCache victimCache) { + if (victimHandler != null) { + throw new IllegalArgumentException("The victim cache has already been set"); + } + victimHandler = requireNonNull(victimCache); + } + + @Override + public void setMaxSize(long maxSize) { + this.maxSize = maxSize; + if (this.size.get() > acceptableSize() && !evictionInProgress) { + runEviction(); + } + } + + public int getCacheDataBlockPercent() { + return cacheDataBlockPercent; + } + + /** + * The block cached in LruAdaptiveBlockCache will always be an heap block: on the one side, + * the heap access will be more faster then off-heap, the small index block or meta block + * cached in CombinedBlockCache will benefit a lot. on other side, the LruAdaptiveBlockCache + * size is always calculated based on the total heap size, if caching an off-heap block in + * LruAdaptiveBlockCache, the heap size will be messed up. Here we will clone the block into an + * heap block if it's an off-heap block, otherwise just use the original block. The key point is + * maintain the refCnt of the block (HBASE-22127):
+ * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle;
+ * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's + * reservoir, if both RPC and LruAdaptiveBlockCache release the block, then it can be garbage + * collected by JVM, so need a retain here. + * @param buf the original block + * @return an block with an heap memory backend. + */ + private Cacheable asReferencedHeapBlock(Cacheable buf) { + if (buf instanceof HFileBlock) { + HFileBlock blk = ((HFileBlock) buf); + if (blk.isSharedMem()) { + return HFileBlock.deepCloneOnHeap(blk); + } + } + // The block will be referenced by this LruAdaptiveBlockCache, + // so should increase its refCnt here. + return buf.retain(); + } + + // BlockCache implementation + + /** + * Cache the block with the specified name and buffer. + *

+ * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547) + * this can happen, for which we compare the buffer contents. + * + * @param cacheKey block's cache key + * @param buf block buffer + * @param inMemory if block is in-memory + */ + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { + + // Some data blocks will not put into BlockCache when eviction rate too much. + // It is good for performance + // (see details: https://issues.apache.org/jira/browse/HBASE-23887) + // How to calculate it can find inside EvictionThread class. + if (cacheDataBlockPercent != 100 && buf.getBlockType().isData()) { + // It works like filter - blocks which two last digits of offset + // more than we calculate in Eviction Thread will not put into BlockCache + if (cacheKey.getOffset() % 100 >= cacheDataBlockPercent) { + return; + } + } + + if (buf.heapSize() > maxBlockSize) { + // If there are a lot of blocks that are too + // big this can make the logs way too noisy. + // So we log 2% + if (stats.failInsert() % 50 == 0) { + LOG.warn("Trying to cache too large a block " + + cacheKey.getHfileName() + " @ " + + cacheKey.getOffset() + + " is " + buf.heapSize() + + " which is larger than " + maxBlockSize); + } + return; + } + + LruCachedBlock cb = map.get(cacheKey); + if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, + cacheKey, buf)) { + return; + } + long currentSize = size.get(); + long currentAcceptableSize = acceptableSize(); + long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize); + if (currentSize >= hardLimitSize) { + stats.failInsert(); + if (LOG.isTraceEnabled()) { + LOG.trace("LruAdaptiveBlockCache current size " + StringUtils.byteDesc(currentSize) + + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "." + + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize) + + ", failed to put cacheKey:" + cacheKey + " into LruAdaptiveBlockCache."); + } + if (!evictionInProgress) { + runEviction(); + } + return; + } + // Ensure that the block is an heap one. + buf = asReferencedHeapBlock(buf); + cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); + long newSize = updateSizeMetrics(cb, false); + map.put(cacheKey, cb); + long val = elements.incrementAndGet(); + if (buf.getBlockType().isData()) { + dataBlockElements.increment(); + } + if (LOG.isTraceEnabled()) { + long size = map.size(); + assertCounterSanity(size, val); + } + if (newSize > currentAcceptableSize && !evictionInProgress) { + runEviction(); + } + } + + /** + * Sanity-checking for parity between actual block cache content and metrics. + * Intended only for use with TRACE level logging and -ea JVM. + */ + private static void assertCounterSanity(long mapSize, long counterVal) { + if (counterVal < 0) { + LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal + + ", mapSize=" + mapSize); + return; + } + if (mapSize < Integer.MAX_VALUE) { + double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.); + if (pct_diff > 0.05) { + LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal + + ", mapSize=" + mapSize); + } + } + } + + /** + * Cache the block with the specified name and buffer. + *

+ * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache + * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an + * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap, + * otherwise the caching size is based on off-heap. + * @param cacheKey block's cache key + * @param buf block buffer + */ + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { + cacheBlock(cacheKey, buf, false); + } + + /** + * Helper function that updates the local size counter and also updates any + * per-cf or per-blocktype metrics it can discern from given + * {@link LruCachedBlock} + */ + private long updateSizeMetrics(LruCachedBlock cb, boolean evict) { + long heapsize = cb.heapSize(); + BlockType bt = cb.getBuffer().getBlockType(); + if (evict) { + heapsize *= -1; + } + if (bt != null && bt.isData()) { + dataBlockSize.add(heapsize); + } + return size.addAndGet(heapsize); + } + + /** + * Get the buffer of the block with the specified name. + * + * @param cacheKey block's cache key + * @param caching true if the caller caches blocks on cache misses + * @param repeat Whether this is a repeat lookup for the same block + * (used to avoid double counting cache misses when doing double-check + * locking) + * @param updateCacheMetrics Whether to update cache metrics or not + * + * @return buffer of specified cache key, or null if not in cache + */ + @Override + public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, + boolean updateCacheMetrics) { + LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> { + // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside + // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove + // the block and release, then we're retaining a block with refCnt=0 which is disallowed. + // see HBASE-22422. + val.getBuffer().retain(); + return val; + }); + if (cb == null) { + if (!repeat && updateCacheMetrics) { + stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); + } + // If there is another block cache then try and read there. + // However if this is a retry ( second time in double checked locking ) + // And it's already a miss then the l2 will also be a miss. + if (victimHandler != null && !repeat) { + // The handler will increase result's refCnt for RPC, so need no extra retain. + Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics); + // Promote this to L1. + if (result != null) { + if (caching) { + cacheBlock(cacheKey, result, /* inMemory = */ false); + } + } + return result; + } + return null; + } + if (updateCacheMetrics) { + stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); + } + cb.access(count.incrementAndGet()); + return cb.getBuffer(); + } + + /** + * Whether the cache contains block with specified cacheKey + * + * @return true if contains the block + */ + @Override + public boolean containsBlock(BlockCacheKey cacheKey) { + return map.containsKey(cacheKey); + } + + @Override + public boolean evictBlock(BlockCacheKey cacheKey) { + LruCachedBlock cb = map.get(cacheKey); + return cb != null && evictBlock(cb, false) > 0; + } + + /** + * Evicts all blocks for a specific HFile. This is an + * expensive operation implemented as a linear-time search through all blocks + * in the cache. Ideally this should be a search in a log-access-time map. + * + *

+ * This is used for evict-on-close to remove all blocks of a specific HFile. + * + * @return the number of blocks evicted + */ + @Override + public int evictBlocksByHfileName(String hfileName) { + int numEvicted = (int) map.keySet().stream().filter(key -> key.getHfileName().equals(hfileName)) + .filter(this::evictBlock).count(); + if (victimHandler != null) { + numEvicted += victimHandler.evictBlocksByHfileName(hfileName); + } + return numEvicted; + } + + /** + * Evict the block, and it will be cached by the victim handler if exists && + * block may be read again later + * + * @param evictedByEvictionProcess true if the given block is evicted by + * EvictionThread + * @return the heap size of evicted block + */ + protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) { + LruCachedBlock previous = map.remove(block.getCacheKey()); + if (previous == null) { + return 0; + } + updateSizeMetrics(block, true); + long val = elements.decrementAndGet(); + if (LOG.isTraceEnabled()) { + long size = map.size(); + assertCounterSanity(size, val); + } + if (block.getBuffer().getBlockType().isData()) { + dataBlockElements.decrement(); + } + if (evictedByEvictionProcess) { + // When the eviction of the block happened because of invalidation of HFiles, no need to + // update the stats counter. + stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary()); + if (victimHandler != null) { + victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer()); + } + } + // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO + // NOT move this up because if do that then the victimHandler may access the buffer with + // refCnt = 0 which is disallowed. + previous.getBuffer().release(); + return block.heapSize(); + } + + /** + * Multi-threaded call to run the eviction process. + */ + private void runEviction() { + if (evictionThread == null) { + evict(); + } else { + evictionThread.evict(); + } + } + + boolean isEvictionInProgress() { + return evictionInProgress; + } + + long getOverhead() { + return overhead; + } + + /** + * Eviction method. + * + * Evict items in order of use, allowing delete items + * which haven't been used for the longest amount of time. + * + * @return how many bytes were freed + */ + long evict() { + + // Ensure only one eviction at a time + if (!evictionLock.tryLock()) { + return 0; + } + + long bytesToFree = 0L; + + try { + evictionInProgress = true; + long currentSize = this.size.get(); + bytesToFree = currentSize - minSize(); + + if (LOG.isTraceEnabled()) { + LOG.trace("Block cache LRU eviction started; Attempting to free " + + StringUtils.byteDesc(bytesToFree) + " of total=" + + StringUtils.byteDesc(currentSize)); + } + + if (bytesToFree <= 0) { + return 0; + } + + // Instantiate priority buckets + BlockBucket bucketSingle + = new BlockBucket("single", bytesToFree, blockSize, singleSize()); + BlockBucket bucketMulti + = new BlockBucket("multi", bytesToFree, blockSize, multiSize()); + BlockBucket bucketMemory + = new BlockBucket("memory", bytesToFree, blockSize, memorySize()); + + // Scan entire map putting into appropriate buckets + for (LruCachedBlock cachedBlock : map.values()) { + switch (cachedBlock.getPriority()) { + case SINGLE: { + bucketSingle.add(cachedBlock); + break; + } + case MULTI: { + bucketMulti.add(cachedBlock); + break; + } + case MEMORY: { + bucketMemory.add(cachedBlock); + break; + } + } + } + + long bytesFreed = 0; + if (forceInMemory || memoryFactor > 0.999f) { + long s = bucketSingle.totalSize(); + long m = bucketMulti.totalSize(); + if (bytesToFree > (s + m)) { + // this means we need to evict blocks in memory bucket to make room, + // so the single and multi buckets will be emptied + bytesFreed = bucketSingle.free(s); + bytesFreed += bucketMulti.free(m); + if (LOG.isTraceEnabled()) { + LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + + " from single and multi buckets"); + } + bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); + if (LOG.isTraceEnabled()) { + LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + + " total from all three buckets "); + } + } else { + // this means no need to evict block in memory bucket, + // and we try best to make the ratio between single-bucket and + // multi-bucket is 1:2 + long bytesRemain = s + m - bytesToFree; + if (3 * s <= bytesRemain) { + // single-bucket is small enough that no eviction happens for it + // hence all eviction goes from multi-bucket + bytesFreed = bucketMulti.free(bytesToFree); + } else if (3 * m <= 2 * bytesRemain) { + // multi-bucket is small enough that no eviction happens for it + // hence all eviction goes from single-bucket + bytesFreed = bucketSingle.free(bytesToFree); + } else { + // both buckets need to evict some blocks + bytesFreed = bucketSingle.free(s - bytesRemain / 3); + if (bytesFreed < bytesToFree) { + bytesFreed += bucketMulti.free(bytesToFree - bytesFreed); + } + } + } + } else { + PriorityQueue bucketQueue = new PriorityQueue<>(3); + + bucketQueue.add(bucketSingle); + bucketQueue.add(bucketMulti); + bucketQueue.add(bucketMemory); + + int remainingBuckets = bucketQueue.size(); + + BlockBucket bucket; + while ((bucket = bucketQueue.poll()) != null) { + long overflow = bucket.overflow(); + if (overflow > 0) { + long bucketBytesToFree = + Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); + bytesFreed += bucket.free(bucketBytesToFree); + } + remainingBuckets--; + } + } + if (LOG.isTraceEnabled()) { + long single = bucketSingle.totalSize(); + long multi = bucketMulti.totalSize(); + long memory = bucketMemory.totalSize(); + LOG.trace("Block cache LRU eviction completed; " + + "freed=" + StringUtils.byteDesc(bytesFreed) + ", " + + "total=" + StringUtils.byteDesc(this.size.get()) + ", " + + "single=" + StringUtils.byteDesc(single) + ", " + + "multi=" + StringUtils.byteDesc(multi) + ", " + + "memory=" + StringUtils.byteDesc(memory)); + } + } finally { + stats.evict(); + evictionInProgress = false; + evictionLock.unlock(); + } + return bytesToFree; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("blockCount", getBlockCount()) + .add("currentSize", StringUtils.byteDesc(getCurrentSize())) + .add("freeSize", StringUtils.byteDesc(getFreeSize())) + .add("maxSize", StringUtils.byteDesc(getMaxSize())) + .add("heapSize", StringUtils.byteDesc(heapSize())) + .add("minSize", StringUtils.byteDesc(minSize())) + .add("minFactor", minFactor) + .add("multiSize", StringUtils.byteDesc(multiSize())) + .add("multiFactor", multiFactor) + .add("singleSize", StringUtils.byteDesc(singleSize())) + .add("singleFactor", singleFactor) + .toString(); + } + + /** + * Used to group blocks into priority buckets. There will be a BlockBucket + * for each priority (single, multi, memory). Once bucketed, the eviction + * algorithm takes the appropriate number of elements out of each according + * to configuration parameters and their relatives sizes. + */ + private class BlockBucket implements Comparable { + + private final String name; + private final LruCachedBlockQueue queue; + private long totalSize = 0; + private final long bucketSize; + + public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) { + this.name = name; + this.bucketSize = bucketSize; + queue = new LruCachedBlockQueue(bytesToFree, blockSize); + totalSize = 0; + } + + public void add(LruCachedBlock block) { + totalSize += block.heapSize(); + queue.add(block); + } + + public long free(long toFree) { + if (LOG.isTraceEnabled()) { + LOG.trace("freeing {} from {}", StringUtils.byteDesc(toFree), this); + } + LruCachedBlock cb; + long freedBytes = 0; + while ((cb = queue.pollLast()) != null) { + freedBytes += evictBlock(cb, true); + if (freedBytes >= toFree) { + return freedBytes; + } + } + if (LOG.isTraceEnabled()) { + LOG.trace("freeing {} from {}", StringUtils.byteDesc(toFree), this); + } + return freedBytes; + } + + public long overflow() { + return totalSize - bucketSize; + } + + public long totalSize() { + return totalSize; + } + + @Override + public int compareTo(BlockBucket that) { + return Long.compare(this.overflow(), that.overflow()); + } + + @Override + public boolean equals(Object that) { + if (!(that instanceof BlockBucket)) { + return false; + } + return compareTo((BlockBucket)that) == 0; + } + + @Override + public int hashCode() { + return Objects.hashCode(name, bucketSize, queue, totalSize); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("name", name) + .add("totalSize", StringUtils.byteDesc(totalSize)) + .add("bucketSize", StringUtils.byteDesc(bucketSize)) + .toString(); + } + } + + /** + * Get the maximum size of this cache. + * + * @return max size in bytes + */ + + @Override + public long getMaxSize() { + return this.maxSize; + } + + @Override + public long getCurrentSize() { + return this.size.get(); + } + + @Override + public long getCurrentDataSize() { + return this.dataBlockSize.sum(); + } + + @Override + public long getFreeSize() { + return getMaxSize() - getCurrentSize(); + } + + @Override + public long size() { + return getMaxSize(); + } + + @Override + public long getBlockCount() { + return this.elements.get(); + } + + @Override + public long getDataBlockCount() { + return this.dataBlockElements.sum(); + } + + EvictionThread getEvictionThread() { + return this.evictionThread; + } + + /* + * Eviction thread. Sits in waiting state until an eviction is triggered + * when the cache size grows above the acceptable level.

+ * + * Thread is triggered into action by {@link LruAdaptiveBlockCache#runEviction()} + */ + static class EvictionThread extends Thread { + + private WeakReference cache; + private volatile boolean go = true; + // flag set after enter the run method, used for test + private boolean enteringRun = false; + + public EvictionThread(LruAdaptiveBlockCache cache) { + super(Thread.currentThread().getName() + ".LruAdaptiveBlockCache.EvictionThread"); + setDaemon(true); + this.cache = new WeakReference<>(cache); + } + + @Override + public void run() { + enteringRun = true; + long freedSumMb = 0; + int heavyEvictionCount = 0; + int freedDataOverheadPercent = 0; + long startTime = System.currentTimeMillis(); + while (this.go) { + synchronized (this) { + try { + this.wait(1000 * 10/*Don't wait for ever*/); + } catch (InterruptedException e) { + LOG.warn("Interrupted eviction thread ", e); + Thread.currentThread().interrupt(); + } + } + LruAdaptiveBlockCache cache = this.cache.get(); + if (cache == null) { + break; + } + freedSumMb += cache.evict()/1024/1024; + /* + * Sometimes we are reading more data than can fit into BlockCache + * and it is the cause a high rate of evictions. + * This in turn leads to heavy Garbage Collector works. + * So a lot of blocks put into BlockCache but never read, + * but spending a lot of CPU resources. + * Here we will analyze how many bytes were freed and decide + * decide whether the time has come to reduce amount of caching blocks. + * It help avoid put too many blocks into BlockCache + * when evict() works very active and save CPU for other jobs. + * More delails: https://issues.apache.org/jira/browse/HBASE-23887 + */ + + // First of all we have to control how much time + // has passed since previuos evict() was launched + // This is should be almost the same time (+/- 10s) + // because we get comparable volumes of freed bytes each time. + // 10s because this is default period to run evict() (see above this.wait) + long stopTime = System.currentTimeMillis(); + if ((stopTime - startTime) > 1000 * 10 - 1) { + // Here we have to calc what situation we have got. + // We have the limit "hbase.lru.cache.heavy.eviction.bytes.size.limit" + // and can calculte overhead on it. + // We will use this information to decide, + // how to change percent of caching blocks. + freedDataOverheadPercent = + (int) (freedSumMb * 100 / cache.heavyEvictionMbSizeLimit) - 100; + if (freedSumMb > cache.heavyEvictionMbSizeLimit) { + // Now we are in the situation when we are above the limit + // But maybe we are going to ignore it because it will end quite soon + heavyEvictionCount++; + if (heavyEvictionCount > cache.heavyEvictionCountLimit) { + // It is going for a long time and we have to reduce of caching + // blocks now. So we calculate here how many blocks we want to skip. + // It depends on: + // 1. Overhead - if overhead is big we could more aggressive + // reducing amount of caching blocks. + // 2. How fast we want to get the result. If we know that our + // heavy reading for a long time, we don't want to wait and can + // increase the coefficient and get good performance quite soon. + // But if we don't sure we can do it slowly and it could prevent + // premature exit from this mode. So, when the coefficient is + // higher we can get better performance when heavy reading is stable. + // But when reading is changing we can adjust to it and set + // the coefficient to lower value. + int change = + (int) (freedDataOverheadPercent * cache.heavyEvictionOverheadCoefficient); + // But practice shows that 15% of reducing is quite enough. + // We are not greedy (it could lead to premature exit). + change = Math.min(15, change); + change = Math.max(0, change); // I think it will never happen but check for sure + // So this is the key point, here we are reducing % of caching blocks + cache.cacheDataBlockPercent -= change; + // If we go down too deep we have to stop here, 1% any way should be. + cache.cacheDataBlockPercent = Math.max(1, cache.cacheDataBlockPercent); + } + } else { + // Well, we have got overshooting. + // Mayby it is just short-term fluctuation and we can stay in this mode. + // It help avoid permature exit during short-term fluctuation. + // If overshooting less than 90%, we will try to increase the percent of + // caching blocks and hope it is enough. + if (freedSumMb >= cache.heavyEvictionMbSizeLimit * 0.1) { + // Simple logic: more overshooting - more caching blocks (backpressure) + int change = (int) (-freedDataOverheadPercent * 0.1 + 1); + cache.cacheDataBlockPercent += change; + // But it can't be more then 100%, so check it. + cache.cacheDataBlockPercent = Math.min(100, cache.cacheDataBlockPercent); + } else { + // Looks like heavy reading is over. + // Just exit form this mode. + heavyEvictionCount = 0; + cache.cacheDataBlockPercent = 100; + } + } + LOG.info("BlockCache evicted (MB): {}, overhead (%): {}, " + + "heavy eviction counter: {}, " + + "current caching DataBlock (%): {}", + freedSumMb, freedDataOverheadPercent, + heavyEvictionCount, cache.cacheDataBlockPercent); + + freedSumMb = 0; + startTime = stopTime; + } + } + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", + justification="This is what we want") + public void evict() { + synchronized (this) { + this.notifyAll(); + } + } + + synchronized void shutdown() { + this.go = false; + this.notifyAll(); + } + + /** + * Used for the test. + */ + boolean isEnteringRun() { + return this.enteringRun; + } + } + + /* + * Statistics thread. Periodically prints the cache statistics to the log. + */ + static class StatisticsThread extends Thread { + + private final LruAdaptiveBlockCache lru; + + public StatisticsThread(LruAdaptiveBlockCache lru) { + super("LruAdaptiveBlockCacheStats"); + setDaemon(true); + this.lru = lru; + } + + @Override + public void run() { + lru.logStats(); + } + } + + public void logStats() { + // Log size + long totalSize = heapSize(); + long freeSize = maxSize - totalSize; + LruAdaptiveBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " + + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " + + "max=" + StringUtils.byteDesc(this.maxSize) + ", " + + "blockCount=" + getBlockCount() + ", " + + "accesses=" + stats.getRequestCount() + ", " + + "hits=" + stats.getHitCount() + ", " + + "hitRatio=" + (stats.getHitCount() == 0 ? + "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " + + "cachingAccesses=" + stats.getRequestCachingCount() + ", " + + "cachingHits=" + stats.getHitCachingCount() + ", " + + "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ? + "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) + + "evictions=" + stats.getEvictionCount() + ", " + + "evicted=" + stats.getEvictedCount() + ", " + + "evictedPerRun=" + stats.evictedPerEviction()); + } + + /** + * Get counter statistics for this cache. + * + *

Includes: total accesses, hits, misses, evicted blocks, and runs + * of the eviction processes. + */ + @Override + public CacheStats getStats() { + return this.stats; + } + + public final static long CACHE_FIXED_OVERHEAD = + ClassSize.estimateBase(LruAdaptiveBlockCache.class, false); + + @Override + public long heapSize() { + return getCurrentSize(); + } + + private static long calculateOverhead(long maxSize, long blockSize, int concurrency) { + // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG + return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP + + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY) + + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT); + } + + @Override + public Iterator iterator() { + final Iterator iterator = map.values().iterator(); + + return new Iterator() { + private final long now = System.nanoTime(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public CachedBlock next() { + final LruCachedBlock b = iterator.next(); + return new CachedBlock() { + @Override + public String toString() { + return BlockCacheUtil.toString(this, now); + } + + @Override + public BlockPriority getBlockPriority() { + return b.getPriority(); + } + + @Override + public BlockType getBlockType() { + return b.getBuffer().getBlockType(); + } + + @Override + public long getOffset() { + return b.getCacheKey().getOffset(); + } + + @Override + public long getSize() { + return b.getBuffer().heapSize(); + } + + @Override + public long getCachedTime() { + return b.getCachedTime(); + } + + @Override + public String getFilename() { + return b.getCacheKey().getHfileName(); + } + + @Override + public int compareTo(CachedBlock other) { + int diff = this.getFilename().compareTo(other.getFilename()); + if (diff != 0) { + return diff; + } + diff = Long.compare(this.getOffset(), other.getOffset()); + if (diff != 0) { + return diff; + } + if (other.getCachedTime() < 0 || this.getCachedTime() < 0) { + throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime()); + } + return Long.compare(other.getCachedTime(), this.getCachedTime()); + } + + @Override + public int hashCode() { + return b.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof CachedBlock) { + CachedBlock cb = (CachedBlock)obj; + return compareTo(cb) == 0; + } else { + return false; + } + } + }; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + // Simple calculators of sizes given factors and maxSize + + long acceptableSize() { + return (long)Math.floor(this.maxSize * this.acceptableFactor); + } + private long minSize() { + return (long)Math.floor(this.maxSize * this.minFactor); + } + private long singleSize() { + return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor); + } + private long multiSize() { + return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor); + } + private long memorySize() { + return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor); + } + + @Override + public void shutdown() { + if (victimHandler != null) { + victimHandler.shutdown(); + } + this.scheduleThreadPool.shutdown(); + for (int i = 0; i < 10; i++) { + if (!this.scheduleThreadPool.isShutdown()) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + LOG.warn("Interrupted while sleeping"); + Thread.currentThread().interrupt(); + break; + } + } + } + + if (!this.scheduleThreadPool.isShutdown()) { + List runnables = this.scheduleThreadPool.shutdownNow(); + LOG.debug("Still running " + runnables); + } + this.evictionThread.shutdown(); + } + + /** Clears the cache. Used in tests. */ + public void clearCache() { + this.map.clear(); + this.elements.set(0); + } + + public Map getEncodingCountsForTest() { + Map counts = new EnumMap<>(DataBlockEncoding.class); + for (LruCachedBlock block : map.values()) { + DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding(); + Integer count = counts.get(encoding); + counts.put(encoding, (count == null ? 0 : count) + 1); + } + return counts; + } + + Map getMapForTests() { + return map; + } + + @Override + public BlockCache[] getBlockCaches() { + if (victimHandler != null) { + return new BlockCache[] { this, this.victimHandler }; + } + return null; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruAdaptiveBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruAdaptiveBlockCache.java new file mode 100644 index 00000000000..f29d12ac315 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruAdaptiveBlockCache.java @@ -0,0 +1,1174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.hfile.LruAdaptiveBlockCache.EvictionThread; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.ClassSize; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests the concurrent LruAdaptiveBlockCache.

+ * + * Tests will ensure it grows and shrinks in size properly, + * evictions run when they're supposed to and do what they should, + * and that cached blocks are accessible when expected to be. + */ +@Category({IOTests.class, SmallTests.class}) +public class TestLruAdaptiveBlockCache { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestLruAdaptiveBlockCache.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestLruAdaptiveBlockCache.class); + + @Test + public void testCacheEvictionThreadSafe() throws Exception { + long maxSize = 100000; + int numBlocks = 9; + int testRuns = 10; + final long blockSize = calculateBlockSizeDefault(maxSize, numBlocks); + assertTrue("calculateBlockSize appears broken.", blockSize * numBlocks <= maxSize); + + final Configuration conf = HBaseConfiguration.create(); + final LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize); + EvictionThread evictionThread = cache.getEvictionThread(); + assertNotNull(evictionThread); + while (!evictionThread.isEnteringRun()) { + Thread.sleep(1000); + } + final String hfileName = "hfile"; + int threads = 10; + final int blocksPerThread = 5 * numBlocks; + for (int run = 0; run != testRuns; ++run) { + final AtomicInteger blockCount = new AtomicInteger(0); + ExecutorService service = Executors.newFixedThreadPool(threads); + for (int i = 0; i != threads; ++i) { + service.execute(() -> { + for (int blockIndex = 0; blockIndex < blocksPerThread + || (!cache.isEvictionInProgress()); ++blockIndex) { + CachedItem block = new CachedItem(hfileName, (int) blockSize, + blockCount.getAndIncrement()); + boolean inMemory = Math.random() > 0.5; + cache.cacheBlock(block.cacheKey, block, inMemory); + } + cache.evictBlocksByHfileName(hfileName); + }); + } + service.shutdown(); + // The test may fail here if the evict thread frees the blocks too fast + service.awaitTermination(10, TimeUnit.MINUTES); + Waiter.waitFor(conf, 10000, 100, new ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + return cache.getBlockCount() == 0; + } + + @Override + public String explainFailure() throws Exception { + return "Cache block count failed to return to 0"; + } + }); + assertEquals(0, cache.getBlockCount()); + assertEquals(cache.getOverhead(), cache.getCurrentSize()); + } + } + + @Test + public void testBackgroundEvictionThread() throws Exception { + long maxSize = 100000; + int numBlocks = 9; + long blockSize = calculateBlockSizeDefault(maxSize, numBlocks); + assertTrue("calculateBlockSize appears broken.", + blockSize * numBlocks <= maxSize); + + LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize,blockSize); + EvictionThread evictionThread = cache.getEvictionThread(); + assertNotNull(evictionThread); + + CachedItem[] blocks = generateFixedBlocks(numBlocks + 1, blockSize, "block"); + + // Make sure eviction thread has entered run method + while (!evictionThread.isEnteringRun()) { + Thread.sleep(1); + } + + // Add all the blocks + for (CachedItem block : blocks) { + cache.cacheBlock(block.cacheKey, block); + } + + // wait until at least one eviction has run + int n = 0; + while(cache.getStats().getEvictionCount() == 0) { + Thread.sleep(200); + assertTrue("Eviction never happened.", n++ < 20); + } + + // let cache stabilize + // On some systems, the cache will run multiple evictions before it attains + // steady-state. For instance, after populating the cache with 10 blocks, + // the first eviction evicts a single block and then a second eviction + // evicts another. I think this is due to the delta between minSize and + // acceptableSize, combined with variance between object overhead on + // different environments. + n = 0; + for (long prevCnt = 0 /* < number of blocks added */, + curCnt = cache.getBlockCount(); + prevCnt != curCnt; prevCnt = curCnt, curCnt = cache.getBlockCount()) { + Thread.sleep(200); + assertTrue("Cache never stabilized.", n++ < 20); + } + + long evictionCount = cache.getStats().getEvictionCount(); + assertTrue(evictionCount >= 1); + System.out.println("Background Evictions run: " + evictionCount); + } + + @Test + public void testCacheSimple() throws Exception { + + long maxSize = 1000000; + long blockSize = calculateBlockSizeDefault(maxSize, 101); + + LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize); + + CachedItem [] blocks = generateRandomBlocks(100, blockSize); + + long expectedCacheSize = cache.heapSize(); + + // Confirm empty + for (CachedItem block : blocks) { + assertTrue(cache.getBlock(block.cacheKey, true, false, + true) == null); + } + + // Add blocks + for (CachedItem block : blocks) { + cache.cacheBlock(block.cacheKey, block); + expectedCacheSize += block.cacheBlockHeapSize(); + } + + // Verify correctly calculated cache heap size + assertEquals(expectedCacheSize, cache.heapSize()); + + // Check if all blocks are properly cached and retrieved + for (CachedItem block : blocks) { + HeapSize buf = cache.getBlock(block.cacheKey, true, false, + true); + assertTrue(buf != null); + assertEquals(buf.heapSize(), block.heapSize()); + } + + // Re-add same blocks and ensure nothing has changed + long expectedBlockCount = cache.getBlockCount(); + for (CachedItem block : blocks) { + cache.cacheBlock(block.cacheKey, block); + } + assertEquals( + "Cache should ignore cache requests for blocks already in cache", + expectedBlockCount, cache.getBlockCount()); + + // Verify correctly calculated cache heap size + assertEquals(expectedCacheSize, cache.heapSize()); + + // Check if all blocks are properly cached and retrieved + for (CachedItem block : blocks) { + HeapSize buf = cache.getBlock(block.cacheKey, true, false, + true); + assertTrue(buf != null); + assertEquals(buf.heapSize(), block.heapSize()); + } + + // Expect no evictions + assertEquals(0, cache.getStats().getEvictionCount()); + Thread t = new LruAdaptiveBlockCache.StatisticsThread(cache); + t.start(); + t.join(); + } + + @Test + public void testCacheEvictionSimple() throws Exception { + + long maxSize = 100000; + long blockSize = calculateBlockSizeDefault(maxSize, 10); + + LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize,blockSize,false); + + CachedItem [] blocks = generateFixedBlocks(10, blockSize, "block"); + + long expectedCacheSize = cache.heapSize(); + + // Add all the blocks + for (CachedItem block : blocks) { + cache.cacheBlock(block.cacheKey, block); + expectedCacheSize += block.cacheBlockHeapSize(); + } + + // A single eviction run should have occurred + assertEquals(1, cache.getStats().getEvictionCount()); + + // Our expected size overruns acceptable limit + assertTrue(expectedCacheSize > + (maxSize * LruAdaptiveBlockCache.DEFAULT_ACCEPTABLE_FACTOR)); + + // But the cache did not grow beyond max + assertTrue(cache.heapSize() < maxSize); + + // And is still below the acceptable limit + assertTrue(cache.heapSize() < + (maxSize * LruAdaptiveBlockCache.DEFAULT_ACCEPTABLE_FACTOR)); + + // All blocks except block 0 should be in the cache + assertTrue(cache.getBlock(blocks[0].cacheKey, true, false, + true) == null); + for(int i=1;i + (maxSize * LruAdaptiveBlockCache.DEFAULT_ACCEPTABLE_FACTOR)); + + // But the cache did not grow beyond max + assertTrue(cache.heapSize() <= maxSize); + + // And is now below the acceptable limit + assertTrue(cache.heapSize() <= + (maxSize * LruAdaptiveBlockCache.DEFAULT_ACCEPTABLE_FACTOR)); + + // We expect fairness across the two priorities. + // This test makes multi go barely over its limit, in-memory + // empty, and the rest in single. Two single evictions and + // one multi eviction expected. + assertTrue(cache.getBlock(singleBlocks[0].cacheKey, true, false, + true) == null); + assertTrue(cache.getBlock(multiBlocks[0].cacheKey, true, false, + true) == null); + + // And all others to be cached + for(int i=1;i<4;i++) { + assertEquals(cache.getBlock(singleBlocks[i].cacheKey, true, false, + true), + singleBlocks[i]); + assertEquals(cache.getBlock(multiBlocks[i].cacheKey, true, false, + true), + multiBlocks[i]); + } + } + + @Test + public void testCacheEvictionThreePriorities() throws Exception { + + long maxSize = 100000; + long blockSize = calculateBlockSize(maxSize, 10); + + LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false, + (int)Math.ceil(1.2*maxSize/blockSize), + LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR, + LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL, + 0.98f, // min + 0.99f, // acceptable + 0.33f, // single + 0.33f, // multi + 0.34f, // memory + 1.2f, // limit + false, + 16 * 1024 * 1024, + 10, + 500, + 0.01f); + + CachedItem [] singleBlocks = generateFixedBlocks(5, blockSize, "single"); + CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi"); + CachedItem [] memoryBlocks = generateFixedBlocks(5, blockSize, "memory"); + + long expectedCacheSize = cache.heapSize(); + + // Add 3 blocks from each priority + for(int i=0;i<3;i++) { + + // Just add single blocks + cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]); + expectedCacheSize += singleBlocks[i].cacheBlockHeapSize(); + + // Add and get multi blocks + cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]); + expectedCacheSize += multiBlocks[i].cacheBlockHeapSize(); + cache.getBlock(multiBlocks[i].cacheKey, true, false, true); + + // Add memory blocks as such + cache.cacheBlock(memoryBlocks[i].cacheKey, memoryBlocks[i], true); + expectedCacheSize += memoryBlocks[i].cacheBlockHeapSize(); + + } + + // Do not expect any evictions yet + assertEquals(0, cache.getStats().getEvictionCount()); + + // Verify cache size + assertEquals(expectedCacheSize, cache.heapSize()); + + // Insert a single block, oldest single should be evicted + cache.cacheBlock(singleBlocks[3].cacheKey, singleBlocks[3]); + + // Single eviction, one thing evicted + assertEquals(1, cache.getStats().getEvictionCount()); + assertEquals(1, cache.getStats().getEvictedCount()); + + // Verify oldest single block is the one evicted + assertEquals(null, cache.getBlock(singleBlocks[0].cacheKey, true, false, + true)); + + // Change the oldest remaining single block to a multi + cache.getBlock(singleBlocks[1].cacheKey, true, false, true); + + // Insert another single block + cache.cacheBlock(singleBlocks[4].cacheKey, singleBlocks[4]); + + // Two evictions, two evicted. + assertEquals(2, cache.getStats().getEvictionCount()); + assertEquals(2, cache.getStats().getEvictedCount()); + + // Oldest multi block should be evicted now + assertEquals(null, cache.getBlock(multiBlocks[0].cacheKey, true, false, + true)); + + // Insert another memory block + cache.cacheBlock(memoryBlocks[3].cacheKey, memoryBlocks[3], true); + + // Three evictions, three evicted. + assertEquals(3, cache.getStats().getEvictionCount()); + assertEquals(3, cache.getStats().getEvictedCount()); + + // Oldest memory block should be evicted now + assertEquals(null, cache.getBlock(memoryBlocks[0].cacheKey, true, false, + true)); + + // Add a block that is twice as big (should force two evictions) + CachedItem [] bigBlocks = generateFixedBlocks(3, blockSize*3, "big"); + cache.cacheBlock(bigBlocks[0].cacheKey, bigBlocks[0]); + + // Four evictions, six evicted (inserted block 3X size, expect +3 evicted) + assertEquals(4, cache.getStats().getEvictionCount()); + assertEquals(6, cache.getStats().getEvictedCount()); + + // Expect three remaining singles to be evicted + assertEquals(null, cache.getBlock(singleBlocks[2].cacheKey, true, false, + true)); + assertEquals(null, cache.getBlock(singleBlocks[3].cacheKey, true, false, + true)); + assertEquals(null, cache.getBlock(singleBlocks[4].cacheKey, true, false, + true)); + + // Make the big block a multi block + cache.getBlock(bigBlocks[0].cacheKey, true, false, true); + + // Cache another single big block + cache.cacheBlock(bigBlocks[1].cacheKey, bigBlocks[1]); + + // Five evictions, nine evicted (3 new) + assertEquals(5, cache.getStats().getEvictionCount()); + assertEquals(9, cache.getStats().getEvictedCount()); + + // Expect three remaining multis to be evicted + assertEquals(null, cache.getBlock(singleBlocks[1].cacheKey, true, false, + true)); + assertEquals(null, cache.getBlock(multiBlocks[1].cacheKey, true, false, + true)); + assertEquals(null, cache.getBlock(multiBlocks[2].cacheKey, true, false, + true)); + + // Cache a big memory block + cache.cacheBlock(bigBlocks[2].cacheKey, bigBlocks[2], true); + + // Six evictions, twelve evicted (3 new) + assertEquals(6, cache.getStats().getEvictionCount()); + assertEquals(12, cache.getStats().getEvictedCount()); + + // Expect three remaining in-memory to be evicted + assertEquals(null, cache.getBlock(memoryBlocks[1].cacheKey, true, false, + true)); + assertEquals(null, cache.getBlock(memoryBlocks[2].cacheKey, true, false, + true)); + assertEquals(null, cache.getBlock(memoryBlocks[3].cacheKey, true, false, + true)); + } + + @Test + public void testCacheEvictionInMemoryForceMode() throws Exception { + long maxSize = 100000; + long blockSize = calculateBlockSize(maxSize, 10); + + LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false, + (int)Math.ceil(1.2*maxSize/blockSize), + LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR, + LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL, + 0.98f, // min + 0.99f, // acceptable + 0.2f, // single + 0.3f, // multi + 0.5f, // memory + 1.2f, // limit + true, + 16 * 1024 * 1024, + 10, + 500, + 0.01f); + + CachedItem [] singleBlocks = generateFixedBlocks(10, blockSize, "single"); + CachedItem [] multiBlocks = generateFixedBlocks(10, blockSize, "multi"); + CachedItem [] memoryBlocks = generateFixedBlocks(10, blockSize, "memory"); + + long expectedCacheSize = cache.heapSize(); + + // 0. Add 5 single blocks and 4 multi blocks to make cache full, si:mu:me = 5:4:0 + for(int i = 0; i < 4; i++) { + // Just add single blocks + cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]); + expectedCacheSize += singleBlocks[i].cacheBlockHeapSize(); + // Add and get multi blocks + cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]); + expectedCacheSize += multiBlocks[i].cacheBlockHeapSize(); + cache.getBlock(multiBlocks[i].cacheKey, true, false, true); + } + // 5th single block + cache.cacheBlock(singleBlocks[4].cacheKey, singleBlocks[4]); + expectedCacheSize += singleBlocks[4].cacheBlockHeapSize(); + // Do not expect any evictions yet + assertEquals(0, cache.getStats().getEvictionCount()); + // Verify cache size + assertEquals(expectedCacheSize, cache.heapSize()); + + // 1. Insert a memory block, oldest single should be evicted, si:mu:me = 4:4:1 + cache.cacheBlock(memoryBlocks[0].cacheKey, memoryBlocks[0], true); + // Single eviction, one block evicted + assertEquals(1, cache.getStats().getEvictionCount()); + assertEquals(1, cache.getStats().getEvictedCount()); + // Verify oldest single block (index = 0) is the one evicted + assertEquals(null, cache.getBlock(singleBlocks[0].cacheKey, true, false, + true)); + + // 2. Insert another memory block, another single evicted, si:mu:me = 3:4:2 + cache.cacheBlock(memoryBlocks[1].cacheKey, memoryBlocks[1], true); + // Two evictions, two evicted. + assertEquals(2, cache.getStats().getEvictionCount()); + assertEquals(2, cache.getStats().getEvictedCount()); + // Current oldest single block (index = 1) should be evicted now + assertEquals(null, cache.getBlock(singleBlocks[1].cacheKey, true, false, + true)); + + // 3. Insert 4 memory blocks, 2 single and 2 multi evicted, si:mu:me = 1:2:6 + cache.cacheBlock(memoryBlocks[2].cacheKey, memoryBlocks[2], true); + cache.cacheBlock(memoryBlocks[3].cacheKey, memoryBlocks[3], true); + cache.cacheBlock(memoryBlocks[4].cacheKey, memoryBlocks[4], true); + cache.cacheBlock(memoryBlocks[5].cacheKey, memoryBlocks[5], true); + // Three evictions, three evicted. + assertEquals(6, cache.getStats().getEvictionCount()); + assertEquals(6, cache.getStats().getEvictedCount()); + // two oldest single blocks and two oldest multi blocks evicted + assertEquals(null, cache.getBlock(singleBlocks[2].cacheKey, true, false, + true)); + assertEquals(null, cache.getBlock(singleBlocks[3].cacheKey, true, false, + true)); + assertEquals(null, cache.getBlock(multiBlocks[0].cacheKey, true, false, + true)); + assertEquals(null, cache.getBlock(multiBlocks[1].cacheKey, true, false, + true)); + + // 4. Insert 3 memory blocks, the remaining 1 single and 2 multi evicted + // si:mu:me = 0:0:9 + cache.cacheBlock(memoryBlocks[6].cacheKey, memoryBlocks[6], true); + cache.cacheBlock(memoryBlocks[7].cacheKey, memoryBlocks[7], true); + cache.cacheBlock(memoryBlocks[8].cacheKey, memoryBlocks[8], true); + // Three evictions, three evicted. + assertEquals(9, cache.getStats().getEvictionCount()); + assertEquals(9, cache.getStats().getEvictedCount()); + // one oldest single block and two oldest multi blocks evicted + assertEquals(null, cache.getBlock(singleBlocks[4].cacheKey, true, false, + true)); + assertEquals(null, cache.getBlock(multiBlocks[2].cacheKey, true, false, + true)); + assertEquals(null, cache.getBlock(multiBlocks[3].cacheKey, true, false, + true)); + + // 5. Insert one memory block, the oldest memory evicted + // si:mu:me = 0:0:9 + cache.cacheBlock(memoryBlocks[9].cacheKey, memoryBlocks[9], true); + // one eviction, one evicted. + assertEquals(10, cache.getStats().getEvictionCount()); + assertEquals(10, cache.getStats().getEvictedCount()); + // oldest memory block evicted + assertEquals(null, cache.getBlock(memoryBlocks[0].cacheKey, true, false, + true)); + + // 6. Insert one new single block, itself evicted immediately since + // all blocks in cache are memory-type which have higher priority + // si:mu:me = 0:0:9 (no change) + cache.cacheBlock(singleBlocks[9].cacheKey, singleBlocks[9]); + // one eviction, one evicted. + assertEquals(11, cache.getStats().getEvictionCount()); + assertEquals(11, cache.getStats().getEvictedCount()); + // the single block just cached now evicted (can't evict memory) + assertEquals(null, cache.getBlock(singleBlocks[9].cacheKey, true, false, + true)); + } + + // test scan resistance + @Test + public void testScanResistance() throws Exception { + + long maxSize = 100000; + long blockSize = calculateBlockSize(maxSize, 10); + + LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false, + (int)Math.ceil(1.2*maxSize/blockSize), + LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR, + LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL, + 0.66f, // min + 0.99f, // acceptable + 0.33f, // single + 0.33f, // multi + 0.34f, // memory + 1.2f, // limit + false, + 16 * 1024 * 1024, + 10, + 500, + 0.01f); + + CachedItem [] singleBlocks = generateFixedBlocks(20, blockSize, "single"); + CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi"); + + // Add 5 multi blocks + for (CachedItem block : multiBlocks) { + cache.cacheBlock(block.cacheKey, block); + cache.getBlock(block.cacheKey, true, false, true); + } + + // Add 5 single blocks + for(int i=0;i<5;i++) { + cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]); + } + + // An eviction ran + assertEquals(1, cache.getStats().getEvictionCount()); + + // To drop down to 2/3 capacity, we'll need to evict 4 blocks + assertEquals(4, cache.getStats().getEvictedCount()); + + // Should have been taken off equally from single and multi + assertEquals(null, cache.getBlock(singleBlocks[0].cacheKey, true, false, + true)); + assertEquals(null, cache.getBlock(singleBlocks[1].cacheKey, true, false, + true)); + assertEquals(null, cache.getBlock(multiBlocks[0].cacheKey, true, false, + true)); + assertEquals(null, cache.getBlock(multiBlocks[1].cacheKey, true, false, + true)); + + // Let's keep "scanning" by adding single blocks. From here on we only + // expect evictions from the single bucket. + + // Every time we reach 10 total blocks (every 4 inserts) we get 4 single + // blocks evicted. Inserting 13 blocks should yield 3 more evictions and + // 12 more evicted. + + for(int i=5;i<18;i++) { + cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]); + } + + // 4 total evictions, 16 total evicted + assertEquals(4, cache.getStats().getEvictionCount()); + assertEquals(16, cache.getStats().getEvictedCount()); + + // Should now have 7 total blocks + assertEquals(7, cache.getBlockCount()); + + } + + @Test + public void testMaxBlockSize() throws Exception { + long maxSize = 100000; + long blockSize = calculateBlockSize(maxSize, 10); + + LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false, + (int)Math.ceil(1.2*maxSize/blockSize), + LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR, + LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL, + 0.66f, // min + 0.99f, // acceptable + 0.33f, // single + 0.33f, // multi + 0.34f, // memory + 1.2f, // limit + false, + 1024, + 10, + 500, + 0.01f); + + CachedItem [] tooLong = generateFixedBlocks(10, 1024+5, "long"); + CachedItem [] small = generateFixedBlocks(15, 600, "small"); + + + for (CachedItem i:tooLong) { + cache.cacheBlock(i.cacheKey, i); + } + for (CachedItem i:small) { + cache.cacheBlock(i.cacheKey, i); + } + assertEquals(15,cache.getBlockCount()); + for (CachedItem i:small) { + assertNotNull(cache.getBlock(i.cacheKey, true, false, false)); + } + for (CachedItem i:tooLong) { + assertNull(cache.getBlock(i.cacheKey, true, false, false)); + } + + assertEquals(10, cache.getStats().getFailedInserts()); + } + + // test setMaxSize + @Test + public void testResizeBlockCache() throws Exception { + + long maxSize = 300000; + long blockSize = calculateBlockSize(maxSize, 31); + + LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false, + (int)Math.ceil(1.2*maxSize/blockSize), + LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR, + LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL, + 0.98f, // min + 0.99f, // acceptable + 0.33f, // single + 0.33f, // multi + 0.34f, // memory + 1.2f, // limit + false, + 16 * 1024 * 1024, + 10, + 500, + 0.01f); + + CachedItem [] singleBlocks = generateFixedBlocks(10, blockSize, "single"); + CachedItem [] multiBlocks = generateFixedBlocks(10, blockSize, "multi"); + CachedItem [] memoryBlocks = generateFixedBlocks(10, blockSize, "memory"); + + // Add all blocks from all priorities + for(int i=0;i<10;i++) { + + // Just add single blocks + cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]); + + // Add and get multi blocks + cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]); + cache.getBlock(multiBlocks[i].cacheKey, true, false, true); + + // Add memory blocks as such + cache.cacheBlock(memoryBlocks[i].cacheKey, memoryBlocks[i], true); + } + + // Do not expect any evictions yet + assertEquals(0, cache.getStats().getEvictionCount()); + + // Resize to half capacity plus an extra block (otherwise we evict an extra) + cache.setMaxSize((long)(maxSize * 0.5f)); + + // Should have run a single eviction + assertEquals(1, cache.getStats().getEvictionCount()); + + // And we expect 1/2 of the blocks to be evicted + assertEquals(15, cache.getStats().getEvictedCount()); + + // And the oldest 5 blocks from each category should be gone + for(int i=0;i<5;i++) { + assertEquals(null, cache.getBlock(singleBlocks[i].cacheKey, true, + false, true)); + assertEquals(null, cache.getBlock(multiBlocks[i].cacheKey, true, + false, true)); + assertEquals(null, cache.getBlock(memoryBlocks[i].cacheKey, true, + false, true)); + } + + // And the newest 5 blocks should still be accessible + for(int i=5;i<10;i++) { + assertEquals(singleBlocks[i], cache.getBlock(singleBlocks[i].cacheKey, true, + false, true)); + assertEquals(multiBlocks[i], cache.getBlock(multiBlocks[i].cacheKey, true, + false, true)); + assertEquals(memoryBlocks[i], cache.getBlock(memoryBlocks[i].cacheKey, true, + false, true)); + } + } + + // test metricsPastNPeriods + @Test + public void testPastNPeriodsMetrics() throws Exception { + double delta = 0.01; + + // 3 total periods + CacheStats stats = new CacheStats("test", 3); + + // No accesses, should be 0 + stats.rollMetricsPeriod(); + assertEquals(0.0, stats.getHitRatioPastNPeriods(), delta); + assertEquals(0.0, stats.getHitCachingRatioPastNPeriods(), delta); + + // period 1, 1 hit caching, 1 hit non-caching, 2 miss non-caching + // should be (2/4)=0.5 and (1/1)=1 + stats.hit(false, true, BlockType.DATA); + stats.hit(true, true, BlockType.DATA); + stats.miss(false, false, BlockType.DATA); + stats.miss(false, false, BlockType.DATA); + stats.rollMetricsPeriod(); + assertEquals(0.5, stats.getHitRatioPastNPeriods(), delta); + assertEquals(1.0, stats.getHitCachingRatioPastNPeriods(), delta); + + // period 2, 1 miss caching, 3 miss non-caching + // should be (2/8)=0.25 and (1/2)=0.5 + stats.miss(true, false, BlockType.DATA); + stats.miss(false, false, BlockType.DATA); + stats.miss(false, false, BlockType.DATA); + stats.miss(false, false, BlockType.DATA); + stats.rollMetricsPeriod(); + assertEquals(0.25, stats.getHitRatioPastNPeriods(), delta); + assertEquals(0.5, stats.getHitCachingRatioPastNPeriods(), delta); + + // period 3, 2 hits of each type + // should be (6/12)=0.5 and (3/4)=0.75 + stats.hit(false, true, BlockType.DATA); + stats.hit(true, true, BlockType.DATA); + stats.hit(false, true, BlockType.DATA); + stats.hit(true, true, BlockType.DATA); + stats.rollMetricsPeriod(); + assertEquals(0.5, stats.getHitRatioPastNPeriods(), delta); + assertEquals(0.75, stats.getHitCachingRatioPastNPeriods(), delta); + + // period 4, evict period 1, two caching misses + // should be (4/10)=0.4 and (2/5)=0.4 + stats.miss(true, false, BlockType.DATA); + stats.miss(true, false, BlockType.DATA); + stats.rollMetricsPeriod(); + assertEquals(0.4, stats.getHitRatioPastNPeriods(), delta); + assertEquals(0.4, stats.getHitCachingRatioPastNPeriods(), delta); + + // period 5, evict period 2, 2 caching misses, 2 non-caching hit + // should be (6/10)=0.6 and (2/6)=1/3 + stats.miss(true, false, BlockType.DATA); + stats.miss(true, false, BlockType.DATA); + stats.hit(false, true, BlockType.DATA); + stats.hit(false, true, BlockType.DATA); + stats.rollMetricsPeriod(); + assertEquals(0.6, stats.getHitRatioPastNPeriods(), delta); + assertEquals((double)1/3, stats.getHitCachingRatioPastNPeriods(), delta); + + // period 6, evict period 3 + // should be (2/6)=1/3 and (0/4)=0 + stats.rollMetricsPeriod(); + assertEquals((double)1/3, stats.getHitRatioPastNPeriods(), delta); + assertEquals(0.0, stats.getHitCachingRatioPastNPeriods(), delta); + + // period 7, evict period 4 + // should be (2/4)=0.5 and (0/2)=0 + stats.rollMetricsPeriod(); + assertEquals(0.5, stats.getHitRatioPastNPeriods(), delta); + assertEquals(0.0, stats.getHitCachingRatioPastNPeriods(), delta); + + // period 8, evict period 5 + // should be 0 and 0 + stats.rollMetricsPeriod(); + assertEquals(0.0, stats.getHitRatioPastNPeriods(), delta); + assertEquals(0.0, stats.getHitCachingRatioPastNPeriods(), delta); + + // period 9, one of each + // should be (2/4)=0.5 and (1/2)=0.5 + stats.miss(true, false, BlockType.DATA); + stats.miss(false, false, BlockType.DATA); + stats.hit(true, true, BlockType.DATA); + stats.hit(false, true, BlockType.DATA); + stats.rollMetricsPeriod(); + assertEquals(0.5, stats.getHitRatioPastNPeriods(), delta); + assertEquals(0.5, stats.getHitCachingRatioPastNPeriods(), delta); + } + + @Test + public void testCacheBlockNextBlockMetadataMissing() { + long maxSize = 100000; + long blockSize = calculateBlockSize(maxSize, 10); + int size = 100; + int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; + byte[] byteArr = new byte[length]; + ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); + HFileContext meta = new HFileContextBuilder().build(); + HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, + -1, ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, 52, + -1, meta, HEAP); + HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, + -1, ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, + -1, meta, HEAP); + + LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false, + (int)Math.ceil(1.2*maxSize/blockSize), + LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR, + LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL, + 0.66f, // min + 0.99f, // acceptable + 0.33f, // single + 0.33f, // multi + 0.34f, // memory + 1.2f, // limit + false, + 1024, + 10, + 500, + 0.01f); + + BlockCacheKey key = new BlockCacheKey("key1", 0); + ByteBuffer actualBuffer = ByteBuffer.allocate(length); + ByteBuffer block1Buffer = ByteBuffer.allocate(length); + ByteBuffer block2Buffer = ByteBuffer.allocate(length); + blockWithNextBlockMetadata.serialize(block1Buffer, true); + blockWithoutNextBlockMetadata.serialize(block2Buffer, true); + + //Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back. + CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, + block1Buffer); + + //Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back. + CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, + block1Buffer); + + //Clear and add blockWithoutNextBlockMetadata + cache.clearCache(); + assertNull(cache.getBlock(key, false, false, false)); + CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer, + block2Buffer); + + //Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace. + CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer, + block1Buffer); + } + + private CachedItem [] generateFixedBlocks(int numBlocks, int size, String pfx) { + CachedItem [] blocks = new CachedItem[numBlocks]; + for(int i=0;i getDeserializer() { + return null; + } + + @Override + public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { + } + + @Override + public BlockType getBlockType() { + return BlockType.DATA; + } + } + + static void testMultiThreadGetAndEvictBlockInternal(BlockCache cache) throws Exception { + int size = 100; + int length = HConstants.HFILEBLOCK_HEADER_SIZE + size; + byte[] byteArr = new byte[length]; + HFileContext meta = new HFileContextBuilder().build(); + BlockCacheKey key = new BlockCacheKey("key1", 0); + HFileBlock blk = new HFileBlock(BlockType.DATA, size, size, -1, + ByteBuff.wrap(ByteBuffer.wrap(byteArr, 0, size)), HFileBlock.FILL_HEADER, -1, + 52, -1, meta, + HEAP); + AtomicBoolean err1 = new AtomicBoolean(false); + Thread t1 = new Thread(() -> { + for (int i = 0; i < 10000 && !err1.get(); i++) { + try { + cache.getBlock(key, false, false, true); + } catch (Exception e) { + err1.set(true); + LOG.info("Cache block or get block failure: ", e); + } + } + }); + + AtomicBoolean err2 = new AtomicBoolean(false); + Thread t2 = new Thread(() -> { + for (int i = 0; i < 10000 && !err2.get(); i++) { + try { + cache.evictBlock(key); + } catch (Exception e) { + err2.set(true); + LOG.info("Evict block failure: ", e); + } + } + }); + + AtomicBoolean err3 = new AtomicBoolean(false); + Thread t3 = new Thread(() -> { + for (int i = 0; i < 10000 && !err3.get(); i++) { + try { + cache.cacheBlock(key, blk); + } catch (Exception e) { + err3.set(true); + LOG.info("Cache block failure: ", e); + } + } + }); + t1.start(); + t2.start(); + t3.start(); + t1.join(); + t2.join(); + t3.join(); + Assert.assertFalse(err1.get()); + Assert.assertFalse(err2.get()); + Assert.assertFalse(err3.get()); + } + + @Test + public void testMultiThreadGetAndEvictBlock() throws Exception { + long maxSize = 100000; + long blockSize = calculateBlockSize(maxSize, 10); + LruAdaptiveBlockCache cache = + new LruAdaptiveBlockCache(maxSize, blockSize, false, + (int) Math.ceil(1.2 * maxSize / blockSize), + LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR, LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL, + 0.66f, // min + 0.99f, // acceptable + 0.33f, // single + 0.33f, // multi + 0.34f, // memory + 1.2f, // limit + false, 1024, + 10, + 500, + 0.01f); + testMultiThreadGetAndEvictBlockInternal(cache); + } + + public void testSkipCacheDataBlocksInteral(int heavyEvictionCountLimit) throws Exception { + long maxSize = 100000000; + int numBlocks = 100000; + final long blockSize = calculateBlockSizeDefault(maxSize, numBlocks); + assertTrue("calculateBlockSize appears broken.", + blockSize * numBlocks <= maxSize); + + final LruAdaptiveBlockCache cache = + new LruAdaptiveBlockCache(maxSize, blockSize, true, + (int) Math.ceil(1.2 * maxSize / blockSize), + LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR, LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL, + 0.5f, // min + 0.99f, // acceptable + 0.33f, // single + 0.33f, // multi + 0.34f, // memory + 1.2f, // limit + false, + maxSize, + heavyEvictionCountLimit, + 200, + 0.01f); + + EvictionThread evictionThread = cache.getEvictionThread(); + assertNotNull(evictionThread); + while (!evictionThread.isEnteringRun()) { + Thread.sleep(1); + } + + final String hfileName = "hfile"; + for (int blockIndex = 0; blockIndex <= numBlocks * 3000; ++blockIndex) { + CachedItem block = new CachedItem(hfileName, (int) blockSize, blockIndex); + cache.cacheBlock(block.cacheKey, block, false); + if (cache.getCacheDataBlockPercent() < 70) { + // enough for test + break; + } + } + + evictionThread.evict(); + Thread.sleep(100); + + if (heavyEvictionCountLimit == 0) { + // Check if all offset (last two digits) of cached blocks less than the percent. + // It means some of blocks haven't put into BlockCache + assertTrue(cache.getCacheDataBlockPercent() < 90); + for (BlockCacheKey key : cache.getMapForTests().keySet()) { + assertTrue(!(key.getOffset() % 100 > 90)); + } + } else { + // Check that auto-scaling is not working (all blocks in BlockCache) + assertTrue(cache.getCacheDataBlockPercent() == 100); + int counter = 0; + for (BlockCacheKey key : cache.getMapForTests().keySet()) { + if (key.getOffset() % 100 > 90) { + counter++; + } + } + assertTrue(counter > 1000); + } + evictionThread.shutdown(); + } + + @Test + public void testSkipCacheDataBlocks() throws Exception { + // Check that auto-scaling will work right after start + testSkipCacheDataBlocksInteral(0); + // Check that auto-scaling will not work right after start + // (have to finished before auto-scaling) + testSkipCacheDataBlocksInteral(100); + } +}