HBASE-13876 Improving performance of HeapMemoryManager

This commit is contained in:
Elliott Clark 2015-06-16 13:06:15 -07:00
parent 1b5ad45b85
commit 623fd63827
5 changed files with 569 additions and 73 deletions

View File

@ -24,63 +24,234 @@ import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY;
import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult;
import org.apache.hadoop.hbase.util.RollingStatCalculator;
/**
* The default implementation for the HeapMemoryTuner. This will do simple checks to decide
* whether there should be changes in the heap size of memstore/block cache. When there is no block
* cache eviction at all but there are flushes because of global heap pressure, it will increase the
* memstore heap size and decrease block cache size. The step value for this heap size change can be
* specified using the config <i>hbase.regionserver.heapmemory.autotuner.step</i>. When there is no
* memstore flushes because of heap pressure but there is block cache evictions it will increase the
* block cache heap.
* The default implementation for the HeapMemoryTuner. This will do statistical checks on
* number of evictions, cache misses and flushes to decide whether there should be changes
* in the heap size of memstore/block cache. During each tuner operation tuner takes a step
* which can either be INCREASE_BLOCK_CACHE_SIZE (increase block cache size),
* INCREASE_MEMSTORE_SIZE (increase memstore size) and by default it is NEUTRAL (no change).
* We say block cache is sufficient when there is no block cache eviction at all or major amount of
* memory allocated to block cache is empty, similarly we say memory allocated for memstore is
* sufficient when there is no memstore flushes because of heap pressure or major amount of
* memory allocated to memstore is empty. If both are sufficient we do nothing, if exactly one of
* them is found to be sufficient we decrease its size by <i>step</i> and increase the other by
* same amount. If none of them is sufficient we do statistical analysis on number of cache misses
* and flushes to determine tuner direction. Based on these statistics we decide the tuner
* direction. If we are not confident about which step direction to take we do nothing and wait for
* next iteration. On expectation we will be tuning for at least 22% tuner calls. The number of
* past periods to consider for statistics calculation can be specified in config by
* <i>hbase.regionserver.heapmemory.autotuner.lookup.periods</i>. Also these many initial calls to
* tuner will be ignored (cache is warming up and we leave the system to reach steady state).
* After the tuner takes a step, in next call we insure that last call was indeed helpful and did
* not do us any harm. If not then we revert the previous step. The step size is dynamic and it
* changes based on current and previous tuning direction. When last tuner step was NEUTRAL
* and current tuning step is not NEUTRAL then we assume we are restarting the tuning process and
* step size is changed to maximum allowed size which can be specified in config by
* <i>hbase.regionserver.heapmemory.autotuner.step.max</i>. If we are reverting the previous step
* then we decrease step size to half. This decrease is similar to binary search where we try to
* reach the most desired value. The minimum step size can be specified in config by
* <i>hbase.regionserver.heapmemory.autotuner.step.max</i>. In other cases we leave step size
* unchanged.
*/
@InterfaceAudience.Private
class DefaultHeapMemoryTuner implements HeapMemoryTuner {
public static final String STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step";
public static final float DEFAULT_STEP_VALUE = 0.02f; // 2%
private static final TunerResult TUNER_RESULT = new TunerResult(true);
public static final String MAX_STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step.max";
public static final String MIN_STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step.min";
public static final String SUFFICIENT_MEMORY_LEVEL_KEY =
"hbase.regionserver.heapmemory.autotuner.sufficient.memory.level";
public static final String LOOKUP_PERIODS_KEY =
"hbase.regionserver.heapmemory.autotuner.lookup.periods";
public static final String NUM_PERIODS_TO_IGNORE =
"hbase.regionserver.heapmemory.autotuner.ignored.periods";
// Maximum step size that the tuner can take
public static final float DEFAULT_MAX_STEP_VALUE = 0.08f; // 8%
// Minimum step size that the tuner can take
public static final float DEFAULT_MIN_STEP_VALUE = 0.005f; // 0.5%
// If current block cache size or memstore size in use is below this level relative to memory
// provided to it then corresponding component will be considered to have sufficient memory
public static final float DEFAULT_SUFFICIENT_MEMORY_LEVEL_VALUE = 0.5f; // 50%
// Number of tuner periods that will be considered while calculating mean and deviation
// If set to zero, all stats will be calculated from the start
public static final int DEFAULT_LOOKUP_PERIODS = 60;
public static final int DEFAULT_NUM_PERIODS_IGNORED = 60;
private static final TunerResult NO_OP_TUNER_RESULT = new TunerResult(false);
private Log LOG = LogFactory.getLog(DefaultHeapMemoryTuner.class);
private TunerResult TUNER_RESULT = new TunerResult(true);
private Configuration conf;
private float step = DEFAULT_STEP_VALUE;
private float sufficientMemoryLevel = DEFAULT_SUFFICIENT_MEMORY_LEVEL_VALUE;
private float maximumStepSize = DEFAULT_MAX_STEP_VALUE;
private float minimumStepSize = DEFAULT_MIN_STEP_VALUE;
private int tunerLookupPeriods = DEFAULT_LOOKUP_PERIODS;
private int numPeriodsToIgnore = DEFAULT_NUM_PERIODS_IGNORED;
// Counter to ignore few initial periods while cache is still warming up
// Memory tuner will do no operation for the first "tunerLookupPeriods"
private int ignoreInitialPeriods = 0;
private float globalMemStorePercentMinRange;
private float globalMemStorePercentMaxRange;
private float blockCachePercentMinRange;
private float blockCachePercentMaxRange;
// Store statistics about the corresponding parameters for memory tuning
private RollingStatCalculator rollingStatsForCacheMisses;
private RollingStatCalculator rollingStatsForFlushes;
private RollingStatCalculator rollingStatsForEvictions;
// Set step size to max value for tuning, this step size will adjust dynamically while tuning
private float step = DEFAULT_MAX_STEP_VALUE;
private StepDirection prevTuneDirection = StepDirection.NEUTRAL;
@Override
public TunerResult tune(TunerContext context) {
long blockedFlushCount = context.getBlockedFlushCount();
long unblockedFlushCount = context.getUnblockedFlushCount();
long evictCount = context.getEvictCount();
boolean memstoreSufficient = blockedFlushCount == 0 && unblockedFlushCount == 0;
boolean blockCacheSufficient = evictCount == 0;
if (memstoreSufficient && blockCacheSufficient) {
long cacheMissCount = context.getCacheMissCount();
long totalFlushCount = blockedFlushCount+unblockedFlushCount;
rollingStatsForCacheMisses.insertDataValue(cacheMissCount);
rollingStatsForFlushes.insertDataValue(totalFlushCount);
rollingStatsForEvictions.insertDataValue(evictCount);
StepDirection newTuneDirection = StepDirection.NEUTRAL;
if (ignoreInitialPeriods < numPeriodsToIgnore) {
// Ignoring the first few tuner periods
ignoreInitialPeriods++;
return NO_OP_TUNER_RESULT;
}
String tunerLog = "";
// We can consider memstore or block cache to be sufficient if
// we are using only a minor fraction of what have been already provided to it.
boolean earlyMemstoreSufficientCheck = totalFlushCount == 0
|| context.getCurMemStoreUsed() < context.getCurMemStoreSize()*sufficientMemoryLevel;
boolean earlyBlockCacheSufficientCheck = evictCount == 0 ||
context.getCurBlockCacheUsed() < context.getCurBlockCacheSize()*sufficientMemoryLevel;
float newMemstoreSize;
float newBlockCacheSize;
if (memstoreSufficient) {
// Increase the block cache size and corresponding decrease in memstore size
newBlockCacheSize = context.getCurBlockCacheSize() + step;
newMemstoreSize = context.getCurMemStoreSize() - step;
} else if (blockCacheSufficient) {
// Increase the memstore size and corresponding decrease in block cache size
newBlockCacheSize = context.getCurBlockCacheSize() - step;
newMemstoreSize = context.getCurMemStoreSize() + step;
if (earlyMemstoreSufficientCheck && earlyBlockCacheSufficientCheck) {
// Both memstore and block cache memory seems to be sufficient. No operation required.
newTuneDirection = StepDirection.NEUTRAL;
} else if (earlyMemstoreSufficientCheck) {
// Increase the block cache size and corresponding decrease in memstore size.
newTuneDirection = StepDirection.INCREASE_BLOCK_CACHE_SIZE;
} else if (earlyBlockCacheSufficientCheck) {
// Increase the memstore size and corresponding decrease in block cache size.
newTuneDirection = StepDirection.INCREASE_MEMSTORE_SIZE;
} else {
return NO_OP_TUNER_RESULT;
// As of now not making any tuning in write/read heavy scenario.
// Early checks for sufficient memory failed. Tuning memory based on past statistics.
// Boolean indicator to show if we need to revert previous step or not.
boolean isReverting = false;
switch (prevTuneDirection) {
// Here we are using number of evictions rather than cache misses because it is more
// strong indicator for deficient cache size. Improving caching is what we
// would like to optimize for in steady state.
case INCREASE_BLOCK_CACHE_SIZE:
if ((double)evictCount > rollingStatsForEvictions.getMean() ||
(double)totalFlushCount > rollingStatsForFlushes.getMean() +
rollingStatsForFlushes.getDeviation()/2.00) {
// Reverting previous step as it was not useful.
// Tuning failed to decrease evictions or tuning resulted in large number of flushes.
newTuneDirection = StepDirection.INCREASE_MEMSTORE_SIZE;
tunerLog += "Reverting previous tuning.";
if ((double)evictCount > rollingStatsForEvictions.getMean()) {
tunerLog += " As could not decrease evctions sufficiently.";
} else {
tunerLog += " As number of flushes rose significantly.";
}
isReverting = true;
}
break;
case INCREASE_MEMSTORE_SIZE:
if ((double)totalFlushCount > rollingStatsForFlushes.getMean() ||
(double)evictCount > rollingStatsForEvictions.getMean() +
rollingStatsForEvictions.getDeviation()/2.00) {
// Reverting previous step as it was not useful.
// Tuning failed to decrease flushes or tuning resulted in large number of evictions.
newTuneDirection = StepDirection.INCREASE_BLOCK_CACHE_SIZE;
tunerLog += "Reverting previous tuning.";
if ((double)totalFlushCount > rollingStatsForFlushes.getMean()) {
tunerLog += " As could not decrease flushes sufficiently.";
} else {
tunerLog += " As number of evictions rose significantly.";
}
isReverting = true;
}
break;
default:
// Last step was neutral, revert doesn't not apply here.
break;
}
// If we are not reverting. We try to tune memory sizes by looking at cache misses / flushes.
if (!isReverting){
// mean +- deviation/2 is considered to be normal
// below it its consider low and above it is considered high.
// We can safely assume that the number cache misses, flushes are normally distributed over
// past periods and hence on all the above mentioned classes (normal, high and low)
// are equally likely with 33% probability each. Hence there is very good probability that
// we will not always fall in default step.
if ((double)cacheMissCount < rollingStatsForCacheMisses.getMean() -
rollingStatsForCacheMisses.getDeviation()/2.00 &&
(double)totalFlushCount < rollingStatsForFlushes.getMean() -
rollingStatsForFlushes.getDeviation()/2.00) {
// Everything is fine no tuning required
newTuneDirection = StepDirection.NEUTRAL;
} else if ((double)cacheMissCount > rollingStatsForCacheMisses.getMean() +
rollingStatsForCacheMisses.getDeviation()/2.00 &&
(double)totalFlushCount < rollingStatsForFlushes.getMean() -
rollingStatsForFlushes.getDeviation()/2.00) {
// more misses , increasing cache size
newTuneDirection = StepDirection.INCREASE_BLOCK_CACHE_SIZE;
tunerLog +=
"Increasing block cache size as observed increase in number of cache misses.";
} else if ((double)cacheMissCount < rollingStatsForCacheMisses.getMean() -
rollingStatsForCacheMisses.getDeviation()/2.00 &&
(double)totalFlushCount > rollingStatsForFlushes.getMean() +
rollingStatsForFlushes.getDeviation()/2.00) {
// more flushes , increasing memstore size
newTuneDirection = StepDirection.INCREASE_MEMSTORE_SIZE;
tunerLog += "Increasing memstore size as observed increase in number of flushes.";
} else {
// Default. Not enough facts to do tuning.
newTuneDirection = StepDirection.NEUTRAL;
}
}
}
// Adjusting step size for tuning to get to steady state.
// Even if the step size was 4% and 32 GB memory size, we will be shifting 1 GB back and forth
// per tuner operation and it can affect the performance of cluster
if (prevTuneDirection == StepDirection.NEUTRAL && newTuneDirection != StepDirection.NEUTRAL) {
// Restarting the tuning from steady state.
step = maximumStepSize;
} else if (prevTuneDirection != newTuneDirection) {
// Decrease the step size to reach the steady state. Similar procedure as binary search.
step = step/2.00f;
if (step < minimumStepSize) {
// Ensure step size does not gets too small.
step = minimumStepSize;
}
}
// Increase / decrease the memstore / block cahce sizes depending on new tuner step.
switch (newTuneDirection) {
case INCREASE_BLOCK_CACHE_SIZE:
newBlockCacheSize = context.getCurBlockCacheSize() + step;
newMemstoreSize = context.getCurMemStoreSize() - step;
break;
case INCREASE_MEMSTORE_SIZE:
newBlockCacheSize = context.getCurBlockCacheSize() - step;
newMemstoreSize = context.getCurMemStoreSize() + step;
break;
default:
prevTuneDirection = StepDirection.NEUTRAL;
return NO_OP_TUNER_RESULT;
}
// Check we are within max/min bounds.
if (newMemstoreSize > globalMemStorePercentMaxRange) {
newMemstoreSize = globalMemStorePercentMaxRange;
} else if (newMemstoreSize < globalMemStorePercentMinRange) {
@ -93,6 +264,10 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
}
TUNER_RESULT.setBlockCacheSize(newBlockCacheSize);
TUNER_RESULT.setMemstoreSize(newMemstoreSize);
if (LOG.isDebugEnabled()) {
LOG.debug(tunerLog);
}
prevTuneDirection = newTuneDirection;
return TUNER_RESULT;
}
@ -104,7 +279,12 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.step = conf.getFloat(STEP_KEY, DEFAULT_STEP_VALUE);
this.maximumStepSize = conf.getFloat(MAX_STEP_KEY, DEFAULT_MAX_STEP_VALUE);
this.minimumStepSize = conf.getFloat(MIN_STEP_KEY, DEFAULT_MIN_STEP_VALUE);
this.step = this.maximumStepSize;
this.sufficientMemoryLevel = conf.getFloat(SUFFICIENT_MEMORY_LEVEL_KEY,
DEFAULT_SUFFICIENT_MEMORY_LEVEL_VALUE);
this.tunerLookupPeriods = conf.getInt(LOOKUP_PERIODS_KEY, DEFAULT_LOOKUP_PERIODS);
this.blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY,
conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT));
this.blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY,
@ -113,5 +293,19 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false));
this.globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY,
HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false));
// Default value of periods to ignore is number of lookup periods
this.numPeriodsToIgnore = conf.getInt(NUM_PERIODS_TO_IGNORE, this.tunerLookupPeriods);
this.rollingStatsForCacheMisses = new RollingStatCalculator(this.tunerLookupPeriods);
this.rollingStatsForFlushes = new RollingStatCalculator(this.tunerLookupPeriods);
this.rollingStatsForEvictions = new RollingStatCalculator(this.tunerLookupPeriods);
}
private enum StepDirection{
// block cache size was increased
INCREASE_BLOCK_CACHE_SIZE,
// memstore size was increased
INCREASE_MEMSTORE_SIZE,
// no operation was performed
NEUTRAL
}
}

View File

@ -1380,7 +1380,8 @@ public class HRegionServer extends HasThread implements
}
private void startHeapMemoryManager() {
this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this);
this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher,
this, this.regionServerAccounting);
if (this.hMemManager != null) {
this.hMemManager.start(getChoreService());
}

View File

@ -76,6 +76,7 @@ public class HeapMemoryManager {
private final ResizableBlockCache blockCache;
private final FlushRequester memStoreFlusher;
private final Server server;
private final RegionServerAccounting regionServerAccounting;
private HeapMemoryTunerChore heapMemTunerChore = null;
private final boolean tunerOn;
@ -85,21 +86,23 @@ public class HeapMemoryManager {
private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher,
Server server) {
Server server, RegionServerAccounting regionServerAccounting) {
BlockCache blockCache = CacheConfig.instantiateBlockCache(conf);
if (blockCache instanceof ResizableBlockCache) {
return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server);
return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server,
regionServerAccounting);
}
return null;
}
@VisibleForTesting
HeapMemoryManager(ResizableBlockCache blockCache, FlushRequester memStoreFlusher,
Server server) {
Server server, RegionServerAccounting regionServerAccounting) {
Configuration conf = server.getConfiguration();
this.blockCache = blockCache;
this.memStoreFlusher = memStoreFlusher;
this.server = server;
this.regionServerAccounting = regionServerAccounting;
this.tunerOn = doInit(conf);
this.defaultChorePeriod = conf.getInt(HBASE_RS_HEAP_MEMORY_TUNER_PERIOD,
HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD);
@ -217,6 +220,7 @@ public class HeapMemoryManager {
private AtomicLong blockedFlushCount = new AtomicLong();
private AtomicLong unblockedFlushCount = new AtomicLong();
private long evictCount = 0L;
private long cacheMissCount = 0L;
private TunerContext tunerContext = new TunerContext();
private boolean alarming = false;
@ -264,11 +268,21 @@ public class HeapMemoryManager {
}
private void tune() {
long curEvictCount = blockCache.getStats().getEvictedCount();
// TODO check if we can increase the memory boundaries
// while remaining in the limits
long curEvictCount;
long curCacheMisCount;
curEvictCount = blockCache.getStats().getEvictedCount();
tunerContext.setEvictCount(curEvictCount - evictCount);
evictCount = curEvictCount;
curCacheMisCount = blockCache.getStats().getMissCachingCount();
tunerContext.setCacheMissCount(curCacheMisCount-cacheMissCount);
cacheMissCount = curCacheMisCount;
tunerContext.setBlockedFlushCount(blockedFlushCount.getAndSet(0));
tunerContext.setUnblockedFlushCount(unblockedFlushCount.getAndSet(0));
tunerContext.setCurBlockCacheUsed((float)blockCache.getCurrentSize() / maxHeapSize);
tunerContext.setCurMemStoreUsed(
(float)regionServerAccounting.getGlobalMemstoreSize() / maxHeapSize);
tunerContext.setCurBlockCacheSize(blockCachePercent);
tunerContext.setCurMemStoreSize(globalMemStorePercent);
TunerResult result = null;
@ -321,6 +335,8 @@ public class HeapMemoryManager {
globalMemStorePercent = memstoreSize;
memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize);
}
} else if (LOG.isDebugEnabled()) {
LOG.debug("No changes made by HeapMemoryTuner.");
}
}
@ -349,6 +365,9 @@ public class HeapMemoryManager {
private long blockedFlushCount;
private long unblockedFlushCount;
private long evictCount;
private long cacheMissCount;
private float curBlockCacheUsed;
private float curMemStoreUsed;
private float curMemStoreSize;
private float curBlockCacheSize;
@ -391,6 +410,30 @@ public class HeapMemoryManager {
public void setCurBlockCacheSize(float curBlockCacheSize) {
this.curBlockCacheSize = curBlockCacheSize;
}
public long getCacheMissCount() {
return cacheMissCount;
}
public void setCacheMissCount(long cacheMissCount) {
this.cacheMissCount = cacheMissCount;
}
public float getCurBlockCacheUsed() {
return curBlockCacheUsed;
}
public void setCurBlockCacheUsed(float curBlockCacheUsed) {
this.curBlockCacheUsed = curBlockCacheUsed;
}
public float getCurMemStoreUsed() {
return curMemStoreUsed;
}
public void setCurMemStoreUsed(float d) {
this.curMemStoreUsed = d;
}
}
/**

View File

@ -0,0 +1,113 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
/**
* This class maintains mean and variation for any sequence of input provided to it.
* It is initialized with number of rolling periods which basically means the number of past
* inputs whose data will be considered to maintain mean and variation.
* It will use O(N) memory to maintain these statistics, where N is number of look up periods it
* was initialized with.
* If zero is passed during initialization then it will maintain mean and variance from the
* start. It will use O(1) memory only. But note that since it will maintain mean / variance
* from the start the statistics may behave like constants and may ignore short trends.
* All operations are O(1) except the initialization which is O(N).
*/
public class RollingStatCalculator {
private double currentSum;
private double currentSqrSum;
// Total number of data values whose statistic is currently present
private long numberOfDataValues;
private int rollingPeriod;
private int currentIndexPosition;
// to be used only if we have non-zero rolling period
private long [] dataValues;
/**
* Creates a RollingStatCalculator with given number of rolling periods.
* @param rollingPeriod
*/
public RollingStatCalculator(int rollingPeriod) {
this.rollingPeriod = rollingPeriod;
this.dataValues = fillWithZeros(rollingPeriod);
this.currentSum = 0.0;
this.currentSqrSum = 0.0;
this.currentIndexPosition = 0;
this.numberOfDataValues = 0;
}
/**
* Inserts given data value to array of data values to be considered for statistics calculation
* @param data
*/
public void insertDataValue(long data) {
// if current number of data points already equals rolling period and rolling period is
// non-zero then remove one data and update the statistics
if(numberOfDataValues >= rollingPeriod && rollingPeriod > 0) {
this.removeData(dataValues[currentIndexPosition]);
}
numberOfDataValues++;
currentSum = currentSum + (double)data;
currentSqrSum = currentSqrSum + ((double)data * data);
if (rollingPeriod >0)
{
dataValues[currentIndexPosition] = data;
currentIndexPosition = (currentIndexPosition + 1) % rollingPeriod;
}
}
/**
* Update the statistics after removing the given data value
* @param data
*/
private void removeData(long data) {
currentSum = currentSum - (double)data;
currentSqrSum = currentSqrSum - ((double)data * data);
numberOfDataValues--;
}
/**
* @return mean of the data values that are in the current list of data values
*/
public double getMean() {
return this.currentSum / (double)numberOfDataValues;
}
/**
* @return deviation of the data values that are in the current list of data values
*/
public double getDeviation() {
double variance = (currentSqrSum - (currentSum*currentSum)/(double)(numberOfDataValues))/
numberOfDataValues;
return Math.sqrt(variance);
}
/**
* @param size
* @return an array of given size initialized with zeros
*/
private long [] fillWithZeros(int size) {
long [] zeros = new long [size];
for (int i=0; i<size; i++) {
zeros[i] = 0L;
}
return zeros;
}
}

View File

@ -61,7 +61,7 @@ public class TestHeapMemoryManager {
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0),
new MemstoreFlusherStub(0), new RegionServerStub(conf));
new MemstoreFlusherStub(0), new RegionServerStub(conf), new RegionServerAccountingStub());
assertFalse(manager.isTunerOn());
}
@ -71,7 +71,7 @@ public class TestHeapMemoryManager {
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.05f);
HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0),
new MemstoreFlusherStub(0), new RegionServerStub(conf));
new MemstoreFlusherStub(0), new RegionServerStub(conf), new RegionServerAccountingStub());
assertFalse(manager.isTunerOn());
}
@ -83,7 +83,8 @@ public class TestHeapMemoryManager {
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.06f);
try {
new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf));
new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf), new RegionServerAccountingStub());
fail();
} catch (RuntimeException e) {
}
@ -91,25 +92,31 @@ public class TestHeapMemoryManager {
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.2f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
try {
new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf));
new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf), new RegionServerAccountingStub());
fail();
} catch (RuntimeException e) {
}
}
@Test
public void testWhenClusterIsWriteHeavy() throws Exception {
public void testWhenClusterIsWriteHeavyWithEmptyMemstore() throws Exception {
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
// Empty block cache and memstore
blockCache.setTestBlockSize(0);
regionServerAccounting.setTestMemstoreSize(0);
Configuration conf = HBaseConfiguration.create();
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
// Let the system start with default values for memstore heap and block cache size.
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
new RegionServerStub(conf), regionServerAccounting);
long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
@ -121,36 +128,29 @@ public class TestHeapMemoryManager {
memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
memStoreFlusher.requestFlush(null, false);
Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize,
memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldBlockCacheSize,
blockCache.maxSize);
oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
oldBlockCacheSize = blockCache.maxSize;
// Do some more flushes before the next run of HeapMemoryTuner
memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
Thread.sleep(1500);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize,
memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldBlockCacheSize,
blockCache.maxSize);
// No changes should be made by tuner as we already have lot of empty space
assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
assertEquals(oldBlockCacheSize, blockCache.maxSize);
}
@Test
public void testWhenClusterIsReadHeavy() throws Exception {
public void testWhenClusterIsReadHeavyWithEmptyBlockCache() throws Exception {
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
// Empty block cache and memstore
blockCache.setTestBlockSize(0);
regionServerAccounting.setTestMemstoreSize(0);
Configuration conf = HBaseConfiguration.create();
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
// Let the system start with default values for memstore heap and block cache size.
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
new RegionServerStub(conf), regionServerAccounting);
long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
@ -159,18 +159,140 @@ public class TestHeapMemoryManager {
blockCache.evictBlock(null);
blockCache.evictBlock(null);
Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldMemstoreHeapSize,
// No changes should be made by tuner as we already have lot of empty space
assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
assertEquals(oldBlockCacheSize, blockCache.maxSize);
}
@Test
public void testWhenClusterIsWriteHeavy() throws Exception {
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
// Empty block cache and but nearly filled memstore
blockCache.setTestBlockSize(0);
regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8));
Configuration conf = HBaseConfiguration.create();
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
// Let the system start with default values for memstore heap and block cache size.
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf), regionServerAccounting);
long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
memStoreFlusher.requestFlush(null, false);
Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldBlockCacheSize,
assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE), oldBlockCacheSize,
blockCache.maxSize);
oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
oldBlockCacheSize = blockCache.maxSize;
// Do some more flushes before the next run of HeapMemoryTuner
memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
Thread.sleep(1500);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE), oldBlockCacheSize,
blockCache.maxSize);
}
@Test
public void testWhenClusterIsReadHeavy() throws Exception {
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
// Empty memstore and but nearly filled block cache
blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8));
regionServerAccounting.setTestMemstoreSize(0);
Configuration conf = HBaseConfiguration.create();
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
// Let the system start with default values for memstore heap and block cache size.
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf), new RegionServerAccountingStub());
long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
blockCache.evictBlock(null);
blockCache.evictBlock(null);
blockCache.evictBlock(null);
Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE), oldMemstoreHeapSize,
memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldBlockCacheSize,
blockCache.maxSize);
oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
oldBlockCacheSize = blockCache.maxSize;
// Do some more evictions before the next run of HeapMemoryTuner
blockCache.evictBlock(null);
Thread.sleep(1500);
assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldMemstoreHeapSize,
assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE), oldMemstoreHeapSize,
memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldBlockCacheSize,
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldBlockCacheSize,
blockCache.maxSize);
}
@Test
public void testWhenClusterIsHavingMoreWritesThanReads() throws Exception {
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
// Both memstore and block cache are nearly filled
blockCache.setTestBlockSize(0);
regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8));
blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8));
Configuration conf = HBaseConfiguration.create();
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
// Let the system start with default values for memstore heap and block cache size.
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf), regionServerAccounting);
long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
blockCache.evictBlock(null);
memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
memStoreFlusher.requestFlush(null, false);
Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
// No changes should happen as there is undefined increase in flushes and evictions
assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
assertEquals(oldBlockCacheSize, blockCache.maxSize);
// Do some more flushes before the next run of HeapMemoryTuner
memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
Thread.sleep(1500);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE), oldBlockCacheSize,
blockCache.maxSize);
}
@ -184,11 +306,12 @@ public class TestHeapMemoryManager {
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.02f);
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class,
HeapMemoryTuner.class);
// Let the system start with default values for memstore heap and block cache size.
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
new RegionServerStub(conf), new RegionServerAccountingStub());
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
// Now we wants to be in write mode. Set bigger memstore size from CustomHeapMemoryTuner
@ -215,10 +338,11 @@ public class TestHeapMemoryManager {
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.1f);
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class,
HeapMemoryTuner.class);
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
new RegionServerStub(conf), new RegionServerAccountingStub());
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
CustomHeapMemoryTuner.memstoreSize = 0.78f;
@ -240,10 +364,11 @@ public class TestHeapMemoryManager {
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.1f);
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class,
HeapMemoryTuner.class);
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
new RegionServerStub(conf), new RegionServerAccountingStub());
long oldMemstoreSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
@ -265,7 +390,7 @@ public class TestHeapMemoryManager {
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.1f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.1f);
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
conf.setFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, 0.4F);
conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.3F);
conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0.1F);
@ -276,8 +401,8 @@ public class TestHeapMemoryManager {
HeapMemoryTuner.class);
try {
heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(
conf));
heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf), new RegionServerAccountingStub());
fail("Should have failed as the collective heap memory need is above 80%");
} catch (Exception e) {
}
@ -285,8 +410,8 @@ public class TestHeapMemoryManager {
// Change the max/min ranges for memstore and bock cache so as to pass the criteria check
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.6f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.6f);
heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(
conf));
heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf), new RegionServerAccountingStub());
long oldMemstoreSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
@ -310,19 +435,24 @@ public class TestHeapMemoryManager {
assertEquals(expected, currentHeapSpace);
}
private void assertHeapSpaceDelta(float expectedDeltaPercent, long oldHeapSpace, long newHeapSpace) {
long expctedMinDelta = (long) (this.maxHeapSize * expectedDeltaPercent);
private void assertHeapSpaceDelta(double expectedDeltaPercent, long oldHeapSpace, long newHeapSpace) {
double expctedMinDelta = (double) (this.maxHeapSize * expectedDeltaPercent);
// Tolerable error
double error = 0.999;
if (expectedDeltaPercent > 0) {
assertTrue(expctedMinDelta <= (newHeapSpace - oldHeapSpace));
assertTrue(expctedMinDelta*error <= (double)(newHeapSpace - oldHeapSpace));
assertTrue(expctedMinDelta/error >= (double)(newHeapSpace - oldHeapSpace));
} else {
assertTrue(expctedMinDelta <= (oldHeapSpace - newHeapSpace));
assertTrue(-expctedMinDelta*error <= (double)(oldHeapSpace - newHeapSpace));
assertTrue(-expctedMinDelta/error >= (double)(oldHeapSpace - newHeapSpace));
}
}
private static class BlockCacheStub implements ResizableBlockCache {
CacheStats stats = new CacheStats("test");
long maxSize = 0;
private long testBlockSize = 0;
public BlockCacheStub(long size){
this.maxSize = size;
}
@ -378,7 +508,7 @@ public class TestHeapMemoryManager {
@Override
public long getCurrentSize() {
return 0;
return this.testBlockSize;
}
@Override
@ -400,6 +530,10 @@ public class TestHeapMemoryManager {
public BlockCache[] getBlockCaches() {
return null;
}
public void setTestBlockSize(long testBlockSize) {
this.testBlockSize = testBlockSize;
}
}
private static class MemstoreFlusherStub implements FlushRequester {
@ -526,4 +660,15 @@ public class TestHeapMemoryManager {
return result;
}
}
private static class RegionServerAccountingStub extends RegionServerAccounting {
private long testMemstoreSize = 0;
@Override
public long getGlobalMemstoreSize() {
return testMemstoreSize;
}
public void setTestMemstoreSize(long testMemstoreSize) {
this.testMemstoreSize = testMemstoreSize;
}
}
}