HBASE-5349 Automagically tweak global memstore and block cache sizes based on workload

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1550059 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
anoopsamjohn 2013-12-11 07:03:40 +00:00
parent 33a753c766
commit c56d41dc3d
16 changed files with 1229 additions and 48 deletions

View File

@ -752,7 +752,7 @@ public final class HConstants {
public static final String HFILE_BLOCK_CACHE_SIZE_KEY =
"hfile.block.cache.size";
public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.25f;
public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.4f;
/*
* Minimum percentage of free heap necessary for a successful cluster startup.

View File

@ -243,21 +243,20 @@ possible configurations would overwhelm and obscure the important.
<description>The HLog file writer implementation.</description>
</property>
<property>
<name>hbase.regionserver.global.memstore.upperLimit</name>
<name>hbase.regionserver.global.memstore.size</name>
<value>0.4</value>
<description>Maximum size of all memstores in a region server before new
updates are blocked and flushes are forced. Defaults to 40% of heap.
Updates are blocked and flushes are forced until size of all memstores
in a region server hits hbase.regionserver.global.memstore.lowerLimit.</description>
in a region server hits hbase.regionserver.global.memstore.size.lower.limit.</description>
</property>
<property>
<name>hbase.regionserver.global.memstore.lowerLimit</name>
<value>0.38</value>
<description>Maximum size of all memstores in a region server before
flushes are forced. Defaults to 38% of heap.
This value equal to hbase.regionserver.global.memstore.upperLimit causes
the minimum possible flushing to occur when updates are blocked due to
memstore limiting.</description>
<name>hbase.regionserver.global.memstore.size.lower.limit</name>
<value>0.95</value>
<description>Maximum size of all memstores in a region server before flushes are forced.
Defaults to 95% of hbase.regionserver.global.memstore.size.
A 100% value for this value causes the minimum possible flushing to occur when updates are
blocked due to memstore limiting.</description>
</property>
<property>
<name>hbase.regionserver.optionalcacheflushinterval</name>

View File

@ -347,7 +347,7 @@ public class CacheConfig {
* @param conf The current configuration.
* @return The block cache or <code>null</code>.
*/
private static synchronized BlockCache instantiateBlockCache(Configuration conf) {
public static synchronized BlockCache instantiateBlockCache(Configuration conf) {
if (globalBlockCache != null) return globalBlockCache;
if (blockCacheDisabled) return null;

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.util.StringUtils;
*
**/
@InterfaceAudience.Private
public class DoubleBlockCache implements BlockCache, HeapSize {
public class DoubleBlockCache implements ResizableBlockCache, HeapSize {
static final Log LOG = LogFactory.getLog(DoubleBlockCache.class.getName());
@ -172,4 +172,8 @@ public class DoubleBlockCache implements BlockCache, HeapSize {
return onHeapCache.getBlockCount() + offHeapCache.getBlockCount();
}
@Override
public void setMaxSize(long size) {
this.onHeapCache.setMaxSize(size);
}
}

View File

@ -95,7 +95,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* to the relative sizes and usage.
*/
@InterfaceAudience.Private
public class LruBlockCache implements BlockCache, HeapSize {
public class LruBlockCache implements ResizableBlockCache, HeapSize {
static final Log LOG = LogFactory.getLog(LruBlockCache.class);
@ -272,6 +272,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
}
@Override
public void setMaxSize(long maxSize) {
this.maxSize = maxSize;
if(this.size.get() > acceptableSize() && !evictionInProgress) {

View File

@ -0,0 +1,34 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* BlockCache which is resizable.
*/
@InterfaceAudience.Private
public interface ResizableBlockCache extends BlockCache {
/**
* Sets the max heap size that can be used by the BlockCache.
* @param size The max heap size.
*/
void setMaxSize(long size);
}

View File

@ -0,0 +1,114 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY;
import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY;
import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY;
import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult;
/**
* The default implementation for the HeapMemoryTuner. This will do simple checks to decide
* whether there should be changes in the heap size of memstore/block cache. When there is no block
* cache eviction at all but there are flushes because of global heap pressure, it will increase the
* memstore heap size and decrease block cache size. The step value for this heap size change can be
* specified using the config <i>hbase.regionserver.heapmemory.autotuner.step</i>. When there is no
* memstore flushes because of heap pressure but there is block cache evictions it will increase the
* block cache heap.
*/
@InterfaceAudience.Private
class DefaultHeapMemoryTuner implements HeapMemoryTuner {
public static final String STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step";
public static final float DEFAULT_STEP_VALUE = 0.02f; // 2%
private Configuration conf;
private float step = DEFAULT_STEP_VALUE;
private float globalMemStorePercentMinRange;
private float globalMemStorePercentMaxRange;
private float blockCachePercentMinRange;
private float blockCachePercentMaxRange;
@Override
public TunerResult tune(TunerContext context) {
long blockedFlushCount = context.getBlockedFlushCount();
long unblockedFlushCount = context.getUnblockedFlushCount();
long evictCount = context.getEvictCount();
boolean memstoreSufficient = blockedFlushCount == 0 && unblockedFlushCount == 0;
boolean blockCacheSufficient = evictCount == 0;
if (memstoreSufficient && blockCacheSufficient) {
return new TunerResult(false);
}
TunerResult result = new TunerResult(true);
float newMemstoreSize;
float newBlockCacheSize;
if (memstoreSufficient) {
// Increase the block cache size and corresponding decrease in memstore size
newBlockCacheSize = context.getCurBlockCacheSize() + step;
newMemstoreSize = context.getCurMemStoreSize() - step;
} else if (blockCacheSufficient) {
// Increase the memstore size and corresponding decrease in block cache size
newBlockCacheSize = context.getCurBlockCacheSize() - step;
newMemstoreSize = context.getCurMemStoreSize() + step;
} else {
return new TunerResult(false);
// As of now not making any tuning in write/read heavy scenario.
}
if (newMemstoreSize > globalMemStorePercentMaxRange) {
newMemstoreSize = globalMemStorePercentMaxRange;
} else if (newMemstoreSize < globalMemStorePercentMinRange) {
newMemstoreSize = globalMemStorePercentMinRange;
}
if (newBlockCacheSize > blockCachePercentMaxRange) {
newBlockCacheSize = blockCachePercentMaxRange;
} else if (newBlockCacheSize < blockCachePercentMinRange) {
newBlockCacheSize = blockCachePercentMinRange;
}
result.setBlockCacheSize(newBlockCacheSize);
result.setMemstoreSize(newMemstoreSize);
return result;
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.step = conf.getFloat(STEP_KEY, DEFAULT_STEP_VALUE);
this.blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY,
conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT));
this.blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY,
conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT));
this.globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY,
MemStoreFlusher.getGlobalMemStorePercent(conf));
this.globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY,
MemStoreFlusher.getGlobalMemStorePercent(conf));
}
}

View File

@ -0,0 +1,36 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Listener which will get notified regarding flush requests of regions.
*/
@InterfaceAudience.Private
public interface FlushRequestListener {
/**
* Callback which will get called when a flush request is made for a region.
*
* @param type The type of flush. (ie. Whether a normal flush or flush because of global heap preassure)
* @param region The region for which flush is requested
*/
void flushRequested(FlushType type, HRegion region);
}

View File

@ -39,4 +39,26 @@ public interface FlushRequester {
* @param delay after how much time should the flush happen
*/
void requestDelayedFlush(HRegion region, long delay);
/**
* Register a FlushRequestListener
*
* @param listener
*/
void registerFlushRequestListener(final FlushRequestListener listener);
/**
* Unregister the given FlushRequestListener
*
* @param listener
* @return true when passed listener is unregistered successfully.
*/
public boolean unregisterFlushRequestListener(final FlushRequestListener listener);
/**
* Sets the global memstore limit to a new size.
*
* @param globalMemStoreSize
*/
public void setGlobalMemstoreLimit(long globalMemStoreSize);
}

View File

@ -274,6 +274,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
// Cache flushing
protected MemStoreFlusher cacheFlusher;
protected HeapMemoryManager hMemManager;
// catalog tracker
protected CatalogTracker catalogTracker;
@ -921,6 +923,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
// Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already
if(this.hMemManager != null) this.hMemManager.stop();
if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary();
@ -1240,6 +1243,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
startServiceThreads();
startHeapMemoryManager();
LOG.info("Serving as " + this.serverNameFromMasterPOV +
", RpcServer on " + this.isa +
", sessionid=0x" +
@ -1255,6 +1259,13 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
}
}
private void startHeapMemoryManager() {
this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this);
if (this.hMemManager != null) {
this.hMemManager.start();
}
}
private void createMyEphemeralNode() throws KeeperException {
ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(),
HConstants.EMPTY_BYTE_ARRAY);

