HBASE-15429 Add split policy for busy regions

Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
Ashu Pachauri 2016-04-11 07:43:06 -07:00 committed by Elliott Clark
parent 6d0e0e3721
commit 3abd52bdc6
3 changed files with 209 additions and 1 deletions

View File

@ -322,7 +322,7 @@ possible configurations would overwhelm and obscure the important.
<value>org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy</value>
<description>
A split policy determines when a region should be split. The various other split policies that
are available currently are ConstantSizeRegionSplitPolicy, DisabledRegionSplitPolicy,
are available currently are BusyRegionSplitPolicy, ConstantSizeRegionSplitPolicy, DisabledRegionSplitPolicy,
DelimitedKeyPrefixRegionSplitPolicy, and KeyPrefixRegionSplitPolicy.
DisabledRegionSplitPolicy blocks manual region splitting.
</description>

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* This class represents a split policy which makes the split decision based
* on how busy a region is. The metric that is used here is the fraction of
* total write requests that are blocked due to high memstore utilization.
* This fractional rate is calculated over a running window of
* "hbase.busy.policy.aggWindow" milliseconds. The rate is a time-weighted
* aggregated average of the rate in the current window and the
* true average rate in the previous window.
*
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class BusyRegionSplitPolicy extends IncreasingToUpperBoundRegionSplitPolicy {
private static final Log LOG = LogFactory.getLog(BusyRegionSplitPolicy.class);
// Maximum fraction blocked write requests before region is considered for split
private float maxBlockedRequests;
public static final float DEFAULT_MAX_BLOCKED_REQUESTS = 0.2f;
// Minimum age of the region in milliseconds before it is considered for split
private long minAge = -1;
public static final long DEFAULT_MIN_AGE_MS = 600000; // 10 minutes
// The window time in milliseconds over which the blocked requests rate is calculated
private long aggregationWindow;
public static final long DEFAULT_AGGREGATION_WINDOW = 300000; // 5 minutes
private HRegion region;
private long prevTime;
private long startTime;
private long writeRequestCount;
private long blockedRequestCount;
private float blockedRate;
@Override
protected void configureForRegion(final HRegion region) {
super.configureForRegion(region);
this.region = region;
Configuration conf = getConf();
maxBlockedRequests = conf.getFloat("hbase.busy.policy.blockedRequests",
DEFAULT_MAX_BLOCKED_REQUESTS);
minAge = conf.getLong("hbase.busy.policy.minAge", DEFAULT_MIN_AGE_MS);
aggregationWindow = conf.getLong("hbase.busy.policy.aggWindow",
DEFAULT_AGGREGATION_WINDOW);
if (maxBlockedRequests < 0.00001f || maxBlockedRequests > 0.99999f) {
LOG.warn("Threshold for maximum blocked requests is set too low or too high, "
+ " resetting to default of " + DEFAULT_MAX_BLOCKED_REQUESTS);
maxBlockedRequests = DEFAULT_MAX_BLOCKED_REQUESTS;
}
if (aggregationWindow <= 0) {
LOG.warn("Aggregation window size is too low: " + aggregationWindow
+ ". Resetting it to default of " + DEFAULT_AGGREGATION_WINDOW);
aggregationWindow = DEFAULT_AGGREGATION_WINDOW;
}
init();
}
private synchronized void init() {
startTime = EnvironmentEdgeManager.currentTime();
prevTime = startTime;
blockedRequestCount = region.getBlockedRequestsCount();
writeRequestCount = region.getWriteRequestsCount();
}
@Override
protected boolean shouldSplit() {
float blockedReqRate = updateRate();
if (super.shouldSplit()) {
return true;
}
if (EnvironmentEdgeManager.currentTime() < startTime + minAge) {
return false;
}
for (Store store: region.getStores()) {
if (!store.canSplit()) {
return false;
}
}
if (blockedReqRate >= maxBlockedRequests) {
if (LOG.isDebugEnabled()) {
LOG.debug("Going to split region " + region.getRegionInfo().getRegionNameAsString()
+ " because it's too busy. Blocked Request rate: " + blockedReqRate);
}
return true;
}
return false;
}
/**
* Update the blocked request rate based on number of blocked and total write requests in the
* last aggregation window, or since last call to this method, whichever is farthest in time.
* Uses weighted rate calculation based on the previous rate and new data.
*
* @return Updated blocked request rate.
*/
private synchronized float updateRate() {
float aggBlockedRate;
long curTime = EnvironmentEdgeManager.currentTime();
long newBlockedReqs = region.getBlockedRequestsCount();
long newWriteReqs = region.getWriteRequestsCount();
aggBlockedRate =
(newBlockedReqs - blockedRequestCount) / (newWriteReqs - writeRequestCount + 0.00001f);
if (curTime - prevTime >= aggregationWindow) {
blockedRate = aggBlockedRate;
prevTime = curTime;
blockedRequestCount = newBlockedReqs;
writeRequestCount = newWriteReqs;
} else if (curTime - startTime >= aggregationWindow) {
// Calculate the aggregate blocked rate as the weighted sum of
// previous window's average blocked rate and blocked rate in this window so far.
float timeSlice = (curTime - prevTime) / (aggregationWindow + 0.0f);
aggBlockedRate = (1 - timeSlice) * blockedRate + timeSlice * aggBlockedRate;
} else {
aggBlockedRate = 0.0f;
}
return aggBlockedRate;
}
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -148,6 +149,56 @@ public class TestRegionSplitPolicy {
assertWithinJitter(maxSplitSize, policy.getSizeToCheck(0));
}
@Test
public void testBusyRegionSplitPolicy() throws Exception {
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
BusyRegionSplitPolicy.class.getName());
conf.setLong("hbase.busy.policy.minAge", 1000000L);
conf.setFloat("hbase.busy.policy.blockedRequests", 0.1f);
RegionServerServices rss = Mockito.mock(RegionServerServices.class);
final List<Region> regions = new ArrayList<Region>();
Mockito.when(rss.getOnlineRegions(TABLENAME)).thenReturn(regions);
Mockito.when(mockRegion.getRegionServerServices()).thenReturn(rss);
Mockito.when(mockRegion.getBlockedRequestsCount()).thenReturn(0L);
Mockito.when(mockRegion.getWriteRequestsCount()).thenReturn(0L);
BusyRegionSplitPolicy policy =
(BusyRegionSplitPolicy)RegionSplitPolicy.create(mockRegion, conf);
Mockito.when(mockRegion.getBlockedRequestsCount()).thenReturn(10L);
Mockito.when(mockRegion.getWriteRequestsCount()).thenReturn(10L);
// Not enough time since region came online
assertFalse(policy.shouldSplit());
// Reset min age for split to zero
conf.setLong("hbase.busy.policy.minAge", 0L);
// Aggregate over 500 ms periods
conf.setLong("hbase.busy.policy.aggWindow", 500L);
policy =
(BusyRegionSplitPolicy)RegionSplitPolicy.create(mockRegion, conf);
long start = EnvironmentEdgeManager.currentTime();
Mockito.when(mockRegion.getBlockedRequestsCount()).thenReturn(10L);
Mockito.when(mockRegion.getWriteRequestsCount()).thenReturn(20L);
Thread.sleep(300);
assertFalse(policy.shouldSplit());
Mockito.when(mockRegion.getBlockedRequestsCount()).thenReturn(12L);
Mockito.when(mockRegion.getWriteRequestsCount()).thenReturn(30L);
Thread.sleep(2);
// Enough blocked requests since last time, but aggregate blocked request
// rate over last 500 ms is still low, because major portion of the window is constituted
// by the previous zero blocked request period which lasted at least 300 ms off last 500 ms.
if (EnvironmentEdgeManager.currentTime() - start < 500) {
assertFalse(policy.shouldSplit());
}
Mockito.when(mockRegion.getBlockedRequestsCount()).thenReturn(14L);
Mockito.when(mockRegion.getWriteRequestsCount()).thenReturn(40L);
Thread.sleep(200);
assertTrue(policy.shouldSplit());
}
private void assertWithinJitter(long maxSplitSize, long sizeToCheck) {
assertTrue("Size greater than lower bound of jitter",
(long)(maxSplitSize * 0.75) <= sizeToCheck);