diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java
index 5e97b803052..93a95b00835 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java
@@ -24,63 +24,234 @@ import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY;
import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult;
+import org.apache.hadoop.hbase.util.RollingStatCalculator;
- * The default implementation for the HeapMemoryTuner. This will do simple checks to decide
- * whether there should be changes in the heap size of memstore/block cache. When there is no block
- * cache eviction at all but there are flushes because of global heap pressure, it will increase the
- * memstore heap size and decrease block cache size. The step value for this heap size change can be
- * specified using the config hbase.regionserver.heapmemory.autotuner.step. When there is no
- * memstore flushes because of heap pressure but there is block cache evictions it will increase the
- * block cache heap.
+ * The default implementation for the HeapMemoryTuner. This will do statistical checks on
+ * number of evictions, cache misses and flushes to decide whether there should be changes
+ * in the heap size of memstore/block cache. During each tuner operation tuner takes a step
+ * which can either be INCREASE_BLOCK_CACHE_SIZE (increase block cache size),
+ * INCREASE_MEMSTORE_SIZE (increase memstore size) and by default it is NEUTRAL (no change).
+ * We say block cache is sufficient when there is no block cache eviction at all or major amount of
+ * memory allocated to block cache is empty, similarly we say memory allocated for memstore is
+ * sufficient when there is no memstore flushes because of heap pressure or major amount of
+ * memory allocated to memstore is empty. If both are sufficient we do nothing, if exactly one of
+ * them is found to be sufficient we decrease its size by step and increase the other by
+ * same amount. If none of them is sufficient we do statistical analysis on number of cache misses
+ * and flushes to determine tuner direction. Based on these statistics we decide the tuner
+ * direction. If we are not confident about which step direction to take we do nothing and wait for
+ * next iteration. On expectation we will be tuning for at least 22% tuner calls. The number of
+ * past periods to consider for statistics calculation can be specified in config by
+ * hbase.regionserver.heapmemory.autotuner.lookup.periods. Also these many initial calls to
+ * tuner will be ignored (cache is warming up and we leave the system to reach steady state).
+ * After the tuner takes a step, in next call we insure that last call was indeed helpful and did
+ * not do us any harm. If not then we revert the previous step. The step size is dynamic and it
+ * changes based on current and previous tuning direction. When last tuner step was NEUTRAL
+ * and current tuning step is not NEUTRAL then we assume we are restarting the tuning process and
+ * step size is changed to maximum allowed size which can be specified in config by
+ * hbase.regionserver.heapmemory.autotuner.step.max. If we are reverting the previous step
+ * then we decrease step size to half. This decrease is similar to binary search where we try to
+ * reach the most desired value. The minimum step size can be specified in config by
+ * hbase.regionserver.heapmemory.autotuner.step.max. In other cases we leave step size
+ * unchanged.
class DefaultHeapMemoryTuner implements HeapMemoryTuner {
- public static final String STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step";
- public static final float DEFAULT_STEP_VALUE = 0.02f; // 2%
- private static final TunerResult TUNER_RESULT = new TunerResult(true);
+ public static final String MAX_STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step.max";
+ public static final String MIN_STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step.min";
+ public static final String SUFFICIENT_MEMORY_LEVEL_KEY =
+ "hbase.regionserver.heapmemory.autotuner.sufficient.memory.level";
+ public static final String LOOKUP_PERIODS_KEY =
+ "hbase.regionserver.heapmemory.autotuner.lookup.periods";
+ public static final String NUM_PERIODS_TO_IGNORE =
+ "hbase.regionserver.heapmemory.autotuner.ignored.periods";
+ // Maximum step size that the tuner can take
+ public static final float DEFAULT_MAX_STEP_VALUE = 0.08f; // 8%
+ // Minimum step size that the tuner can take
+ public static final float DEFAULT_MIN_STEP_VALUE = 0.005f; // 0.5%
+ // If current block cache size or memstore size in use is below this level relative to memory
+ // provided to it then corresponding component will be considered to have sufficient memory
+ public static final float DEFAULT_SUFFICIENT_MEMORY_LEVEL_VALUE = 0.5f; // 50%
+ // Number of tuner periods that will be considered while calculating mean and deviation
+ // If set to zero, all stats will be calculated from the start
+ public static final int DEFAULT_LOOKUP_PERIODS = 60;
+ public static final int DEFAULT_NUM_PERIODS_IGNORED = 60;
private static final TunerResult NO_OP_TUNER_RESULT = new TunerResult(false);
+ private Log LOG = LogFactory.getLog(DefaultHeapMemoryTuner.class);
+ private TunerResult TUNER_RESULT = new TunerResult(true);
private Configuration conf;
- private float step = DEFAULT_STEP_VALUE;
+ private float sufficientMemoryLevel = DEFAULT_SUFFICIENT_MEMORY_LEVEL_VALUE;
+ private float maximumStepSize = DEFAULT_MAX_STEP_VALUE;
+ private float minimumStepSize = DEFAULT_MIN_STEP_VALUE;
+ private int tunerLookupPeriods = DEFAULT_LOOKUP_PERIODS;
+ private int numPeriodsToIgnore = DEFAULT_NUM_PERIODS_IGNORED;
+ // Counter to ignore few initial periods while cache is still warming up
+ // Memory tuner will do no operation for the first "tunerLookupPeriods"
+ private int ignoreInitialPeriods = 0;
private float globalMemStorePercentMinRange;
private float globalMemStorePercentMaxRange;
private float blockCachePercentMinRange;
private float blockCachePercentMaxRange;
+ // Store statistics about the corresponding parameters for memory tuning
+ private RollingStatCalculator rollingStatsForCacheMisses;
+ private RollingStatCalculator rollingStatsForFlushes;
+ private RollingStatCalculator rollingStatsForEvictions;
+ // Set step size to max value for tuning, this step size will adjust dynamically while tuning
+ private float step = DEFAULT_MAX_STEP_VALUE;
+ private StepDirection prevTuneDirection = StepDirection.NEUTRAL;
public TunerResult tune(TunerContext context) {
long blockedFlushCount = context.getBlockedFlushCount();
long unblockedFlushCount = context.getUnblockedFlushCount();
long evictCount = context.getEvictCount();
- boolean memstoreSufficient = blockedFlushCount == 0 && unblockedFlushCount == 0;
- boolean blockCacheSufficient = evictCount == 0;
- if (memstoreSufficient && blockCacheSufficient) {
+ long cacheMissCount = context.getCacheMissCount();
+ long totalFlushCount = blockedFlushCount+unblockedFlushCount;
+ rollingStatsForCacheMisses.insertDataValue(cacheMissCount);
+ rollingStatsForFlushes.insertDataValue(totalFlushCount);
+ rollingStatsForEvictions.insertDataValue(evictCount);
+ StepDirection newTuneDirection = StepDirection.NEUTRAL;
+ if (ignoreInitialPeriods < numPeriodsToIgnore) {
+ // Ignoring the first few tuner periods
+ ignoreInitialPeriods++;
+ String tunerLog = "";
+ // We can consider memstore or block cache to be sufficient if
+ // we are using only a minor fraction of what have been already provided to it.
+ boolean earlyMemstoreSufficientCheck = totalFlushCount == 0
+ || context.getCurMemStoreUsed() < context.getCurMemStoreSize()*sufficientMemoryLevel;
+ boolean earlyBlockCacheSufficientCheck = evictCount == 0 ||
+ context.getCurBlockCacheUsed() < context.getCurBlockCacheSize()*sufficientMemoryLevel;
float newMemstoreSize;
float newBlockCacheSize;
- if (memstoreSufficient) {
- // Increase the block cache size and corresponding decrease in memstore size
- newBlockCacheSize = context.getCurBlockCacheSize() + step;
- newMemstoreSize = context.getCurMemStoreSize() - step;
- } else if (blockCacheSufficient) {
- // Increase the memstore size and corresponding decrease in block cache size
- newBlockCacheSize = context.getCurBlockCacheSize() - step;
- newMemstoreSize = context.getCurMemStoreSize() + step;
+ if (earlyMemstoreSufficientCheck && earlyBlockCacheSufficientCheck) {
+ // Both memstore and block cache memory seems to be sufficient. No operation required.
+ newTuneDirection = StepDirection.NEUTRAL;
+ } else if (earlyMemstoreSufficientCheck) {
+ // Increase the block cache size and corresponding decrease in memstore size.
+ newTuneDirection = StepDirection.INCREASE_BLOCK_CACHE_SIZE;
+ } else if (earlyBlockCacheSufficientCheck) {
+ // Increase the memstore size and corresponding decrease in block cache size.
+ newTuneDirection = StepDirection.INCREASE_MEMSTORE_SIZE;
} else {
- // As of now not making any tuning in write/read heavy scenario.
+ // Early checks for sufficient memory failed. Tuning memory based on past statistics.
+ // Boolean indicator to show if we need to revert previous step or not.
+ boolean isReverting = false;
+ switch (prevTuneDirection) {
+ // Here we are using number of evictions rather than cache misses because it is more
+ // strong indicator for deficient cache size. Improving caching is what we
+ // would like to optimize for in steady state.
+ if ((double)evictCount > rollingStatsForEvictions.getMean() ||
+ (double)totalFlushCount > rollingStatsForFlushes.getMean() +
+ rollingStatsForFlushes.getDeviation()/2.00) {
+ // Reverting previous step as it was not useful.
+ // Tuning failed to decrease evictions or tuning resulted in large number of flushes.
+ newTuneDirection = StepDirection.INCREASE_MEMSTORE_SIZE;
+ tunerLog += "Reverting previous tuning.";
+ if ((double)evictCount > rollingStatsForEvictions.getMean()) {
+ tunerLog += " As could not decrease evctions sufficiently.";
+ } else {
+ tunerLog += " As number of flushes rose significantly.";
+ }
+ isReverting = true;
+ }
+ break;
+ if ((double)totalFlushCount > rollingStatsForFlushes.getMean() ||
+ (double)evictCount > rollingStatsForEvictions.getMean() +
+ rollingStatsForEvictions.getDeviation()/2.00) {
+ // Reverting previous step as it was not useful.
+ // Tuning failed to decrease flushes or tuning resulted in large number of evictions.
+ newTuneDirection = StepDirection.INCREASE_BLOCK_CACHE_SIZE;
+ tunerLog += "Reverting previous tuning.";
+ if ((double)totalFlushCount > rollingStatsForFlushes.getMean()) {
+ tunerLog += " As could not decrease flushes sufficiently.";
+ } else {
+ tunerLog += " As number of evictions rose significantly.";
+ }
+ isReverting = true;
+ }
+ break;
+ default:
+ // Last step was neutral, revert doesn't not apply here.
+ break;
+ }
+ // If we are not reverting. We try to tune memory sizes by looking at cache misses / flushes.
+ if (!isReverting){
+ // mean +- deviation/2 is considered to be normal
+ // below it its consider low and above it is considered high.
+ // We can safely assume that the number cache misses, flushes are normally distributed over
+ // past periods and hence on all the above mentioned classes (normal, high and low)
+ // are equally likely with 33% probability each. Hence there is very good probability that
+ // we will not always fall in default step.
+ if ((double)cacheMissCount < rollingStatsForCacheMisses.getMean() -
+ rollingStatsForCacheMisses.getDeviation()/2.00 &&
+ (double)totalFlushCount < rollingStatsForFlushes.getMean() -
+ rollingStatsForFlushes.getDeviation()/2.00) {
+ // Everything is fine no tuning required
+ newTuneDirection = StepDirection.NEUTRAL;
+ } else if ((double)cacheMissCount > rollingStatsForCacheMisses.getMean() +
+ rollingStatsForCacheMisses.getDeviation()/2.00 &&
+ (double)totalFlushCount < rollingStatsForFlushes.getMean() -
+ rollingStatsForFlushes.getDeviation()/2.00) {
+ // more misses , increasing cache size
+ newTuneDirection = StepDirection.INCREASE_BLOCK_CACHE_SIZE;
+ tunerLog +=
+ "Increasing block cache size as observed increase in number of cache misses.";
+ } else if ((double)cacheMissCount < rollingStatsForCacheMisses.getMean() -
+ rollingStatsForCacheMisses.getDeviation()/2.00 &&
+ (double)totalFlushCount > rollingStatsForFlushes.getMean() +
+ rollingStatsForFlushes.getDeviation()/2.00) {
+ // more flushes , increasing memstore size
+ newTuneDirection = StepDirection.INCREASE_MEMSTORE_SIZE;
+ tunerLog += "Increasing memstore size as observed increase in number of flushes.";
+ } else {
+ // Default. Not enough facts to do tuning.
+ newTuneDirection = StepDirection.NEUTRAL;
+ }
+ }
+ // Adjusting step size for tuning to get to steady state.
+ // Even if the step size was 4% and 32 GB memory size, we will be shifting 1 GB back and forth
+ // per tuner operation and it can affect the performance of cluster
+ if (prevTuneDirection == StepDirection.NEUTRAL && newTuneDirection != StepDirection.NEUTRAL) {
+ // Restarting the tuning from steady state.
+ step = maximumStepSize;
+ } else if (prevTuneDirection != newTuneDirection) {
+ // Decrease the step size to reach the steady state. Similar procedure as binary search.
+ step = step/2.00f;
+ if (step < minimumStepSize) {
+ // Ensure step size does not gets too small.
+ step = minimumStepSize;
+ }
+ }
+ // Increase / decrease the memstore / block cahce sizes depending on new tuner step.
+ switch (newTuneDirection) {
+ newBlockCacheSize = context.getCurBlockCacheSize() + step;
+ newMemstoreSize = context.getCurMemStoreSize() - step;
+ break;
+ newBlockCacheSize = context.getCurBlockCacheSize() - step;
+ newMemstoreSize = context.getCurMemStoreSize() + step;
+ break;
+ default:
+ prevTuneDirection = StepDirection.NEUTRAL;
+ }
+ // Check we are within max/min bounds.
if (newMemstoreSize > globalMemStorePercentMaxRange) {
newMemstoreSize = globalMemStorePercentMaxRange;
} else if (newMemstoreSize < globalMemStorePercentMinRange) {
@@ -93,6 +264,10 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(tunerLog);
+ }
+ prevTuneDirection = newTuneDirection;
@@ -104,7 +279,12 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
public void setConf(Configuration conf) {
this.conf = conf;
- this.step = conf.getFloat(STEP_KEY, DEFAULT_STEP_VALUE);
+ this.maximumStepSize = conf.getFloat(MAX_STEP_KEY, DEFAULT_MAX_STEP_VALUE);
+ this.minimumStepSize = conf.getFloat(MIN_STEP_KEY, DEFAULT_MIN_STEP_VALUE);
+ this.step = this.maximumStepSize;
+ this.sufficientMemoryLevel = conf.getFloat(SUFFICIENT_MEMORY_LEVEL_KEY,
+ this.tunerLookupPeriods = conf.getInt(LOOKUP_PERIODS_KEY, DEFAULT_LOOKUP_PERIODS);
this.blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY,
this.blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY,
@@ -113,5 +293,19 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false));
this.globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY,
HeapMemorySizeUtil.getGlobalMemStorePercent(conf, false));
+ // Default value of periods to ignore is number of lookup periods
+ this.numPeriodsToIgnore = conf.getInt(NUM_PERIODS_TO_IGNORE, this.tunerLookupPeriods);
+ this.rollingStatsForCacheMisses = new RollingStatCalculator(this.tunerLookupPeriods);
+ this.rollingStatsForFlushes = new RollingStatCalculator(this.tunerLookupPeriods);
+ this.rollingStatsForEvictions = new RollingStatCalculator(this.tunerLookupPeriods);
+ }
+ private enum StepDirection{
+ // block cache size was increased
+ // memstore size was increased
+ // no operation was performed
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index a08877debde..ae739b380a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1380,7 +1380,8 @@ public class HRegionServer extends HasThread implements
private void startHeapMemoryManager() {
- this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this);
+ this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher,
+ this, this.regionServerAccounting);
if (this.hMemManager != null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
index 3deb2580c1c..8f001a16621 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
@@ -76,6 +76,7 @@ public class HeapMemoryManager {
private final ResizableBlockCache blockCache;
private final FlushRequester memStoreFlusher;
private final Server server;
+ private final RegionServerAccounting regionServerAccounting;
private HeapMemoryTunerChore heapMemTunerChore = null;
private final boolean tunerOn;
@@ -85,21 +86,23 @@ public class HeapMemoryManager {
private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher,
- Server server) {
+ Server server, RegionServerAccounting regionServerAccounting) {
BlockCache blockCache = CacheConfig.instantiateBlockCache(conf);
if (blockCache instanceof ResizableBlockCache) {
- return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server);
+ return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server,
+ regionServerAccounting);
return null;
HeapMemoryManager(ResizableBlockCache blockCache, FlushRequester memStoreFlusher,
- Server server) {
+ Server server, RegionServerAccounting regionServerAccounting) {
Configuration conf = server.getConfiguration();
this.blockCache = blockCache;
this.memStoreFlusher = memStoreFlusher;
this.server = server;
+ this.regionServerAccounting = regionServerAccounting;
this.tunerOn = doInit(conf);
this.defaultChorePeriod = conf.getInt(HBASE_RS_HEAP_MEMORY_TUNER_PERIOD,
@@ -217,6 +220,7 @@ public class HeapMemoryManager {
private AtomicLong blockedFlushCount = new AtomicLong();
private AtomicLong unblockedFlushCount = new AtomicLong();
private long evictCount = 0L;
+ private long cacheMissCount = 0L;
private TunerContext tunerContext = new TunerContext();
private boolean alarming = false;
@@ -264,11 +268,21 @@ public class HeapMemoryManager {
private void tune() {
- long curEvictCount = blockCache.getStats().getEvictedCount();
+ // TODO check if we can increase the memory boundaries
+ // while remaining in the limits
+ long curEvictCount;
+ long curCacheMisCount;
+ curEvictCount = blockCache.getStats().getEvictedCount();
tunerContext.setEvictCount(curEvictCount - evictCount);
evictCount = curEvictCount;
+ curCacheMisCount = blockCache.getStats().getMissCachingCount();
+ tunerContext.setCacheMissCount(curCacheMisCount-cacheMissCount);
+ cacheMissCount = curCacheMisCount;
+ tunerContext.setCurBlockCacheUsed((float)blockCache.getCurrentSize() / maxHeapSize);
+ tunerContext.setCurMemStoreUsed(
+ (float)regionServerAccounting.getGlobalMemstoreSize() / maxHeapSize);
TunerResult result = null;
@@ -321,6 +335,8 @@ public class HeapMemoryManager {
globalMemStorePercent = memstoreSize;
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("No changes made by HeapMemoryTuner.");
@@ -349,6 +365,9 @@ public class HeapMemoryManager {
private long blockedFlushCount;
private long unblockedFlushCount;
private long evictCount;
+ private long cacheMissCount;
+ private float curBlockCacheUsed;
+ private float curMemStoreUsed;
private float curMemStoreSize;
private float curBlockCacheSize;
@@ -391,6 +410,30 @@ public class HeapMemoryManager {
public void setCurBlockCacheSize(float curBlockCacheSize) {
this.curBlockCacheSize = curBlockCacheSize;
+ public long getCacheMissCount() {
+ return cacheMissCount;
+ }
+ public void setCacheMissCount(long cacheMissCount) {
+ this.cacheMissCount = cacheMissCount;
+ }
+ public float getCurBlockCacheUsed() {
+ return curBlockCacheUsed;
+ }
+ public void setCurBlockCacheUsed(float curBlockCacheUsed) {
+ this.curBlockCacheUsed = curBlockCacheUsed;
+ }
+ public float getCurMemStoreUsed() {
+ return curMemStoreUsed;
+ }
+ public void setCurMemStoreUsed(float d) {
+ this.curMemStoreUsed = d;
+ }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RollingStatCalculator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RollingStatCalculator.java
new file mode 100644
index 00000000000..554d6f51a71
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RollingStatCalculator.java
@@ -0,0 +1,113 @@
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+ * This class maintains mean and variation for any sequence of input provided to it.
+ * It is initialized with number of rolling periods which basically means the number of past
+ * inputs whose data will be considered to maintain mean and variation.
+ * It will use O(N) memory to maintain these statistics, where N is number of look up periods it
+ * was initialized with.
+ * If zero is passed during initialization then it will maintain mean and variance from the
+ * start. It will use O(1) memory only. But note that since it will maintain mean / variance
+ * from the start the statistics may behave like constants and may ignore short trends.
+ * All operations are O(1) except the initialization which is O(N).
+ */
+public class RollingStatCalculator {
+ private double currentSum;
+ private double currentSqrSum;
+ // Total number of data values whose statistic is currently present
+ private long numberOfDataValues;
+ private int rollingPeriod;
+ private int currentIndexPosition;
+ // to be used only if we have non-zero rolling period
+ private long [] dataValues;
+ /**
+ * Creates a RollingStatCalculator with given number of rolling periods.
+ * @param rollingPeriod
+ */
+ public RollingStatCalculator(int rollingPeriod) {
+ this.rollingPeriod = rollingPeriod;
+ this.dataValues = fillWithZeros(rollingPeriod);
+ this.currentSum = 0.0;
+ this.currentSqrSum = 0.0;
+ this.currentIndexPosition = 0;
+ this.numberOfDataValues = 0;
+ }
+ /**
+ * Inserts given data value to array of data values to be considered for statistics calculation
+ * @param data
+ */
+ public void insertDataValue(long data) {
+ // if current number of data points already equals rolling period and rolling period is
+ // non-zero then remove one data and update the statistics
+ if(numberOfDataValues >= rollingPeriod && rollingPeriod > 0) {
+ this.removeData(dataValues[currentIndexPosition]);
+ }
+ numberOfDataValues++;
+ currentSum = currentSum + (double)data;
+ currentSqrSum = currentSqrSum + ((double)data * data);
+ if (rollingPeriod >0)
+ {
+ dataValues[currentIndexPosition] = data;
+ currentIndexPosition = (currentIndexPosition + 1) % rollingPeriod;
+ }
+ }
+ /**
+ * Update the statistics after removing the given data value
+ * @param data
+ */
+ private void removeData(long data) {
+ currentSum = currentSum - (double)data;
+ currentSqrSum = currentSqrSum - ((double)data * data);
+ numberOfDataValues--;
+ }
+ /**
+ * @return mean of the data values that are in the current list of data values
+ */
+ public double getMean() {
+ return this.currentSum / (double)numberOfDataValues;
+ }
+ /**
+ * @return deviation of the data values that are in the current list of data values
+ */
+ public double getDeviation() {
+ double variance = (currentSqrSum - (currentSum*currentSum)/(double)(numberOfDataValues))/
+ numberOfDataValues;
+ return Math.sqrt(variance);
+ }
+ /**
+ * @param size
+ * @return an array of given size initialized with zeros
+ */
+ private long [] fillWithZeros(int size) {
+ long [] zeros = new long [size];
+ for (int i=0; i 0) {
- assertTrue(expctedMinDelta <= (newHeapSpace - oldHeapSpace));
+ assertTrue(expctedMinDelta*error <= (double)(newHeapSpace - oldHeapSpace));
+ assertTrue(expctedMinDelta/error >= (double)(newHeapSpace - oldHeapSpace));
} else {
- assertTrue(expctedMinDelta <= (oldHeapSpace - newHeapSpace));
+ assertTrue(-expctedMinDelta*error <= (double)(oldHeapSpace - newHeapSpace));
+ assertTrue(-expctedMinDelta/error >= (double)(oldHeapSpace - newHeapSpace));
private static class BlockCacheStub implements ResizableBlockCache {
CacheStats stats = new CacheStats("test");
long maxSize = 0;
+ private long testBlockSize = 0;
public BlockCacheStub(long size){
this.maxSize = size;
@@ -378,7 +508,7 @@ public class TestHeapMemoryManager {
public long getCurrentSize() {
- return 0;
+ return this.testBlockSize;
@@ -400,6 +530,10 @@ public class TestHeapMemoryManager {
public BlockCache[] getBlockCaches() {
return null;
+ public void setTestBlockSize(long testBlockSize) {
+ this.testBlockSize = testBlockSize;
+ }
private static class MemstoreFlusherStub implements FlushRequester {
@@ -526,4 +660,15 @@ public class TestHeapMemoryManager {
return result;
+ private static class RegionServerAccountingStub extends RegionServerAccounting {
+ private long testMemstoreSize = 0;
+ @Override
+ public long getGlobalMemstoreSize() {
+ return testMemstoreSize;
+ }
+ public void setTestMemstoreSize(long testMemstoreSize) {
+ this.testMemstoreSize = testMemstoreSize;
+ }
+ }