View File

@ -0,0 +1,384 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY;
import java.lang.management.ManagementFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.annotations.VisibleForTesting;
/**
* Manages tuning of Heap memory using <code>HeapMemoryTuner</code>.
*/
@InterfaceAudience.Private
public class HeapMemoryManager {
private static final Log LOG = LogFactory.getLog(HeapMemoryManager.class);
private static final int CONVERT_TO_PERCENTAGE = 100;
private static final int CLUSTER_MINIMUM_MEMORY_THRESHOLD =
(int) (CONVERT_TO_PERCENTAGE * HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD);
public static final String BLOCK_CACHE_SIZE_MAX_RANGE_KEY = "hfile.block.cache.size.max.range";
public static final String BLOCK_CACHE_SIZE_MIN_RANGE_KEY = "hfile.block.cache.size.min.range";
public static final String MEMSTORE_SIZE_MAX_RANGE_KEY =
"hbase.regionserver.global.memstore.size.max.range";
public static final String MEMSTORE_SIZE_MIN_RANGE_KEY =
"hbase.regionserver.global.memstore.size.min.range";
public static final String HBASE_RS_HEAP_MEMORY_TUNER_PERIOD =
"hbase.regionserver.heapmemory.tuner.period";
public static final int HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD = 5 * 60 * 1000;
public static final String HBASE_RS_HEAP_MEMORY_TUNER_CLASS =
"hbase.regionserver.heapmemory.tuner.class";
private float globalMemStorePercent;
private float globalMemStorePercentMinRange;
private float globalMemStorePercentMaxRange;
private float blockCachePercent;
private float blockCachePercentMinRange;
private float blockCachePercentMaxRange;
private final ResizableBlockCache blockCache;
private final FlushRequester memStoreFlusher;
private final Server server;
private HeapMemoryTunerChore heapMemTunerChore = null;
private final boolean tunerOn;
private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher,
Server server) {
BlockCache blockCache = CacheConfig.instantiateBlockCache(conf);
if (blockCache instanceof ResizableBlockCache) {
return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server);
}
return null;
}
@VisibleForTesting
HeapMemoryManager(ResizableBlockCache blockCache, FlushRequester memStoreFlusher,
Server server) {
this.blockCache = blockCache;
this.memStoreFlusher = memStoreFlusher;
this.server = server;
this.tunerOn = doInit(server.getConfiguration());
}
private boolean doInit(Configuration conf) {
globalMemStorePercent = MemStoreFlusher.getGlobalMemStorePercent(conf);
blockCachePercent = conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY,
HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT);
int gml = (int) (globalMemStorePercent * CONVERT_TO_PERCENTAGE);
int bcul = (int) (blockCachePercent * CONVERT_TO_PERCENTAGE);
if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
+ "the threshold required for successful cluster operation. "
+ "The combined value cannot exceed 0.8. Please check " + "the settings for "
+ MemStoreFlusher.MEMSTORE_SIZE_KEY + " and " + HFILE_BLOCK_CACHE_SIZE_KEY
+ " in your configuration. " + MemStoreFlusher.MEMSTORE_SIZE_KEY + " is "
+ globalMemStorePercent + " and " + HFILE_BLOCK_CACHE_SIZE_KEY + " is "
+ blockCachePercent);
}
// Initialize max and min range for memstore heap space
globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY,
globalMemStorePercent);
globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY,
globalMemStorePercent);
if (globalMemStorePercent < globalMemStorePercentMinRange) {
LOG.warn("Setting " + MEMSTORE_SIZE_MIN_RANGE_KEY + " to " + globalMemStorePercent
+ ", same value as " + MemStoreFlusher.MEMSTORE_SIZE_KEY
+ " because supplied value greater than initial memstore size value.");
globalMemStorePercentMinRange = globalMemStorePercent;
conf.setFloat(MEMSTORE_SIZE_MIN_RANGE_KEY, globalMemStorePercentMinRange);
}
if (globalMemStorePercent > globalMemStorePercentMaxRange) {
LOG.warn("Setting " + MEMSTORE_SIZE_MAX_RANGE_KEY + " to " + globalMemStorePercent
+ ", same value as " + MemStoreFlusher.MEMSTORE_SIZE_KEY
+ " because supplied value less than initial memstore size value.");
globalMemStorePercentMaxRange = globalMemStorePercent;
conf.setFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, globalMemStorePercentMaxRange);
}
if (globalMemStorePercent == globalMemStorePercentMinRange
&& globalMemStorePercent == globalMemStorePercentMaxRange) {
return false;
}
// Initialize max and min range for block cache
blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercent);
blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercent);
if (blockCachePercent < blockCachePercentMinRange) {
LOG.warn("Setting " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + " to " + blockCachePercent
+ ", same value as " + HFILE_BLOCK_CACHE_SIZE_KEY
+ " because supplied value greater than initial block cache size.");
blockCachePercentMinRange = blockCachePercent;
conf.setFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercentMinRange);
}
if (blockCachePercent > blockCachePercentMaxRange) {
LOG.warn("Setting " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + " to " + blockCachePercent
+ ", same value as " + HFILE_BLOCK_CACHE_SIZE_KEY
+ " because supplied value less than initial block cache size.");
blockCachePercentMaxRange = blockCachePercent;
conf.setFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercentMaxRange);
}
if (blockCachePercent == blockCachePercentMinRange
&& blockCachePercent == blockCachePercentMaxRange) {
return false;
}
gml = (int) (globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE);
bcul = (int) (blockCachePercentMinRange * CONVERT_TO_PERCENTAGE);
if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
+ "the threshold required for successful cluster operation. "
+ "The combined value cannot exceed 0.8. Please check the settings for "
+ MEMSTORE_SIZE_MAX_RANGE_KEY + " and " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY
+ " in your configuration. " + MEMSTORE_SIZE_MAX_RANGE_KEY + " is "
+ globalMemStorePercentMaxRange + " and " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + " is "
+ blockCachePercentMinRange);
}
gml = (int) (globalMemStorePercentMinRange * CONVERT_TO_PERCENTAGE);
bcul = (int) (blockCachePercentMaxRange * CONVERT_TO_PERCENTAGE);
if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds "
+ "the threshold required for successful cluster operation. "
+ "The combined value cannot exceed 0.8. Please check the settings for "
+ MEMSTORE_SIZE_MIN_RANGE_KEY + " and " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY
+ " in your configuration. " + MEMSTORE_SIZE_MIN_RANGE_KEY + " is "
+ globalMemStorePercentMinRange + " and " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + " is "
+ blockCachePercentMaxRange);
}
return true;
}
public void start() {
if (tunerOn) {
LOG.info("Starting HeapMemoryTuner chore.");
this.heapMemTunerChore = new HeapMemoryTunerChore();
Threads.setDaemonThreadRunning(heapMemTunerChore.getThread());
// Register HeapMemoryTuner as a memstore flush listener
memStoreFlusher.registerFlushRequestListener(heapMemTunerChore);
}
}
public void stop() {
// The thread is Daemon. Just interrupting the ongoing process.
if (tunerOn) {
LOG.info("Stoping HeapMemoryTuner chore.");
this.heapMemTunerChore.interrupt();
}
}
// Used by the test cases.
boolean isTunerOn() {
return this.tunerOn;
}
private class HeapMemoryTunerChore extends Chore implements FlushRequestListener {
private HeapMemoryTuner heapMemTuner;
private AtomicLong blockedFlushCount = new AtomicLong();
private AtomicLong unblockedFlushCount = new AtomicLong();
private long evictCount = 0L;
private TunerContext tunerContext = new TunerContext();
public HeapMemoryTunerChore() {
super(server.getServerName() + "-HeapMemoryTunerChore", server.getConfiguration().getInt(
HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD), server);
Class<? extends HeapMemoryTuner> tunerKlass = server.getConfiguration().getClass(
HBASE_RS_HEAP_MEMORY_TUNER_CLASS, DefaultHeapMemoryTuner.class, HeapMemoryTuner.class);
heapMemTuner = ReflectionUtils.newInstance(tunerKlass, server.getConfiguration());
}
@Override
protected void chore() {
evictCount = blockCache.getStats().getEvictedCount() - evictCount;
tunerContext.setBlockedFlushCount(blockedFlushCount.getAndSet(0));
tunerContext.setUnblockedFlushCount(unblockedFlushCount.getAndSet(0));
tunerContext.setEvictCount(evictCount);
tunerContext.setCurBlockCacheSize(blockCachePercent);
tunerContext.setCurMemStoreSize(globalMemStorePercent);
TunerResult result = null;
try {
result = this.heapMemTuner.tune(tunerContext);
} catch (Throwable t) {
LOG.error("Exception thrown from the HeapMemoryTuner implementation", t);
}
if (result != null && result.needsTuning()) {
float memstoreSize = result.getMemstoreSize();
float blockCacheSize = result.getBlockCacheSize();
LOG.debug("From HeapMemoryTuner new memstoreSize: " + memstoreSize
+ ". new blockCacheSize: " + blockCacheSize);
if (memstoreSize < globalMemStorePercentMinRange) {
LOG.info("New memstoreSize from HeapMemoryTuner " + memstoreSize + " is below min level "
+ globalMemStorePercentMinRange + ". Resetting memstoreSize to min size");
memstoreSize = globalMemStorePercentMinRange;
} else if (memstoreSize > globalMemStorePercentMaxRange) {
LOG.info("New memstoreSize from HeapMemoryTuner " + memstoreSize + " is above max level "
+ globalMemStorePercentMaxRange + ". Resetting memstoreSize to max size");
memstoreSize = globalMemStorePercentMaxRange;
}
if (blockCacheSize < blockCachePercentMinRange) {
LOG.info("New blockCacheSize from HeapMemoryTuner " + blockCacheSize
+ " is below min level " + blockCachePercentMinRange
+ ". Resetting blockCacheSize to min size");
blockCacheSize = blockCachePercentMinRange;
} else if (blockCacheSize > blockCachePercentMaxRange) {
LOG.info("New blockCacheSize from HeapMemoryTuner " + blockCacheSize
+ " is above max level " + blockCachePercentMaxRange
+ ". Resetting blockCacheSize to min size");
blockCacheSize = blockCachePercentMaxRange;
}
int gml = (int) (memstoreSize * CONVERT_TO_PERCENTAGE);
int bcul = (int) (blockCacheSize * CONVERT_TO_PERCENTAGE);
if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) {
LOG.info("Current heap configuration from HeapMemoryTuner exceeds "
+ "the threshold required for successful cluster operation. "
+ "The combined value cannot exceed 0.8. " + MemStoreFlusher.MEMSTORE_SIZE_KEY
+ " is " + memstoreSize + " and " + HFILE_BLOCK_CACHE_SIZE_KEY + " is "
+ blockCacheSize);
// TODO can adjust the value so as not exceed 80%. Is that correct? may be.
} else {
long newBlockCacheSize = (long) (maxHeapSize * blockCacheSize);
long newMemstoreSize = (long) (maxHeapSize * memstoreSize);
LOG.info("Setting block cache heap size to " + newBlockCacheSize
+ " and memstore heap size to " + newMemstoreSize);
blockCachePercent = blockCacheSize;
blockCache.setMaxSize(newBlockCacheSize);
globalMemStorePercent = memstoreSize;
memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize);
}
}
}
@Override
public void flushRequested(FlushType type, HRegion region) {
switch (type) {
case ABOVE_HIGHER_MARK:
blockedFlushCount.incrementAndGet();
break;
case ABOVE_LOWER_MARK:
unblockedFlushCount.incrementAndGet();
break;
default:
// In case of normal flush don't do any action.
break;
}
}
}
/**
* POJO to pass all the relevant information required to do the heap memory tuning. It holds the
* flush counts and block cache evictions happened within the interval. Also holds the current
* heap percentage allocated for memstore and block cache.
*/
public static final class TunerContext {
private long blockedFlushCount;
private long unblockedFlushCount;
private long evictCount;
private float curMemStoreSize;
private float curBlockCacheSize;
public long getBlockedFlushCount() {
return blockedFlushCount;
}
public void setBlockedFlushCount(long blockedFlushCount) {
this.blockedFlushCount = blockedFlushCount;
}
public long getUnblockedFlushCount() {
return unblockedFlushCount;
}
public void setUnblockedFlushCount(long unblockedFlushCount) {
this.unblockedFlushCount = unblockedFlushCount;
}
public long getEvictCount() {
return evictCount;
}
public void setEvictCount(long evictCount) {
this.evictCount = evictCount;
}
public float getCurMemStoreSize() {
return curMemStoreSize;
}
public void setCurMemStoreSize(float curMemStoreSize) {
this.curMemStoreSize = curMemStoreSize;
}
public float getCurBlockCacheSize() {
return curBlockCacheSize;
}
public void setCurBlockCacheSize(float curBlockCacheSize) {
this.curBlockCacheSize = curBlockCacheSize;
}
}
/**
* POJO which holds the result of memory tuning done by HeapMemoryTuner implementation.
* It includes the new heap percentage for memstore and block cache.
*/
public static final class TunerResult {
private float memstoreSize;
private float blockCacheSize;
private final boolean needsTuning;
public TunerResult(boolean needsTuning) {
this.needsTuning = needsTuning;
}
public float getMemstoreSize() {
return memstoreSize;
}
public void setMemstoreSize(float memstoreSize) {
this.memstoreSize = memstoreSize;
}
public float getBlockCacheSize() {
return blockCacheSize;
}
public void setBlockCacheSize(float blockCacheSize) {
this.blockCacheSize = blockCacheSize;
}
public boolean needsTuning() {
return needsTuning;
}
}
}

