Revert "Code cleanup of LruBlockCache"
Missing JIRA number
This reverts commit ad857d1b77
.
This commit is contained in:
parent
8f8daafee0
commit
f3524a06bb
|
@ -1,4 +1,4 @@
|
||||||
/*
|
/**
|
||||||
*
|
*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
@ -57,16 +57,16 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
* {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
|
* {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
|
||||||
* constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
|
* constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
|
||||||
*
|
*
|
||||||
* Contains three levels of block priority to allow for scan-resistance and in-memory families
|
* Contains three levels of block priority to allow for scan-resistance and in-memory families
|
||||||
* {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column
|
* {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column
|
||||||
* family is a column family that should be served from memory if possible):
|
* family is a column family that should be served from memory if possible):
|
||||||
* single-access, multiple-accesses, and in-memory priority.
|
* single-access, multiple-accesses, and in-memory priority.
|
||||||
* A block is added with an in-memory priority flag if
|
* A block is added with an in-memory priority flag if
|
||||||
* {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a
|
* {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a
|
||||||
* single access priority the first time it is read into this block cache. If a block is
|
* single access priority the first time it is read into this block cache. If a block is
|
||||||
* accessed again while in cache, it is marked as a multiple access priority block. This
|
* accessed again while in cache, it is marked as a multiple access priority block. This
|
||||||
* delineation of blocks is used to prevent scans from thrashing the cache adding a
|
* delineation of blocks is used to prevent scans from thrashing the cache adding a
|
||||||
* least-frequently-used element to the eviction algorithm.<p>
|
* least-frequently-used element to the eviction algorithm.<p>
|
||||||
*
|
*
|
||||||
* Each priority is given its own chunk of the total cache to ensure
|
* Each priority is given its own chunk of the total cache to ensure
|
||||||
* fairness during eviction. Each priority will retain close to its maximum
|
* fairness during eviction. Each priority will retain close to its maximum
|
||||||
|
@ -103,60 +103,55 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
* Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
|
* Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
|
||||||
* evicting during an eviction run till the cache size is down to 80% of the total.
|
* evicting during an eviction run till the cache size is down to 80% of the total.
|
||||||
*/
|
*/
|
||||||
private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
|
static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Acceptable size of cache (no evictions if size < acceptable)
|
* Acceptable size of cache (no evictions if size < acceptable)
|
||||||
*/
|
*/
|
||||||
private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME =
|
static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
|
||||||
"hbase.lru.blockcache.acceptable.factor";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Hard capacity limit of cache, will reject any put if size > this * acceptable
|
* Hard capacity limit of cache, will reject any put if size > this * acceptable
|
||||||
*/
|
*/
|
||||||
static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME =
|
static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.hard.capacity.limit.factor";
|
||||||
"hbase.lru.blockcache.hard.capacity.limit.factor";
|
static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
|
||||||
private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME =
|
static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
|
||||||
"hbase.lru.blockcache.single.percentage";
|
static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
|
||||||
private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME =
|
|
||||||
"hbase.lru.blockcache.multi.percentage";
|
|
||||||
private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME =
|
|
||||||
"hbase.lru.blockcache.memory.percentage";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configuration key to force data-block always (except in-memory are too much)
|
* Configuration key to force data-block always (except in-memory are too much)
|
||||||
* cached in memory for in-memory hfile, unlike inMemory, which is a column-family
|
* cached in memory for in-memory hfile, unlike inMemory, which is a column-family
|
||||||
* configuration, inMemoryForceMode is a cluster-wide configuration
|
* configuration, inMemoryForceMode is a cluster-wide configuration
|
||||||
*/
|
*/
|
||||||
private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME =
|
static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
|
||||||
"hbase.lru.rs.inmemoryforcemode";
|
|
||||||
|
|
||||||
/* Default Configuration Parameters*/
|
/** Default Configuration Parameters*/
|
||||||
|
|
||||||
/* Backing Concurrent Map Configuration */
|
/** Backing Concurrent Map Configuration */
|
||||||
static final float DEFAULT_LOAD_FACTOR = 0.75f;
|
static final float DEFAULT_LOAD_FACTOR = 0.75f;
|
||||||
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
|
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
|
||||||
|
|
||||||
/* Eviction thresholds */
|
/** Eviction thresholds */
|
||||||
private static final float DEFAULT_MIN_FACTOR = 0.95f;
|
static final float DEFAULT_MIN_FACTOR = 0.95f;
|
||||||
static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
|
static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
|
||||||
|
|
||||||
/* Priority buckets */
|
/** Priority buckets */
|
||||||
private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
|
static final float DEFAULT_SINGLE_FACTOR = 0.25f;
|
||||||
private static final float DEFAULT_MULTI_FACTOR = 0.50f;
|
static final float DEFAULT_MULTI_FACTOR = 0.50f;
|
||||||
private static final float DEFAULT_MEMORY_FACTOR = 0.25f;
|
static final float DEFAULT_MEMORY_FACTOR = 0.25f;
|
||||||
|
|
||||||
private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;
|
/** default hard capacity limit */
|
||||||
|
static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;
|
||||||
|
|
||||||
private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
|
static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
|
||||||
|
|
||||||
/* Statistics thread */
|
/** Statistics thread */
|
||||||
private static final int STAT_THREAD_PERIOD = 60 * 5;
|
static final int statThreadPeriod = 60 * 5;
|
||||||
private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
|
private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
|
||||||
private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
|
private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
|
||||||
|
|
||||||
/** Concurrent map (the cache) */
|
/** Concurrent map (the cache) */
|
||||||
private final Map<BlockCacheKey, LruCachedBlock> map;
|
private final Map<BlockCacheKey,LruCachedBlock> map;
|
||||||
|
|
||||||
/** Eviction lock (locked when eviction in process) */
|
/** Eviction lock (locked when eviction in process) */
|
||||||
private final ReentrantLock evictionLock = new ReentrantLock(true);
|
private final ReentrantLock evictionLock = new ReentrantLock(true);
|
||||||
|
@ -223,8 +218,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
*
|
*
|
||||||
* <p>All other factors will be calculated based on defaults specified in
|
* <p>All other factors will be calculated based on defaults specified in
|
||||||
* this class.
|
* this class.
|
||||||
*
|
* @param maxSize maximum size of cache, in bytes
|
||||||
* @param maxSize maximum size of cache, in bytes
|
|
||||||
* @param blockSize approximate size of each block, in bytes
|
* @param blockSize approximate size of each block, in bytes
|
||||||
*/
|
*/
|
||||||
public LruBlockCache(long maxSize, long blockSize) {
|
public LruBlockCache(long maxSize, long blockSize) {
|
||||||
|
@ -236,7 +230,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
*/
|
*/
|
||||||
public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
|
public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
|
||||||
this(maxSize, blockSize, evictionThread,
|
this(maxSize, blockSize, evictionThread,
|
||||||
(int) Math.ceil(1.2 * maxSize / blockSize),
|
(int)Math.ceil(1.2*maxSize/blockSize),
|
||||||
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
|
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
|
||||||
DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
|
DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
|
||||||
DEFAULT_SINGLE_FACTOR,
|
DEFAULT_SINGLE_FACTOR,
|
||||||
|
@ -250,7 +244,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
|
|
||||||
public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
|
public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
|
||||||
this(maxSize, blockSize, evictionThread,
|
this(maxSize, blockSize, evictionThread,
|
||||||
(int) Math.ceil(1.2 * maxSize / blockSize),
|
(int)Math.ceil(1.2*maxSize/blockSize),
|
||||||
DEFAULT_LOAD_FACTOR,
|
DEFAULT_LOAD_FACTOR,
|
||||||
DEFAULT_CONCURRENCY_LEVEL,
|
DEFAULT_CONCURRENCY_LEVEL,
|
||||||
conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
|
conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
|
||||||
|
@ -258,11 +252,10 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
|
conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
|
||||||
conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
|
conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
|
||||||
conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
|
conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
|
||||||
conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME,
|
conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
|
||||||
DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
|
|
||||||
conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
|
conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
|
||||||
conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)
|
conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
|
public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
|
||||||
|
@ -271,18 +264,17 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configurable constructor. Use this constructor if not using defaults.
|
* Configurable constructor. Use this constructor if not using defaults.
|
||||||
*
|
* @param maxSize maximum size of this cache, in bytes
|
||||||
* @param maxSize maximum size of this cache, in bytes
|
* @param blockSize expected average size of blocks, in bytes
|
||||||
* @param blockSize expected average size of blocks, in bytes
|
* @param evictionThread whether to run evictions in a bg thread or not
|
||||||
* @param evictionThread whether to run evictions in a bg thread or not
|
* @param mapInitialSize initial size of backing ConcurrentHashMap
|
||||||
* @param mapInitialSize initial size of backing ConcurrentHashMap
|
* @param mapLoadFactor initial load factor of backing ConcurrentHashMap
|
||||||
* @param mapLoadFactor initial load factor of backing ConcurrentHashMap
|
|
||||||
* @param mapConcurrencyLevel initial concurrency factor for backing CHM
|
* @param mapConcurrencyLevel initial concurrency factor for backing CHM
|
||||||
* @param minFactor percentage of total size that eviction will evict until
|
* @param minFactor percentage of total size that eviction will evict until
|
||||||
* @param acceptableFactor percentage of total size that triggers eviction
|
* @param acceptableFactor percentage of total size that triggers eviction
|
||||||
* @param singleFactor percentage of total size for single-access blocks
|
* @param singleFactor percentage of total size for single-access blocks
|
||||||
* @param multiFactor percentage of total size for multiple-access blocks
|
* @param multiFactor percentage of total size for multiple-access blocks
|
||||||
* @param memoryFactor percentage of total size for in-memory blocks
|
* @param memoryFactor percentage of total size for in-memory blocks
|
||||||
*/
|
*/
|
||||||
public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
|
public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
|
||||||
int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
|
int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
|
||||||
|
@ -295,16 +287,17 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
throw new IllegalArgumentException("Single, multi, and memory factors " +
|
throw new IllegalArgumentException("Single, multi, and memory factors " +
|
||||||
" should be non-negative and total 1.0");
|
" should be non-negative and total 1.0");
|
||||||
}
|
}
|
||||||
if (minFactor >= acceptableFactor) {
|
if(minFactor >= acceptableFactor) {
|
||||||
throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
|
throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
|
||||||
}
|
}
|
||||||
if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
|
if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
|
||||||
throw new IllegalArgumentException("all factors must be < 1");
|
throw new IllegalArgumentException("all factors must be < 1");
|
||||||
}
|
}
|
||||||
this.maxSize = maxSize;
|
this.maxSize = maxSize;
|
||||||
this.blockSize = blockSize;
|
this.blockSize = blockSize;
|
||||||
this.forceInMemory = forceInMemory;
|
this.forceInMemory = forceInMemory;
|
||||||
map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
|
map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
|
||||||
|
mapLoadFactor, mapConcurrencyLevel);
|
||||||
this.minFactor = minFactor;
|
this.minFactor = minFactor;
|
||||||
this.acceptableFactor = acceptableFactor;
|
this.acceptableFactor = acceptableFactor;
|
||||||
this.singleFactor = singleFactor;
|
this.singleFactor = singleFactor;
|
||||||
|
@ -316,7 +309,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
|
this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
|
||||||
this.size = new AtomicLong(this.overhead);
|
this.size = new AtomicLong(this.overhead);
|
||||||
this.hardCapacityLimitFactor = hardLimitFactor;
|
this.hardCapacityLimitFactor = hardLimitFactor;
|
||||||
if (evictionThread) {
|
if(evictionThread) {
|
||||||
this.evictionThread = new EvictionThread(this);
|
this.evictionThread = new EvictionThread(this);
|
||||||
this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
|
this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
|
||||||
} else {
|
} else {
|
||||||
|
@ -324,14 +317,14 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
}
|
}
|
||||||
// TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
|
// TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
|
||||||
// every five minutes.
|
// every five minutes.
|
||||||
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
|
this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
|
||||||
STAT_THREAD_PERIOD, TimeUnit.SECONDS);
|
statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setMaxSize(long maxSize) {
|
public void setMaxSize(long maxSize) {
|
||||||
this.maxSize = maxSize;
|
this.maxSize = maxSize;
|
||||||
if (this.size.get() > acceptableSize() && !evictionInProgress) {
|
if(this.size.get() > acceptableSize() && !evictionInProgress) {
|
||||||
runEviction();
|
runEviction();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -343,10 +336,10 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
* <p>
|
* <p>
|
||||||
* It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
|
* It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
|
||||||
* this can happen, for which we compare the buffer contents.
|
* this can happen, for which we compare the buffer contents.
|
||||||
*
|
|
||||||
* @param cacheKey block's cache key
|
* @param cacheKey block's cache key
|
||||||
* @param buf block buffer
|
* @param buf block buffer
|
||||||
* @param inMemory if block is in-memory
|
* @param inMemory if block is in-memory
|
||||||
|
* @param cacheDataInL1
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
|
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
|
||||||
|
@ -385,9 +378,9 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
stats.failInsert();
|
stats.failInsert();
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
|
LOG.trace("LruBlockCache current size " + StringUtils.byteDesc(currentSize)
|
||||||
+ " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "."
|
+ " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + " too many."
|
||||||
+ " The hard limit size is " + StringUtils.byteDesc(hardLimitSize)
|
+ " the hard limit size is " + StringUtils.byteDesc(hardLimitSize) + ", failed to put cacheKey:"
|
||||||
+ ", failed to put cacheKey:" + cacheKey + " into LruBlockCache.");
|
+ cacheKey + " into LruBlockCache.");
|
||||||
}
|
}
|
||||||
if (!evictionInProgress) {
|
if (!evictionInProgress) {
|
||||||
runEviction();
|
runEviction();
|
||||||
|
@ -438,9 +431,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
/**
|
/**
|
||||||
* Cache the block with the specified name and buffer.
|
* Cache the block with the specified name and buffer.
|
||||||
* <p>
|
* <p>
|
||||||
*
|
|
||||||
* @param cacheKey block's cache key
|
* @param cacheKey block's cache key
|
||||||
* @param buf block buffer
|
* @param buf block buffer
|
||||||
*/
|
*/
|
||||||
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
|
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
|
||||||
cacheBlock(cacheKey, buf, false, false);
|
cacheBlock(cacheKey, buf, false, false);
|
||||||
|
@ -450,8 +442,11 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
* Helper function that updates the local size counter and also updates any
|
* Helper function that updates the local size counter and also updates any
|
||||||
* per-cf or per-blocktype metrics it can discern from given
|
* per-cf or per-blocktype metrics it can discern from given
|
||||||
* {@link LruCachedBlock}
|
* {@link LruCachedBlock}
|
||||||
|
*
|
||||||
|
* @param cb
|
||||||
|
* @param evict
|
||||||
*/
|
*/
|
||||||
private long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
|
protected long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
|
||||||
long heapsize = cb.heapSize();
|
long heapsize = cb.heapSize();
|
||||||
if (evict) {
|
if (evict) {
|
||||||
heapsize *= -1;
|
heapsize *= -1;
|
||||||
|
@ -461,14 +456,11 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the buffer of the block with the specified name.
|
* Get the buffer of the block with the specified name.
|
||||||
*
|
* @param cacheKey block's cache key
|
||||||
* @param cacheKey block's cache key
|
* @param caching true if the caller caches blocks on cache misses
|
||||||
* @param caching true if the caller caches blocks on cache misses
|
* @param repeat Whether this is a repeat lookup for the same block
|
||||||
* @param repeat Whether this is a repeat lookup for the same block
|
* (used to avoid double counting cache misses when doing double-check locking)
|
||||||
* (used to avoid double counting cache misses when doing double-check
|
|
||||||
* locking)
|
|
||||||
* @param updateCacheMetrics Whether to update cache metrics or not
|
* @param updateCacheMetrics Whether to update cache metrics or not
|
||||||
*
|
|
||||||
* @return buffer of specified cache key, or null if not in cache
|
* @return buffer of specified cache key, or null if not in cache
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@ -503,7 +495,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Whether the cache contains block with specified cacheKey
|
* Whether the cache contains block with specified cacheKey
|
||||||
*
|
* @param cacheKey
|
||||||
* @return true if contains the block
|
* @return true if contains the block
|
||||||
*/
|
*/
|
||||||
public boolean containsBlock(BlockCacheKey cacheKey) {
|
public boolean containsBlock(BlockCacheKey cacheKey) {
|
||||||
|
@ -513,7 +505,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
@Override
|
@Override
|
||||||
public boolean evictBlock(BlockCacheKey cacheKey) {
|
public boolean evictBlock(BlockCacheKey cacheKey) {
|
||||||
LruCachedBlock cb = map.get(cacheKey);
|
LruCachedBlock cb = map.get(cacheKey);
|
||||||
return cb != null && evictBlock(cb, false) > 0;
|
if (cb == null) return false;
|
||||||
|
return evictBlock(cb, false) > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -544,7 +537,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
/**
|
/**
|
||||||
* Evict the block, and it will be cached by the victim handler if exists &&
|
* Evict the block, and it will be cached by the victim handler if exists &&
|
||||||
* block may be read again later
|
* block may be read again later
|
||||||
*
|
* @param block
|
||||||
* @param evictedByEvictionProcess true if the given block is evicted by
|
* @param evictedByEvictionProcess true if the given block is evicted by
|
||||||
* EvictionThread
|
* EvictionThread
|
||||||
* @return the heap size of evicted block
|
* @return the heap size of evicted block
|
||||||
|
@ -582,7 +575,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
* Multi-threaded call to run the eviction process.
|
* Multi-threaded call to run the eviction process.
|
||||||
*/
|
*/
|
||||||
private void runEviction() {
|
private void runEviction() {
|
||||||
if (evictionThread == null) {
|
if(evictionThread == null) {
|
||||||
evict();
|
evict();
|
||||||
} else {
|
} else {
|
||||||
evictionThread.evict();
|
evictionThread.evict();
|
||||||
|
@ -618,16 +611,19 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
StringUtils.byteDesc(currentSize));
|
StringUtils.byteDesc(currentSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bytesToFree <= 0) return;
|
if(bytesToFree <= 0) return;
|
||||||
|
|
||||||
// Instantiate priority buckets
|
// Instantiate priority buckets
|
||||||
BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, singleSize());
|
BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
|
||||||
BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, multiSize());
|
singleSize());
|
||||||
BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, memorySize());
|
BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
|
||||||
|
multiSize());
|
||||||
|
BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
|
||||||
|
memorySize());
|
||||||
|
|
||||||
// Scan entire map putting into appropriate buckets
|
// Scan entire map putting into appropriate buckets
|
||||||
for (LruCachedBlock cachedBlock : map.values()) {
|
for(LruCachedBlock cachedBlock : map.values()) {
|
||||||
switch (cachedBlock.getPriority()) {
|
switch(cachedBlock.getPriority()) {
|
||||||
case SINGLE: {
|
case SINGLE: {
|
||||||
bucketSingle.add(cachedBlock);
|
bucketSingle.add(cachedBlock);
|
||||||
break;
|
break;
|
||||||
|
@ -683,7 +679,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
|
PriorityQueue<BlockBucket> bucketQueue =
|
||||||
|
new PriorityQueue<BlockBucket>(3);
|
||||||
|
|
||||||
bucketQueue.add(bucketSingle);
|
bucketQueue.add(bucketSingle);
|
||||||
bucketQueue.add(bucketMulti);
|
bucketQueue.add(bucketMulti);
|
||||||
|
@ -692,11 +689,11 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
int remainingBuckets = 3;
|
int remainingBuckets = 3;
|
||||||
|
|
||||||
BlockBucket bucket;
|
BlockBucket bucket;
|
||||||
while ((bucket = bucketQueue.poll()) != null) {
|
while((bucket = bucketQueue.poll()) != null) {
|
||||||
long overflow = bucket.overflow();
|
long overflow = bucket.overflow();
|
||||||
if (overflow > 0) {
|
if(overflow > 0) {
|
||||||
long bucketBytesToFree =
|
long bucketBytesToFree = Math.min(overflow,
|
||||||
Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets);
|
(bytesToFree - bytesFreed) / remainingBuckets);
|
||||||
bytesFreed += bucket.free(bucketBytesToFree);
|
bytesFreed += bucket.free(bucketBytesToFree);
|
||||||
}
|
}
|
||||||
remainingBuckets--;
|
remainingBuckets--;
|
||||||
|
@ -794,7 +791,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object that) {
|
public boolean equals(Object that) {
|
||||||
if (that == null || !(that instanceof BlockBucket)) {
|
if (that == null || !(that instanceof BlockBucket)){
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return compareTo((BlockBucket)that) == 0;
|
return compareTo((BlockBucket)that) == 0;
|
||||||
|
@ -817,7 +814,6 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the maximum size of this cache.
|
* Get the maximum size of this cache.
|
||||||
*
|
|
||||||
* @return max size in bytes
|
* @return max size in bytes
|
||||||
*/
|
*/
|
||||||
public long getMaxSize() {
|
public long getMaxSize() {
|
||||||
|
@ -855,7 +851,6 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
* Thread is triggered into action by {@link LruBlockCache#runEviction()}
|
* Thread is triggered into action by {@link LruBlockCache#runEviction()}
|
||||||
*/
|
*/
|
||||||
static class EvictionThread extends HasThread {
|
static class EvictionThread extends HasThread {
|
||||||
|
|
||||||
private WeakReference<LruBlockCache> cache;
|
private WeakReference<LruBlockCache> cache;
|
||||||
private volatile boolean go = true;
|
private volatile boolean go = true;
|
||||||
// flag set after enter the run method, used for test
|
// flag set after enter the run method, used for test
|
||||||
|
@ -864,17 +859,17 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
public EvictionThread(LruBlockCache cache) {
|
public EvictionThread(LruBlockCache cache) {
|
||||||
super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
|
super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
|
||||||
setDaemon(true);
|
setDaemon(true);
|
||||||
this.cache = new WeakReference<>(cache);
|
this.cache = new WeakReference<LruBlockCache>(cache);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
enteringRun = true;
|
enteringRun = true;
|
||||||
while (this.go) {
|
while (this.go) {
|
||||||
synchronized (this) {
|
synchronized(this) {
|
||||||
try {
|
try {
|
||||||
this.wait(1000 * 10/*Don't wait for ever*/);
|
this.wait(1000 * 10/*Don't wait for ever*/);
|
||||||
} catch (InterruptedException e) {
|
} catch(InterruptedException e) {
|
||||||
LOG.warn("Interrupted eviction thread ", e);
|
LOG.warn("Interrupted eviction thread ", e);
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
|
@ -888,7 +883,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
|
||||||
justification="This is what we want")
|
justification="This is what we want")
|
||||||
public void evict() {
|
public void evict() {
|
||||||
synchronized (this) {
|
synchronized(this) {
|
||||||
this.notifyAll();
|
this.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -910,7 +905,6 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
* Statistics thread. Periodically prints the cache statistics to the log.
|
* Statistics thread. Periodically prints the cache statistics to the log.
|
||||||
*/
|
*/
|
||||||
static class StatisticsThread extends Thread {
|
static class StatisticsThread extends Thread {
|
||||||
|
|
||||||
private final LruBlockCache lru;
|
private final LruBlockCache lru;
|
||||||
|
|
||||||
public StatisticsThread(LruBlockCache lru) {
|
public StatisticsThread(LruBlockCache lru) {
|
||||||
|
@ -966,11 +960,12 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
return getCurrentSize();
|
return getCurrentSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
|
public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
|
||||||
// FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
|
// FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
|
||||||
return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP
|
return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
|
||||||
+ ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
|
((long)Math.ceil(maxSize*1.2/blockSize)
|
||||||
+ ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
|
* ClassSize.CONCURRENT_HASHMAP_ENTRY) +
|
||||||
|
((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1031,7 +1026,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
diff = Long.compare(this.getOffset(), other.getOffset());
|
diff = Long.compare(this.getOffset(), other.getOffset());
|
||||||
if (diff != 0) return diff;
|
if (diff != 0) return diff;
|
||||||
if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
|
if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
|
||||||
throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime());
|
throw new IllegalStateException("" + this.getCachedTime() + ", " +
|
||||||
|
other.getCachedTime());
|
||||||
}
|
}
|
||||||
return Long.compare(other.getCachedTime(), this.getCachedTime());
|
return Long.compare(other.getCachedTime(), this.getCachedTime());
|
||||||
}
|
}
|
||||||
|
@ -1075,13 +1071,12 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
|
return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
|
||||||
}
|
}
|
||||||
private long memorySize() {
|
private long memorySize() {
|
||||||
return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
|
return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
if (victimHandler != null) {
|
if (victimHandler != null)
|
||||||
victimHandler.shutdown();
|
victimHandler.shutdown();
|
||||||
}
|
|
||||||
this.scheduleThreadPool.shutdown();
|
this.scheduleThreadPool.shutdown();
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
if (!this.scheduleThreadPool.isShutdown()) {
|
if (!this.scheduleThreadPool.isShutdown()) {
|
||||||
|
@ -1111,12 +1106,11 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used in testing. May be very inefficient.
|
* Used in testing. May be very inefficient.
|
||||||
*
|
|
||||||
* @return the set of cached file names
|
* @return the set of cached file names
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
SortedSet<String> getCachedFileNamesForTest() {
|
SortedSet<String> getCachedFileNamesForTest() {
|
||||||
SortedSet<String> fileNames = new TreeSet<>();
|
SortedSet<String> fileNames = new TreeSet<String>();
|
||||||
for (BlockCacheKey cacheKey : map.keySet()) {
|
for (BlockCacheKey cacheKey : map.keySet()) {
|
||||||
fileNames.add(cacheKey.getHfileName());
|
fileNames.add(cacheKey.getHfileName());
|
||||||
}
|
}
|
||||||
|
@ -1125,9 +1119,10 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
Map<BlockType, Integer> getBlockTypeCountsForTest() {
|
Map<BlockType, Integer> getBlockTypeCountsForTest() {
|
||||||
Map<BlockType, Integer> counts = new EnumMap<>(BlockType.class);
|
Map<BlockType, Integer> counts =
|
||||||
|
new EnumMap<BlockType, Integer>(BlockType.class);
|
||||||
for (LruCachedBlock cb : map.values()) {
|
for (LruCachedBlock cb : map.values()) {
|
||||||
BlockType blockType = cb.getBuffer().getBlockType();
|
BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType();
|
||||||
Integer count = counts.get(blockType);
|
Integer count = counts.get(blockType);
|
||||||
counts.put(blockType, (count == null ? 0 : count) + 1);
|
counts.put(blockType, (count == null ? 0 : count) + 1);
|
||||||
}
|
}
|
||||||
|
@ -1136,9 +1131,11 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
|
public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
|
||||||
Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class);
|
Map<DataBlockEncoding, Integer> counts =
|
||||||
|
new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
|
||||||
for (LruCachedBlock block : map.values()) {
|
for (LruCachedBlock block : map.values()) {
|
||||||
DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
|
DataBlockEncoding encoding =
|
||||||
|
((HFileBlock) block.getBuffer()).getDataBlockEncoding();
|
||||||
Integer count = counts.get(encoding);
|
Integer count = counts.get(encoding);
|
||||||
counts.put(encoding, (count == null ? 0 : count) + 1);
|
counts.put(encoding, (count == null ? 0 : count) + 1);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue