From c56d41dc3d5dfa009269c25b48105f886f2a7447 Mon Sep 17 00:00:00 2001 From: anoopsamjohn Date: Wed, 11 Dec 2013 07:03:40 +0000 Subject: [PATCH] HBASE-5349 Automagically tweak global memstore and block cache sizes based on workload git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1550059 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/hadoop/hbase/HConstants.java | 2 +- .../src/main/resources/hbase-default.xml | 17 +- .../hadoop/hbase/io/hfile/CacheConfig.java | 2 +- .../hbase/io/hfile/DoubleBlockCache.java | 6 +- .../hadoop/hbase/io/hfile/LruBlockCache.java | 3 +- .../hbase/io/hfile/ResizableBlockCache.java | 34 ++ .../regionserver/DefaultHeapMemoryTuner.java | 114 +++++ .../regionserver/FlushRequestListener.java | 36 ++ .../hbase/regionserver/FlushRequester.java | 22 + .../hbase/regionserver/HRegionServer.java | 11 + .../hbase/regionserver/HeapMemoryManager.java | 384 +++++++++++++++ .../hbase/regionserver/HeapMemoryTuner.java | 42 ++ .../hbase/regionserver/MemStoreChunkPool.java | 3 +- .../hbase/regionserver/MemStoreFlusher.java | 132 +++-- .../regionserver/TestHeapMemoryManager.java | 453 ++++++++++++++++++ .../hbase/regionserver/wal/TestWALReplay.java | 16 + 16 files changed, 1229 insertions(+), 48 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ResizableBlockCache.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequestListener.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryTuner.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index e07e6f8d3f7..a4f8c07aeaa 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -752,7 +752,7 @@ public final class HConstants { public static final String HFILE_BLOCK_CACHE_SIZE_KEY = "hfile.block.cache.size"; - public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.25f; + public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.4f; /* * Minimum percentage of free heap necessary for a successful cluster startup. diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index e5ffa7d7702..542b393ea2e 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -243,21 +243,20 @@ possible configurations would overwhelm and obscure the important. The HLog file writer implementation. - hbase.regionserver.global.memstore.upperLimit + hbase.regionserver.global.memstore.size 0.4 Maximum size of all memstores in a region server before new updates are blocked and flushes are forced. Defaults to 40% of heap. Updates are blocked and flushes are forced until size of all memstores - in a region server hits hbase.regionserver.global.memstore.lowerLimit. + in a region server hits hbase.regionserver.global.memstore.size.lower.limit. - hbase.regionserver.global.memstore.lowerLimit - 0.38 - Maximum size of all memstores in a region server before - flushes are forced. Defaults to 38% of heap. - This value equal to hbase.regionserver.global.memstore.upperLimit causes - the minimum possible flushing to occur when updates are blocked due to - memstore limiting. + hbase.regionserver.global.memstore.size.lower.limit + 0.95 + Maximum size of all memstores in a region server before flushes are forced. + Defaults to 95% of hbase.regionserver.global.memstore.size. + A 100% value for this value causes the minimum possible flushing to occur when updates are + blocked due to memstore limiting. hbase.regionserver.optionalcacheflushinterval diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index fb6b1a686c3..17988508c09 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -347,7 +347,7 @@ public class CacheConfig { * @param conf The current configuration. * @return The block cache or null. */ - private static synchronized BlockCache instantiateBlockCache(Configuration conf) { + public static synchronized BlockCache instantiateBlockCache(Configuration conf) { if (globalBlockCache != null) return globalBlockCache; if (blockCacheDisabled) return null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java index f89c364cbb9..0e32b4b0f24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java @@ -38,7 +38,7 @@ import org.apache.hadoop.util.StringUtils; * **/ @InterfaceAudience.Private -public class DoubleBlockCache implements BlockCache, HeapSize { +public class DoubleBlockCache implements ResizableBlockCache, HeapSize { static final Log LOG = LogFactory.getLog(DoubleBlockCache.class.getName()); @@ -172,4 +172,8 @@ public class DoubleBlockCache implements BlockCache, HeapSize { return onHeapCache.getBlockCount() + offHeapCache.getBlockCount(); } + @Override + public void setMaxSize(long size) { + this.onHeapCache.setMaxSize(size); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 7f7b1722228..a316c0d38f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -95,7 +95,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * to the relative sizes and usage. */ @InterfaceAudience.Private -public class LruBlockCache implements BlockCache, HeapSize { +public class LruBlockCache implements ResizableBlockCache, HeapSize { static final Log LOG = LogFactory.getLog(LruBlockCache.class); @@ -272,6 +272,7 @@ public class LruBlockCache implements BlockCache, HeapSize { statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); } + @Override public void setMaxSize(long maxSize) { this.maxSize = maxSize; if(this.size.get() > acceptableSize() && !evictionInProgress) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ResizableBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ResizableBlockCache.java new file mode 100644 index 00000000000..cb27bddb3e3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ResizableBlockCache.java @@ -0,0 +1,34 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * BlockCache which is resizable. + */ +@InterfaceAudience.Private +public interface ResizableBlockCache extends BlockCache { + + /** + * Sets the max heap size that can be used by the BlockCache. + * @param size The max heap size. + */ + void setMaxSize(long size); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java new file mode 100644 index 00000000000..27edbbb7f94 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultHeapMemoryTuner.java @@ -0,0 +1,114 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY; +import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY; +import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY; +import static org.apache.hadoop.hbase.regionserver.HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult; + +/** + * The default implementation for the HeapMemoryTuner. This will do simple checks to decide + * whether there should be changes in the heap size of memstore/block cache. When there is no block + * cache eviction at all but there are flushes because of global heap pressure, it will increase the + * memstore heap size and decrease block cache size. The step value for this heap size change can be + * specified using the config hbase.regionserver.heapmemory.autotuner.step. When there is no + * memstore flushes because of heap pressure but there is block cache evictions it will increase the + * block cache heap. + */ +@InterfaceAudience.Private +class DefaultHeapMemoryTuner implements HeapMemoryTuner { + + public static final String STEP_KEY = "hbase.regionserver.heapmemory.autotuner.step"; + public static final float DEFAULT_STEP_VALUE = 0.02f; // 2% + + private Configuration conf; + private float step = DEFAULT_STEP_VALUE; + + private float globalMemStorePercentMinRange; + private float globalMemStorePercentMaxRange; + private float blockCachePercentMinRange; + private float blockCachePercentMaxRange; + + @Override + public TunerResult tune(TunerContext context) { + long blockedFlushCount = context.getBlockedFlushCount(); + long unblockedFlushCount = context.getUnblockedFlushCount(); + long evictCount = context.getEvictCount(); + boolean memstoreSufficient = blockedFlushCount == 0 && unblockedFlushCount == 0; + boolean blockCacheSufficient = evictCount == 0; + if (memstoreSufficient && blockCacheSufficient) { + return new TunerResult(false); + } + TunerResult result = new TunerResult(true); + float newMemstoreSize; + float newBlockCacheSize; + if (memstoreSufficient) { + // Increase the block cache size and corresponding decrease in memstore size + newBlockCacheSize = context.getCurBlockCacheSize() + step; + newMemstoreSize = context.getCurMemStoreSize() - step; + } else if (blockCacheSufficient) { + // Increase the memstore size and corresponding decrease in block cache size + newBlockCacheSize = context.getCurBlockCacheSize() - step; + newMemstoreSize = context.getCurMemStoreSize() + step; + } else { + return new TunerResult(false); + // As of now not making any tuning in write/read heavy scenario. + } + if (newMemstoreSize > globalMemStorePercentMaxRange) { + newMemstoreSize = globalMemStorePercentMaxRange; + } else if (newMemstoreSize < globalMemStorePercentMinRange) { + newMemstoreSize = globalMemStorePercentMinRange; + } + if (newBlockCacheSize > blockCachePercentMaxRange) { + newBlockCacheSize = blockCachePercentMaxRange; + } else if (newBlockCacheSize < blockCachePercentMinRange) { + newBlockCacheSize = blockCachePercentMinRange; + } + result.setBlockCacheSize(newBlockCacheSize); + result.setMemstoreSize(newMemstoreSize); + return result; + } + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + this.step = conf.getFloat(STEP_KEY, DEFAULT_STEP_VALUE); + this.blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, + conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT)); + this.blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, + conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT)); + this.globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY, + MemStoreFlusher.getGlobalMemStorePercent(conf)); + this.globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, + MemStoreFlusher.getGlobalMemStorePercent(conf)); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequestListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequestListener.java new file mode 100644 index 00000000000..72d8573fb4c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequestListener.java @@ -0,0 +1,36 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Listener which will get notified regarding flush requests of regions. + */ +@InterfaceAudience.Private +public interface FlushRequestListener { + + /** + * Callback which will get called when a flush request is made for a region. + * + * @param type The type of flush. (ie. Whether a normal flush or flush because of global heap preassure) + * @param region The region for which flush is requested + */ + void flushRequested(FlushType type, HRegion region); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java index 6e6039db1f5..a29a9c54de9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java @@ -39,4 +39,26 @@ public interface FlushRequester { * @param delay after how much time should the flush happen */ void requestDelayedFlush(HRegion region, long delay); + + /** + * Register a FlushRequestListener + * + * @param listener + */ + void registerFlushRequestListener(final FlushRequestListener listener); + + /** + * Unregister the given FlushRequestListener + * + * @param listener + * @return true when passed listener is unregistered successfully. + */ + public boolean unregisterFlushRequestListener(final FlushRequestListener listener); + + /** + * Sets the global memstore limit to a new size. + * + * @param globalMemStoreSize + */ + public void setGlobalMemstoreLimit(long globalMemStoreSize); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 00b73044520..860b539a198 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -274,6 +274,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // Cache flushing protected MemStoreFlusher cacheFlusher; + protected HeapMemoryManager hMemManager; + // catalog tracker protected CatalogTracker catalogTracker; @@ -921,6 +923,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already + if(this.hMemManager != null) this.hMemManager.stop(); if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary(); @@ -1240,6 +1243,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); startServiceThreads(); + startHeapMemoryManager(); LOG.info("Serving as " + this.serverNameFromMasterPOV + ", RpcServer on " + this.isa + ", sessionid=0x" + @@ -1255,6 +1259,13 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } } + private void startHeapMemoryManager() { + this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this); + if (this.hMemManager != null) { + this.hMemManager.start(); + } + } + private void createMyEphemeralNode() throws KeeperException { ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), HConstants.EMPTY_BYTE_ARRAY); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java new file mode 100644 index 00000000000..3869fad4eb2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java @@ -0,0 +1,384 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HConstants.HFILE_BLOCK_CACHE_SIZE_KEY; + +import java.lang.management.ManagementFactory; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.util.ReflectionUtils; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Manages tuning of Heap memory using HeapMemoryTuner. + */ +@InterfaceAudience.Private +public class HeapMemoryManager { + private static final Log LOG = LogFactory.getLog(HeapMemoryManager.class); + private static final int CONVERT_TO_PERCENTAGE = 100; + private static final int CLUSTER_MINIMUM_MEMORY_THRESHOLD = + (int) (CONVERT_TO_PERCENTAGE * HConstants.HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD); + + public static final String BLOCK_CACHE_SIZE_MAX_RANGE_KEY = "hfile.block.cache.size.max.range"; + public static final String BLOCK_CACHE_SIZE_MIN_RANGE_KEY = "hfile.block.cache.size.min.range"; + public static final String MEMSTORE_SIZE_MAX_RANGE_KEY = + "hbase.regionserver.global.memstore.size.max.range"; + public static final String MEMSTORE_SIZE_MIN_RANGE_KEY = + "hbase.regionserver.global.memstore.size.min.range"; + public static final String HBASE_RS_HEAP_MEMORY_TUNER_PERIOD = + "hbase.regionserver.heapmemory.tuner.period"; + public static final int HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD = 5 * 60 * 1000; + public static final String HBASE_RS_HEAP_MEMORY_TUNER_CLASS = + "hbase.regionserver.heapmemory.tuner.class"; + + private float globalMemStorePercent; + private float globalMemStorePercentMinRange; + private float globalMemStorePercentMaxRange; + + private float blockCachePercent; + private float blockCachePercentMinRange; + private float blockCachePercentMaxRange; + + private final ResizableBlockCache blockCache; + private final FlushRequester memStoreFlusher; + private final Server server; + + private HeapMemoryTunerChore heapMemTunerChore = null; + private final boolean tunerOn; + + private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); + + public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher, + Server server) { + BlockCache blockCache = CacheConfig.instantiateBlockCache(conf); + if (blockCache instanceof ResizableBlockCache) { + return new HeapMemoryManager((ResizableBlockCache) blockCache, memStoreFlusher, server); + } + return null; + } + + @VisibleForTesting + HeapMemoryManager(ResizableBlockCache blockCache, FlushRequester memStoreFlusher, + Server server) { + this.blockCache = blockCache; + this.memStoreFlusher = memStoreFlusher; + this.server = server; + this.tunerOn = doInit(server.getConfiguration()); + } + + private boolean doInit(Configuration conf) { + globalMemStorePercent = MemStoreFlusher.getGlobalMemStorePercent(conf); + blockCachePercent = conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, + HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); + int gml = (int) (globalMemStorePercent * CONVERT_TO_PERCENTAGE); + int bcul = (int) (blockCachePercent * CONVERT_TO_PERCENTAGE); + if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) { + throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds " + + "the threshold required for successful cluster operation. " + + "The combined value cannot exceed 0.8. Please check " + "the settings for " + + MemStoreFlusher.MEMSTORE_SIZE_KEY + " and " + HFILE_BLOCK_CACHE_SIZE_KEY + + " in your configuration. " + MemStoreFlusher.MEMSTORE_SIZE_KEY + " is " + + globalMemStorePercent + " and " + HFILE_BLOCK_CACHE_SIZE_KEY + " is " + + blockCachePercent); + } + // Initialize max and min range for memstore heap space + globalMemStorePercentMinRange = conf.getFloat(MEMSTORE_SIZE_MIN_RANGE_KEY, + globalMemStorePercent); + globalMemStorePercentMaxRange = conf.getFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, + globalMemStorePercent); + if (globalMemStorePercent < globalMemStorePercentMinRange) { + LOG.warn("Setting " + MEMSTORE_SIZE_MIN_RANGE_KEY + " to " + globalMemStorePercent + + ", same value as " + MemStoreFlusher.MEMSTORE_SIZE_KEY + + " because supplied value greater than initial memstore size value."); + globalMemStorePercentMinRange = globalMemStorePercent; + conf.setFloat(MEMSTORE_SIZE_MIN_RANGE_KEY, globalMemStorePercentMinRange); + } + if (globalMemStorePercent > globalMemStorePercentMaxRange) { + LOG.warn("Setting " + MEMSTORE_SIZE_MAX_RANGE_KEY + " to " + globalMemStorePercent + + ", same value as " + MemStoreFlusher.MEMSTORE_SIZE_KEY + + " because supplied value less than initial memstore size value."); + globalMemStorePercentMaxRange = globalMemStorePercent; + conf.setFloat(MEMSTORE_SIZE_MAX_RANGE_KEY, globalMemStorePercentMaxRange); + } + if (globalMemStorePercent == globalMemStorePercentMinRange + && globalMemStorePercent == globalMemStorePercentMaxRange) { + return false; + } + // Initialize max and min range for block cache + blockCachePercentMinRange = conf.getFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercent); + blockCachePercentMaxRange = conf.getFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercent); + if (blockCachePercent < blockCachePercentMinRange) { + LOG.warn("Setting " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + " to " + blockCachePercent + + ", same value as " + HFILE_BLOCK_CACHE_SIZE_KEY + + " because supplied value greater than initial block cache size."); + blockCachePercentMinRange = blockCachePercent; + conf.setFloat(BLOCK_CACHE_SIZE_MIN_RANGE_KEY, blockCachePercentMinRange); + } + if (blockCachePercent > blockCachePercentMaxRange) { + LOG.warn("Setting " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + " to " + blockCachePercent + + ", same value as " + HFILE_BLOCK_CACHE_SIZE_KEY + + " because supplied value less than initial block cache size."); + blockCachePercentMaxRange = blockCachePercent; + conf.setFloat(BLOCK_CACHE_SIZE_MAX_RANGE_KEY, blockCachePercentMaxRange); + } + if (blockCachePercent == blockCachePercentMinRange + && blockCachePercent == blockCachePercentMaxRange) { + return false; + } + + gml = (int) (globalMemStorePercentMaxRange * CONVERT_TO_PERCENTAGE); + bcul = (int) (blockCachePercentMinRange * CONVERT_TO_PERCENTAGE); + if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) { + throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds " + + "the threshold required for successful cluster operation. " + + "The combined value cannot exceed 0.8. Please check the settings for " + + MEMSTORE_SIZE_MAX_RANGE_KEY + " and " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + + " in your configuration. " + MEMSTORE_SIZE_MAX_RANGE_KEY + " is " + + globalMemStorePercentMaxRange + " and " + BLOCK_CACHE_SIZE_MIN_RANGE_KEY + " is " + + blockCachePercentMinRange); + } + gml = (int) (globalMemStorePercentMinRange * CONVERT_TO_PERCENTAGE); + bcul = (int) (blockCachePercentMaxRange * CONVERT_TO_PERCENTAGE); + if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) { + throw new RuntimeException("Current heap configuration for MemStore and BlockCache exceeds " + + "the threshold required for successful cluster operation. " + + "The combined value cannot exceed 0.8. Please check the settings for " + + MEMSTORE_SIZE_MIN_RANGE_KEY + " and " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + + " in your configuration. " + MEMSTORE_SIZE_MIN_RANGE_KEY + " is " + + globalMemStorePercentMinRange + " and " + BLOCK_CACHE_SIZE_MAX_RANGE_KEY + " is " + + blockCachePercentMaxRange); + } + return true; + } + + public void start() { + if (tunerOn) { + LOG.info("Starting HeapMemoryTuner chore."); + this.heapMemTunerChore = new HeapMemoryTunerChore(); + Threads.setDaemonThreadRunning(heapMemTunerChore.getThread()); + // Register HeapMemoryTuner as a memstore flush listener + memStoreFlusher.registerFlushRequestListener(heapMemTunerChore); + } + } + + public void stop() { + // The thread is Daemon. Just interrupting the ongoing process. + if (tunerOn) { + LOG.info("Stoping HeapMemoryTuner chore."); + this.heapMemTunerChore.interrupt(); + } + } + + // Used by the test cases. + boolean isTunerOn() { + return this.tunerOn; + } + + private class HeapMemoryTunerChore extends Chore implements FlushRequestListener { + private HeapMemoryTuner heapMemTuner; + private AtomicLong blockedFlushCount = new AtomicLong(); + private AtomicLong unblockedFlushCount = new AtomicLong(); + private long evictCount = 0L; + private TunerContext tunerContext = new TunerContext(); + + public HeapMemoryTunerChore() { + super(server.getServerName() + "-HeapMemoryTunerChore", server.getConfiguration().getInt( + HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD), server); + Class tunerKlass = server.getConfiguration().getClass( + HBASE_RS_HEAP_MEMORY_TUNER_CLASS, DefaultHeapMemoryTuner.class, HeapMemoryTuner.class); + heapMemTuner = ReflectionUtils.newInstance(tunerKlass, server.getConfiguration()); + } + + @Override + protected void chore() { + evictCount = blockCache.getStats().getEvictedCount() - evictCount; + tunerContext.setBlockedFlushCount(blockedFlushCount.getAndSet(0)); + tunerContext.setUnblockedFlushCount(unblockedFlushCount.getAndSet(0)); + tunerContext.setEvictCount(evictCount); + tunerContext.setCurBlockCacheSize(blockCachePercent); + tunerContext.setCurMemStoreSize(globalMemStorePercent); + TunerResult result = null; + try { + result = this.heapMemTuner.tune(tunerContext); + } catch (Throwable t) { + LOG.error("Exception thrown from the HeapMemoryTuner implementation", t); + } + if (result != null && result.needsTuning()) { + float memstoreSize = result.getMemstoreSize(); + float blockCacheSize = result.getBlockCacheSize(); + LOG.debug("From HeapMemoryTuner new memstoreSize: " + memstoreSize + + ". new blockCacheSize: " + blockCacheSize); + if (memstoreSize < globalMemStorePercentMinRange) { + LOG.info("New memstoreSize from HeapMemoryTuner " + memstoreSize + " is below min level " + + globalMemStorePercentMinRange + ". Resetting memstoreSize to min size"); + memstoreSize = globalMemStorePercentMinRange; + } else if (memstoreSize > globalMemStorePercentMaxRange) { + LOG.info("New memstoreSize from HeapMemoryTuner " + memstoreSize + " is above max level " + + globalMemStorePercentMaxRange + ". Resetting memstoreSize to max size"); + memstoreSize = globalMemStorePercentMaxRange; + } + if (blockCacheSize < blockCachePercentMinRange) { + LOG.info("New blockCacheSize from HeapMemoryTuner " + blockCacheSize + + " is below min level " + blockCachePercentMinRange + + ". Resetting blockCacheSize to min size"); + blockCacheSize = blockCachePercentMinRange; + } else if (blockCacheSize > blockCachePercentMaxRange) { + LOG.info("New blockCacheSize from HeapMemoryTuner " + blockCacheSize + + " is above max level " + blockCachePercentMaxRange + + ". Resetting blockCacheSize to min size"); + blockCacheSize = blockCachePercentMaxRange; + } + int gml = (int) (memstoreSize * CONVERT_TO_PERCENTAGE); + int bcul = (int) (blockCacheSize * CONVERT_TO_PERCENTAGE); + if (CONVERT_TO_PERCENTAGE - (gml + bcul) < CLUSTER_MINIMUM_MEMORY_THRESHOLD) { + LOG.info("Current heap configuration from HeapMemoryTuner exceeds " + + "the threshold required for successful cluster operation. " + + "The combined value cannot exceed 0.8. " + MemStoreFlusher.MEMSTORE_SIZE_KEY + + " is " + memstoreSize + " and " + HFILE_BLOCK_CACHE_SIZE_KEY + " is " + + blockCacheSize); + // TODO can adjust the value so as not exceed 80%. Is that correct? may be. + } else { + long newBlockCacheSize = (long) (maxHeapSize * blockCacheSize); + long newMemstoreSize = (long) (maxHeapSize * memstoreSize); + LOG.info("Setting block cache heap size to " + newBlockCacheSize + + " and memstore heap size to " + newMemstoreSize); + blockCachePercent = blockCacheSize; + blockCache.setMaxSize(newBlockCacheSize); + globalMemStorePercent = memstoreSize; + memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize); + } + } + } + + @Override + public void flushRequested(FlushType type, HRegion region) { + switch (type) { + case ABOVE_HIGHER_MARK: + blockedFlushCount.incrementAndGet(); + break; + case ABOVE_LOWER_MARK: + unblockedFlushCount.incrementAndGet(); + break; + default: + // In case of normal flush don't do any action. + break; + } + } + } + + /** + * POJO to pass all the relevant information required to do the heap memory tuning. It holds the + * flush counts and block cache evictions happened within the interval. Also holds the current + * heap percentage allocated for memstore and block cache. + */ + public static final class TunerContext { + private long blockedFlushCount; + private long unblockedFlushCount; + private long evictCount; + private float curMemStoreSize; + private float curBlockCacheSize; + + public long getBlockedFlushCount() { + return blockedFlushCount; + } + + public void setBlockedFlushCount(long blockedFlushCount) { + this.blockedFlushCount = blockedFlushCount; + } + + public long getUnblockedFlushCount() { + return unblockedFlushCount; + } + + public void setUnblockedFlushCount(long unblockedFlushCount) { + this.unblockedFlushCount = unblockedFlushCount; + } + + public long getEvictCount() { + return evictCount; + } + + public void setEvictCount(long evictCount) { + this.evictCount = evictCount; + } + + public float getCurMemStoreSize() { + return curMemStoreSize; + } + + public void setCurMemStoreSize(float curMemStoreSize) { + this.curMemStoreSize = curMemStoreSize; + } + + public float getCurBlockCacheSize() { + return curBlockCacheSize; + } + + public void setCurBlockCacheSize(float curBlockCacheSize) { + this.curBlockCacheSize = curBlockCacheSize; + } + } + + /** + * POJO which holds the result of memory tuning done by HeapMemoryTuner implementation. + * It includes the new heap percentage for memstore and block cache. + */ + public static final class TunerResult { + private float memstoreSize; + private float blockCacheSize; + private final boolean needsTuning; + + public TunerResult(boolean needsTuning) { + this.needsTuning = needsTuning; + } + + public float getMemstoreSize() { + return memstoreSize; + } + + public void setMemstoreSize(float memstoreSize) { + this.memstoreSize = memstoreSize; + } + + public float getBlockCacheSize() { + return blockCacheSize; + } + + public void setBlockCacheSize(float blockCacheSize) { + this.blockCacheSize = blockCacheSize; + } + + public boolean needsTuning() { + return needsTuning; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryTuner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryTuner.java new file mode 100644 index 00000000000..9dc65b5f358 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryTuner.java @@ -0,0 +1,42 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult; + +/** + * Makes the decision regarding proper sizing of the heap memory. Decides what percentage of heap + * memory should be allocated for global memstore and BlockCache. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface HeapMemoryTuner extends Configurable { + + /** + * Perform the heap memory tuning operation. + * + * @param context + * @return TunerResult including the heap percentage for memstore and block cache + */ + TunerResult tune(TunerContext context); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java index f3feb663520..1230802f4a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java @@ -194,8 +194,7 @@ public class MemStoreChunkPool { } long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax(); - long globalMemStoreLimit = MemStoreFlusher.globalMemStoreLimit(heapMax, - MemStoreFlusher.DEFAULT_UPPER, MemStoreFlusher.UPPER_KEY, conf); + long globalMemStoreLimit = (long) (heapMax * MemStoreFlusher.getGlobalMemStorePercent(conf)); int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index ab2781522f6..a8457c696c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -21,9 +21,11 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.ManagementFactory; +import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; @@ -65,6 +67,18 @@ import org.cloudera.htrace.TraceScope; @InterfaceAudience.Private class MemStoreFlusher implements FlushRequester { static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); + static final String MEMSTORE_SIZE_KEY = "hbase.regionserver.global.memstore.size"; + private static final String MEMSTORE_SIZE_OLD_KEY = + "hbase.regionserver.global.memstore.upperLimit"; + private static final String MEMSTORE_SIZE_LOWER_LIMIT_KEY = + "hbase.regionserver.global.memstore.size.lower.limit"; + private static final String MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY = + "hbase.regionserver.global.memstore.lowerLimit"; + + private static final float DEFAULT_MEMSTORE_SIZE = 0.4f; + // Default lower water mark limit is 95% size of memstore size. + private static final float DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT = 0.95f; + // These two data members go together. Any entry in the one must have // a corresponding entry in the other. private final BlockingQueue flushQueue = @@ -78,19 +92,15 @@ class MemStoreFlusher implements FlushRequester { private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Object blockSignal = new Object(); - protected final long globalMemStoreLimit; - protected final long globalMemStoreLimitLowMark; + protected long globalMemStoreLimit; + protected float globalMemStoreLimitLowMarkPercent; + protected long globalMemStoreLimitLowMark; - static final float DEFAULT_UPPER = 0.4f; - private static final float DEFAULT_LOWER = 0.35f; - static final String UPPER_KEY = - "hbase.regionserver.global.memstore.upperLimit"; - private static final String LOWER_KEY = - "hbase.regionserver.global.memstore.lowerLimit"; private long blockingWaitTime; private final Counter updatesBlockedMsHighWater = new Counter(); private final FlushHandler[] flushHandlers; + private List flushRequestListeners = new ArrayList(1); /** * @param conf @@ -103,15 +113,13 @@ class MemStoreFlusher implements FlushRequester { this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); - this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER, - UPPER_KEY, conf); - long lower = globalMemStoreLimit(max, DEFAULT_LOWER, LOWER_KEY, conf); - if (lower > this.globalMemStoreLimit) { - lower = this.globalMemStoreLimit; - LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " + - "because supplied " + LOWER_KEY + " was > " + UPPER_KEY); - } - this.globalMemStoreLimitLowMark = lower; + float globalMemStorePercent = getGlobalMemStorePercent(conf); + this.globalMemStoreLimit = (long) (max * globalMemStorePercent); + this.globalMemStoreLimitLowMarkPercent = + getGlobalMemStoreLowerMark(conf, globalMemStorePercent); + this.globalMemStoreLimitLowMark = + (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent); + this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); int handlerCount = conf.getInt("hbase.hstore.flusher.count", 1); @@ -124,29 +132,40 @@ class MemStoreFlusher implements FlushRequester { } /** - * Calculate size using passed key for configured - * percentage of max. + * Calculate global memstore size for configured percentage of max. * @param max - * @param defaultLimit - * @param key * @param c * @return Limit. */ - static long globalMemStoreLimit(final long max, - final float defaultLimit, final String key, final Configuration c) { - float limit = c.getFloat(key, defaultLimit); - return getMemStoreLimit(max, limit, defaultLimit); + static float getGlobalMemStorePercent(final Configuration c) { + float limit = c.getFloat(MEMSTORE_SIZE_KEY, + c.getFloat(MEMSTORE_SIZE_OLD_KEY, DEFAULT_MEMSTORE_SIZE)); + if (limit > 0.8f || limit < 0.05f) { + LOG.warn("Setting global memstore limit to default of " + DEFAULT_MEMSTORE_SIZE + + " because supplied value outside allowed range of 0.05 -> 0.8"); + limit = DEFAULT_MEMSTORE_SIZE; + } + return limit; } - static long getMemStoreLimit(final long max, final float limit, - final float defaultLimit) { - float effectiveLimit = limit; - if (limit >= 0.9f || limit < 0.1f) { - LOG.warn("Setting global memstore limit to default of " + defaultLimit + - " because supplied value outside allowed range of 0.1 -> 0.9"); - effectiveLimit = defaultLimit; + private static float getGlobalMemStoreLowerMark(final Configuration c, float globalMemStorePercent) { + String lowMarkPercentStr = c.get(MEMSTORE_SIZE_LOWER_LIMIT_KEY); + if (lowMarkPercentStr != null) { + return Float.parseFloat(lowMarkPercentStr); } - return (long)(max * effectiveLimit); + String lowerWaterMarkOldValStr = c.get(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY); + if (lowerWaterMarkOldValStr != null) { + LOG.warn(MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " is deprecated. Instead use " + + MEMSTORE_SIZE_LOWER_LIMIT_KEY); + float lowerWaterMarkOldVal = Float.parseFloat(lowerWaterMarkOldValStr); + if (lowerWaterMarkOldVal > globalMemStorePercent) { + lowerWaterMarkOldVal = globalMemStorePercent; + LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " + "because supplied " + + MEMSTORE_SIZE_LOWER_LIMIT_OLD_KEY + " was > " + MEMSTORE_SIZE_OLD_KEY); + } + return lowerWaterMarkOldVal / globalMemStorePercent; + } + return DEFAULT_MEMSTORE_SIZE_LOWER_LIMIT; } public Counter getUpdatesBlockedMsHighWater() { @@ -453,6 +472,7 @@ class MemStoreFlusher implements FlushRequester { } lock.readLock().lock(); try { + notifyFlushRequest(region, emergencyFlush); boolean shouldCompact = region.flushcache(); // We just want to check the size boolean shouldSplit = region.checkSplit() != null; @@ -485,6 +505,16 @@ class MemStoreFlusher implements FlushRequester { return true; } + private void notifyFlushRequest(HRegion region, boolean emergencyFlush) { + FlushType type = FlushType.NORMAL; + if (emergencyFlush) { + type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK; + } + for (FlushRequestListener listener : flushRequestListeners) { + listener.flushRequested(type, region); + } + } + private void wakeUpIfBlocking() { synchronized (blockSignal) { blockSignal.notifyAll(); @@ -570,6 +600,38 @@ class MemStoreFlusher implements FlushRequester { return queueList.toString(); } + /** + * Register a MemstoreFlushListener + * @param listener + */ + public void registerFlushRequestListener(final FlushRequestListener listener) { + this.flushRequestListeners.add(listener); + } + + /** + * Unregister the listener from MemstoreFlushListeners + * @param listener + * @return true when passed listener is unregistered successfully. + */ + public boolean unregisterFlushRequestListener(final FlushRequestListener listener) { + return this.flushRequestListeners.remove(listener); + } + + /** + * Sets the global memstore limit to a new size. + * @param globalMemStoreSize + */ + public void setGlobalMemstoreLimit(long globalMemStoreSize) { + this.globalMemStoreLimit = globalMemStoreSize; + this.globalMemStoreLimitLowMark = + (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize); + reclaimMemStoreMemory(); + } + + public long getMemoryLimit() { + return this.globalMemStoreLimit; + } + interface FlushQueueEntry extends Delayed {} /** @@ -672,3 +734,7 @@ class MemStoreFlusher implements FlushRequester { } } } + +enum FlushType { + NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK; +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java new file mode 100644 index 00000000000..6dfe7c62df1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -0,0 +1,453 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.CacheStats; +import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerContext; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.TunerResult; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestHeapMemoryManager { + + private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); + + @Test + public void testAutoTunerShouldBeOffWhenMaxMinRangesForMemstoreIsNotGiven() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); + HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0), + new MemstoreFlusherStub(0), new RegionServerStub(conf)); + assertFalse(manager.isTunerOn()); + } + + @Test + public void testAutoTunerShouldBeOffWhenMaxMinRangesForBlockCacheIsNotGiven() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.05f); + HeapMemoryManager manager = new HeapMemoryManager(new BlockCacheStub(0), + new MemstoreFlusherStub(0), new RegionServerStub(conf)); + assertFalse(manager.isTunerOn()); + } + + @Test + public void testWhenMemstoreAndBlockCacheMaxMinChecksFails() throws Exception { + BlockCacheStub blockCache = new BlockCacheStub(0); + MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub(0); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.06f); + try { + new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf)); + fail(); + } catch (RuntimeException e) { + } + conf = HBaseConfiguration.create(); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.2f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); + try { + new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf)); + fail(); + } catch (RuntimeException e) { + } + } + + @Test + public void testWhenClusterIsWriteHeavy() throws Exception { + BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); + MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); + conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); + // Let the system start with default values for memstore heap and block cache size. + HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf)); + long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; + long oldBlockCacheSize = blockCache.maxSize; + heapMemoryManager.start(); + memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; + memStoreFlusher.requestFlush(null); + memStoreFlusher.requestFlush(null); + memStoreFlusher.requestFlush(null); + memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; + memStoreFlusher.requestFlush(null); + Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up + assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize, + memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldBlockCacheSize, + blockCache.maxSize); + oldMemstoreHeapSize = memStoreFlusher.memstoreSize; + oldBlockCacheSize = blockCache.maxSize; + // Do some more flushes before the next run of HeapMemoryTuner + memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; + memStoreFlusher.requestFlush(null); + memStoreFlusher.requestFlush(null); + Thread.sleep(1500); + assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize, + memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldBlockCacheSize, + blockCache.maxSize); + } + + @Test + public void testWhenClusterIsReadHeavy() throws Exception { + BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); + MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.75f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.10f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.05f); + conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); + // Let the system start with default values for memstore heap and block cache size. + HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf)); + long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; + long oldBlockCacheSize = blockCache.maxSize; + heapMemoryManager.start(); + blockCache.evictBlock(null); + blockCache.evictBlock(null); + blockCache.evictBlock(null); + Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up + assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldMemstoreHeapSize, + memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldBlockCacheSize, + blockCache.maxSize); + oldMemstoreHeapSize = memStoreFlusher.memstoreSize; + oldBlockCacheSize = blockCache.maxSize; + // Do some more evictions before the next run of HeapMemoryTuner + blockCache.evictBlock(null); + Thread.sleep(1500); + assertHeapSpaceDelta(-(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE), oldMemstoreHeapSize, + memStoreFlusher.memstoreSize); + assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldBlockCacheSize, + blockCache.maxSize); + } + + @Test + public void testPluggingInHeapMemoryTuner() throws Exception { + BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); + MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.78f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.05f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.75f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.02f); + conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); + conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class, + HeapMemoryTuner.class); + // Let the system start with default values for memstore heap and block cache size. + HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf)); + heapMemoryManager.start(); + // Now we wants to be in write mode. Set bigger memstore size from CustomHeapMemoryTuner + CustomHeapMemoryTuner.memstoreSize = 0.78f; + CustomHeapMemoryTuner.blockCacheSize = 0.02f; + Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up + assertHeapSpace(0.78f, memStoreFlusher.memstoreSize);// Memstore + assertHeapSpace(0.02f, blockCache.maxSize);// BlockCache + // Now we wants to be in read mode. Set bigger memstore size from CustomHeapMemoryTuner + CustomHeapMemoryTuner.blockCacheSize = 0.75f; + CustomHeapMemoryTuner.memstoreSize = 0.05f; + Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up + assertHeapSpace(0.75f, blockCache.maxSize);// BlockCache + assertHeapSpace(0.05f, memStoreFlusher.memstoreSize);// Memstore + } + + @Test + public void testWhenSizeGivenByHeapTunerGoesOutsideRange() throws Exception { + BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); + MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.1f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.1f); + conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); + conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class, + HeapMemoryTuner.class); + HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf)); + heapMemoryManager.start(); + CustomHeapMemoryTuner.memstoreSize = 0.78f; + CustomHeapMemoryTuner.blockCacheSize = 0.02f; + Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up + // Even if the tuner says to set the memstore to 78%, HBase makes it as 70% as that is the + // upper bound. Same with block cache as 10% is the lower bound. + assertHeapSpace(0.7f, memStoreFlusher.memstoreSize); + assertHeapSpace(0.1f, blockCache.maxSize); + } + + @Test + public void testWhenCombinedHeapSizesFromTunerGoesOutSideMaxLimit() throws Exception { + BlockCacheStub blockCache = new BlockCacheStub((long) (maxHeapSize * 0.4)); + MemstoreFlusherStub memStoreFlusher = new MemstoreFlusherStub((long) (maxHeapSize * 0.4)); + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MAX_RANGE_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.MEMSTORE_SIZE_MIN_RANGE_KEY, 0.1f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MAX_RANGE_KEY, 0.7f); + conf.setFloat(HeapMemoryManager.BLOCK_CACHE_SIZE_MIN_RANGE_KEY, 0.1f); + conf.setLong(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_PERIOD, 1000); + conf.setClass(HeapMemoryManager.HBASE_RS_HEAP_MEMORY_TUNER_CLASS, CustomHeapMemoryTuner.class, + HeapMemoryTuner.class); + HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, + new RegionServerStub(conf)); + long oldMemstoreSize = memStoreFlusher.memstoreSize; + long oldBlockCacheSize = blockCache.maxSize; + heapMemoryManager.start(); + CustomHeapMemoryTuner.memstoreSize = 0.7f; + CustomHeapMemoryTuner.blockCacheSize = 0.3f; + Thread.sleep(1500); + assertEquals(oldMemstoreSize, memStoreFlusher.memstoreSize); + assertEquals(oldBlockCacheSize, blockCache.maxSize); + } + + private void assertHeapSpace(float expectedHeapPercentage, long currentHeapSpace) { + long expected = (long) (this.maxHeapSize * expectedHeapPercentage); + assertEquals(expected, currentHeapSpace); + } + + private void assertHeapSpaceDelta(float expectedDeltaPercent, long oldHeapSpace, long newHeapSpace) { + long expctedMinDelta = (long) (this.maxHeapSize * expectedDeltaPercent); + if (expectedDeltaPercent > 0) { + assertTrue(expctedMinDelta <= (newHeapSpace - oldHeapSpace)); + } else { + assertTrue(expctedMinDelta <= (oldHeapSpace - newHeapSpace)); + } + } + + private static class BlockCacheStub implements ResizableBlockCache { + + CacheStats stats = new CacheStats(); + long maxSize = 0; + + public BlockCacheStub(long size){ + this.maxSize = size; + } + + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { + + } + + @Override + public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { + + } + + @Override + public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat) { + return null; + } + + @Override + public boolean evictBlock(BlockCacheKey cacheKey) { + stats.evicted(); + return false; + } + + @Override + public int evictBlocksByHfileName(String hfileName) { + stats.evicted(); // Just assuming only one block for file here. + return 0; + } + + @Override + public CacheStats getStats() { + return this.stats; + } + + @Override + public void shutdown() { + + } + + @Override + public long size() { + return 0; + } + + @Override + public long getFreeSize() { + return 0; + } + + @Override + public long getCurrentSize() { + return 0; + } + + @Override + public long getEvictedCount() { + return 0; + } + + @Override + public long getBlockCount() { + return 0; + } + + @Override + public List getBlockCacheColumnFamilySummaries(Configuration conf) + throws IOException { + return null; + } + + @Override + public void setMaxSize(long size) { + this.maxSize = size; + } + } + + private static class MemstoreFlusherStub implements FlushRequester { + + long memstoreSize; + + FlushRequestListener listener; + + FlushType flushType = FlushType.NORMAL; + + public MemstoreFlusherStub(long memstoreSize) { + this.memstoreSize = memstoreSize; + } + + @Override + public void requestFlush(HRegion region) { + this.listener.flushRequested(flushType, region); + } + + @Override + public void requestDelayedFlush(HRegion region, long delay) { + + } + + @Override + public void registerFlushRequestListener(FlushRequestListener listener) { + this.listener = listener; + } + + @Override + public boolean unregisterFlushRequestListener(FlushRequestListener listener) { + return false; + } + + @Override + public void setGlobalMemstoreLimit(long globalMemStoreSize) { + this.memstoreSize = globalMemStoreSize; + } + } + + private static class RegionServerStub implements Server { + private Configuration conf; + private boolean stopped = false; + + public RegionServerStub(Configuration conf) { + this.conf = conf; + } + + @Override + public void abort(String why, Throwable e) { + + } + + @Override + public boolean isAborted() { + return false; + } + + @Override + public void stop(String why) { + this.stopped = true; + } + + @Override + public boolean isStopped() { + return this.stopped; + } + + @Override + public Configuration getConfiguration() { + return this.conf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return null; + } + + @Override + public CatalogTracker getCatalogTracker() { + return null; + } + + @Override + public ServerName getServerName() { + return ServerName.valueOf("server1",4000,12345); + } + } + + static class CustomHeapMemoryTuner implements HeapMemoryTuner { + static float blockCacheSize = 0.4f; + static float memstoreSize = 0.4f; + + @Override + public Configuration getConf() { + return null; + } + + @Override + public void setConf(Configuration arg0) { + + } + + @Override + public TunerResult tune(TunerContext context) { + TunerResult result = new TunerResult(true); + result.setBlockCacheSize(blockCacheSize); + result.setMemstoreSize(memstoreSize); + return result; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 5762ef48621..5aeb31b994d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; +import org.apache.hadoop.hbase.regionserver.FlushRequestListener; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -867,6 +868,21 @@ public class TestWALReplay { // TODO Auto-generated method stub } + + @Override + public void registerFlushRequestListener(FlushRequestListener listener) { + + } + + @Override + public boolean unregisterFlushRequestListener(FlushRequestListener listener) { + return false; + } + + @Override + public void setGlobalMemstoreLimit(long globalMemStoreSize) { + + } } private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,