diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java index e7520a2e958..d3da9b43f2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/util/MemorySizeUtil.java @@ -157,9 +157,19 @@ public class MemorySizeUtil { + " Going with on heap global memstore size ('" + MEMSTORE_SIZE_KEY + "')"); } } + return new Pair(getOnheapGlobalMemstoreSize(conf), MemoryType.HEAP); + } + + /** + * Returns the onheap global memstore limit based on the config + * 'hbase.regionserver.global.memstore.size'. + * @param conf + * @return the onheap global memstore limt + */ + public static long getOnheapGlobalMemstoreSize(Configuration conf) { long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); float globalMemStorePercent = getGlobalMemStoreHeapPercent(conf, true); - return new Pair((long) (max * globalMemStorePercent), MemoryType.HEAP); + return ((long) (max * globalMemStorePercent)); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java index 1c7dfe2524b..29c1ee04755 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java @@ -139,6 +139,10 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner { } StepDirection newTuneDirection = getTuneDirection(context); + long blockedFlushCount = context.getBlockedFlushCount(); + long unblockedFlushCount = context.getUnblockedFlushCount(); + long totalOnheapFlushCount = blockedFlushCount + unblockedFlushCount; + boolean offheapMemstore = context.isOffheapMemstore(); float newMemstoreSize; float newBlockCacheSize; @@ -159,7 +163,10 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner { && decayingTunerStepSizeSum > 0)) { // Current step is opposite of past tuner actions so decrease the step size to reach steady // state. - step = step/2.00f; + if (!offheapMemstore && step != minimumStepSize) { + // we leave the step to be at minimumStepSize for offheap memstore + step = step / 2.00f; + } } if (step < minimumStepSize) { // If step size is too small then we do nothing. @@ -167,7 +174,17 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner { step = 0.0f; newTuneDirection = StepDirection.NEUTRAL; } - // Increase / decrease the memstore / block cahce sizes depending on new tuner step. + // There are no flushes due to onheap pressure and + // we have an offheap memstore and we are in need of more block_cache size. + if (totalOnheapFlushCount == 0 && offheapMemstore + && newTuneDirection == StepDirection.INCREASE_BLOCK_CACHE_SIZE) { + // we are sure that there are flushes only due to offheap pressure + // So don't do the memstore decrease equal to the step size. Instead do minimum stepSize + // decrease. But even if we have some flushes due to heap then it is better we tune + // the existing way. + step = minimumStepSize; + } + // Increase / decrease the memstore / block cache sizes depending on new tuner step. // We don't want to exert immediate pressure on memstore. So, we decrease its size gracefully; // we set a minimum bar in the middle of the total memstore size and the lower limit. float minMemstoreSize = ((globalMemStoreLimitLowMarkPercent + 1) * curMemstoreSize) / 2.00f; @@ -222,7 +239,7 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner { long unblockedFlushCount = context.getUnblockedFlushCount(); long evictCount = context.getEvictCount(); long cacheMissCount = context.getCacheMissCount(); - long totalFlushCount = blockedFlushCount+unblockedFlushCount; + long totalFlushCount = blockedFlushCount + unblockedFlushCount; float curMemstoreSize = context.getCurMemStoreSize(); float curBlockCacheSize = context.getCurBlockCacheSize(); StringBuilder tunerLog = new StringBuilder(); @@ -342,8 +359,8 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner { */ private void addToRollingStats(TunerContext context) { rollingStatsForCacheMisses.insertDataValue(context.getCacheMissCount()); - rollingStatsForFlushes.insertDataValue(context.getBlockedFlushCount() + - context.getUnblockedFlushCount()); + rollingStatsForFlushes + .insertDataValue(context.getBlockedFlushCount() + context.getUnblockedFlushCount()); rollingStatsForEvictions.insertDataValue(context.getEvictCount()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3e4a23e8fe1..37fca3ce8fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -48,6 +48,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.MalformedObjectNameException; @@ -574,7 +575,7 @@ public class HRegionServer extends HasThread implements // or process owner as default super user. Superusers.initialize(conf); - regionServerAccounting = new RegionServerAccounting(); + regionServerAccounting = new RegionServerAccounting(conf); cacheConfig = new CacheConfig(conf); mobCacheConfig = new MobCacheConfig(conf); uncaughtExceptionHandler = new UncaughtExceptionHandler() { @@ -1482,7 +1483,7 @@ public class HRegionServer extends HasThread implements // it. Pair pair = MemorySizeUtil.getGlobalMemstoreSize(conf); long globalMemStoreSize = pair.getFirst(); - boolean offheap = pair.getSecond() == MemoryType.NON_HEAP; + boolean offheap = this.regionServerAccounting.isOffheap(); // When off heap memstore in use, take full area for chunk pool. float poolSizePercentage = offheap ? 1.0F : conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); @@ -3588,8 +3589,7 @@ public class HRegionServer extends HasThread implements // return 0 during RS initialization return 0.0; } - return getRegionServerAccounting().getGlobalMemstoreSize() * 1.0 - / cacheFlusher.globalMemStoreLimitLowMark; + return getRegionServerAccounting().getFlushPressure(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java index ad3978ef73a..96c90524f29 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; import java.lang.management.ManagementFactory; +import java.lang.management.MemoryType; import java.lang.management.MemoryUsage; import java.util.ArrayList; import java.util.List; @@ -77,6 +78,7 @@ public class HeapMemoryManager { private float heapOccupancyPercent; private final ResizableBlockCache blockCache; + // TODO : remove this and mark regionServerAccounting as the observer directly private final FlushRequester memStoreFlusher; private final Server server; private final RegionServerAccounting regionServerAccounting; @@ -240,6 +242,8 @@ public class HeapMemoryManager { Class tunerKlass = server.getConfiguration().getClass( HBASE_RS_HEAP_MEMORY_TUNER_CLASS, DefaultHeapMemoryTuner.class, HeapMemoryTuner.class); heapMemTuner = ReflectionUtils.newInstance(tunerKlass, server.getConfiguration()); + tunerContext + .setOffheapMemstore(regionServerAccounting.isOffheap()); } @Override @@ -298,10 +302,10 @@ public class HeapMemoryManager { unblockedFlushCnt = unblockedFlushCount.getAndSet(0); tunerContext.setUnblockedFlushCount(unblockedFlushCnt); metricsHeapMemoryManager.updateUnblockedFlushCount(unblockedFlushCnt); + // TODO : add support for offheap metrics tunerContext.setCurBlockCacheUsed((float) blockCache.getCurrentSize() / maxHeapSize); metricsHeapMemoryManager.setCurBlockCacheSizeGauge(blockCache.getCurrentSize()); - long globalMemstoreHeapSize = regionServerAccounting.getGlobalMemstoreSize() - + regionServerAccounting.getGlobalMemstoreHeapOverhead(); + long globalMemstoreHeapSize = regionServerAccounting.getGlobalMemstoreSize(); tunerContext.setCurMemStoreUsed((float) globalMemstoreHeapSize / maxHeapSize); metricsHeapMemoryManager.setCurMemStoreSizeGauge(globalMemstoreHeapSize); tunerContext.setCurBlockCacheSize(blockCachePercent); @@ -354,14 +358,20 @@ public class HeapMemoryManager { metricsHeapMemoryManager.updateMemStoreDeltaSizeHistogram(memStoreDeltaSize); metricsHeapMemoryManager.updateBlockCacheDeltaSizeHistogram(blockCacheDeltaSize); long newBlockCacheSize = (long) (maxHeapSize * blockCacheSize); + // we could have got an increase or decrease in size for the offheap memstore + // also if the flush had happened due to heap overhead. In that case it is ok + // to adjust the onheap memstore limit configs long newMemstoreSize = (long) (maxHeapSize * memstoreSize); LOG.info("Setting block cache heap size to " + newBlockCacheSize + " and memstore heap size to " + newMemstoreSize); blockCachePercent = blockCacheSize; blockCache.setMaxSize(newBlockCacheSize); globalMemStorePercent = memstoreSize; + // Internally sets it to RegionServerAccounting + // TODO : Set directly on RSAccounting?? memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize); for (HeapMemoryTuneObserver observer : tuneObservers) { + // Risky.. If this newMemstoreSize decreases we reduce the count in offheap chunk pool observer.onHeapMemoryTune(newMemstoreSize, newBlockCacheSize); } } @@ -376,14 +386,16 @@ public class HeapMemoryManager { @Override public void flushRequested(FlushType type, Region region) { switch (type) { - case ABOVE_HIGHER_MARK: + case ABOVE_ONHEAP_HIGHER_MARK: blockedFlushCount.incrementAndGet(); break; - case ABOVE_LOWER_MARK: + case ABOVE_ONHEAP_LOWER_MARK: unblockedFlushCount.incrementAndGet(); break; + // Removed the counting of the offheap related flushes (after reviews). Will add later if + // needed default: - // In case of normal flush don't do any action. + // In case of any other flush don't do any action. break; } } @@ -403,6 +415,7 @@ public class HeapMemoryManager { private float curMemStoreUsed; private float curMemStoreSize; private float curBlockCacheSize; + private boolean offheapMemstore; public long getBlockedFlushCount() { return blockedFlushCount; @@ -467,6 +480,14 @@ public class HeapMemoryManager { public void setCurMemStoreUsed(float d) { this.curMemStoreUsed = d; } + + public void setOffheapMemstore(boolean offheapMemstore) { + this.offheapMemstore = offheapMemstore; + } + + public boolean isOffheapMemstore() { + return this.offheapMemstore; + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java index 926dd7a12cf..b7ac21259ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java @@ -234,6 +234,11 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { @Override public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { + // don't do any tuning in case of offheap memstore + if (this.offheap) { + LOG.warn("Not tuning the chunk pool as it is offheap"); + return; + } int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize); if (newMaxCount != this.maxCount) { // We need an adjustment in the chunks numbers diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index a123dd51637..2ee099c5e3b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -49,12 +49,10 @@ import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.RegionReplicaUtil; -import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.regionserver.Region.FlushResult; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; @@ -90,10 +88,6 @@ class MemStoreFlusher implements FlushRequester { private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Object blockSignal = new Object(); - protected long globalMemStoreLimit; - protected float globalMemStoreLimitLowMarkPercent; - protected long globalMemStoreLimitLowMark; - private long blockingWaitTime; private final LongAdder updatesBlockedMsHighWater = new LongAdder(); @@ -111,32 +105,18 @@ class MemStoreFlusher implements FlushRequester { this.server = server; this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); - Pair pair = MemorySizeUtil.getGlobalMemstoreSize(conf); - this.globalMemStoreLimit = pair.getFirst(); - boolean onheap = pair.getSecond() == MemoryType.HEAP; - // When off heap memstore in use we configure the global off heap space for memstore as bytes - // not as % of max memory size. In such case, the lower water mark should be specified using the - // key "hbase.regionserver.global.memstore.size.lower.limit" which says % of the global upper - // bound and defaults to 95%. In on heap case also specifying this way is ideal. But in the past - // we used to take lower bound also as the % of xmx (38% as default). For backward compatibility - // for this deprecated config,we will fall back to read that config when new one is missing. - // Only for on heap case, do this fallback mechanism. For off heap it makes no sense. - // TODO When to get rid of the deprecated config? ie - // "hbase.regionserver.global.memstore.lowerLimit". Can get rid of this boolean passing then. - this.globalMemStoreLimitLowMarkPercent = MemorySizeUtil.getGlobalMemStoreHeapLowerMark(conf, - onheap); - this.globalMemStoreLimitLowMark = - (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent); - this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); this.flushHandlers = new FlushHandler[handlerCount]; LOG.info("globalMemStoreLimit=" - + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1) + + TraditionalBinaryPrefix + .long2String(this.server.getRegionServerAccounting().getGlobalMemstoreLimit(), "", 1) + ", globalMemStoreLimitLowMark=" - + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1) - + ", Offheap=" + !onheap); + + TraditionalBinaryPrefix.long2String( + this.server.getRegionServerAccounting().getGlobalMemstoreLimitLowMark(), "", 1) + + ", Offheap=" + + (this.server.getRegionServerAccounting().isOffheap())); } public LongAdder getUpdatesBlockedMsHighWater() { @@ -210,7 +190,7 @@ class MemStoreFlusher implements FlushRequester { LOG.info("Refreshing storefiles of region " + bestRegionReplica + " due to global heap pressure. Total memstore size=" + StringUtils - .humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) + .humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreDataSize()) + " memstore heap overhead=" + StringUtils.humanReadableInt( server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead())); flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica); @@ -222,7 +202,7 @@ class MemStoreFlusher implements FlushRequester { } else { LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " + "Total Memstore size=" - + humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) + + humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreDataSize()) + ", Region memstore size=" + humanReadableInt(regionToFlush.getMemstoreSize())); flushedOne = flushRegion(regionToFlush, true, true); @@ -251,9 +231,15 @@ class MemStoreFlusher implements FlushRequester { wakeupPending.set(false); // allow someone to wake us up again fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); if (fqe == null || fqe instanceof WakeupFlushThread) { - if (isAboveLowWaterMark()) { + FlushType type = isAboveLowWaterMark(); + if (type != FlushType.NORMAL) { LOG.debug("Flush thread woke up because memory above low water=" - + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1)); + + TraditionalBinaryPrefix.long2String( + server.getRegionServerAccounting().getGlobalMemstoreLimitLowMark(), "", 1)); + // For offheap memstore, even if the lower water mark was breached due to heap overhead + // we still select the regions based on the region's memstore data size. + // TODO : If we want to decide based on heap over head it can be done without tracking + // it per region. if (!flushOneForGlobalPressure()) { // Wasn't able to flush any region, but we're above low water mark // This is unlikely to happen, but might happen when closing the @@ -355,17 +341,15 @@ class MemStoreFlusher implements FlushRequester { /** * Return true if global memory usage is above the high watermark */ - private boolean isAboveHighWaterMark() { - return server.getRegionServerAccounting().getGlobalMemstoreSize() - + server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead() >= globalMemStoreLimit; + private FlushType isAboveHighWaterMark() { + return server.getRegionServerAccounting().isAboveHighWaterMark(); } /** - * Return true if we're above the high watermark + * Return true if we're above the low watermark */ - private boolean isAboveLowWaterMark() { - return server.getRegionServerAccounting().getGlobalMemstoreSize() + server - .getRegionServerAccounting().getGlobalMemstoreHeapOverhead() >= globalMemStoreLimitLowMark; + private FlushType isAboveLowWaterMark() { + return server.getRegionServerAccounting().isAboveLowWaterMark(); } @Override @@ -548,9 +532,12 @@ class MemStoreFlusher implements FlushRequester { } private void notifyFlushRequest(Region region, boolean emergencyFlush) { - FlushType type = FlushType.NORMAL; + FlushType type = null; if (emergencyFlush) { - type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK; + type = isAboveHighWaterMark(); + if (type == null) { + type = isAboveLowWaterMark(); + } } for (FlushRequestListener listener : flushRequestListeners) { listener.flushRequested(type, region); @@ -586,7 +573,8 @@ class MemStoreFlusher implements FlushRequester { */ public void reclaimMemStoreMemory() { TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory"); - if (isAboveHighWaterMark()) { + FlushType flushType = isAboveHighWaterMark(); + if (flushType != FlushType.NORMAL) { if (Trace.isTracing()) { scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark."); } @@ -596,17 +584,24 @@ class MemStoreFlusher implements FlushRequester { long startTime = 0; boolean interrupted = false; try { - while (isAboveHighWaterMark() && !server.isStopped()) { + flushType = isAboveHighWaterMark(); + while (flushType != FlushType.NORMAL && !server.isStopped()) { if (!blocked) { startTime = EnvironmentEdgeManager.currentTime(); - LOG.info("Blocking updates on " + server.toString() + ": the global memstore size " - + TraditionalBinaryPrefix.long2String( - server.getRegionServerAccounting().getGlobalMemstoreSize(), "", 1) - + " + global memstore heap overhead " - + TraditionalBinaryPrefix.long2String( - server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead(), "", 1) - + " is >= than blocking " - + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size"); + if (!server.getRegionServerAccounting().isOffheap()) { + logMsg("the global memstore size", "global memstore heap overhead"); + } else { + switch (flushType) { + case ABOVE_OFFHEAP_HIGHER_MARK: + logMsg("the global offheap memstore size", "global memstore heap overhead"); + break; + case ABOVE_ONHEAP_HIGHER_MARK: + logMsg("global memstore heap overhead", ""); + break; + default: + break; + } + } } blocked = true; wakeupFlushThread(); @@ -620,6 +615,7 @@ class MemStoreFlusher implements FlushRequester { } long took = EnvironmentEdgeManager.currentTime() - start; LOG.warn("Memstore is above high water mark and block " + took + "ms"); + flushType = isAboveHighWaterMark(); } } finally { if (interrupted) { @@ -635,11 +631,24 @@ class MemStoreFlusher implements FlushRequester { LOG.info("Unblocking updates for server " + server.toString()); } } - } else if (isAboveLowWaterMark()) { + } else if (isAboveLowWaterMark() != FlushType.NORMAL) { wakeupFlushThread(); } scope.close(); } + + private void logMsg(String string1, String string2) { + LOG.info("Blocking updates on " + server.toString() + ": " + string1 + " " + + TraditionalBinaryPrefix + .long2String(server.getRegionServerAccounting().getGlobalMemstoreDataSize(), "", 1) + + " + " + string2 + " " + + TraditionalBinaryPrefix + .long2String(server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead(), "", 1) + + " is >= than blocking " + TraditionalBinaryPrefix.long2String( + server.getRegionServerAccounting().getGlobalMemstoreLimit(), "", 1) + + " size"); + } + @Override public String toString() { return "flush_queue=" @@ -685,16 +694,10 @@ class MemStoreFlusher implements FlushRequester { */ @Override public void setGlobalMemstoreLimit(long globalMemStoreSize) { - this.globalMemStoreLimit = globalMemStoreSize; - this.globalMemStoreLimitLowMark = - (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize); + this.server.getRegionServerAccounting().setGlobalMemstoreLimits(globalMemStoreSize); reclaimMemStoreMemory(); } - public long getMemoryLimit() { - return this.globalMemStoreLimit; - } - interface FlushQueueEntry extends Delayed { } @@ -825,5 +828,11 @@ class MemStoreFlusher implements FlushRequester { } enum FlushType { - NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK; + NORMAL, + ABOVE_ONHEAP_LOWER_MARK, /* happens due to lower mark breach of onheap memstore settings + An offheap memstore can even breach the onheap_lower_mark*/ + ABOVE_ONHEAP_HIGHER_MARK,/* happens due to higher mark breach of onheap memstore settings + An offheap memstore can even breach the onheap_higher_mark*/ + ABOVE_OFFHEAP_LOWER_MARK,/* happens due to lower mark breach of offheap memstore settings*/ + ABOVE_OFFHEAP_HIGHER_MARK;/*/* happens due to higer mark breach of offheap memstore settings*/ } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsHeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsHeapMemoryManager.java index f670779b244..a76d3a38484 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsHeapMemoryManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsHeapMemoryManager.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import com.google.common.annotations.VisibleForTesting; /** * This class is for maintaining the various regionserver's heap memory manager statistics and diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java index cb8551ff6c9..39043931c43 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java @@ -18,39 +18,126 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.lang.management.MemoryType; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; /** * RegionServerAccounting keeps record of some basic real time information about - * the Region Server. Currently, it only keeps record the global memstore size. + * the Region Server. Currently, it keeps record the global memstore size and global memstore heap + * overhead. It also tracks the replay edits per region. */ @InterfaceAudience.Private public class RegionServerAccounting { + // memstore data size private final AtomicLong globalMemstoreDataSize = new AtomicLong(0); + // memstore heap over head size private final AtomicLong globalMemstoreHeapOverhead = new AtomicLong(0); - // Store the edits size during replaying WAL. Use this to roll back the + // Store the edits size during replaying WAL. Use this to roll back the // global memstore size once a region opening failed. private final ConcurrentMap replayEditsPerRegion = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); - /** - * @return the global Memstore size in the RegionServer - */ - public long getGlobalMemstoreSize() { - return globalMemstoreDataSize.get(); + private final Configuration conf; + + private long globalMemStoreLimit; + private final float globalMemStoreLimitLowMarkPercent; + private long globalMemStoreLimitLowMark; + private final MemoryType memType; + private long globalOnHeapMemstoreLimit; + private long globalOnHeapMemstoreLimitLowMark; + + public RegionServerAccounting(Configuration conf) { + this.conf = conf; + Pair globalMemstoreSizePair = MemorySizeUtil.getGlobalMemstoreSize(conf); + this.globalMemStoreLimit = globalMemstoreSizePair.getFirst(); + this.memType = globalMemstoreSizePair.getSecond(); + this.globalMemStoreLimitLowMarkPercent = + MemorySizeUtil.getGlobalMemStoreHeapLowerMark(conf, this.memType == MemoryType.HEAP); + // When off heap memstore in use we configure the global off heap space for memstore as bytes + // not as % of max memory size. In such case, the lower water mark should be specified using the + // key "hbase.regionserver.global.memstore.size.lower.limit" which says % of the global upper + // bound and defaults to 95%. In on heap case also specifying this way is ideal. But in the past + // we used to take lower bound also as the % of xmx (38% as default). For backward compatibility + // for this deprecated config,we will fall back to read that config when new one is missing. + // Only for on heap case, do this fallback mechanism. For off heap it makes no sense. + // TODO When to get rid of the deprecated config? ie + // "hbase.regionserver.global.memstore.lowerLimit". Can get rid of this boolean passing then. + this.globalMemStoreLimitLowMark = + (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent); + this.globalOnHeapMemstoreLimit = MemorySizeUtil.getOnheapGlobalMemstoreSize(conf); + this.globalOnHeapMemstoreLimitLowMark = + (long) (this.globalOnHeapMemstoreLimit * this.globalMemStoreLimitLowMarkPercent); } + public long getGlobalMemstoreLimit() { + return this.globalMemStoreLimit; + } + + public long getOnheapGlobalMemstoreLimit() { + return this.globalOnHeapMemstoreLimit; + } + + // Called by the tuners. + public void setGlobalMemstoreLimits(long newGlobalMemstoreLimit) { + if (this.memType == MemoryType.HEAP) { + this.globalMemStoreLimit = newGlobalMemstoreLimit; + this.globalMemStoreLimitLowMark = + (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent); + } else { + this.globalOnHeapMemstoreLimit = newGlobalMemstoreLimit; + this.globalOnHeapMemstoreLimitLowMark = + (long) (this.globalOnHeapMemstoreLimit * this.globalMemStoreLimitLowMarkPercent); + } + } + + public boolean isOffheap() { + return this.memType == MemoryType.NON_HEAP; + } + + public long getGlobalMemstoreLimitLowMark() { + return this.globalMemStoreLimitLowMark; + } + + public float getGlobalMemstoreLimitLowMarkPercent() { + return this.globalMemStoreLimitLowMarkPercent; + } + + /** + * @return the global Memstore data size in the RegionServer + */ + public long getGlobalMemstoreDataSize() { + return globalMemstoreDataSize.get(); + } + /** + * @return the global memstore heap overhead size in the RegionServer + */ public long getGlobalMemstoreHeapOverhead() { return this.globalMemstoreHeapOverhead.get(); } + /** + * @return the global memstore data size and heap overhead size for an onheap memstore + * whereas return the heap overhead size for an offheap memstore + */ + public long getGlobalMemstoreSize() { + if (isOffheap()) { + // get only the heap overhead for offheap memstore + return getGlobalMemstoreHeapOverhead(); + } else { + return getGlobalMemstoreDataSize() + getGlobalMemstoreHeapOverhead(); + } + } + /** * @param memStoreSize the Memstore size will be added to * the global Memstore size @@ -65,9 +152,80 @@ public class RegionServerAccounting { globalMemstoreHeapOverhead.addAndGet(-memStoreSize.getHeapOverhead()); } + /** + * Return true if we are above the memstore high water mark + * @return the flushtype + */ + public FlushType isAboveHighWaterMark() { + // for onheap memstore we check if the global memstore size and the + // global heap overhead is greater than the global memstore limit + if (memType == MemoryType.HEAP) { + if (getGlobalMemstoreDataSize() + getGlobalMemstoreHeapOverhead() >= globalMemStoreLimit) { + return FlushType.ABOVE_ONHEAP_HIGHER_MARK; + } + } else { + // If the configured memstore is offheap, check for two things + // 1) If the global memstore data size is greater than the configured + // 'hbase.regionserver.offheap.global.memstore.size' + // 2) If the global memstore heap size is greater than the configured onheap + // global memstore limit 'hbase.regionserver.global.memstore.size'. + // We do this to avoid OOME incase of scenarios where the heap is occupied with + // lot of onheap references to the cells in memstore + if (getGlobalMemstoreDataSize() >= globalMemStoreLimit) { + // Indicates that global memstore size is above the configured + // 'hbase.regionserver.offheap.global.memstore.size' + return FlushType.ABOVE_OFFHEAP_HIGHER_MARK; + } else if (getGlobalMemstoreHeapOverhead() >= this.globalOnHeapMemstoreLimit) { + // Indicates that the offheap memstore's heap overhead is greater than the + // configured 'hbase.regionserver.global.memstore.size'. + return FlushType.ABOVE_ONHEAP_HIGHER_MARK; + } + } + return FlushType.NORMAL; + } + + /** + * Return true if we're above the low watermark + */ + public FlushType isAboveLowWaterMark() { + // for onheap memstore we check if the global memstore size and the + // global heap overhead is greater than the global memstore lower mark limit + if (memType == MemoryType.HEAP) { + if (getGlobalMemstoreDataSize() + getGlobalMemstoreHeapOverhead() >= globalMemStoreLimitLowMark) { + return FlushType.ABOVE_ONHEAP_LOWER_MARK; + } + } else { + if (getGlobalMemstoreDataSize() >= globalMemStoreLimitLowMark) { + // Indicates that the offheap memstore's data size is greater than the global memstore + // lower limit + return FlushType.ABOVE_OFFHEAP_LOWER_MARK; + } else if (getGlobalMemstoreHeapOverhead() >= globalOnHeapMemstoreLimitLowMark) { + // Indicates that the offheap memstore's heap overhead is greater than the global memstore + // onheap lower limit + return FlushType.ABOVE_ONHEAP_LOWER_MARK; + } + } + return FlushType.NORMAL; + } + + /** + * @return the flush pressure of all stores on this regionserver. The value should be greater than + * or equal to 0.0, and any value greater than 1.0 means we enter the emergency state that + * global memstore size already exceeds lower limit. + */ + public double getFlushPressure() { + if (memType == MemoryType.HEAP) { + return (getGlobalMemstoreDataSize() + getGlobalMemstoreHeapOverhead()) * 1.0 + / globalMemStoreLimitLowMark; + } else { + return Math.max(getGlobalMemstoreDataSize() * 1.0 / globalMemStoreLimitLowMark, + getGlobalMemstoreHeapOverhead() * 1.0 / globalOnHeapMemstoreLimitLowMark); + } + } + /*** * Add memStoreSize to replayEditsPerRegion. - * + * * @param regionName region name. * @param memStoreSize the Memstore size will be added to replayEditsPerRegion. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 2fbd340a8b4..5a6c7ed3ae5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -264,6 +264,7 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi * or equal to 0.0, and any value greater than 1.0 means we enter the emergency state that * global memstore size already exceeds lower limit. */ + @Deprecated double getFlushPressure(); /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index e7c4f9864d8..316e2f63e9c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -22,8 +22,7 @@ import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DE import java.io.IOException; import java.io.InterruptedIOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryUsage; +import java.lang.management.MemoryType; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; @@ -285,9 +284,9 @@ public abstract class AbstractFSWAL implements WAL { return Long.parseLong(chompedPath); } - private int calculateMaxLogFiles(float memstoreSizeRatio, long logRollSize) { - MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); - return Math.round(mu.getMax() * memstoreSizeRatio * 2 / logRollSize); + private int calculateMaxLogFiles(Configuration conf, long logRollSize) { + Pair globalMemstoreSize = MemorySizeUtil.getGlobalMemstoreSize(conf); + return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize); } // must be power of 2 @@ -386,13 +385,12 @@ public abstract class AbstractFSWAL implements WAL { this.logrollsize = (long) (blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); - float memstoreRatio = MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false); boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null; if (maxLogsDefined) { LOG.warn("'hbase.regionserver.maxlogs' was deprecated."); } this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", - Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize))); + Math.max(32, calculateMaxLogFiles(conf, logrollsize))); LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java index e1f1f6fce62..d9b37c09747 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestGlobalMemStoreSize.java @@ -92,7 +92,7 @@ public class TestGlobalMemStoreSize { server.getFromOnlineRegions(regionInfo.getEncodedName()). getMemstoreSize(); } - assertEquals(server.getRegionServerAccounting().getGlobalMemstoreSize(), + assertEquals(server.getRegionServerAccounting().getGlobalMemstoreDataSize(), globalMemStoreSize); } @@ -100,7 +100,7 @@ public class TestGlobalMemStoreSize { int i = 0; for (HRegionServer server : getOnlineRegionServers()) { LOG.info("Starting flushes on " + server.getServerName() + - ", size=" + server.getRegionServerAccounting().getGlobalMemstoreSize()); + ", size=" + server.getRegionServerAccounting().getGlobalMemstoreDataSize()); for (HRegionInfo regionInfo : ProtobufUtil.getOnlineRegions(null, server.getRSRpcServices())) { @@ -110,11 +110,11 @@ public class TestGlobalMemStoreSize { LOG.info("Post flush on " + server.getServerName()); long now = System.currentTimeMillis(); long timeout = now + 1000; - while(server.getRegionServerAccounting().getGlobalMemstoreSize() != 0 && + while(server.getRegionServerAccounting().getGlobalMemstoreDataSize() != 0 && timeout < System.currentTimeMillis()) { Threads.sleep(10); } - long size = server.getRegionServerAccounting().getGlobalMemstoreSize(); + long size = server.getRegionServerAccounting().getGlobalMemstoreDataSize(); if (size > 0) { // If size > 0, see if its because the meta region got edits while // our test was running.... @@ -131,7 +131,7 @@ public class TestGlobalMemStoreSize { } } } - size = server.getRegionServerAccounting().getGlobalMemstoreSize(); + size = server.getRegionServerAccounting().getGlobalMemstoreDataSize(); assertEquals("Server=" + server.getServerName() + ", i=" + i++, 0, size); } @@ -149,7 +149,7 @@ public class TestGlobalMemStoreSize { throws IOException { LOG.info("Flush " + r.toString() + " on " + server.getServerName() + ", " + r.flush(true) + ", size=" + - server.getRegionServerAccounting().getGlobalMemstoreSize()); + server.getRegionServerAccounting().getGlobalMemstoreDataSize()); } private List getOnlineRegionServers() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index fe0d350cc1b..ac10f8c39bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -170,7 +170,7 @@ public class TestHRegionReplayEvents { rss = mock(RegionServerServices.class); when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); when(rss.getConfiguration()).thenReturn(CONF); - when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting()); + when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF)); String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER .toString(); ExecutorService es = new ExecutorService(string); @@ -281,12 +281,12 @@ public class TestHRegionReplayEvents { } } - assertTrue(rss.getRegionServerAccounting().getGlobalMemstoreSize() > 0); + assertTrue(rss.getRegionServerAccounting().getGlobalMemstoreDataSize() > 0); // now close the region which should not cause hold because of un-committed flush secondaryRegion.close(); // verify that the memstore size is back to what it was - assertEquals(0, rss.getRegionServerAccounting().getGlobalMemstoreSize()); + assertEquals(0, rss.getRegionServerAccounting().getGlobalMemstoreDataSize()); } static int replayEdit(HRegion region, WAL.Entry entry) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index f620eb032b3..5a092746a8e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.lang.management.ManagementFactory; +import java.lang.management.MemoryType; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; @@ -65,8 +66,10 @@ public class TestHeapMemoryManager { conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.02f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.03f); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0), - new MemstoreFlusherStub(0), new RegionServerStub(conf), new RegionServerAccountingStub()); + new MemstoreFlusherStub(0), new RegionServerStub(conf), + regionServerAccounting); assertFalse(manager.isTunerOn()); } @@ -76,21 +79,24 @@ public class TestHeapMemoryManager { conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.02f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.03f); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0), - new MemstoreFlusherStub(0), new RegionServerStub(conf), new RegionServerAccountingStub()); + new MemstoreFlusherStub(0), new RegionServerStub(conf), + regionServerAccounting); assertFalse(manager.isTunerOn()); } @Test public void testWhenMemstoreAndBlockCacheMaxMinChecksFails() throws Exception { BlockCacheStub blockCache = new BlockCacheStub(0); - MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub(0); Configuration conf = HBaseConfiguration.create(); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.06f); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub(0); try { new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), regionServerAccounting); fail(); } catch (RuntimeException e) { } @@ -99,7 +105,7 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); try { new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), regionServerAccounting); fail(); } catch (RuntimeException e) { } @@ -107,18 +113,19 @@ public class TestHeapMemoryManager { @Test public void testWhenClusterIsWriteHeavyWithEmptyMemstore() throws Exception { - BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); - MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); - RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); - // Empty block cache and memstore - blockCache.setTestBlockSize(0); - regionServerAccounting.setTestMemstoreSize(0); Configuration conf = HBaseConfiguration.create(); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); + BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + MemstoreFlusherStub memStoreFlusher = + new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + // Empty block cache and memstore + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize(0); conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, @@ -127,11 +134,11 @@ public class TestHeapMemoryManager { long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); - memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK; memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); - memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.requestFlush(null, false); // Allow the tuner to run once and do necessary memory up Thread.sleep(1500); @@ -140,14 +147,114 @@ public class TestHeapMemoryManager { assertEquals(oldBlockCacheSize, blockCache.maxSize); } + @Test + public void testHeapMemoryManagerWhenOffheapFlushesHappenUnderReadHeavyCase() throws Exception { + BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_LOWER_LIMIT_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); + conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); + conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf, true); + MemstoreFlusherStub memStoreFlusher = + new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + // Empty memstore and but nearly filled block cache + blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); + regionServerAccounting.setTestMemstoreSize(0); + // Let the system start with default values for memstore heap and block cache size. + HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf), regionServerAccounting); + long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; + long oldBlockCacheSize = blockCache.maxSize; + float maxStepValue = DefaultHeapMemoryTuner.DEFAULT_MIN_STEP_VALUE; + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + heapMemoryManager.start(choreService); + blockCache.evictBlock(null); + blockCache.evictBlock(null); + blockCache.evictBlock(null); + // do some offheap flushes also. So there should be decrease in memstore but + // not as that when we don't have offheap flushes + memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK; + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + // Allow the tuner to run once and do necessary memory up + waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(maxStepValue, oldBlockCacheSize, blockCache.maxSize); + oldMemstoreHeapSize = memStoreFlusher.memstoreSize; + oldBlockCacheSize = blockCache.maxSize; + // Do some more evictions before the next run of HeapMemoryTuner + blockCache.evictBlock(null); + // Allow the tuner to run once and do necessary memory up + waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(maxStepValue, oldBlockCacheSize, blockCache.maxSize); + } + + @Test + public void testHeapMemoryManagerWithOffheapMemstoreAndMixedWorkload() throws Exception { + BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_LOWER_LIMIT_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); + conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); + conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf, true); + MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + // Empty memstore and but nearly filled block cache + blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); + regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); + // Let the system start with default values for memstore heap and block cache size. + HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf), regionServerAccounting); + long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; + long oldBlockCacheSize = blockCache.maxSize; + float maxStepValue = DefaultHeapMemoryTuner.DEFAULT_MIN_STEP_VALUE; + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + heapMemoryManager.start(choreService); + blockCache.evictBlock(null); + blockCache.evictBlock(null); + blockCache.evictBlock(null); + // do some offheap flushes also. So there should be decrease in memstore but + // not as that when we don't have offheap flushes + memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK; + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + // Allow the tuner to run once and do necessary memory up + waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(maxStepValue, oldBlockCacheSize, blockCache.maxSize); + oldMemstoreHeapSize = memStoreFlusher.memstoreSize; + oldBlockCacheSize = blockCache.maxSize; + // change memstore size + // regionServerAccounting.setTestMemstoreSize((long)(maxHeapSize * 0.4 * 0.8)); + // The memstore size would have decreased. Now again do some flushes and ensure the + // flushes are due to onheap overhead. This should once again call for increase in + // memstore size but that increase should be to the safe size + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK; + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + // Allow the tuner to run once and do necessary memory up + waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(-maxStepValue, oldBlockCacheSize, blockCache.maxSize); + } + @Test public void testWhenClusterIsReadHeavyWithEmptyBlockCache() throws Exception { BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); - MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); - RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); - // Empty block cache and memstore - blockCache.setTestBlockSize(0); - regionServerAccounting.setTestMemstoreSize(0); Configuration conf = HBaseConfiguration.create(); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); @@ -155,6 +262,11 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + // Empty block cache and memstore + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize(0); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf), regionServerAccounting); @@ -175,11 +287,6 @@ public class TestHeapMemoryManager { @Test public void testWhenClusterIsWriteHeavy() throws Exception { BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); - MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); - RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); - // Empty block cache and but nearly filled memstore - blockCache.setTestBlockSize(0); - regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); Configuration conf = HBaseConfiguration.create(); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); @@ -187,6 +294,12 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + MemstoreFlusherStub memStoreFlusher = + new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + // Empty block cache and but nearly filled memstore + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf), regionServerAccounting); @@ -194,7 +307,7 @@ public class TestHeapMemoryManager { long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); - memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); @@ -208,7 +321,7 @@ public class TestHeapMemoryManager { oldMemstoreHeapSize = memStoreFlusher.memstoreSize; oldBlockCacheSize = blockCache.maxSize; // Do some more flushes before the next run of HeapMemoryTuner - memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); // Allow the tuner to run once and do necessary memory up @@ -219,14 +332,45 @@ public class TestHeapMemoryManager { blockCache.maxSize); } + @Test + public void testWhenClusterIsWriteHeavyWithOffheapMemstore() throws Exception { + BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); + conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); + conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + MemstoreFlusherStub memStoreFlusher = + new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + // Empty block cache and but nearly filled memstore + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); + // Let the system start with default values for memstore heap and block cache size. + HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf), regionServerAccounting); + long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; + long oldBlockCacheSize = blockCache.maxSize; + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + heapMemoryManager.start(choreService); + // this should not change anything with onheap memstore + memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK; + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + // Allow the tuner to run once and do necessary memory up + Thread.sleep(1500); + // No changes should be made by tuner as we already have lot of empty space + assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize); + assertEquals(oldBlockCacheSize, blockCache.maxSize); + } + @Test public void testWhenClusterIsReadHeavy() throws Exception { BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); - MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); - RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); - // Empty memstore and but nearly filled block cache - blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); - regionServerAccounting.setTestMemstoreSize(0); Configuration conf = HBaseConfiguration.create(); conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_LOWER_LIMIT_KEY, 0.7f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); @@ -235,9 +379,15 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + MemstoreFlusherStub memStoreFlusher = + new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + // Empty memstore and but nearly filled block cache + blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); + regionServerAccounting.setTestMemstoreSize(0); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), new RegionServerAccountingStub(conf)); long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; long oldBlockCacheSize = blockCache.maxSize; long oldMemstoreLowerMarkSize = 7 * oldMemstoreHeapSize / 10; @@ -272,12 +422,6 @@ public class TestHeapMemoryManager { @Test public void testWhenClusterIsHavingMoreWritesThanReads() throws Exception { BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); - MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); - RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); - // Both memstore and block cache are nearly filled - blockCache.setTestBlockSize(0); - regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); - blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); Configuration conf = HBaseConfiguration.create(); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); @@ -285,6 +429,13 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + MemstoreFlusherStub memStoreFlusher = + new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + // Both memstore and block cache are nearly filled + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); + blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf), regionServerAccounting); @@ -292,7 +443,7 @@ public class TestHeapMemoryManager { long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); - memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); @@ -303,7 +454,7 @@ public class TestHeapMemoryManager { assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize); assertEquals(oldBlockCacheSize, blockCache.maxSize); // Do some more flushes before the next run of HeapMemoryTuner - memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); @@ -319,11 +470,6 @@ public class TestHeapMemoryManager { public void testBlockedFlushesIncreaseMemstoreInSteadyState() throws Exception { BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); - RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(); - // Both memstore and block cache are nearly filled - blockCache.setTestBlockSize(0); - regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); - blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); Configuration conf = HBaseConfiguration.create(); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); @@ -331,6 +477,11 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0); + RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf); + // Both memstore and block cache are nearly filled + blockCache.setTestBlockSize(0); + regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8)); + blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8)); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf), regionServerAccounting); @@ -338,7 +489,7 @@ public class TestHeapMemoryManager { long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); - memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); @@ -350,7 +501,7 @@ public class TestHeapMemoryManager { assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize); assertEquals(oldBlockCacheSize, blockCache.maxSize); // Flushes that block updates - memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; + memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK; memStoreFlusher.requestFlush(null, false); blockCache.evictBlock(null); blockCache.evictBlock(null); @@ -379,7 +530,7 @@ public class TestHeapMemoryManager { HeapMemoryTuner.class); // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), new RegionServerAccountingStub(conf)); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); // Now we wants to be in write mode. Set bigger memstore size from CustomHeapMemoryTuner @@ -412,7 +563,7 @@ public class TestHeapMemoryManager { conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class, HeapMemoryTuner.class); HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), new RegionServerAccountingStub(conf)); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); CustomHeapMemoryTuner.memstoreSize = 0.78f; @@ -438,7 +589,7 @@ public class TestHeapMemoryManager { conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class, HeapMemoryTuner.class); HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), new RegionServerAccountingStub(conf)); long oldMemstoreSize = memStoreFlusher.memstoreSize; long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); @@ -473,7 +624,7 @@ public class TestHeapMemoryManager { try { heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), new RegionServerAccountingStub(conf)); fail("Should have failed as the collective heap memory need is above 80%"); } catch (Exception e) { } @@ -482,7 +633,7 @@ public class TestHeapMemoryManager { conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.6f); conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.6f); heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, - new RegionServerStub(conf), new RegionServerAccountingStub()); + new RegionServerStub(conf), new RegionServerAccountingStub(conf)); long oldMemstoreSize = memStoreFlusher.memstoreSize; long oldBlockCacheSize = blockCache.maxSize; final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); @@ -490,7 +641,7 @@ public class TestHeapMemoryManager { CustomHeapMemoryTuner.memstoreSize = 0.4f; CustomHeapMemoryTuner.blockCacheSize = 0.4f; // Allow the tuner to run once and do necessary memory up - Thread.sleep(1500); + Thread.sleep(1500); // The size should not get changes as the collection of memstore size and L1 and L2 block cache // size will cross the ax allowed 80% mark assertEquals(oldMemstoreSize, memStoreFlusher.memstoreSize); @@ -508,7 +659,8 @@ public class TestHeapMemoryManager { assertEquals(expected, currentHeapSpace); } - private void assertHeapSpaceDelta(double expectedDeltaPercent, long oldHeapSpace, long newHeapSpace) { + private void assertHeapSpaceDelta(double expectedDeltaPercent, long oldHeapSpace, + long newHeapSpace) { double expctedMinDelta = (double) (this.maxHeapSize * expectedDeltaPercent); // Tolerable error double error = 0.95; @@ -757,11 +909,34 @@ public class TestHeapMemoryManager { } private static class RegionServerAccountingStub extends RegionServerAccounting { + boolean offheap; + + public RegionServerAccountingStub(Configuration conf) { + super(conf); + } + + public RegionServerAccountingStub(Configuration conf, boolean offheap) { + super(conf); + this.offheap = offheap; + } + private long testMemstoreSize = 0; + @Override - public long getGlobalMemstoreSize() { + public long getGlobalMemstoreDataSize() { return testMemstoreSize; } + + @Override + public long getGlobalMemstoreHeapOverhead() { + return testMemstoreSize; + } + + @Override + public boolean isOffheap() { + return offheap; + } + public void setTestMemstoreSize(long testMemstoreSize) { this.testMemstoreSize = testMemstoreSize; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAccounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAccounting.java new file mode 100644 index 00000000000..fd45a126cec --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerAccounting.java @@ -0,0 +1,120 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestRegionServerAccounting { + + @Test + public void testOnheapMemstoreHigherWaterMarkLimits() { + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.2f); + // try for default cases + RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf); + MemstoreSize memstoreSize = + new MemstoreSize((long) (3l * 1024l * 1024l * 1024l), (long) (1l * 1024l * 1024l * 1024l)); + regionServerAccounting.incGlobalMemstoreSize(memstoreSize); + assertEquals(FlushType.ABOVE_ONHEAP_HIGHER_MARK, + regionServerAccounting.isAboveHighWaterMark()); + } + + @Test + public void testOnheapMemstoreLowerWaterMarkLimits() { + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.2f); + // try for default cases + RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf); + MemstoreSize memstoreSize = + new MemstoreSize((long) (3l * 1024l * 1024l * 1024l), (long) (1l * 1024l * 1024l * 1024l)); + regionServerAccounting.incGlobalMemstoreSize(memstoreSize); + assertEquals(FlushType.ABOVE_ONHEAP_LOWER_MARK, + regionServerAccounting.isAboveLowWaterMark()); + } + + @Test + public void testOffheapMemstoreHigherWaterMarkLimitsDueToDataSize() { + Configuration conf = HBaseConfiguration.create(); + // setting 1G as offheap data size + conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1l * 1024l)); + // try for default cases + RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf); + // this will breach offheap limit as data size is higher and not due to heap size + MemstoreSize memstoreSize = + new MemstoreSize((long) (3l * 1024l * 1024l * 1024l), (long) (1l * 1024l * 1024l * 1024l)); + regionServerAccounting.incGlobalMemstoreSize(memstoreSize); + assertEquals(FlushType.ABOVE_OFFHEAP_HIGHER_MARK, + regionServerAccounting.isAboveHighWaterMark()); + } + + @Test + public void testOffheapMemstoreHigherWaterMarkLimitsDueToHeapSize() { + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.2f); + // setting 1G as offheap data size + conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1l * 1024l)); + // try for default cases + RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf); + // this will breach higher limit as heap size is higher and not due to offheap size + MemstoreSize memstoreSize = + new MemstoreSize((long) (3l * 1024l * 1024l), (long) (2l * 1024l * 1024l * 1024l)); + regionServerAccounting.incGlobalMemstoreSize(memstoreSize); + assertEquals(FlushType.ABOVE_ONHEAP_HIGHER_MARK, + regionServerAccounting.isAboveHighWaterMark()); + } + + @Test + public void testOffheapMemstoreLowerWaterMarkLimitsDueToDataSize() { + Configuration conf = HBaseConfiguration.create(); + // setting 1G as offheap data size + conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1l * 1024l)); + // try for default cases + RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf); + // this will breach offheap limit as data size is higher and not due to heap size + MemstoreSize memstoreSize = + new MemstoreSize((long) (3l * 1024l * 1024l * 1024l), (long) (1l * 1024l * 1024l * 1024l)); + regionServerAccounting.incGlobalMemstoreSize(memstoreSize); + assertEquals(FlushType.ABOVE_OFFHEAP_LOWER_MARK, + regionServerAccounting.isAboveLowWaterMark()); + } + + @Test + public void testOffheapMemstoreLowerWaterMarkLimitsDueToHeapSize() { + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.2f); + // setting 1G as offheap data size + conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1l * 1024l)); + // try for default cases + RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf); + // this will breach higher limit as heap size is higher and not due to offheap size + MemstoreSize memstoreSize = + new MemstoreSize((long) (3l * 1024l * 1024l), (long) (2l * 1024l * 1024l * 1024l)); + regionServerAccounting.incGlobalMemstoreSize(memstoreSize); + assertEquals(FlushType.ABOVE_ONHEAP_LOWER_MARK, + regionServerAccounting.isAboveLowWaterMark()); + } +}