HBASE-15787 Change the flush related heuristics to work with offheap size
configured (Ram)
This commit is contained in:
parent
a2a7618d26
commit
d1147eeb7e
|
@ -157,9 +157,19 @@ public class MemorySizeUtil {
|
|||
+ " Going with on heap global memstore size ('" + MEMSTORE_SIZE_KEY + "')");
|
||||
}
|
||||
}
|
||||
return new Pair<Long, MemoryType>(getOnheapGlobalMemstoreSize(conf), MemoryType.HEAP);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the onheap global memstore limit based on the config
|
||||
* 'hbase.regionserver.global.memstore.size'.
|
||||
* @param conf
|
||||
* @return the onheap global memstore limt
|
||||
*/
|
||||
public static long getOnheapGlobalMemstoreSize(Configuration conf) {
|
||||
long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
|
||||
float globalMemStorePercent = getGlobalMemStoreHeapPercent(conf, true);
|
||||
return new Pair<Long, MemoryType>((long) (max * globalMemStorePercent), MemoryType.HEAP);
|
||||
return ((long) (max * globalMemStorePercent));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -139,6 +139,10 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
|
|||
}
|
||||
StepDirection newTuneDirection = getTuneDirection(context);
|
||||
|
||||
long blockedFlushCount = context.getBlockedFlushCount();
|
||||
long unblockedFlushCount = context.getUnblockedFlushCount();
|
||||
long totalOnheapFlushCount = blockedFlushCount + unblockedFlushCount;
|
||||
boolean offheapMemstore = context.isOffheapMemstore();
|
||||
float newMemstoreSize;
|
||||
float newBlockCacheSize;
|
||||
|
||||
|
@ -159,7 +163,10 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
|
|||
&& decayingTunerStepSizeSum > 0)) {
|
||||
// Current step is opposite of past tuner actions so decrease the step size to reach steady
|
||||
// state.
|
||||
step = step/2.00f;
|
||||
if (!offheapMemstore && step != minimumStepSize) {
|
||||
// we leave the step to be at minimumStepSize for offheap memstore
|
||||
step = step / 2.00f;
|
||||
}
|
||||
}
|
||||
if (step < minimumStepSize) {
|
||||
// If step size is too small then we do nothing.
|
||||
|
@ -167,7 +174,17 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
|
|||
step = 0.0f;
|
||||
newTuneDirection = StepDirection.NEUTRAL;
|
||||
}
|
||||
// Increase / decrease the memstore / block cahce sizes depending on new tuner step.
|
||||
// There are no flushes due to onheap pressure and
|
||||
// we have an offheap memstore and we are in need of more block_cache size.
|
||||
if (totalOnheapFlushCount == 0 && offheapMemstore
|
||||
&& newTuneDirection == StepDirection.INCREASE_BLOCK_CACHE_SIZE) {
|
||||
// we are sure that there are flushes only due to offheap pressure
|
||||
// So don't do the memstore decrease equal to the step size. Instead do minimum stepSize
|
||||
// decrease. But even if we have some flushes due to heap then it is better we tune
|
||||
// the existing way.
|
||||
step = minimumStepSize;
|
||||
}
|
||||
// Increase / decrease the memstore / block cache sizes depending on new tuner step.
|
||||
// We don't want to exert immediate pressure on memstore. So, we decrease its size gracefully;
|
||||
// we set a minimum bar in the middle of the total memstore size and the lower limit.
|
||||
float minMemstoreSize = ((globalMemStoreLimitLowMarkPercent + 1) * curMemstoreSize) / 2.00f;
|
||||
|
@ -222,7 +239,7 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
|
|||
long unblockedFlushCount = context.getUnblockedFlushCount();
|
||||
long evictCount = context.getEvictCount();
|
||||
long cacheMissCount = context.getCacheMissCount();
|
||||
long totalFlushCount = blockedFlushCount+unblockedFlushCount;
|
||||
long totalFlushCount = blockedFlushCount + unblockedFlushCount;
|
||||
float curMemstoreSize = context.getCurMemStoreSize();
|
||||
float curBlockCacheSize = context.getCurBlockCacheSize();
|
||||
StringBuilder tunerLog = new StringBuilder();
|
||||
|
@ -342,8 +359,8 @@ class DefaultHeapMemoryTuner implements HeapMemoryTuner {
|
|||
*/
|
||||
private void addToRollingStats(TunerContext context) {
|
||||
rollingStatsForCacheMisses.insertDataValue(context.getCacheMissCount());
|
||||
rollingStatsForFlushes.insertDataValue(context.getBlockedFlushCount() +
|
||||
context.getUnblockedFlushCount());
|
||||
rollingStatsForFlushes
|
||||
.insertDataValue(context.getBlockedFlushCount() + context.getUnblockedFlushCount());
|
||||
rollingStatsForEvictions.insertDataValue(context.getEvictCount());
|
||||
}
|
||||
|
||||
|
|
|
@ -48,6 +48,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import javax.management.MalformedObjectNameException;
|
||||
|
@ -574,7 +575,7 @@ public class HRegionServer extends HasThread implements
|
|||
// or process owner as default super user.
|
||||
Superusers.initialize(conf);
|
||||
|
||||
regionServerAccounting = new RegionServerAccounting();
|
||||
regionServerAccounting = new RegionServerAccounting(conf);
|
||||
cacheConfig = new CacheConfig(conf);
|
||||
mobCacheConfig = new MobCacheConfig(conf);
|
||||
uncaughtExceptionHandler = new UncaughtExceptionHandler() {
|
||||
|
@ -1482,7 +1483,7 @@ public class HRegionServer extends HasThread implements
|
|||
// it.
|
||||
Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemstoreSize(conf);
|
||||
long globalMemStoreSize = pair.getFirst();
|
||||
boolean offheap = pair.getSecond() == MemoryType.NON_HEAP;
|
||||
boolean offheap = this.regionServerAccounting.isOffheap();
|
||||
// When off heap memstore in use, take full area for chunk pool.
|
||||
float poolSizePercentage = offheap ? 1.0F
|
||||
: conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
|
||||
|
@ -3588,8 +3589,7 @@ public class HRegionServer extends HasThread implements
|
|||
// return 0 during RS initialization
|
||||
return 0.0;
|
||||
}
|
||||
return getRegionServerAccounting().getGlobalMemstoreSize() * 1.0
|
||||
/ cacheFlusher.globalMemStoreLimitLowMark;
|
||||
return getRegionServerAccounting().getFlushPressure();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.MemoryType;
|
||||
import java.lang.management.MemoryUsage;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -77,6 +78,7 @@ public class HeapMemoryManager {
|
|||
private float heapOccupancyPercent;
|
||||
|
||||
private final ResizableBlockCache blockCache;
|
||||
// TODO : remove this and mark regionServerAccounting as the observer directly
|
||||
private final FlushRequester memStoreFlusher;
|
||||
private final Server server;
|
||||
private final RegionServerAccounting regionServerAccounting;
|
||||
|
@ -240,6 +242,8 @@ public class HeapMemoryManager {
|
|||
Class<? extends HeapMemoryTuner> tunerKlass = server.getConfiguration().getClass(
|
||||
HBASE_RS_HEAP_MEMORY_TUNER_CLASS, DefaultHeapMemoryTuner.class, HeapMemoryTuner.class);
|
||||
heapMemTuner = ReflectionUtils.newInstance(tunerKlass, server.getConfiguration());
|
||||
tunerContext
|
||||
.setOffheapMemstore(regionServerAccounting.isOffheap());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -298,10 +302,10 @@ public class HeapMemoryManager {
|
|||
unblockedFlushCnt = unblockedFlushCount.getAndSet(0);
|
||||
tunerContext.setUnblockedFlushCount(unblockedFlushCnt);
|
||||
metricsHeapMemoryManager.updateUnblockedFlushCount(unblockedFlushCnt);
|
||||
// TODO : add support for offheap metrics
|
||||
tunerContext.setCurBlockCacheUsed((float) blockCache.getCurrentSize() / maxHeapSize);
|
||||
metricsHeapMemoryManager.setCurBlockCacheSizeGauge(blockCache.getCurrentSize());
|
||||
long globalMemstoreHeapSize = regionServerAccounting.getGlobalMemstoreSize()
|
||||
+ regionServerAccounting.getGlobalMemstoreHeapOverhead();
|
||||
long globalMemstoreHeapSize = regionServerAccounting.getGlobalMemstoreSize();
|
||||
tunerContext.setCurMemStoreUsed((float) globalMemstoreHeapSize / maxHeapSize);
|
||||
metricsHeapMemoryManager.setCurMemStoreSizeGauge(globalMemstoreHeapSize);
|
||||
tunerContext.setCurBlockCacheSize(blockCachePercent);
|
||||
|
@ -354,14 +358,20 @@ public class HeapMemoryManager {
|
|||
metricsHeapMemoryManager.updateMemStoreDeltaSizeHistogram(memStoreDeltaSize);
|
||||
metricsHeapMemoryManager.updateBlockCacheDeltaSizeHistogram(blockCacheDeltaSize);
|
||||
long newBlockCacheSize = (long) (maxHeapSize * blockCacheSize);
|
||||
// we could have got an increase or decrease in size for the offheap memstore
|
||||
// also if the flush had happened due to heap overhead. In that case it is ok
|
||||
// to adjust the onheap memstore limit configs
|
||||
long newMemstoreSize = (long) (maxHeapSize * memstoreSize);
|
||||
LOG.info("Setting block cache heap size to " + newBlockCacheSize
|
||||
+ " and memstore heap size to " + newMemstoreSize);
|
||||
blockCachePercent = blockCacheSize;
|
||||
blockCache.setMaxSize(newBlockCacheSize);
|
||||
globalMemStorePercent = memstoreSize;
|
||||
// Internally sets it to RegionServerAccounting
|
||||
// TODO : Set directly on RSAccounting??
|
||||
memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize);
|
||||
for (HeapMemoryTuneObserver observer : tuneObservers) {
|
||||
// Risky.. If this newMemstoreSize decreases we reduce the count in offheap chunk pool
|
||||
observer.onHeapMemoryTune(newMemstoreSize, newBlockCacheSize);
|
||||
}
|
||||
}
|
||||
|
@ -376,14 +386,16 @@ public class HeapMemoryManager {
|
|||
@Override
|
||||
public void flushRequested(FlushType type, Region region) {
|
||||
switch (type) {
|
||||
case ABOVE_HIGHER_MARK:
|
||||
case ABOVE_ONHEAP_HIGHER_MARK:
|
||||
blockedFlushCount.incrementAndGet();
|
||||
break;
|
||||
case ABOVE_LOWER_MARK:
|
||||
case ABOVE_ONHEAP_LOWER_MARK:
|
||||
unblockedFlushCount.incrementAndGet();
|
||||
break;
|
||||
// Removed the counting of the offheap related flushes (after reviews). Will add later if
|
||||
// needed
|
||||
default:
|
||||
// In case of normal flush don't do any action.
|
||||
// In case of any other flush don't do any action.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -403,6 +415,7 @@ public class HeapMemoryManager {
|
|||
private float curMemStoreUsed;
|
||||
private float curMemStoreSize;
|
||||
private float curBlockCacheSize;
|
||||
private boolean offheapMemstore;
|
||||
|
||||
public long getBlockedFlushCount() {
|
||||
return blockedFlushCount;
|
||||
|
@ -467,6 +480,14 @@ public class HeapMemoryManager {
|
|||
public void setCurMemStoreUsed(float d) {
|
||||
this.curMemStoreUsed = d;
|
||||
}
|
||||
|
||||
public void setOffheapMemstore(boolean offheapMemstore) {
|
||||
this.offheapMemstore = offheapMemstore;
|
||||
}
|
||||
|
||||
public boolean isOffheapMemstore() {
|
||||
return this.offheapMemstore;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -234,6 +234,11 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver {
|
|||
|
||||
@Override
|
||||
public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
|
||||
// don't do any tuning in case of offheap memstore
|
||||
if (this.offheap) {
|
||||
LOG.warn("Not tuning the chunk pool as it is offheap");
|
||||
return;
|
||||
}
|
||||
int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize);
|
||||
if (newMaxCount != this.maxCount) {
|
||||
// We need an adjustment in the chunks numbers
|
||||
|
|
|
@ -49,12 +49,10 @@ import org.apache.hadoop.hbase.DroppedSnapshotException;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
@ -90,10 +88,6 @@ class MemStoreFlusher implements FlushRequester {
|
|||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
private final Object blockSignal = new Object();
|
||||
|
||||
protected long globalMemStoreLimit;
|
||||
protected float globalMemStoreLimitLowMarkPercent;
|
||||
protected long globalMemStoreLimitLowMark;
|
||||
|
||||
private long blockingWaitTime;
|
||||
private final LongAdder updatesBlockedMsHighWater = new LongAdder();
|
||||
|
||||
|
@ -111,32 +105,18 @@ class MemStoreFlusher implements FlushRequester {
|
|||
this.server = server;
|
||||
this.threadWakeFrequency =
|
||||
conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
|
||||
Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemstoreSize(conf);
|
||||
this.globalMemStoreLimit = pair.getFirst();
|
||||
boolean onheap = pair.getSecond() == MemoryType.HEAP;
|
||||
// When off heap memstore in use we configure the global off heap space for memstore as bytes
|
||||
// not as % of max memory size. In such case, the lower water mark should be specified using the
|
||||
// key "hbase.regionserver.global.memstore.size.lower.limit" which says % of the global upper
|
||||
// bound and defaults to 95%. In on heap case also specifying this way is ideal. But in the past
|
||||
// we used to take lower bound also as the % of xmx (38% as default). For backward compatibility
|
||||
// for this deprecated config,we will fall back to read that config when new one is missing.
|
||||
// Only for on heap case, do this fallback mechanism. For off heap it makes no sense.
|
||||
// TODO When to get rid of the deprecated config? ie
|
||||
// "hbase.regionserver.global.memstore.lowerLimit". Can get rid of this boolean passing then.
|
||||
this.globalMemStoreLimitLowMarkPercent = MemorySizeUtil.getGlobalMemStoreHeapLowerMark(conf,
|
||||
onheap);
|
||||
this.globalMemStoreLimitLowMark =
|
||||
(long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
|
||||
|
||||
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
|
||||
90000);
|
||||
int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
|
||||
this.flushHandlers = new FlushHandler[handlerCount];
|
||||
LOG.info("globalMemStoreLimit="
|
||||
+ TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1)
|
||||
+ TraditionalBinaryPrefix
|
||||
.long2String(this.server.getRegionServerAccounting().getGlobalMemstoreLimit(), "", 1)
|
||||
+ ", globalMemStoreLimitLowMark="
|
||||
+ TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1)
|
||||
+ ", Offheap=" + !onheap);
|
||||
+ TraditionalBinaryPrefix.long2String(
|
||||
this.server.getRegionServerAccounting().getGlobalMemstoreLimitLowMark(), "", 1)
|
||||
+ ", Offheap="
|
||||
+ (this.server.getRegionServerAccounting().isOffheap()));
|
||||
}
|
||||
|
||||
public LongAdder getUpdatesBlockedMsHighWater() {
|
||||
|
@ -210,7 +190,7 @@ class MemStoreFlusher implements FlushRequester {
|
|||
LOG.info("Refreshing storefiles of region " + bestRegionReplica
|
||||
+ " due to global heap pressure. Total memstore size="
|
||||
+ StringUtils
|
||||
.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
|
||||
.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreDataSize())
|
||||
+ " memstore heap overhead=" + StringUtils.humanReadableInt(
|
||||
server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead()));
|
||||
flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
|
||||
|
@ -222,7 +202,7 @@ class MemStoreFlusher implements FlushRequester {
|
|||
} else {
|
||||
LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
|
||||
+ "Total Memstore size="
|
||||
+ humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
|
||||
+ humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreDataSize())
|
||||
+ ", Region memstore size="
|
||||
+ humanReadableInt(regionToFlush.getMemstoreSize()));
|
||||
flushedOne = flushRegion(regionToFlush, true, true);
|
||||
|
@ -251,9 +231,15 @@ class MemStoreFlusher implements FlushRequester {
|
|||
wakeupPending.set(false); // allow someone to wake us up again
|
||||
fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
||||
if (fqe == null || fqe instanceof WakeupFlushThread) {
|
||||
if (isAboveLowWaterMark()) {
|
||||
FlushType type = isAboveLowWaterMark();
|
||||
if (type != FlushType.NORMAL) {
|
||||
LOG.debug("Flush thread woke up because memory above low water="
|
||||
+ TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
|
||||
+ TraditionalBinaryPrefix.long2String(
|
||||
server.getRegionServerAccounting().getGlobalMemstoreLimitLowMark(), "", 1));
|
||||
// For offheap memstore, even if the lower water mark was breached due to heap overhead
|
||||
// we still select the regions based on the region's memstore data size.
|
||||
// TODO : If we want to decide based on heap over head it can be done without tracking
|
||||
// it per region.
|
||||
if (!flushOneForGlobalPressure()) {
|
||||
// Wasn't able to flush any region, but we're above low water mark
|
||||
// This is unlikely to happen, but might happen when closing the
|
||||
|
@ -355,17 +341,15 @@ class MemStoreFlusher implements FlushRequester {
|
|||
/**
|
||||
* Return true if global memory usage is above the high watermark
|
||||
*/
|
||||
private boolean isAboveHighWaterMark() {
|
||||
return server.getRegionServerAccounting().getGlobalMemstoreSize()
|
||||
+ server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead() >= globalMemStoreLimit;
|
||||
private FlushType isAboveHighWaterMark() {
|
||||
return server.getRegionServerAccounting().isAboveHighWaterMark();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if we're above the high watermark
|
||||
* Return true if we're above the low watermark
|
||||
*/
|
||||
private boolean isAboveLowWaterMark() {
|
||||
return server.getRegionServerAccounting().getGlobalMemstoreSize() + server
|
||||
.getRegionServerAccounting().getGlobalMemstoreHeapOverhead() >= globalMemStoreLimitLowMark;
|
||||
private FlushType isAboveLowWaterMark() {
|
||||
return server.getRegionServerAccounting().isAboveLowWaterMark();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -548,9 +532,12 @@ class MemStoreFlusher implements FlushRequester {
|
|||
}
|
||||
|
||||
private void notifyFlushRequest(Region region, boolean emergencyFlush) {
|
||||
FlushType type = FlushType.NORMAL;
|
||||
FlushType type = null;
|
||||
if (emergencyFlush) {
|
||||
type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
|
||||
type = isAboveHighWaterMark();
|
||||
if (type == null) {
|
||||
type = isAboveLowWaterMark();
|
||||
}
|
||||
}
|
||||
for (FlushRequestListener listener : flushRequestListeners) {
|
||||
listener.flushRequested(type, region);
|
||||
|
@ -586,7 +573,8 @@ class MemStoreFlusher implements FlushRequester {
|
|||
*/
|
||||
public void reclaimMemStoreMemory() {
|
||||
TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
|
||||
if (isAboveHighWaterMark()) {
|
||||
FlushType flushType = isAboveHighWaterMark();
|
||||
if (flushType != FlushType.NORMAL) {
|
||||
if (Trace.isTracing()) {
|
||||
scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
|
||||
}
|
||||
|
@ -596,17 +584,24 @@ class MemStoreFlusher implements FlushRequester {
|
|||
long startTime = 0;
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
while (isAboveHighWaterMark() && !server.isStopped()) {
|
||||
flushType = isAboveHighWaterMark();
|
||||
while (flushType != FlushType.NORMAL && !server.isStopped()) {
|
||||
if (!blocked) {
|
||||
startTime = EnvironmentEdgeManager.currentTime();
|
||||
LOG.info("Blocking updates on " + server.toString() + ": the global memstore size "
|
||||
+ TraditionalBinaryPrefix.long2String(
|
||||
server.getRegionServerAccounting().getGlobalMemstoreSize(), "", 1)
|
||||
+ " + global memstore heap overhead "
|
||||
+ TraditionalBinaryPrefix.long2String(
|
||||
server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead(), "", 1)
|
||||
+ " is >= than blocking "
|
||||
+ TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
|
||||
if (!server.getRegionServerAccounting().isOffheap()) {
|
||||
logMsg("the global memstore size", "global memstore heap overhead");
|
||||
} else {
|
||||
switch (flushType) {
|
||||
case ABOVE_OFFHEAP_HIGHER_MARK:
|
||||
logMsg("the global offheap memstore size", "global memstore heap overhead");
|
||||
break;
|
||||
case ABOVE_ONHEAP_HIGHER_MARK:
|
||||
logMsg("global memstore heap overhead", "");
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
blocked = true;
|
||||
wakeupFlushThread();
|
||||
|
@ -620,6 +615,7 @@ class MemStoreFlusher implements FlushRequester {
|
|||
}
|
||||
long took = EnvironmentEdgeManager.currentTime() - start;
|
||||
LOG.warn("Memstore is above high water mark and block " + took + "ms");
|
||||
flushType = isAboveHighWaterMark();
|
||||
}
|
||||
} finally {
|
||||
if (interrupted) {
|
||||
|
@ -635,11 +631,24 @@ class MemStoreFlusher implements FlushRequester {
|
|||
LOG.info("Unblocking updates for server " + server.toString());
|
||||
}
|
||||
}
|
||||
} else if (isAboveLowWaterMark()) {
|
||||
} else if (isAboveLowWaterMark() != FlushType.NORMAL) {
|
||||
wakeupFlushThread();
|
||||
}
|
||||
scope.close();
|
||||
}
|
||||
|
||||
private void logMsg(String string1, String string2) {
|
||||
LOG.info("Blocking updates on " + server.toString() + ": " + string1 + " "
|
||||
+ TraditionalBinaryPrefix
|
||||
.long2String(server.getRegionServerAccounting().getGlobalMemstoreDataSize(), "", 1)
|
||||
+ " + " + string2 + " "
|
||||
+ TraditionalBinaryPrefix
|
||||
.long2String(server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead(), "", 1)
|
||||
+ " is >= than blocking " + TraditionalBinaryPrefix.long2String(
|
||||
server.getRegionServerAccounting().getGlobalMemstoreLimit(), "", 1)
|
||||
+ " size");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "flush_queue="
|
||||
|
@ -685,16 +694,10 @@ class MemStoreFlusher implements FlushRequester {
|
|||
*/
|
||||
@Override
|
||||
public void setGlobalMemstoreLimit(long globalMemStoreSize) {
|
||||
this.globalMemStoreLimit = globalMemStoreSize;
|
||||
this.globalMemStoreLimitLowMark =
|
||||
(long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
|
||||
this.server.getRegionServerAccounting().setGlobalMemstoreLimits(globalMemStoreSize);
|
||||
reclaimMemStoreMemory();
|
||||
}
|
||||
|
||||
public long getMemoryLimit() {
|
||||
return this.globalMemStoreLimit;
|
||||
}
|
||||
|
||||
interface FlushQueueEntry extends Delayed {
|
||||
}
|
||||
|
||||
|
@ -825,5 +828,11 @@ class MemStoreFlusher implements FlushRequester {
|
|||
}
|
||||
|
||||
enum FlushType {
|
||||
NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK;
|
||||
NORMAL,
|
||||
ABOVE_ONHEAP_LOWER_MARK, /* happens due to lower mark breach of onheap memstore settings
|
||||
An offheap memstore can even breach the onheap_lower_mark*/
|
||||
ABOVE_ONHEAP_HIGHER_MARK,/* happens due to higher mark breach of onheap memstore settings
|
||||
An offheap memstore can even breach the onheap_higher_mark*/
|
||||
ABOVE_OFFHEAP_LOWER_MARK,/* happens due to lower mark breach of offheap memstore settings*/
|
||||
ABOVE_OFFHEAP_HIGHER_MARK;/*/* happens due to higer mark breach of offheap memstore settings*/
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* This class is for maintaining the various regionserver's heap memory manager statistics and
|
||||
|
|
|
@ -18,21 +18,28 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.lang.management.MemoryType;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
* RegionServerAccounting keeps record of some basic real time information about
|
||||
* the Region Server. Currently, it only keeps record the global memstore size.
|
||||
* the Region Server. Currently, it keeps record the global memstore size and global memstore heap
|
||||
* overhead. It also tracks the replay edits per region.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RegionServerAccounting {
|
||||
|
||||
// memstore data size
|
||||
private final AtomicLong globalMemstoreDataSize = new AtomicLong(0);
|
||||
// memstore heap over head size
|
||||
private final AtomicLong globalMemstoreHeapOverhead = new AtomicLong(0);
|
||||
|
||||
// Store the edits size during replaying WAL. Use this to roll back the
|
||||
|
@ -40,17 +47,97 @@ public class RegionServerAccounting {
|
|||
private final ConcurrentMap<byte[], MemstoreSize> replayEditsPerRegion =
|
||||
new ConcurrentSkipListMap<byte[], MemstoreSize>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
/**
|
||||
* @return the global Memstore size in the RegionServer
|
||||
*/
|
||||
public long getGlobalMemstoreSize() {
|
||||
return globalMemstoreDataSize.get();
|
||||
private final Configuration conf;
|
||||
|
||||
private long globalMemStoreLimit;
|
||||
private final float globalMemStoreLimitLowMarkPercent;
|
||||
private long globalMemStoreLimitLowMark;
|
||||
private final MemoryType memType;
|
||||
private long globalOnHeapMemstoreLimit;
|
||||
private long globalOnHeapMemstoreLimitLowMark;
|
||||
|
||||
public RegionServerAccounting(Configuration conf) {
|
||||
this.conf = conf;
|
||||
Pair<Long, MemoryType> globalMemstoreSizePair = MemorySizeUtil.getGlobalMemstoreSize(conf);
|
||||
this.globalMemStoreLimit = globalMemstoreSizePair.getFirst();
|
||||
this.memType = globalMemstoreSizePair.getSecond();
|
||||
this.globalMemStoreLimitLowMarkPercent =
|
||||
MemorySizeUtil.getGlobalMemStoreHeapLowerMark(conf, this.memType == MemoryType.HEAP);
|
||||
// When off heap memstore in use we configure the global off heap space for memstore as bytes
|
||||
// not as % of max memory size. In such case, the lower water mark should be specified using the
|
||||
// key "hbase.regionserver.global.memstore.size.lower.limit" which says % of the global upper
|
||||
// bound and defaults to 95%. In on heap case also specifying this way is ideal. But in the past
|
||||
// we used to take lower bound also as the % of xmx (38% as default). For backward compatibility
|
||||
// for this deprecated config,we will fall back to read that config when new one is missing.
|
||||
// Only for on heap case, do this fallback mechanism. For off heap it makes no sense.
|
||||
// TODO When to get rid of the deprecated config? ie
|
||||
// "hbase.regionserver.global.memstore.lowerLimit". Can get rid of this boolean passing then.
|
||||
this.globalMemStoreLimitLowMark =
|
||||
(long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
|
||||
this.globalOnHeapMemstoreLimit = MemorySizeUtil.getOnheapGlobalMemstoreSize(conf);
|
||||
this.globalOnHeapMemstoreLimitLowMark =
|
||||
(long) (this.globalOnHeapMemstoreLimit * this.globalMemStoreLimitLowMarkPercent);
|
||||
}
|
||||
|
||||
public long getGlobalMemstoreLimit() {
|
||||
return this.globalMemStoreLimit;
|
||||
}
|
||||
|
||||
public long getOnheapGlobalMemstoreLimit() {
|
||||
return this.globalOnHeapMemstoreLimit;
|
||||
}
|
||||
|
||||
// Called by the tuners.
|
||||
public void setGlobalMemstoreLimits(long newGlobalMemstoreLimit) {
|
||||
if (this.memType == MemoryType.HEAP) {
|
||||
this.globalMemStoreLimit = newGlobalMemstoreLimit;
|
||||
this.globalMemStoreLimitLowMark =
|
||||
(long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
|
||||
} else {
|
||||
this.globalOnHeapMemstoreLimit = newGlobalMemstoreLimit;
|
||||
this.globalOnHeapMemstoreLimitLowMark =
|
||||
(long) (this.globalOnHeapMemstoreLimit * this.globalMemStoreLimitLowMarkPercent);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isOffheap() {
|
||||
return this.memType == MemoryType.NON_HEAP;
|
||||
}
|
||||
|
||||
public long getGlobalMemstoreLimitLowMark() {
|
||||
return this.globalMemStoreLimitLowMark;
|
||||
}
|
||||
|
||||
public float getGlobalMemstoreLimitLowMarkPercent() {
|
||||
return this.globalMemStoreLimitLowMarkPercent;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the global Memstore data size in the RegionServer
|
||||
*/
|
||||
public long getGlobalMemstoreDataSize() {
|
||||
return globalMemstoreDataSize.get();
|
||||
}
|
||||
/**
|
||||
* @return the global memstore heap overhead size in the RegionServer
|
||||
*/
|
||||
public long getGlobalMemstoreHeapOverhead() {
|
||||
return this.globalMemstoreHeapOverhead.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the global memstore data size and heap overhead size for an onheap memstore
|
||||
* whereas return the heap overhead size for an offheap memstore
|
||||
*/
|
||||
public long getGlobalMemstoreSize() {
|
||||
if (isOffheap()) {
|
||||
// get only the heap overhead for offheap memstore
|
||||
return getGlobalMemstoreHeapOverhead();
|
||||
} else {
|
||||
return getGlobalMemstoreDataSize() + getGlobalMemstoreHeapOverhead();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param memStoreSize the Memstore size will be added to
|
||||
* the global Memstore size
|
||||
|
@ -65,6 +152,77 @@ public class RegionServerAccounting {
|
|||
globalMemstoreHeapOverhead.addAndGet(-memStoreSize.getHeapOverhead());
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if we are above the memstore high water mark
|
||||
* @return the flushtype
|
||||
*/
|
||||
public FlushType isAboveHighWaterMark() {
|
||||
// for onheap memstore we check if the global memstore size and the
|
||||
// global heap overhead is greater than the global memstore limit
|
||||
if (memType == MemoryType.HEAP) {
|
||||
if (getGlobalMemstoreDataSize() + getGlobalMemstoreHeapOverhead() >= globalMemStoreLimit) {
|
||||
return FlushType.ABOVE_ONHEAP_HIGHER_MARK;
|
||||
}
|
||||
} else {
|
||||
// If the configured memstore is offheap, check for two things
|
||||
// 1) If the global memstore data size is greater than the configured
|
||||
// 'hbase.regionserver.offheap.global.memstore.size'
|
||||
// 2) If the global memstore heap size is greater than the configured onheap
|
||||
// global memstore limit 'hbase.regionserver.global.memstore.size'.
|
||||
// We do this to avoid OOME incase of scenarios where the heap is occupied with
|
||||
// lot of onheap references to the cells in memstore
|
||||
if (getGlobalMemstoreDataSize() >= globalMemStoreLimit) {
|
||||
// Indicates that global memstore size is above the configured
|
||||
// 'hbase.regionserver.offheap.global.memstore.size'
|
||||
return FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
|
||||
} else if (getGlobalMemstoreHeapOverhead() >= this.globalOnHeapMemstoreLimit) {
|
||||
// Indicates that the offheap memstore's heap overhead is greater than the
|
||||
// configured 'hbase.regionserver.global.memstore.size'.
|
||||
return FlushType.ABOVE_ONHEAP_HIGHER_MARK;
|
||||
}
|
||||
}
|
||||
return FlushType.NORMAL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if we're above the low watermark
|
||||
*/
|
||||
public FlushType isAboveLowWaterMark() {
|
||||
// for onheap memstore we check if the global memstore size and the
|
||||
// global heap overhead is greater than the global memstore lower mark limit
|
||||
if (memType == MemoryType.HEAP) {
|
||||
if (getGlobalMemstoreDataSize() + getGlobalMemstoreHeapOverhead() >= globalMemStoreLimitLowMark) {
|
||||
return FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
||||
}
|
||||
} else {
|
||||
if (getGlobalMemstoreDataSize() >= globalMemStoreLimitLowMark) {
|
||||
// Indicates that the offheap memstore's data size is greater than the global memstore
|
||||
// lower limit
|
||||
return FlushType.ABOVE_OFFHEAP_LOWER_MARK;
|
||||
} else if (getGlobalMemstoreHeapOverhead() >= globalOnHeapMemstoreLimitLowMark) {
|
||||
// Indicates that the offheap memstore's heap overhead is greater than the global memstore
|
||||
// onheap lower limit
|
||||
return FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
||||
}
|
||||
}
|
||||
return FlushType.NORMAL;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the flush pressure of all stores on this regionserver. The value should be greater than
|
||||
* or equal to 0.0, and any value greater than 1.0 means we enter the emergency state that
|
||||
* global memstore size already exceeds lower limit.
|
||||
*/
|
||||
public double getFlushPressure() {
|
||||
if (memType == MemoryType.HEAP) {
|
||||
return (getGlobalMemstoreDataSize() + getGlobalMemstoreHeapOverhead()) * 1.0
|
||||
/ globalMemStoreLimitLowMark;
|
||||
} else {
|
||||
return Math.max(getGlobalMemstoreDataSize() * 1.0 / globalMemStoreLimitLowMark,
|
||||
getGlobalMemstoreHeapOverhead() * 1.0 / globalOnHeapMemstoreLimitLowMark);
|
||||
}
|
||||
}
|
||||
|
||||
/***
|
||||
* Add memStoreSize to replayEditsPerRegion.
|
||||
*
|
||||
|
|
|
@ -264,6 +264,7 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
|
|||
* or equal to 0.0, and any value greater than 1.0 means we enter the emergency state that
|
||||
* global memstore size already exceeds lower limit.
|
||||
*/
|
||||
@Deprecated
|
||||
double getFlushPressure();
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,8 +22,7 @@ import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DE
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.MemoryUsage;
|
||||
import java.lang.management.MemoryType;
|
||||
import java.net.URLEncoder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -285,9 +284,9 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
|||
return Long.parseLong(chompedPath);
|
||||
}
|
||||
|
||||
private int calculateMaxLogFiles(float memstoreSizeRatio, long logRollSize) {
|
||||
MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
|
||||
return Math.round(mu.getMax() * memstoreSizeRatio * 2 / logRollSize);
|
||||
private int calculateMaxLogFiles(Configuration conf, long logRollSize) {
|
||||
Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemstoreSize(conf);
|
||||
return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize);
|
||||
}
|
||||
|
||||
// must be power of 2
|
||||
|
@ -386,13 +385,12 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
|||
this.logrollsize = (long) (blocksize
|
||||
* conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
|
||||
|
||||
float memstoreRatio = MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false);
|
||||
boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null;
|
||||
if (maxLogsDefined) {
|
||||
LOG.warn("'hbase.regionserver.maxlogs' was deprecated.");
|
||||
}
|
||||
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",
|
||||
Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize)));
|
||||
Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
|
||||
|
||||
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize="
|
||||
+ StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix="
|
||||
|
|
|
@ -92,7 +92,7 @@ public class TestGlobalMemStoreSize {
|
|||
server.getFromOnlineRegions(regionInfo.getEncodedName()).
|
||||
getMemstoreSize();
|
||||
}
|
||||
assertEquals(server.getRegionServerAccounting().getGlobalMemstoreSize(),
|
||||
assertEquals(server.getRegionServerAccounting().getGlobalMemstoreDataSize(),
|
||||
globalMemStoreSize);
|
||||
}
|
||||
|
||||
|
@ -100,7 +100,7 @@ public class TestGlobalMemStoreSize {
|
|||
int i = 0;
|
||||
for (HRegionServer server : getOnlineRegionServers()) {
|
||||
LOG.info("Starting flushes on " + server.getServerName() +
|
||||
", size=" + server.getRegionServerAccounting().getGlobalMemstoreSize());
|
||||
", size=" + server.getRegionServerAccounting().getGlobalMemstoreDataSize());
|
||||
|
||||
for (HRegionInfo regionInfo :
|
||||
ProtobufUtil.getOnlineRegions(null, server.getRSRpcServices())) {
|
||||
|
@ -110,11 +110,11 @@ public class TestGlobalMemStoreSize {
|
|||
LOG.info("Post flush on " + server.getServerName());
|
||||
long now = System.currentTimeMillis();
|
||||
long timeout = now + 1000;
|
||||
while(server.getRegionServerAccounting().getGlobalMemstoreSize() != 0 &&
|
||||
while(server.getRegionServerAccounting().getGlobalMemstoreDataSize() != 0 &&
|
||||
timeout < System.currentTimeMillis()) {
|
||||
Threads.sleep(10);
|
||||
}
|
||||
long size = server.getRegionServerAccounting().getGlobalMemstoreSize();
|
||||
long size = server.getRegionServerAccounting().getGlobalMemstoreDataSize();
|
||||
if (size > 0) {
|
||||
// If size > 0, see if its because the meta region got edits while
|
||||
// our test was running....
|
||||
|
@ -131,7 +131,7 @@ public class TestGlobalMemStoreSize {
|
|||
}
|
||||
}
|
||||
}
|
||||
size = server.getRegionServerAccounting().getGlobalMemstoreSize();
|
||||
size = server.getRegionServerAccounting().getGlobalMemstoreDataSize();
|
||||
assertEquals("Server=" + server.getServerName() + ", i=" + i++, 0, size);
|
||||
}
|
||||
|
||||
|
@ -149,7 +149,7 @@ public class TestGlobalMemStoreSize {
|
|||
throws IOException {
|
||||
LOG.info("Flush " + r.toString() + " on " + server.getServerName() +
|
||||
", " + r.flush(true) + ", size=" +
|
||||
server.getRegionServerAccounting().getGlobalMemstoreSize());
|
||||
server.getRegionServerAccounting().getGlobalMemstoreDataSize());
|
||||
}
|
||||
|
||||
private List<HRegionServer> getOnlineRegionServers() {
|
||||
|
|
|
@ -170,7 +170,7 @@ public class TestHRegionReplayEvents {
|
|||
rss = mock(RegionServerServices.class);
|
||||
when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
|
||||
when(rss.getConfiguration()).thenReturn(CONF);
|
||||
when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting());
|
||||
when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(CONF));
|
||||
String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
|
||||
.toString();
|
||||
ExecutorService es = new ExecutorService(string);
|
||||
|
@ -281,12 +281,12 @@ public class TestHRegionReplayEvents {
|
|||
}
|
||||
}
|
||||
|
||||
assertTrue(rss.getRegionServerAccounting().getGlobalMemstoreSize() > 0);
|
||||
assertTrue(rss.getRegionServerAccounting().getGlobalMemstoreDataSize() > 0);
|
||||
// now close the region which should not cause hold because of un-committed flush
|
||||
secondaryRegion.close();
|
||||
|
||||
// verify that the memstore size is back to what it was
|
||||
assertEquals(0, rss.getRegionServerAccounting().getGlobalMemstoreSize());
|
||||
assertEquals(0, rss.getRegionServerAccounting().getGlobalMemstoreDataSize());
|
||||
}
|
||||
|
||||
static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
|
||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.MemoryType;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -65,8 +66,10 @@ public class TestHeapMemoryManager {
|
|||
conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.02f);
|
||||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f);
|
||||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.03f);
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf);
|
||||
HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0),
|
||||
new MemstoreFlusherStub(0), new RegionServerStub(conf), new RegionServerAccountingStub());
|
||||
new MemstoreFlusherStub(0), new RegionServerStub(conf),
|
||||
regionServerAccounting);
|
||||
assertFalse(manager.isTunerOn());
|
||||
}
|
||||
|
||||
|
@ -76,21 +79,24 @@ public class TestHeapMemoryManager {
|
|||
conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.02f);
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.03f);
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf);
|
||||
HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0),
|
||||
new MemstoreFlusherStub(0), new RegionServerStub(conf), new RegionServerAccountingStub());
|
||||
new MemstoreFlusherStub(0), new RegionServerStub(conf),
|
||||
regionServerAccounting);
|
||||
assertFalse(manager.isTunerOn());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWhenMemstoreAndBlockCacheMaxMinChecksFails() throws Exception {
|
||||
BlockCacheStub blockCache = new BlockCacheStub(0);
|
||||
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub(0);
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
|
||||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.06f);
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf);
|
||||
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub(0);
|
||||
try {
|
||||
new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
new RegionServerStub(conf), new RegionServerAccountingStub());
|
||||
new RegionServerStub(conf), regionServerAccounting);
|
||||
fail();
|
||||
} catch (RuntimeException e) {
|
||||
}
|
||||
|
@ -99,7 +105,7 @@ public class TestHeapMemoryManager {
|
|||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
|
||||
try {
|
||||
new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
new RegionServerStub(conf), new RegionServerAccountingStub());
|
||||
new RegionServerStub(conf), regionServerAccounting);
|
||||
fail();
|
||||
} catch (RuntimeException e) {
|
||||
}
|
||||
|
@ -107,18 +113,19 @@ public class TestHeapMemoryManager {
|
|||
|
||||
@Test
|
||||
public void testWhenClusterIsWriteHeavyWithEmptyMemstore() throws Exception {
|
||||
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
|
||||
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
|
||||
// Empty block cache and memstore
|
||||
blockCache.setTestBlockSize(0);
|
||||
regionServerAccounting.setTestMemstoreSize(0);
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
|
||||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
|
||||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
|
||||
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
|
||||
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf);
|
||||
MemstoreFlusherStub memStoreFlusher =
|
||||
new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
|
||||
// Empty block cache and memstore
|
||||
blockCache.setTestBlockSize(0);
|
||||
regionServerAccounting.setTestMemstoreSize(0);
|
||||
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
|
||||
// Let the system start with default values for memstore heap and block cache size.
|
||||
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
|
@ -127,11 +134,11 @@ public class TestHeapMemoryManager {
|
|||
long oldBlockCacheSize = blockCache.maxSize;
|
||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||
heapMemoryManager.start(choreService);
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
// Allow the tuner to run once and do necessary memory up
|
||||
Thread.sleep(1500);
|
||||
|
@ -140,14 +147,114 @@ public class TestHeapMemoryManager {
|
|||
assertEquals(oldBlockCacheSize, blockCache.maxSize);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHeapMemoryManagerWhenOffheapFlushesHappenUnderReadHeavyCase() throws Exception {
|
||||
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_LOWER_LIMIT_KEY, 0.7f);
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
|
||||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
|
||||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
|
||||
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
|
||||
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf, true);
|
||||
MemstoreFlusherStub memStoreFlusher =
|
||||
new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
|
||||
// Empty memstore and but nearly filled block cache
|
||||
blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
regionServerAccounting.setTestMemstoreSize(0);
|
||||
// Let the system start with default values for memstore heap and block cache size.
|
||||
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
new RegionServerStub(conf), regionServerAccounting);
|
||||
long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
|
||||
long oldBlockCacheSize = blockCache.maxSize;
|
||||
float maxStepValue = DefaultHeapMemoryTuner.DEFAULT_MIN_STEP_VALUE;
|
||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||
heapMemoryManager.start(choreService);
|
||||
blockCache.evictBlock(null);
|
||||
blockCache.evictBlock(null);
|
||||
blockCache.evictBlock(null);
|
||||
// do some offheap flushes also. So there should be decrease in memstore but
|
||||
// not as that when we don't have offheap flushes
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
// Allow the tuner to run once and do necessary memory up
|
||||
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
||||
assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
|
||||
assertHeapSpaceDelta(maxStepValue, oldBlockCacheSize, blockCache.maxSize);
|
||||
oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
|
||||
oldBlockCacheSize = blockCache.maxSize;
|
||||
// Do some more evictions before the next run of HeapMemoryTuner
|
||||
blockCache.evictBlock(null);
|
||||
// Allow the tuner to run once and do necessary memory up
|
||||
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
||||
assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
|
||||
assertHeapSpaceDelta(maxStepValue, oldBlockCacheSize, blockCache.maxSize);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHeapMemoryManagerWithOffheapMemstoreAndMixedWorkload() throws Exception {
|
||||
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_LOWER_LIMIT_KEY, 0.7f);
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
|
||||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
|
||||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
|
||||
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
|
||||
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf, true);
|
||||
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
|
||||
// Empty memstore and but nearly filled block cache
|
||||
blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
// Let the system start with default values for memstore heap and block cache size.
|
||||
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
new RegionServerStub(conf), regionServerAccounting);
|
||||
long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
|
||||
long oldBlockCacheSize = blockCache.maxSize;
|
||||
float maxStepValue = DefaultHeapMemoryTuner.DEFAULT_MIN_STEP_VALUE;
|
||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||
heapMemoryManager.start(choreService);
|
||||
blockCache.evictBlock(null);
|
||||
blockCache.evictBlock(null);
|
||||
blockCache.evictBlock(null);
|
||||
// do some offheap flushes also. So there should be decrease in memstore but
|
||||
// not as that when we don't have offheap flushes
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
// Allow the tuner to run once and do necessary memory up
|
||||
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
||||
assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
|
||||
assertHeapSpaceDelta(maxStepValue, oldBlockCacheSize, blockCache.maxSize);
|
||||
oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
|
||||
oldBlockCacheSize = blockCache.maxSize;
|
||||
// change memstore size
|
||||
// regionServerAccounting.setTestMemstoreSize((long)(maxHeapSize * 0.4 * 0.8));
|
||||
// The memstore size would have decreased. Now again do some flushes and ensure the
|
||||
// flushes are due to onheap overhead. This should once again call for increase in
|
||||
// memstore size but that increase should be to the safe size
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
// Allow the tuner to run once and do necessary memory up
|
||||
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
|
||||
assertHeapSpaceDelta(maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
|
||||
assertHeapSpaceDelta(-maxStepValue, oldBlockCacheSize, blockCache.maxSize);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWhenClusterIsReadHeavyWithEmptyBlockCache() throws Exception {
|
||||
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
|
||||
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
|
||||
// Empty block cache and memstore
|
||||
blockCache.setTestBlockSize(0);
|
||||
regionServerAccounting.setTestMemstoreSize(0);
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
|
||||
|
@ -155,6 +262,11 @@ public class TestHeapMemoryManager {
|
|||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
|
||||
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
|
||||
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf);
|
||||
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
|
||||
// Empty block cache and memstore
|
||||
blockCache.setTestBlockSize(0);
|
||||
regionServerAccounting.setTestMemstoreSize(0);
|
||||
// Let the system start with default values for memstore heap and block cache size.
|
||||
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
new RegionServerStub(conf), regionServerAccounting);
|
||||
|
@ -175,11 +287,6 @@ public class TestHeapMemoryManager {
|
|||
@Test
|
||||
public void testWhenClusterIsWriteHeavy() throws Exception {
|
||||
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
|
||||
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
|
||||
// Empty block cache and but nearly filled memstore
|
||||
blockCache.setTestBlockSize(0);
|
||||
regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
|
||||
|
@ -187,6 +294,12 @@ public class TestHeapMemoryManager {
|
|||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
|
||||
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
|
||||
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf);
|
||||
MemstoreFlusherStub memStoreFlusher =
|
||||
new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
|
||||
// Empty block cache and but nearly filled memstore
|
||||
blockCache.setTestBlockSize(0);
|
||||
regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
// Let the system start with default values for memstore heap and block cache size.
|
||||
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
new RegionServerStub(conf), regionServerAccounting);
|
||||
|
@ -194,7 +307,7 @@ public class TestHeapMemoryManager {
|
|||
long oldBlockCacheSize = blockCache.maxSize;
|
||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||
heapMemoryManager.start(choreService);
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
|
@ -208,7 +321,7 @@ public class TestHeapMemoryManager {
|
|||
oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
|
||||
oldBlockCacheSize = blockCache.maxSize;
|
||||
// Do some more flushes before the next run of HeapMemoryTuner
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
// Allow the tuner to run once and do necessary memory up
|
||||
|
@ -219,14 +332,45 @@ public class TestHeapMemoryManager {
|
|||
blockCache.maxSize);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWhenClusterIsWriteHeavyWithOffheapMemstore() throws Exception {
|
||||
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
|
||||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
|
||||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
|
||||
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
|
||||
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf);
|
||||
MemstoreFlusherStub memStoreFlusher =
|
||||
new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
|
||||
// Empty block cache and but nearly filled memstore
|
||||
blockCache.setTestBlockSize(0);
|
||||
regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
// Let the system start with default values for memstore heap and block cache size.
|
||||
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
new RegionServerStub(conf), regionServerAccounting);
|
||||
long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
|
||||
long oldBlockCacheSize = blockCache.maxSize;
|
||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||
heapMemoryManager.start(choreService);
|
||||
// this should not change anything with onheap memstore
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
// Allow the tuner to run once and do necessary memory up
|
||||
Thread.sleep(1500);
|
||||
// No changes should be made by tuner as we already have lot of empty space
|
||||
assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
|
||||
assertEquals(oldBlockCacheSize, blockCache.maxSize);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWhenClusterIsReadHeavy() throws Exception {
|
||||
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
|
||||
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
|
||||
// Empty memstore and but nearly filled block cache
|
||||
blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
regionServerAccounting.setTestMemstoreSize(0);
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_LOWER_LIMIT_KEY, 0.7f);
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
|
||||
|
@ -235,9 +379,15 @@ public class TestHeapMemoryManager {
|
|||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
|
||||
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
|
||||
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf);
|
||||
MemstoreFlusherStub memStoreFlusher =
|
||||
new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
|
||||
// Empty memstore and but nearly filled block cache
|
||||
blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
regionServerAccounting.setTestMemstoreSize(0);
|
||||
// Let the system start with default values for memstore heap and block cache size.
|
||||
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
new RegionServerStub(conf), new RegionServerAccountingStub());
|
||||
new RegionServerStub(conf), new RegionServerAccountingStub(conf));
|
||||
long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
|
||||
long oldBlockCacheSize = blockCache.maxSize;
|
||||
long oldMemstoreLowerMarkSize = 7 * oldMemstoreHeapSize / 10;
|
||||
|
@ -272,12 +422,6 @@ public class TestHeapMemoryManager {
|
|||
@Test
|
||||
public void testWhenClusterIsHavingMoreWritesThanReads() throws Exception {
|
||||
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
|
||||
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
|
||||
// Both memstore and block cache are nearly filled
|
||||
blockCache.setTestBlockSize(0);
|
||||
regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
|
||||
|
@ -285,6 +429,13 @@ public class TestHeapMemoryManager {
|
|||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
|
||||
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
|
||||
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf);
|
||||
MemstoreFlusherStub memStoreFlusher =
|
||||
new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
|
||||
// Both memstore and block cache are nearly filled
|
||||
blockCache.setTestBlockSize(0);
|
||||
regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
// Let the system start with default values for memstore heap and block cache size.
|
||||
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
new RegionServerStub(conf), regionServerAccounting);
|
||||
|
@ -292,7 +443,7 @@ public class TestHeapMemoryManager {
|
|||
long oldBlockCacheSize = blockCache.maxSize;
|
||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||
heapMemoryManager.start(choreService);
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
|
@ -303,7 +454,7 @@ public class TestHeapMemoryManager {
|
|||
assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
|
||||
assertEquals(oldBlockCacheSize, blockCache.maxSize);
|
||||
// Do some more flushes before the next run of HeapMemoryTuner
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
|
@ -319,11 +470,6 @@ public class TestHeapMemoryManager {
|
|||
public void testBlockedFlushesIncreaseMemstoreInSteadyState() throws Exception {
|
||||
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
|
||||
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub();
|
||||
// Both memstore and block cache are nearly filled
|
||||
blockCache.setTestBlockSize(0);
|
||||
regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
|
||||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
|
||||
|
@ -331,6 +477,11 @@ public class TestHeapMemoryManager {
|
|||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
|
||||
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
|
||||
conf.setInt(DefaultHeapMemoryTuner.NUM_PERIODS_TO_IGNORE, 0);
|
||||
RegionServerAccountingStub regionServerAccounting = new RegionServerAccountingStub(conf);
|
||||
// Both memstore and block cache are nearly filled
|
||||
blockCache.setTestBlockSize(0);
|
||||
regionServerAccounting.setTestMemstoreSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
blockCache.setTestBlockSize((long) (maxHeapSize * 0.4 * 0.8));
|
||||
// Let the system start with default values for memstore heap and block cache size.
|
||||
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
new RegionServerStub(conf), regionServerAccounting);
|
||||
|
@ -338,7 +489,7 @@ public class TestHeapMemoryManager {
|
|||
long oldBlockCacheSize = blockCache.maxSize;
|
||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||
heapMemoryManager.start(choreService);
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
|
@ -350,7 +501,7 @@ public class TestHeapMemoryManager {
|
|||
assertEquals(oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
|
||||
assertEquals(oldBlockCacheSize, blockCache.maxSize);
|
||||
// Flushes that block updates
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
|
||||
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
|
||||
memStoreFlusher.requestFlush(null, false);
|
||||
blockCache.evictBlock(null);
|
||||
blockCache.evictBlock(null);
|
||||
|
@ -379,7 +530,7 @@ public class TestHeapMemoryManager {
|
|||
HeapMemoryTuner.class);
|
||||
// Let the system start with default values for memstore heap and block cache size.
|
||||
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
new RegionServerStub(conf), new RegionServerAccountingStub());
|
||||
new RegionServerStub(conf), new RegionServerAccountingStub(conf));
|
||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||
heapMemoryManager.start(choreService);
|
||||
// Now we wants to be in write mode. Set bigger memstore size from CustomHeapMemoryTuner
|
||||
|
@ -412,7 +563,7 @@ public class TestHeapMemoryManager {
|
|||
conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class,
|
||||
HeapMemoryTuner.class);
|
||||
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
new RegionServerStub(conf), new RegionServerAccountingStub());
|
||||
new RegionServerStub(conf), new RegionServerAccountingStub(conf));
|
||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||
heapMemoryManager.start(choreService);
|
||||
CustomHeapMemoryTuner.memstoreSize = 0.78f;
|
||||
|
@ -438,7 +589,7 @@ public class TestHeapMemoryManager {
|
|||
conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class,
|
||||
HeapMemoryTuner.class);
|
||||
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
new RegionServerStub(conf), new RegionServerAccountingStub());
|
||||
new RegionServerStub(conf), new RegionServerAccountingStub(conf));
|
||||
long oldMemstoreSize = memStoreFlusher.memstoreSize;
|
||||
long oldBlockCacheSize = blockCache.maxSize;
|
||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||
|
@ -473,7 +624,7 @@ public class TestHeapMemoryManager {
|
|||
|
||||
try {
|
||||
heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
new RegionServerStub(conf), new RegionServerAccountingStub());
|
||||
new RegionServerStub(conf), new RegionServerAccountingStub(conf));
|
||||
fail("Should have failed as the collective heap memory need is above 80%");
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
@ -482,7 +633,7 @@ public class TestHeapMemoryManager {
|
|||
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.6f);
|
||||
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.6f);
|
||||
heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
|
||||
new RegionServerStub(conf), new RegionServerAccountingStub());
|
||||
new RegionServerStub(conf), new RegionServerAccountingStub(conf));
|
||||
long oldMemstoreSize = memStoreFlusher.memstoreSize;
|
||||
long oldBlockCacheSize = blockCache.maxSize;
|
||||
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
|
||||
|
@ -508,7 +659,8 @@ public class TestHeapMemoryManager {
|
|||
assertEquals(expected, currentHeapSpace);
|
||||
}
|
||||
|
||||
private void assertHeapSpaceDelta(double expectedDeltaPercent, long oldHeapSpace, long newHeapSpace) {
|
||||
private void assertHeapSpaceDelta(double expectedDeltaPercent, long oldHeapSpace,
|
||||
long newHeapSpace) {
|
||||
double expctedMinDelta = (double) (this.maxHeapSize * expectedDeltaPercent);
|
||||
// Tolerable error
|
||||
double error = 0.95;
|
||||
|
@ -757,11 +909,34 @@ public class TestHeapMemoryManager {
|
|||
}
|
||||
|
||||
private static class RegionServerAccountingStub extends RegionServerAccounting {
|
||||
boolean offheap;
|
||||
|
||||
public RegionServerAccountingStub(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
public RegionServerAccountingStub(Configuration conf, boolean offheap) {
|
||||
super(conf);
|
||||
this.offheap = offheap;
|
||||
}
|
||||
|
||||
private long testMemstoreSize = 0;
|
||||
|
||||
@Override
|
||||
public long getGlobalMemstoreSize() {
|
||||
public long getGlobalMemstoreDataSize() {
|
||||
return testMemstoreSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getGlobalMemstoreHeapOverhead() {
|
||||
return testMemstoreSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffheap() {
|
||||
return offheap;
|
||||
}
|
||||
|
||||
public void setTestMemstoreSize(long testMemstoreSize) {
|
||||
this.testMemstoreSize = testMemstoreSize;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestRegionServerAccounting {
|
||||
|
||||
@Test
|
||||
public void testOnheapMemstoreHigherWaterMarkLimits() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.2f);
|
||||
// try for default cases
|
||||
RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
|
||||
MemstoreSize memstoreSize =
|
||||
new MemstoreSize((long) (3l * 1024l * 1024l * 1024l), (long) (1l * 1024l * 1024l * 1024l));
|
||||
regionServerAccounting.incGlobalMemstoreSize(memstoreSize);
|
||||
assertEquals(FlushType.ABOVE_ONHEAP_HIGHER_MARK,
|
||||
regionServerAccounting.isAboveHighWaterMark());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnheapMemstoreLowerWaterMarkLimits() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.2f);
|
||||
// try for default cases
|
||||
RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
|
||||
MemstoreSize memstoreSize =
|
||||
new MemstoreSize((long) (3l * 1024l * 1024l * 1024l), (long) (1l * 1024l * 1024l * 1024l));
|
||||
regionServerAccounting.incGlobalMemstoreSize(memstoreSize);
|
||||
assertEquals(FlushType.ABOVE_ONHEAP_LOWER_MARK,
|
||||
regionServerAccounting.isAboveLowWaterMark());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffheapMemstoreHigherWaterMarkLimitsDueToDataSize() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
// setting 1G as offheap data size
|
||||
conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1l * 1024l));
|
||||
// try for default cases
|
||||
RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
|
||||
// this will breach offheap limit as data size is higher and not due to heap size
|
||||
MemstoreSize memstoreSize =
|
||||
new MemstoreSize((long) (3l * 1024l * 1024l * 1024l), (long) (1l * 1024l * 1024l * 1024l));
|
||||
regionServerAccounting.incGlobalMemstoreSize(memstoreSize);
|
||||
assertEquals(FlushType.ABOVE_OFFHEAP_HIGHER_MARK,
|
||||
regionServerAccounting.isAboveHighWaterMark());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffheapMemstoreHigherWaterMarkLimitsDueToHeapSize() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.2f);
|
||||
// setting 1G as offheap data size
|
||||
conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1l * 1024l));
|
||||
// try for default cases
|
||||
RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
|
||||
// this will breach higher limit as heap size is higher and not due to offheap size
|
||||
MemstoreSize memstoreSize =
|
||||
new MemstoreSize((long) (3l * 1024l * 1024l), (long) (2l * 1024l * 1024l * 1024l));
|
||||
regionServerAccounting.incGlobalMemstoreSize(memstoreSize);
|
||||
assertEquals(FlushType.ABOVE_ONHEAP_HIGHER_MARK,
|
||||
regionServerAccounting.isAboveHighWaterMark());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffheapMemstoreLowerWaterMarkLimitsDueToDataSize() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
// setting 1G as offheap data size
|
||||
conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1l * 1024l));
|
||||
// try for default cases
|
||||
RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
|
||||
// this will breach offheap limit as data size is higher and not due to heap size
|
||||
MemstoreSize memstoreSize =
|
||||
new MemstoreSize((long) (3l * 1024l * 1024l * 1024l), (long) (1l * 1024l * 1024l * 1024l));
|
||||
regionServerAccounting.incGlobalMemstoreSize(memstoreSize);
|
||||
assertEquals(FlushType.ABOVE_OFFHEAP_LOWER_MARK,
|
||||
regionServerAccounting.isAboveLowWaterMark());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffheapMemstoreLowerWaterMarkLimitsDueToHeapSize() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setFloat(MemorySizeUtil.MEMSTORE_SIZE_KEY, 0.2f);
|
||||
// setting 1G as offheap data size
|
||||
conf.setLong(MemorySizeUtil.OFFHEAP_MEMSTORE_SIZE_KEY, (1l * 1024l));
|
||||
// try for default cases
|
||||
RegionServerAccounting regionServerAccounting = new RegionServerAccounting(conf);
|
||||
// this will breach higher limit as heap size is higher and not due to offheap size
|
||||
MemstoreSize memstoreSize =
|
||||
new MemstoreSize((long) (3l * 1024l * 1024l), (long) (2l * 1024l * 1024l * 1024l));
|
||||
regionServerAccounting.incGlobalMemstoreSize(memstoreSize);
|
||||
assertEquals(FlushType.ABOVE_ONHEAP_LOWER_MARK,
|
||||
regionServerAccounting.isAboveLowWaterMark());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue