HBASE-15560 TinyLFU-based BlockCache (Ben Manes)

This commit is contained in:
tedyu 2016-10-04 05:15:51 -07:00
parent 2508edcd4e
commit 9e0c2562a9
13 changed files with 897 additions and 79 deletions

View File

@ -806,6 +806,11 @@ possible configurations would overwhelm and obscure the important.
<description> <description>
The default thread pool size if parallel-seeking feature enabled.</description> The default thread pool size if parallel-seeking feature enabled.</description>
</property> </property>
<property>
<name>hfile.block.cache.policy</name>
<value>LRU</value>
<description>The eviction policy for the L1 block cache (LRU or TinyLFU).</description>
</property>
<property> <property>
<name>hfile.block.cache.size</name> <name>hfile.block.cache.size</name>
<value>0.4</value> <value>0.4</value>

View File

@ -150,6 +150,20 @@ under the License.
</licenses> </licenses>
</project> </project>
</supplement> </supplement>
<supplement>
<project>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>
</project>
</supplement>
<supplement> <supplement>
<project> <project>
<groupId>com.lmax</groupId> <groupId>com.lmax</groupId>
@ -1668,7 +1682,7 @@ Mozilla Public License Version 2.0
means any form of the work other than Source Code Form. means any form of the work other than Source Code Form.
1.7. "Larger Work" 1.7. "Larger Work"
means a work that combines Covered Software with other material, in means a work that combines Covered Software with other material, in
a separate file or files, that is not Covered Software. a separate file or files, that is not Covered Software.
1.8. "License" 1.8. "License"

View File

@ -434,6 +434,10 @@
<artifactId>findbugs-annotations</artifactId> <artifactId>findbugs-annotations</artifactId>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency> <dependency>
<groupId>io.dropwizard.metrics</groupId> <groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId> <artifactId>metrics-core</artifactId>

View File