View File

@ -0,0 +1,42 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult;
/**
* Makes the decision regarding proper sizing of the heap memory. Decides what percentage of heap
* memory should be allocated for global memstore and BlockCache.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface HeapMemoryTuner extends Configurable {
/**
* Perform the heap memory tuning operation.
*
* @param context
* @return <code>TunerResult</code> including the heap percentage for memstore and block cache
*/
TunerResult tune(TunerContext context);
}

View File

@ -194,8 +194,7 @@ public class MemStoreChunkPool {
}
long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
.getMax();
long globalMemStoreLimit = MemStoreFlusher.globalMemStoreLimit(heapMax,
MemStoreFlusher.DEFAULT_UPPER, MemStoreFlusher.UPPER_KEY, conf);
long globalMemStoreLimit = (long) (heapMax * MemStoreFlusher.getGlobalMemStorePercent(conf));
int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY,
MemStoreLAB.CHUNK_SIZE_DEFAULT);
int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize);

View File

@ -21,9 +21,11 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
@ -65,6 +67,18 @@ import org.cloudera.htrace.TraceScope;
@InterfaceAudience.Private
class MemStoreFlusher implements FlushRequester {
static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
static final String MEMSTORE_SIZE_KEY = "hbase.regionserver.global.memstore.size";
private static final String MEMSTORE_SIZE_OLD_KEY =
"hbase.regionserver.global.memstore.upperLimit";
private static final String MEMSTORE_SIZE_LOWER_LIMIT_KEY =
"hbase.regionserver.global.memstore.size.lower.limit";
private static final String MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY =
"hbase.regionserver.global.memstore.lowerLimit";
private static final float DEFAULT_MEMSTORE_SIZE = 0.4f;
// Default lower water mark limit is 95% size of memstore size.
private static final float DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT = 0.95f;
// These two data members go together. Any entry in the one must have
// a corresponding entry in the other.
private final BlockingQueue<FlushQueueEntry> flushQueue =
@ -78,19 +92,15 @@ class MemStoreFlusher implements FlushRequester {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Object blockSignal = new Object();
protected final long globalMemStoreLimit;
protected final long globalMemStoreLimitLowMark;
protected long globalMemStoreLimit;
protected float globalMemStoreLimitLowMarkPercent;
protected long globalMemStoreLimitLowMark;
static final float DEFAULT_UPPER = 0.4f;
private static final float DEFAULT_LOWER = 0.35f;
static final String UPPER_KEY =
"hbase.regionserver.global.memstore.upperLimit";
private static final String LOWER_KEY =
"hbase.regionserver.global.memstore.lowerLimit";
private long blockingWaitTime;
private final Counter updatesBlockedMsHighWater = new Counter();
private final FlushHandler[] flushHandlers;
private List<FlushRequestListener> flushRequestListeners = new ArrayList<FlushRequestListener>(1);
/**
* @param conf
@ -103,15 +113,13 @@ class MemStoreFlusher implements FlushRequester {
this.threadWakeFrequency =
conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER,
UPPER_KEY, conf);
long lower = globalMemStoreLimit(max, DEFAULT_LOWER, LOWER_KEY, conf);
if (lower > this.globalMemStoreLimit) {
lower = this.globalMemStoreLimit;
LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " +
"because supplied " + LOWER_KEY + " was > " + UPPER_KEY);
}
this.globalMemStoreLimitLowMark = lower;
float globalMemStorePercent = getGlobalMemStorePercent(conf);
this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
this.globalMemStoreLimitLowMarkPercent =
getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
this.globalMemStoreLimitLowMark =
(long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
90000);
int handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
@ -124,29 +132,40 @@ class MemStoreFlusher implements FlushRequester {
}
/**
* Calculate size using passed <code>key</code> for configured
* percentage of <code>max</code>.
* Calculate global memstore size for configured percentage of <code>max</code>.
* @param max
* @param defaultLimit
* @param key
* @param c
* @return Limit.
*/
static long globalMemStoreLimit(final long max,
final float defaultLimit, final String key, final Configuration c) {
float limit = c.getFloat(key, defaultLimit);
return getMemStoreLimit(max, limit, defaultLimit);
static float getGlobalMemStorePercent(final Configuration c) {
float limit = c.getFloat(MEMSTORE_SIZE_KEY,
c.getFloat(MEMSTORE_SIZE_OLD_KEY, DEFAULT_MEMSTORE_SIZE));
if (limit > 0.8f || limit < 0.05f) {
LOG.warn("Setting global memstore limit to default of " + DEFAULT_MEMSTORE_SIZE
+ " because supplied value outside allowed range of 0.05 -> 0.8");
limit = DEFAULT_MEMSTORE_SIZE;
}
return limit;
}
static long getMemStoreLimit(final long max, final float limit,
final float defaultLimit) {
float effectiveLimit = limit;
if (limit >= 0.9f || limit < 0.1f) {
LOG.warn("Setting global memstore limit to default of " + defaultLimit +
" because supplied value outside allowed range of 0.1 -> 0.9");
effectiveLimit = defaultLimit;
private static float getGlobalMemStoreLowerMark(final Configuration c, float globalMemStorePercent) {
String lowMarkPercentStr = c.get(MEMSTORE_SIZE_LOWER_LIMIT_KEY);
if (lowMarkPercentStr != null) {
return Float.parseFloat(lowMarkPercentStr);
}
return (long)(max * effectiveLimit);
String lowerWaterMarkOldValStr = c.get(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY);
if (lowerWaterMarkOldValStr != null) {
LOG.warn(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " is deprecated. Instead use "
+ MEMSTORE_SIZE_LOWER_LIMIT_KEY);
float lowerWaterMarkOldVal = Float.parseFloat(lowerWaterMarkOldValStr);
if (lowerWaterMarkOldVal > globalMemStorePercent) {
lowerWaterMarkOldVal = globalMemStorePercent;
LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " + "because supplied "
+ MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " was > " + MEMSTORE_SIZE_OLD_KEY);
}
return lowerWaterMarkOldVal / globalMemStorePercent;
}
return DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT;
}
public Counter getUpdatesBlockedMsHighWater() {
@ -453,6 +472,7 @@ class MemStoreFlusher implements FlushRequester {
}
lock.readLock().lock();
try {
notifyFlushRequest(region, emergencyFlush);
boolean shouldCompact = region.flushcache();
// We just want to check the size
boolean shouldSplit = region.checkSplit() != null;
@ -485,6 +505,16 @@ class MemStoreFlusher implements FlushRequester {
return true;
}
private void notifyFlushRequest(HRegion region, boolean emergencyFlush) {
FlushType type = FlushType.NORMAL;
if (emergencyFlush) {
type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
}
for (FlushRequestListener listener : flushRequestListeners) {
listener.flushRequested(type, region);
}
}
private void wakeUpIfBlocking() {
synchronized (blockSignal) {
blockSignal.notifyAll();
@ -570,6 +600,38 @@ class MemStoreFlusher implements FlushRequester {
return queueList.toString();
}
/**
* Register a MemstoreFlushListener
* @param listener
*/
public void registerFlushRequestListener(final FlushRequestListener listener) {
this.flushRequestListeners.add(listener);
}
/**
* Unregister the listener from MemstoreFlushListeners
* @param listener
* @return true when passed listener is unregistered successfully.
*/
public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
return this.flushRequestListeners.remove(listener);
}
/**
* Sets the global memstore limit to a new size.
* @param globalMemStoreSize
*/
public void setGlobalMemstoreLimit(long globalMemStoreSize) {
this.globalMemStoreLimit = globalMemStoreSize;
this.globalMemStoreLimitLowMark =
(long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
reclaimMemStoreMemory();
}
public long getMemoryLimit() {
return this.globalMemStoreLimit;
}
interface FlushQueueEntry extends Delayed {}
/**
@ -672,3 +734,7 @@ class MemStoreFlusher implements FlushRequester {
}
}
}
enum FlushType {
NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK;
}

View File

@ -0,0 +1,453 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestHeapMemoryManager {
private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
@Test
public void testAutoTunerShouldBeOffWhenMaxMinRangesForMemstoreIsNotGiven() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0),
new MemstoreFlusherStub(0), new RegionServerStub(conf));
assertFalse(manager.isTunerOn());
}
@Test
public void testAutoTunerShouldBeOffWhenMaxMinRangesForBlockCacheIsNotGiven() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.05f);
HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0),
new MemstoreFlusherStub(0), new RegionServerStub(conf));
assertFalse(manager.isTunerOn());
}
@Test
public void testWhenMemstoreAndBlockCacheMaxMinChecksFails() throws Exception {
BlockCacheStub blockCache = new BlockCacheStub(0);
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub(0);
Configuration conf = HBaseConfiguration.create();
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.06f);
try {
new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf));
fail();
} catch (RuntimeException e) {
}
conf = HBaseConfiguration.create();
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.2f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
try {
new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf));
fail();
} catch (RuntimeException e) {
}
}
@Test
public void testWhenClusterIsWriteHeavy() throws Exception {
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
Configuration conf = HBaseConfiguration.create();
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
// Let the system start with default values for memstore heap and block cache size.
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
heapMemoryManager.start();
memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
memStoreFlusher.requestFlush(null);
memStoreFlusher.requestFlush(null);
memStoreFlusher.requestFlush(null);
memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK;
memStoreFlusher.requestFlush(null);
Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize,
memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldBlockCacheSize,
blockCache.maxSize);
oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
oldBlockCacheSize = blockCache.maxSize;
// Do some more flushes before the next run of HeapMemoryTuner
memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK;
memStoreFlusher.requestFlush(null);
memStoreFlusher.requestFlush(null);
Thread.sleep(1500);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize,
memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldBlockCacheSize,
blockCache.maxSize);
}
@Test
public void testWhenClusterIsReadHeavy() throws Exception {
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
Configuration conf = HBaseConfiguration.create();
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f);
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
// Let the system start with default values for memstore heap and block cache size.
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
long oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
heapMemoryManager.start();
blockCache.evictBlock(null);
blockCache.evictBlock(null);
blockCache.evictBlock(null);
Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldMemstoreHeapSize,
memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldBlockCacheSize,
blockCache.maxSize);
oldMemstoreHeapSize = memStoreFlusher.memstoreSize;
oldBlockCacheSize = blockCache.maxSize;
// Do some more evictions before the next run of HeapMemoryTuner
blockCache.evictBlock(null);
Thread.sleep(1500);
assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldMemstoreHeapSize,
memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldBlockCacheSize,
blockCache.maxSize);
}
@Test
public void testPluggingInHeapMemoryTuner() throws Exception {
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
Configuration conf = HBaseConfiguration.create();
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.78f);
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.05f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.02f);
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class,
HeapMemoryTuner.class);
// Let the system start with default values for memstore heap and block cache size.
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
heapMemoryManager.start();
// Now we wants to be in write mode. Set bigger memstore size from CustomHeapMemoryTuner
CustomHeapMemoryTuner.memstoreSize = 0.78f;
CustomHeapMemoryTuner.blockCacheSize = 0.02f;
Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
assertHeapSpace(0.78f, memStoreFlusher.memstoreSize);// Memstore
assertHeapSpace(0.02f, blockCache.maxSize);// BlockCache
// Now we wants to be in read mode. Set bigger memstore size from CustomHeapMemoryTuner
CustomHeapMemoryTuner.blockCacheSize = 0.75f;
CustomHeapMemoryTuner.memstoreSize = 0.05f;
Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
assertHeapSpace(0.75f, blockCache.maxSize);// BlockCache
assertHeapSpace(0.05f, memStoreFlusher.memstoreSize);// Memstore
}
@Test
public void testWhenSizeGivenByHeapTunerGoesOutsideRange() throws Exception {
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
Configuration conf = HBaseConfiguration.create();
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.7f);
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.1f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.1f);
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class,
HeapMemoryTuner.class);
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
heapMemoryManager.start();
CustomHeapMemoryTuner.memstoreSize = 0.78f;
CustomHeapMemoryTuner.blockCacheSize = 0.02f;
Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up
// Even if the tuner says to set the memstore to 78%, HBase makes it as 70% as that is the
// upper bound. Same with block cache as 10% is the lower bound.
assertHeapSpace(0.7f, memStoreFlusher.memstoreSize);
assertHeapSpace(0.1f, blockCache.maxSize);
}
@Test
public void testWhenCombinedHeapSizesFromTunerGoesOutSideMaxLimit() throws Exception {
BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4));
MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4));
Configuration conf = HBaseConfiguration.create();
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.7f);
conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.1f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f);
conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.1f);
conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000);
conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class,
HeapMemoryTuner.class);
HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher,
new RegionServerStub(conf));
long oldMemstoreSize = memStoreFlusher.memstoreSize;
long oldBlockCacheSize = blockCache.maxSize;
heapMemoryManager.start();
CustomHeapMemoryTuner.memstoreSize = 0.7f;
CustomHeapMemoryTuner.blockCacheSize = 0.3f;
Thread.sleep(1500);
assertEquals(oldMemstoreSize, memStoreFlusher.memstoreSize);
assertEquals(oldBlockCacheSize, blockCache.maxSize);
}
private void assertHeapSpace(float expectedHeapPercentage, long currentHeapSpace) {
long expected = (long) (this.maxHeapSize * expectedHeapPercentage);
assertEquals(expected, currentHeapSpace);
}
private void assertHeapSpaceDelta(float expectedDeltaPercent, long oldHeapSpace, long newHeapSpace) {
long expctedMinDelta = (long) (this.maxHeapSize * expectedDeltaPercent);
if (expectedDeltaPercent > 0) {
assertTrue(expctedMinDelta <= (newHeapSpace - oldHeapSpace));
} else {
assertTrue(expctedMinDelta <= (oldHeapSpace - newHeapSpace));
}
}
private static class BlockCacheStub implements ResizableBlockCache {
CacheStats stats = new CacheStats();
long maxSize = 0;
public BlockCacheStub(long size){
this.maxSize = size;
}
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
}
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
}
@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) {
return null;
}
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
stats.evicted();
return false;
}
@Override
public int evictBlocksByHfileName(String hfileName) {
stats.evicted(); // Just assuming only one block for file here.
return 0;
}
@Override
public CacheStats getStats() {
return this.stats;
}
@Override
public void shutdown() {
}
@Override
public long size() {
return 0;
}
@Override
public long getFreeSize() {
return 0;
}
@Override
public long getCurrentSize() {
return 0;
}
@Override
public long getEvictedCount() {
return 0;
}
@Override
public long getBlockCount() {
return 0;
}
@Override
public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(Configuration conf)
throws IOException {
return null;
}
@Override
public void setMaxSize(long size) {
this.maxSize = size;
}
}
private static class MemstoreFlusherStub implements FlushRequester {
long memstoreSize;
FlushRequestListener listener;
FlushType flushType = FlushType.NORMAL;
public MemstoreFlusherStub(long memstoreSize) {
this.memstoreSize = memstoreSize;
}
@Override
public void requestFlush(HRegion region) {
this.listener.flushRequested(flushType, region);
}
@Override
public void requestDelayedFlush(HRegion region, long delay) {
}
@Override
public void registerFlushRequestListener(FlushRequestListener listener) {
this.listener = listener;
}
@Override
public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
return false;
}
@Override
public void setGlobalMemstoreLimit(long globalMemStoreSize) {
this.memstoreSize = globalMemStoreSize;
}
}
private static class RegionServerStub implements Server {
private Configuration conf;
private boolean stopped = false;
public RegionServerStub(Configuration conf) {
this.conf = conf;
}
@Override
public void abort(String why, Throwable e) {
}
@Override
public boolean isAborted() {
return false;
}
@Override
public void stop(String why) {
this.stopped = true;
}
@Override
public boolean isStopped() {
return this.stopped;
}
@Override
public Configuration getConfiguration() {
return this.conf;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return null;
}
@Override
public CatalogTracker getCatalogTracker() {
return null;
}
@Override
public ServerName getServerName() {
return ServerName.valueOf("server1",4000,12345);
}
}
static class CustomHeapMemoryTuner implements HeapMemoryTuner {
static float blockCacheSize = 0.4f;
static float memstoreSize = 0.4f;
@Override
public Configuration getConf() {
return null;
}
@Override
public void setConf(Configuration arg0) {
}
@Override
public TunerResult tune(TunerContext context) {
TunerResult result = new TunerResult(true);
result.setBlockCacheSize(blockCacheSize);
result.setMemstoreSize(memstoreSize);
return result;
}
}
}

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@ -867,6 +868,21 @@ public class TestWALReplay {
// TODO Auto-generated method stub
}
@Override
public void registerFlushRequestListener(FlushRequestListener listener) {
}
@Override
public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
return false;
}
@Override
public void setGlobalMemstoreLimit(long globalMemStoreSize) {
}
}
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,