@ -22,13 +22,14 @@ import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import java.io.IOException; import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.util.concurrent.ForkJoinPool;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
@ -43,6 +44,12 @@ import com.google.common.annotations.VisibleForTesting;
public class CacheConfig { public class CacheConfig {
private static final Log LOG = LogFactory.getLog(CacheConfig.class.getName()); private static final Log LOG = LogFactory.getLog(CacheConfig.class.getName());
/**
* Configuration key to cache block policy (Lru, TinyLfu).
*/
public static final String HFILE_BLOCK_CACHE_POLICY_KEY = "hfile.block.cache.policy";
public static final String HFILE_BLOCK_CACHE_POLICY_DEFAULT = "LRU";
/** /**
* Configuration key to cache data blocks on read. Bloom blocks and index blocks are always be * Configuration key to cache data blocks on read. Bloom blocks and index blocks are always be
* cached if the block cache is enabled. * cached if the block cache is enabled.
@ -96,7 +103,7 @@ public class CacheConfig {
* is an in-memory map that needs to be persisted across restarts. Where to store this * is an in-memory map that needs to be persisted across restarts. Where to store this
* in-memory state is what you supply here: e.g. <code>/tmp/bucketcache.map</code>. * in-memory state is what you supply here: e.g. <code>/tmp/bucketcache.map</code>.
*/ */
public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY =
"hbase.bucketcache.persistent.path"; "hbase.bucketcache.persistent.path";
/** /**
@ -104,11 +111,11 @@ public class CacheConfig {
* as indices and blooms are kept in the lru blockcache and the data blocks in the * as indices and blooms are kept in the lru blockcache and the data blocks in the
* bucket cache). * bucket cache).
*/ */
public static final String BUCKET_CACHE_COMBINED_KEY = public static final String BUCKET_CACHE_COMBINED_KEY =
"hbase.bucketcache.combinedcache.enabled"; "hbase.bucketcache.combinedcache.enabled";
public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads"; public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads";
public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = public static final String BUCKET_CACHE_WRITER_QUEUE_KEY =
"hbase.bucketcache.writer.queuelength"; "hbase.bucketcache.writer.queuelength";
/** /**
@ -154,6 +161,7 @@ public class CacheConfig {
memcached("org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache"); memcached("org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache");
// TODO(eclark): Consider more. Redis, etc. // TODO(eclark): Consider more. Redis, etc.
Class<? extends BlockCache> clazz; Class<? extends BlockCache> clazz;
@SuppressWarnings("unchecked")
ExternalBlockCaches(String clazzName) { ExternalBlockCaches(String clazzName) {
try { try {
clazz = (Class<? extends BlockCache>) Class.forName(clazzName); clazz = (Class<? extends BlockCache>) Class.forName(clazzName);
@ -449,7 +457,9 @@ public class CacheConfig {
* @return true if this {@link BlockCategory} should be compressed in blockcache, false otherwise * @return true if this {@link BlockCategory} should be compressed in blockcache, false otherwise
*/ */
public boolean shouldCacheCompressed(BlockCategory category) { public boolean shouldCacheCompressed(BlockCategory category) {
if (!isBlockCacheEnabled()) return false; if (!isBlockCacheEnabled()) {
return false;
}
switch (category) { switch (category) {
case DATA: case DATA:
return this.cacheDataOnRead && this.cacheDataCompressed; return this.cacheDataOnRead && this.cacheDataCompressed;
@ -531,13 +541,13 @@ public class CacheConfig {
// Clear this if in tests you'd make more than one block cache instance. // Clear this if in tests you'd make more than one block cache instance.
@VisibleForTesting @VisibleForTesting
static BlockCache GLOBAL_BLOCK_CACHE_INSTANCE; static BlockCache GLOBAL_BLOCK_CACHE_INSTANCE;
private static LruBlockCache GLOBAL_L1_CACHE_INSTANCE; private static FirstLevelBlockCache GLOBAL_L1_CACHE_INSTANCE;
/** Boolean whether we have disabled the block cache entirely. */ /** Boolean whether we have disabled the block cache entirely. */
@VisibleForTesting @VisibleForTesting
static boolean blockCacheDisabled = false; static boolean blockCacheDisabled = false;
static long getLruCacheSize(final Configuration conf, final long xmx) { static long getFirstLevelCacheSize(final Configuration conf, final long xmx) {
float cachePercentage = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, float cachePercentage = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY,
HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
if (cachePercentage <= 0.0001f) { if (cachePercentage <= 0.0001f) {
@ -555,26 +565,37 @@ public class CacheConfig {
/** /**
* @param c Configuration to use. * @param c Configuration to use.
* @return An L1 instance. Currently an instance of LruBlockCache. * @return An L1 instance
*/ */
public static LruBlockCache getL1(final Configuration c) { public static FirstLevelBlockCache getL1(final Configuration c) {
return getL1(c, ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()); long xmx = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
long l1CacheSize = getFirstLevelCacheSize(c, xmx);
return getL1(l1CacheSize, c);
} }
/** /**
* @param c Configuration to use. * @param c Configuration to use.
* @param xmx Max heap memory * @param xmx Max heap memory
* @return An L1 instance. Currently an instance of LruBlockCache. * @return An L1 instance.
*/ */
private synchronized static LruBlockCache getL1(final Configuration c, final long xmx) { private synchronized static FirstLevelBlockCache getL1(long cacheSize, Configuration c) {
if (GLOBAL_L1_CACHE_INSTANCE != null) return GLOBAL_L1_CACHE_INSTANCE; if (GLOBAL_L1_CACHE_INSTANCE != null) return GLOBAL_L1_CACHE_INSTANCE;
if (blockCacheDisabled) return null; if (blockCacheDisabled) return null;
long lruCacheSize = getLruCacheSize(c, xmx); if (cacheSize < 0) return null;
if (lruCacheSize < 0) return null;
String policy = c.get(HFILE_BLOCK_CACHE_POLICY_KEY, HFILE_BLOCK_CACHE_POLICY_DEFAULT);
int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE);
LOG.info("Allocating LruBlockCache size=" + LOG.info("Allocating BlockCache size=" +
StringUtils.byteDesc(lruCacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); StringUtils.byteDesc(cacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize));
GLOBAL_L1_CACHE_INSTANCE = new LruBlockCache(lruCacheSize, blockSize, true, c);
if (policy.equalsIgnoreCase("LRU")) {
GLOBAL_L1_CACHE_INSTANCE = new LruBlockCache(cacheSize, blockSize, true, c);
} else if (policy.equalsIgnoreCase("TinyLFU")) {
GLOBAL_L1_CACHE_INSTANCE = new TinyLfuBlockCache(
cacheSize, blockSize, ForkJoinPool.commonPool(), c);
} else {
throw new IllegalArgumentException("Unknown policy: " + policy);
}
return GLOBAL_L1_CACHE_INSTANCE; return GLOBAL_L1_CACHE_INSTANCE;
} }
@ -601,7 +622,7 @@ public class CacheConfig {
} }
private static BlockCache getExternalBlockcache(Configuration c) { private static BlockCache getExternalBlockcache(Configuration c) {
Class klass = null; Class<?> klass = null;
// Get the class, from the config. s // Get the class, from the config. s
try { try {
@ -629,7 +650,9 @@ public class CacheConfig {
private static BlockCache getBucketCache(Configuration c, long xmx) { private static BlockCache getBucketCache(Configuration c, long xmx) {
// Check for L2. ioengine name must be non-null. // Check for L2. ioengine name must be non-null.
String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null); String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null);
if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) return null; if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) {
return null;
}
int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE);
float bucketCachePercentage = c.getFloat(BUCKET_CACHE_SIZE_KEY, 0F); float bucketCachePercentage = c.getFloat(BUCKET_CACHE_SIZE_KEY, 0F);
@ -679,29 +702,37 @@ public class CacheConfig {
* @return The block cache or <code>null</code>. * @return The block cache or <code>null</code>.
*/ */
public static synchronized BlockCache instantiateBlockCache(Configuration conf) { public static synchronized BlockCache instantiateBlockCache(Configuration conf) {
if (GLOBAL_BLOCK_CACHE_INSTANCE != null) return GLOBAL_BLOCK_CACHE_INSTANCE; if (GLOBAL_BLOCK_CACHE_INSTANCE != null) {
if (blockCacheDisabled) return null; return GLOBAL_BLOCK_CACHE_INSTANCE;
}
if (blockCacheDisabled) {
return null;
}
// blockCacheDisabled is set as a side-effect of getFirstLevelCacheSize()
// so check it again after the call
long xmx = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); long xmx = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
LruBlockCache l1 = getL1(conf, xmx); long l1CacheSize = getFirstLevelCacheSize(conf, xmx);
// blockCacheDisabled is set as a side-effect of getL1(), so check it again after the call. if (blockCacheDisabled) {
if (blockCacheDisabled) return null; return null;
}
BlockCache l2 = getL2(conf, xmx); BlockCache l2 = getL2(conf, xmx);
FirstLevelBlockCache l1 = getL1(l1CacheSize, conf);
if (l2 == null) { if (l2 == null) {
GLOBAL_BLOCK_CACHE_INSTANCE = l1; GLOBAL_BLOCK_CACHE_INSTANCE = l1;
} else { } else {
boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT); boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT);
boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, boolean combinedWithL1 = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY,
DEFAULT_BUCKET_CACHE_COMBINED); DEFAULT_BUCKET_CACHE_COMBINED);
if (useExternal) { if (useExternal) {
GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2); GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2);
} else { } else {
if (combinedWithLru) { if (combinedWithL1) {
GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2); GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2);
} else { } else {
// L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler // L1 and L2 are not 'combined'. They are connected via the FirstLevelBlockCache
// mechanism. It is a little ugly but works according to the following: when the // victimhandler mechanism. It is a little ugly but works according to the following:
// background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get // when the background eviction thread runs, blocks evicted from L1 will go to L2 AND when
// a block from the L1 cache, if not in L1, we will search L2. // we get a block from the L1 cache, if not in L1, we will search L2.
GLOBAL_BLOCK_CACHE_INSTANCE = l1; GLOBAL_BLOCK_CACHE_INSTANCE = l1;
} }
} }

View File

@ -30,24 +30,24 @@ import com.google.common.annotations.VisibleForTesting;
/** /**
* CombinedBlockCache is an abstraction layer that combines * CombinedBlockCache is an abstraction layer that combines
* {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used * {@link FirstLevelBlockCache} and {@link BucketCache}. The smaller lruCache is used
* to cache bloom blocks and index blocks. The larger l2Cache is used to * to cache bloom blocks and index blocks. The larger l2Cache is used to
* cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean, boolean)} reads * cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean, boolean)} reads
* first from the smaller lruCache before looking for the block in the l2Cache. Blocks evicted * first from the smaller l1Cache before looking for the block in the l2Cache. Blocks evicted
* from lruCache are put into the bucket cache. * from l1Cache are put into the bucket cache.
* Metrics are the combined size and hits and misses of both caches. * Metrics are the combined size and hits and misses of both caches.
* *
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class CombinedBlockCache implements ResizableBlockCache, HeapSize { public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
protected final LruBlockCache lruCache; protected final FirstLevelBlockCache l1Cache;
protected final BlockCache l2Cache; protected final BlockCache l2Cache;
protected final CombinedCacheStats combinedCacheStats; protected final CombinedCacheStats combinedCacheStats;
public CombinedBlockCache(LruBlockCache lruCache, BlockCache l2Cache) { public CombinedBlockCache(FirstLevelBlockCache l1Cache, BlockCache l2Cache) {
this.lruCache = lruCache; this.l1Cache = l1Cache;
this.l2Cache = l2Cache; this.l2Cache = l2Cache;
this.combinedCacheStats = new CombinedCacheStats(lruCache.getStats(), this.combinedCacheStats = new CombinedCacheStats(l1Cache.getStats(),
l2Cache.getStats()); l2Cache.getStats());
} }
@ -57,7 +57,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
if (l2Cache instanceof HeapSize) { if (l2Cache instanceof HeapSize) {
l2size = ((HeapSize) l2Cache).heapSize(); l2size = ((HeapSize) l2Cache).heapSize();
} }
return lruCache.heapSize() + l2size; return l1Cache.heapSize() + l2size;
} }
@Override @Override
@ -65,7 +65,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
final boolean cacheDataInL1) { final boolean cacheDataInL1) {
boolean metaBlock = buf.getBlockType().getCategory() != BlockCategory.DATA; boolean metaBlock = buf.getBlockType().getCategory() != BlockCategory.DATA;
if (metaBlock || cacheDataInL1) { if (metaBlock || cacheDataInL1) {
lruCache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1); l1Cache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1);
} else { } else {
l2Cache.cacheBlock(cacheKey, buf, inMemory, false); l2Cache.cacheBlock(cacheKey, buf, inMemory, false);
} }
@ -81,19 +81,19 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
boolean repeat, boolean updateCacheMetrics) { boolean repeat, boolean updateCacheMetrics) {
// TODO: is there a hole here, or just awkwardness since in the lruCache getBlock // TODO: is there a hole here, or just awkwardness since in the lruCache getBlock
// we end up calling l2Cache.getBlock. // we end up calling l2Cache.getBlock.
return lruCache.containsBlock(cacheKey)? return l1Cache.containsBlock(cacheKey)?
lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics): l1Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics):
l2Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); l2Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
} }
@Override @Override
public boolean evictBlock(BlockCacheKey cacheKey) { public boolean evictBlock(BlockCacheKey cacheKey) {
return lruCache.evictBlock(cacheKey) || l2Cache.evictBlock(cacheKey); return l1Cache.evictBlock(cacheKey) || l2Cache.evictBlock(cacheKey);
} }
@Override @Override
public int evictBlocksByHfileName(String hfileName) { public int evictBlocksByHfileName(String hfileName) {
return lruCache.evictBlocksByHfileName(hfileName) return l1Cache.evictBlocksByHfileName(hfileName)
+ l2Cache.evictBlocksByHfileName(hfileName); + l2Cache.evictBlocksByHfileName(hfileName);
} }
@ -104,28 +104,28 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
@Override @Override
public void shutdown() { public void shutdown() {
lruCache.shutdown(); l1Cache.shutdown();
l2Cache.shutdown(); l2Cache.shutdown();
} }
@Override @Override
public long size() { public long size() {
return lruCache.size() + l2Cache.size(); return l1Cache.size() + l2Cache.size();
} }
@Override @Override
public long getFreeSize() { public long getFreeSize() {
return lruCache.getFreeSize() + l2Cache.getFreeSize(); return l1Cache.getFreeSize() + l2Cache.getFreeSize();
} }
@Override @Override
public long getCurrentSize() { public long getCurrentSize() {
return lruCache.getCurrentSize() + l2Cache.getCurrentSize(); return l1Cache.getCurrentSize() + l2Cache.getCurrentSize();
} }
@Override @Override
public long getBlockCount() { public long getBlockCount() {
return lruCache.getBlockCount() + l2Cache.getBlockCount(); return l1Cache.getBlockCount() + l2Cache.getBlockCount();
} }
public static class CombinedCacheStats extends CacheStats { public static class CombinedCacheStats extends CacheStats {
@ -310,7 +310,7 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
lruCacheStats.rollMetricsPeriod(); lruCacheStats.rollMetricsPeriod();
bucketCacheStats.rollMetricsPeriod(); bucketCacheStats.rollMetricsPeriod();
} }
@Override @Override
public long getFailedInserts() { public long getFailedInserts() {
return lruCacheStats.getFailedInserts() + bucketCacheStats.getFailedInserts(); return lruCacheStats.getFailedInserts() + bucketCacheStats.getFailedInserts();
@ -321,13 +321,13 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
return lruCacheStats.getSumHitCountsPastNPeriods() return lruCacheStats.getSumHitCountsPastNPeriods()
+ bucketCacheStats.getSumHitCountsPastNPeriods(); + bucketCacheStats.getSumHitCountsPastNPeriods();
} }
@Override @Override
public long getSumRequestCountsPastNPeriods() { public long getSumRequestCountsPastNPeriods() {
return lruCacheStats.getSumRequestCountsPastNPeriods() return lruCacheStats.getSumRequestCountsPastNPeriods()
+ bucketCacheStats.getSumRequestCountsPastNPeriods(); + bucketCacheStats.getSumRequestCountsPastNPeriods();
} }
@Override @Override
public long getSumHitCachingCountsPastNPeriods() { public long getSumHitCachingCountsPastNPeriods() {
return lruCacheStats.getSumHitCachingCountsPastNPeriods() return lruCacheStats.getSumHitCachingCountsPastNPeriods()
@ -348,12 +348,12 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
@Override @Override
public BlockCache[] getBlockCaches() { public BlockCache[] getBlockCaches() {
return new BlockCache [] {this.lruCache, this.l2Cache}; return new BlockCache [] {this.l1Cache, this.l2Cache};
} }
@Override @Override
public void setMaxSize(long size) { public void setMaxSize(long size) {
this.lruCache.setMaxSize(size); this.l1Cache.setMaxSize(size);
} }
@Override @Override

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
/**
* In-memory BlockCache that may be backed by secondary layer(s).
*/
@InterfaceAudience.Private
public interface FirstLevelBlockCache extends ResizableBlockCache, HeapSize {
/**
* Whether the cache contains the block with specified cacheKey
*
* @param cacheKey
* @return true if it contains the block
*/
boolean containsBlock(BlockCacheKey cacheKey);
/**
* Specifies the secondary cache. An entry that is evicted from this cache due to a size
* constraint will be inserted into the victim cache.
*
* @param victimCache the second level cache
* @throws IllegalArgumentException if the victim cache had already been set
*/
void setVictimCache(BlockCache victimCache);
}

View File

@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class InclusiveCombinedBlockCache extends CombinedBlockCache implements BlockCache { public class InclusiveCombinedBlockCache extends CombinedBlockCache implements BlockCache {
public InclusiveCombinedBlockCache(LruBlockCache l1, BlockCache l2) { public InclusiveCombinedBlockCache(FirstLevelBlockCache l1, BlockCache l2) {
super(l1,l2); super(l1,l2);
} }
@ -34,7 +34,7 @@ public class InclusiveCombinedBlockCache extends CombinedBlockCache implements B
// On all external cache set ups the lru should have the l2 cache set as the victimHandler // On all external cache set ups the lru should have the l2 cache set as the victimHandler
// Because of that all requests that miss inside of the lru block cache will be // Because of that all requests that miss inside of the lru block cache will be
// tried in the l2 block cache. // tried in the l2 block cache.
return lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics); return l1Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
} }
/** /**
@ -50,7 +50,7 @@ public class InclusiveCombinedBlockCache extends CombinedBlockCache implements B
final boolean cacheDataInL1) { final boolean cacheDataInL1) {
// This is the inclusive part of the combined block cache. // This is the inclusive part of the combined block cache.
// Every block is placed into both block caches. // Every block is placed into both block caches.
lruCache.cacheBlock(cacheKey, buf, inMemory, true); l1Cache.cacheBlock(cacheKey, buf, inMemory, true);
// This assumes that insertion into the L2 block cache is either async or very fast. // This assumes that insertion into the L2 block cache is either async or very fast.
l2Cache.cacheBlock(cacheKey, buf, inMemory, true); l2Cache.cacheBlock(cacheKey, buf, inMemory, true);

View File

@ -18,6 +18,8 @@
*/ */
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import static java.util.Objects.requireNonNull;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.EnumMap; import java.util.EnumMap;
@ -57,15 +59,15 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* {@link ConcurrentHashMap} and with a non-blocking eviction thread giving * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
* constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p> * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
* *
* Contains three levels of block priority to allow for scan-resistance and in-memory families * Contains three levels of block priority to allow for scan-resistance and in-memory families
* {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column
* family is a column family that should be served from memory if possible): * family is a column family that should be served from memory if possible):
* single-access, multiple-accesses, and in-memory priority. * single-access, multiple-accesses, and in-memory priority.
* A block is added with an in-memory priority flag if * A block is added with an in-memory priority flag if
* {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a
* single access priority the first time it is read into this block cache. If a block is * single access priority the first time it is read into this block cache. If a block is
* accessed again while in cache, it is marked as a multiple access priority block. This * accessed again while in cache, it is marked as a multiple access priority block. This
* delineation of blocks is used to prevent scans from thrashing the cache adding a * delineation of blocks is used to prevent scans from thrashing the cache adding a
* least-frequently-used element to the eviction algorithm.<p> * least-frequently-used element to the eviction algorithm.<p>
* *
* Each priority is given its own chunk of the total cache to ensure * Each priority is given its own chunk of the total cache to ensure
@ -95,7 +97,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@JsonIgnoreProperties({"encodingCountsForTest"}) @JsonIgnoreProperties({"encodingCountsForTest"})
public class LruBlockCache implements ResizableBlockCache, HeapSize { public class LruBlockCache implements FirstLevelBlockCache {
private static final Log LOG = LogFactory.getLog(LruBlockCache.class); private static final Log LOG = LogFactory.getLog(LruBlockCache.class);
@ -238,8 +240,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
DEFAULT_MEMORY_FACTOR, DEFAULT_MEMORY_FACTOR,
DEFAULT_HARD_CAPACITY_LIMIT_FACTOR, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
false, false,
DEFAULT_MAX_BLOCK_SIZE DEFAULT_MAX_BLOCK_SIZE);
);
} }
public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) { public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
@ -254,8 +255,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR), conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR), conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE), conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE) conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE));
);
} }
public LruBlockCache(long maxSize, long blockSize, Configuration conf) { public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
@ -321,6 +321,14 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
} }
@Override
public void setVictimCache(BlockCache victimCache) {
if (victimHandler != null) {
throw new IllegalArgumentException("The victim cache has already been set");
}
victimHandler = requireNonNull(victimCache);
}
@Override @Override
public void setMaxSize(long maxSize) { public void setMaxSize(long maxSize) {
this.maxSize = maxSize; this.maxSize = maxSize;
@ -434,6 +442,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
* @param cacheKey block's cache key * @param cacheKey block's cache key
* @param buf block buffer * @param buf block buffer
*/ */
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
cacheBlock(cacheKey, buf, false, false); cacheBlock(cacheKey, buf, false, false);
} }
@ -498,6 +507,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
* @param cacheKey * @param cacheKey
* @return true if contains the block * @return true if contains the block
*/ */
@Override
public boolean containsBlock(BlockCacheKey cacheKey) { public boolean containsBlock(BlockCacheKey cacheKey) {
return map.containsKey(cacheKey); return map.containsKey(cacheKey);
} }
@ -785,6 +795,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
return totalSize; return totalSize;
} }
@Override
public int compareTo(BlockBucket that) { public int compareTo(BlockBucket that) {
return Long.compare(this.overflow(), that.overflow()); return Long.compare(this.overflow(), that.overflow());
} }
@ -946,6 +957,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
* <p>Includes: total accesses, hits, misses, evicted blocks, and runs * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
* of the eviction processes. * of the eviction processes.
*/ */
@Override
public CacheStats getStats() { public CacheStats getStats() {
return this.stats; return this.stats;
} }
@ -1074,6 +1086,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor); return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
} }
@Override
public void shutdown() { public void shutdown() {
if (victimHandler != null) if (victimHandler != null)
victimHandler.shutdown(); victimHandler.shutdown();
@ -1122,7 +1135,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
Map<BlockType, Integer> counts = Map<BlockType, Integer> counts =
new EnumMap<BlockType, Integer>(BlockType.class); new EnumMap<BlockType, Integer>(BlockType.class);
for (LruCachedBlock cb : map.values()) { for (LruCachedBlock cb : map.values()) {
BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType(); BlockType blockType = cb.getBuffer().getBlockType();
Integer count = counts.get(blockType); Integer count = counts.get(blockType);
counts.put(blockType, (count == null ? 0 : count) + 1); counts.put(blockType, (count == null ? 0 : count) + 1);
} }
@ -1142,11 +1155,6 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
return counts; return counts;
} }
public void setVictimCache(BlockCache handler) {
assert victimHandler == null;
victimHandler = handler;
}
@VisibleForTesting @VisibleForTesting
Map<BlockCacheKey, LruCachedBlock> getMapForTests() { Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
return map; return map;

View File

@ -0,0 +1,402 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static java.util.Objects.requireNonNull;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.util.StringUtils;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Policy.Eviction;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A block cache that is memory-aware using {@link HeapSize}, memory bounded using the W-TinyLFU
* eviction algorithm, and concurrent. This implementation delegates to a Caffeine cache to provide
* O(1) read and write operations.
* <ul>
* <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li>
* <li>Caffeine: https://github.com/ben-manes/caffeine</li>
* <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li>
* </ul>
*/
@InterfaceAudience.Private
public final class TinyLfuBlockCache implements FirstLevelBlockCache {
private static final Log LOG = LogFactory.getLog(TinyLfuBlockCache.class);
private static final String MAX_BLOCK_SIZE = "hbase.tinylfu.max.block.size";
private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
private static final int STAT_THREAD_PERIOD_SECONDS = 5 * 60;
private final Eviction<BlockCacheKey, Cacheable> policy;
private final ScheduledExecutorService statsThreadPool;
private final long maxBlockSize;
private final CacheStats stats;
private BlockCache victimCache;
@VisibleForTesting
final Cache<BlockCacheKey, Cacheable> cache;
/**
* Creates a block cache.
*
* @param maximumSizeInBytes maximum size of this cache, in bytes
* @param avgBlockSize expected average size of blocks, in bytes
* @param executor the cache's executor
* @param conf additional configuration
*/
public TinyLfuBlockCache(long maximumSizeInBytes, long avgBlockSize,
Executor executor, Configuration conf) {
this(maximumSizeInBytes, avgBlockSize,
conf.getLong(MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE), executor);
}
/**
* Creates a block cache.
*
* @param maximumSizeInBytes maximum size of this cache, in bytes
* @param avgBlockSize expected average size of blocks, in bytes
* @param maxBlockSize maximum size of a block, in bytes
* @param executor the cache's executor
*/
public TinyLfuBlockCache(long maximumSizeInBytes,
long avgBlockSize, long maxBlockSize, Executor executor) {
this.cache = Caffeine.newBuilder()
.executor(executor)
.maximumWeight(maximumSizeInBytes)
.removalListener(new EvictionListener())
.weigher((BlockCacheKey key, Cacheable value) ->
(int) Math.min(value.heapSize(), Integer.MAX_VALUE))
.initialCapacity((int) Math.ceil((1.2 * maximumSizeInBytes) / avgBlockSize))
.build();
this.maxBlockSize = maxBlockSize;
this.policy = cache.policy().eviction().get();
this.stats = new CacheStats(getClass().getSimpleName());
statsThreadPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
.setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true).build());
statsThreadPool.scheduleAtFixedRate(this::logStats,
STAT_THREAD_PERIOD_SECONDS, STAT_THREAD_PERIOD_SECONDS, TimeUnit.SECONDS);
}
@Override
public void setVictimCache(BlockCache victimCache) {
if (this.victimCache != null) {
throw new IllegalArgumentException("The victim cache has already been set");
}
this.victimCache = requireNonNull(victimCache);
}
@Override
public long size() {
return policy.getMaximum();
}
@Override
public long getFreeSize() {
return size() - getCurrentSize();
}
@Override
public long getCurrentSize() {
return policy.weightedSize().getAsLong();
}
@Override
public long getBlockCount() {
return cache.estimatedSize();
}
@Override
public long heapSize() {
return getCurrentSize();
}
@Override
public void setMaxSize(long size) {
policy.setMaximum(size);
}
@Override
public boolean containsBlock(BlockCacheKey cacheKey) {
return cache.asMap().containsKey(cacheKey);
}
@Override
public Cacheable getBlock(BlockCacheKey cacheKey,
boolean caching, boolean repeat, boolean updateCacheMetrics) {
Cacheable value = cache.getIfPresent(cacheKey);
if (value == null) {
if (repeat) {
return null;
}
if (updateCacheMetrics) {
stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
}
if (victimCache != null) {
value = victimCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
if ((value != null) && caching) {
if ((value instanceof HFileBlock) && ((HFileBlock) value).usesSharedMemory()) {
value = ((HFileBlock) value).deepClone();
}
cacheBlock(cacheKey, value);
}
}
} else if (updateCacheMetrics) {
stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
}
return value;
}
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable value,
boolean inMemory, boolean cacheDataInL1) {
cacheBlock(cacheKey, value);
}
@Override
public void cacheBlock(BlockCacheKey key, Cacheable value) {
if (value.heapSize() > maxBlockSize) {
// If there are a lot of blocks that are too big this can make the logs too noisy (2% logged)
if (stats.failInsert() % 50 == 0) {
LOG.warn(String.format(
"Trying to cache too large a block %s @ %,d is %,d which is larger than %,d",
key.getHfileName(), key.getOffset(), value.heapSize(), DEFAULT_MAX_BLOCK_SIZE));
}
} else {
cache.put(key, value);
}
}
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
Cacheable value = cache.asMap().remove(cacheKey);
return (value != null);
}
@Override
public int evictBlocksByHfileName(String hfileName) {
int evicted = 0;
for (BlockCacheKey key : cache.asMap().keySet()) {
if (key.getHfileName().equals(hfileName) && evictBlock(key)) {
evicted++;
}
}
if (victimCache != null) {
evicted += victimCache.evictBlocksByHfileName(hfileName);
}
return evicted;
}
@Override
public CacheStats getStats() {
return stats;
}
@Override
public void shutdown() {
if (victimCache != null) {
victimCache.shutdown();
}
statsThreadPool.shutdown();
}
@Override
public BlockCache[] getBlockCaches() {
return null;
}
@Override
public Iterator<CachedBlock> iterator() {
long now = System.nanoTime();
return cache.asMap().entrySet().stream()
.map(entry -> (CachedBlock) new CachedBlockView(entry.getKey(), entry.getValue(), now))
.iterator();
}
@Override
public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
// There is no SHARED type here in L1. But the block might have been served from the L2 victim
// cache (when the Combined mode = false). So just try return this block to the victim cache.
// Note : In case of CombinedBlockCache we will have this victim cache configured for L1
// cache. But CombinedBlockCache will only call returnBlock on L2 cache.
if (victimCache != null) {
victimCache.returnBlock(cacheKey, block);
}
}
private void logStats() {
LOG.info(
"totalSize=" + StringUtils.byteDesc(heapSize()) + ", " +
"freeSize=" + StringUtils.byteDesc(getFreeSize()) + ", " +
"max=" + StringUtils.byteDesc(size()) + ", " +
"blockCount=" + getBlockCount() + ", " +
"accesses=" + stats.getRequestCount() + ", " +
"hits=" + stats.getHitCount() + ", " +
"hitRatio=" + (stats.getHitCount() == 0 ?
"0," : StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ") +
"cachingAccesses=" + stats.getRequestCachingCount() + ", " +
"cachingHits=" + stats.getHitCachingCount() + ", " +
"cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
"0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
"evictions=" + stats.getEvictionCount() + ", " +
"evicted=" + stats.getEvictedCount());
}
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("blockCount", getBlockCount())
.add("currentSize", getCurrentSize())
.add("freeSize", getFreeSize())
.add("maxSize", size())
.add("heapSize", heapSize())
.add("victimCache", (victimCache != null))
.toString();
}
/** A removal listener to asynchronously record evictions and populate the victim cache. */
private final class EvictionListener implements RemovalListener<BlockCacheKey, Cacheable> {
@Override
public void onRemoval(BlockCacheKey key, Cacheable value, RemovalCause cause) {
if (!cause.wasEvicted()) {
// An explicit eviction (invalidation) is not added to the victim cache as the data may
// no longer be valid for subsequent queries.
return;
}
recordEviction();
if (victimCache == null) {
return;
} else if (victimCache instanceof BucketCache) {
BucketCache victimBucketCache = (BucketCache) victimCache;
victimBucketCache.cacheBlockWithWait(key, value, /* inMemory */ true, /* wait */ true);
} else {
victimCache.cacheBlock(key, value);
}
}
}
/**
* Records an eviction. The number of eviction operations and evicted blocks are identical, as
* an eviction is triggered immediately when the capacity has been exceeded. An eviction is
* performed asynchronously. See the library's documentation for details on write buffers,
* batching, and maintenance behavior.
*/
private void recordEviction() {
// FIXME: Currently does not capture the insertion time
stats.evicted(Long.MAX_VALUE, true);
stats.evict();
}
private static final class CachedBlockView implements CachedBlock {
private static final Comparator<CachedBlock> COMPARATOR = Comparator
.comparing(CachedBlock::getFilename)
.thenComparing(CachedBlock::getOffset)
.thenComparing(CachedBlock::getCachedTime);
private final BlockCacheKey key;
private final Cacheable value;
private final long now;
public CachedBlockView(BlockCacheKey key, Cacheable value, long now) {
this.now = now;
this.key = key;
this.value = value;
}
@Override
public BlockPriority getBlockPriority() {
// This does not appear to be used in any meaningful way and is irrelevant to this cache
return BlockPriority.MEMORY;
}
@Override
public BlockType getBlockType() {
return value.getBlockType();
}
@Override
public long getOffset() {
return key.getOffset();
}
@Override
public long getSize() {
return value.heapSize();
}
@Override
public long getCachedTime() {
// This does not appear to be used in any meaningful way, so not captured
return 0L;
}
@Override
public String getFilename() {
return key.getHfileName();
}
@Override
public int compareTo(CachedBlock other) {
return COMPARATOR.compare(this, other);
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (!(obj instanceof CachedBlock)) {
return false;
}
CachedBlock other = (CachedBlock) obj;
return compareTo(other) == 0;
}
@Override
public int hashCode() {
return key.hashCode();
}
@Override
public String toString() {
return BlockCacheUtil.toString(this, now);
}
}
}

View File

@ -88,11 +88,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* *
* <p>BucketCache can be used as mainly a block cache (see * <p>BucketCache can be used as mainly a block cache (see
* {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with * {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with
* LruBlockCache to decrease CMS GC and heap fragmentation. * a BlockCache to decrease CMS GC and heap fragmentation.
* *
* <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store * <p>It also can be used as a secondary cache (e.g. using a file on ssd/fusionio to store
* blocks) to enlarge cache space via * blocks) to enlarge cache space via a victim cache.
* {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache}
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class BucketCache implements BlockCache, HeapSize { public class BucketCache implements BlockCache, HeapSize {

View File

@ -37,12 +37,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -328,7 +328,7 @@ public class TestCacheConfig {
BlockCache [] bcs = cbc.getBlockCaches(); BlockCache [] bcs = cbc.getBlockCaches();
assertTrue(bcs[0] instanceof LruBlockCache); assertTrue(bcs[0] instanceof LruBlockCache);
LruBlockCache lbc = (LruBlockCache)bcs[0]; LruBlockCache lbc = (LruBlockCache)bcs[0];
assertEquals(CacheConfig.getLruCacheSize(this.conf, assertEquals(CacheConfig.getFirstLevelCacheSize(this.conf,
ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()), lbc.getMaxSize()); ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()), lbc.getMaxSize());
assertTrue(bcs[1] instanceof BucketCache); assertTrue(bcs[1] instanceof BucketCache);
BucketCache bc = (BucketCache)bcs[1]; BucketCache bc = (BucketCache)bcs[1];
@ -347,7 +347,7 @@ public class TestCacheConfig {
// from L1 happens, it does not fail because L2 can't take the eviction because block too big. // from L1 happens, it does not fail because L2 can't take the eviction because block too big.
this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.001f); this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.001f);
MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
long lruExpectedSize = CacheConfig.getLruCacheSize(this.conf, mu.getMax()); long lruExpectedSize = CacheConfig.getFirstLevelCacheSize(this.conf, mu.getMax());
final int bcSize = 100; final int bcSize = 100;
long bcExpectedSize = 100 * 1024 * 1024; // MB. long bcExpectedSize = 100 * 1024 * 1024; // MB.
assertTrue(lruExpectedSize < bcExpectedSize); assertTrue(lruExpectedSize < bcExpectedSize);

View File

@ -0,0 +1,304 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ClassSize;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Tests the concurrent TinyLfuBlockCache.
*/
@Category({IOTests.class, SmallTests.class})
public class TestTinyLfuBlockCache {
@Test
public void testCacheSimple() throws Exception {
long maxSize = 1000000;
long blockSize = calculateBlockSizeDefault(maxSize, 101);
TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, blockSize, blockSize, Runnable::run);
CachedItem [] blocks = generateRandomBlocks(100, blockSize);
long expectedCacheSize = cache.heapSize();
// Confirm empty
for (CachedItem block : blocks) {
assertTrue(cache.getBlock(block.cacheKey, true, false, true) == null);
}
// Add blocks
for (CachedItem block : blocks) {
cache.cacheBlock(block.cacheKey, block);
expectedCacheSize += block.heapSize();
}
// Verify correctly calculated cache heap size
assertEquals(expectedCacheSize, cache.heapSize());
// Check if all blocks are properly cached and retrieved
for (CachedItem block : blocks) {
HeapSize buf = cache.getBlock(block.cacheKey, true, false, true);
assertTrue(buf != null);
assertEquals(buf.heapSize(), block.heapSize());
}
// Re-add same blocks and ensure nothing has changed
long expectedBlockCount = cache.getBlockCount();
for (CachedItem block : blocks) {
cache.cacheBlock(block.cacheKey, block);
}
assertEquals(
"Cache should ignore cache requests for blocks already in cache",
expectedBlockCount, cache.getBlockCount());
// Verify correctly calculated cache heap size
assertEquals(expectedCacheSize, cache.heapSize());
// Check if all blocks are properly cached and retrieved
for (CachedItem block : blocks) {
HeapSize buf = cache.getBlock(block.cacheKey, true, false, true);
assertTrue(buf != null);
assertEquals(buf.heapSize(), block.heapSize());
}
// Expect no evictions
assertEquals(0, cache.getStats().getEvictionCount());
}
@Test
public void testCacheEvictionSimple() throws Exception {
long maxSize = 100000;
long blockSize = calculateBlockSizeDefault(maxSize, 10);
TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, blockSize, blockSize, Runnable::run);
CachedItem [] blocks = generateFixedBlocks(11, blockSize, "block");
// Add all the blocks
for (CachedItem block : blocks) {
cache.cacheBlock(block.cacheKey, block);
}
// A single eviction run should have occurred
assertEquals(1, cache.getStats().getEvictionCount());
// The cache did not grow beyond max
assertTrue(cache.heapSize() < maxSize);
// All blocks except one should be in the cache
assertEquals(10, cache.getBlockCount());
}
@Test
public void testScanResistance() throws Exception {
long maxSize = 100000;
long blockSize = calculateBlockSize(maxSize, 10);
TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, blockSize, blockSize, Runnable::run);
CachedItem [] singleBlocks = generateFixedBlocks(20, blockSize, "single");
CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
// Add 5 blocks from each
for(int i=0; i<5; i++) {
cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]);
}
// Add frequency
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 10; j++) {
CachedItem block = multiBlocks[i];
cache.getBlock(block.cacheKey, true, false, true);
}
}
// Let's keep "scanning" by adding single blocks. From here on we only
// expect evictions from the single bucket.
for(int i=5;i<18;i++) {
cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
}
for (CachedItem block : multiBlocks) {
assertTrue(cache.cache.asMap().containsKey(block.cacheKey));
}
assertEquals(10, cache.getBlockCount());
assertEquals(13, cache.getStats().getEvictionCount());
}
@Test
public void testMaxBlockSize() throws Exception {
long maxSize = 100000;
long blockSize = calculateBlockSize(maxSize, 10);
TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, blockSize, blockSize, Runnable::run);
CachedItem [] tooLong = generateFixedBlocks(10, 2 * blockSize, "long");
CachedItem [] small = generateFixedBlocks(15, blockSize / 2, "small");
for (CachedItem i:tooLong) {
cache.cacheBlock(i.cacheKey, i);
}
for (CachedItem i:small) {
cache.cacheBlock(i.cacheKey, i);
}
assertEquals(15,cache.getBlockCount());
for (CachedItem i:small) {
assertNotNull(cache.getBlock(i.cacheKey, true, false, false));
}
for (CachedItem i:tooLong) {
assertNull(cache.getBlock(i.cacheKey, true, false, false));
}
assertEquals(10, cache.getStats().getFailedInserts());
}
@Test
public void testResizeBlockCache() throws Exception {
long maxSize = 100000;
long blockSize = calculateBlockSize(maxSize, 10);
TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, blockSize, blockSize, Runnable::run);
CachedItem [] blocks = generateFixedBlocks(10, blockSize, "block");
for(CachedItem block : blocks) {
cache.cacheBlock(block.cacheKey, block);
}
// Do not expect any evictions yet
assertEquals(10, cache.getBlockCount());
assertEquals(0, cache.getStats().getEvictionCount());
// Resize to half capacity plus an extra block (otherwise we evict an extra)
cache.setMaxSize(maxSize / 2);
// And we expect 1/2 of the blocks to be evicted
assertEquals(5, cache.getBlockCount());
assertEquals(5, cache.getStats().getEvictedCount());
}
private CachedItem [] generateFixedBlocks(int numBlocks, int size, String pfx) {
CachedItem [] blocks = new CachedItem[numBlocks];
for(int i=0;i<numBlocks;i++) {
blocks[i] = new CachedItem(pfx + i, size);
}
return blocks;
}
private CachedItem [] generateFixedBlocks(int numBlocks, long size, String pfx) {
return generateFixedBlocks(numBlocks, (int)size, pfx);
}
private CachedItem [] generateRandomBlocks(int numBlocks, long maxSize) {
CachedItem [] blocks = new CachedItem[numBlocks];
Random r = new Random();
for(int i=0;i<numBlocks;i++) {
blocks[i] = new CachedItem("block" + i, r.nextInt((int)maxSize)+1);
}
return blocks;
}
private long calculateBlockSize(long maxSize, int numBlocks) {
long roughBlockSize = maxSize / numBlocks;
int numEntries = (int)Math.ceil((1.2)*maxSize/roughBlockSize);
long totalOverhead = LruBlockCache.CACHE_FIXED_OVERHEAD +
ClassSize.CONCURRENT_HASHMAP +
(numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
(LruBlockCache.DEFAULT_CONCURRENCY_LEVEL * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
long negateBlockSize = totalOverhead/numEntries;
negateBlockSize += LruCachedBlock.PER_BLOCK_OVERHEAD;
return ClassSize.align((long)Math.floor((roughBlockSize - negateBlockSize)*0.99f));
}
private long calculateBlockSizeDefault(long maxSize, int numBlocks) {
long roughBlockSize = maxSize / numBlocks;
int numEntries = (int)Math.ceil((1.2)*maxSize/roughBlockSize);
long totalOverhead = LruBlockCache.CACHE_FIXED_OVERHEAD +
ClassSize.CONCURRENT_HASHMAP +
(numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
(LruBlockCache.DEFAULT_CONCURRENCY_LEVEL * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
long negateBlockSize = totalOverhead / numEntries;
negateBlockSize += LruCachedBlock.PER_BLOCK_OVERHEAD;
return ClassSize.align((long)Math.floor((roughBlockSize - negateBlockSize)*
LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
}
private static class CachedItem implements Cacheable {
BlockCacheKey cacheKey;
int size;
CachedItem(String blockName, int size) {
this.cacheKey = new BlockCacheKey(blockName, 0);
this.size = size;
}
/** The size of this item reported to the block cache layer */
@Override
public long heapSize() {
return ClassSize.align(size);
}
@Override
public int getSerializedLength() {
return 0;
}
@Override
public CacheableDeserializer<Cacheable> getDeserializer() {
return null;
}
@Override
public void serialize(ByteBuffer destination) {
}
@Override
public BlockType getBlockType() {
return BlockType.DATA;
}
@Override
public MemoryType getMemoryType() {
return MemoryType.EXCLUSIVE;
}
}
}

View File

@ -1198,6 +1198,7 @@
<httpclient.version>4.5.2</httpclient.version> <httpclient.version>4.5.2</httpclient.version>
<httpcore.version>4.4.4</httpcore.version> <httpcore.version>4.4.4</httpcore.version>
<metrics-core.version>3.1.2</metrics-core.version> <metrics-core.version>3.1.2</metrics-core.version>
<caffeine.version>2.3.3</caffeine.version>
<guava.version>12.0.1</guava.version> <guava.version>12.0.1</guava.version>
<jackson.version>1.9.13</jackson.version> <jackson.version>1.9.13</jackson.version>
<jasper.version>5.5.23</jasper.version> <jasper.version>5.5.23</jasper.version>
@ -1481,6 +1482,11 @@
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version> <version>${slf4j.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
<dependency> <dependency>
<groupId>io.dropwizard.metrics</groupId> <groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId> <artifactId>metrics-core</artifactId>