HBASE-6730 Enable rolling averages in StochasticLoadBalancer
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1387865 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6e3d76183c
commit
eddc746457
|
@ -85,6 +85,8 @@ import org.apache.hadoop.hbase.ipc.HBaseRPC;
|
|||
import org.apache.hadoop.hbase.ipc.HBaseServer;
|
||||
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.master.balancer.BalancerChore;
|
||||
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
|
||||
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
|
||||
|
@ -284,6 +286,7 @@ Server {
|
|||
|
||||
private LoadBalancer balancer;
|
||||
private Thread balancerChore;
|
||||
private Thread clusterStatusChore;
|
||||
|
||||
private CatalogJanitor catalogJanitorChore;
|
||||
private LogCleaner logCleaner;
|
||||
|
@ -309,6 +312,7 @@ Server {
|
|||
private final boolean masterCheckCompression;
|
||||
|
||||
private SpanReceiverHost spanReceiverHost;
|
||||
|
||||
/**
|
||||
* Initializes the HMaster. The steps are as follows:
|
||||
* <p>
|
||||
|
@ -700,6 +704,7 @@ Server {
|
|||
// Start balancer and meta catalog janitor after meta and regions have
|
||||
// been assigned.
|
||||
status.setStatus("Starting balancer and catalog janitor");
|
||||
this.clusterStatusChore = getAndStartClusterStatusChore(this);
|
||||
this.balancerChore = getAndStartBalancerChore(this);
|
||||
this.catalogJanitorChore = new CatalogJanitor(this, this);
|
||||
startCatalogJanitorChore();
|
||||
|
@ -1083,17 +1088,17 @@ Server {
|
|||
if (this.executorService != null) this.executorService.shutdown();
|
||||
}
|
||||
|
||||
private static Thread getAndStartClusterStatusChore(HMaster master) {
|
||||
if (master == null || master.balancer == null) {
|
||||
return null;
|
||||
}
|
||||
Chore chore = new ClusterStatusChore(master, master.balancer);
|
||||
return Threads.setDaemonThreadRunning(chore.getThread());
|
||||
}
|
||||
|
||||
private static Thread getAndStartBalancerChore(final HMaster master) {
|
||||
String name = master.getServerName() + "-BalancerChore";
|
||||
int balancerPeriod =
|
||||
master.getConfiguration().getInt("hbase.balancer.period", 300000);
|
||||
// Start up the load balancer chore
|
||||
Chore chore = new Chore(name, balancerPeriod, master) {
|
||||
@Override
|
||||
protected void chore() {
|
||||
master.balance();
|
||||
}
|
||||
};
|
||||
Chore chore = new BalancerChore(master);
|
||||
return Threads.setDaemonThreadRunning(chore.getThread());
|
||||
}
|
||||
|
||||
|
@ -1101,6 +1106,9 @@ Server {
|
|||
if (this.balancerChore != null) {
|
||||
this.balancerChore.interrupt();
|
||||
}
|
||||
if (this.clusterStatusChore != null) {
|
||||
this.clusterStatusChore.interrupt();
|
||||
}
|
||||
if (this.catalogJanitorChore != null) {
|
||||
this.catalogJanitorChore.interrupt();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Chore;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
|
||||
/**
|
||||
* Chore that will call HMaster.balance{@link org.apache.hadoop.hbase.master.HMaster#balance()} when
|
||||
* needed.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class BalancerChore extends Chore {
|
||||
|
||||
private final HMaster master;
|
||||
|
||||
public BalancerChore(HMaster master) {
|
||||
super(master.getServerName() + "-BalancerChore",
|
||||
master.getConfiguration().getInt("hbase.balancer.period", 300000),
|
||||
master);
|
||||
this.master = master;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void chore() {
|
||||
master.balance();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Chore;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||
|
||||
/**
|
||||
* Chore that will feed the balancer the cluster status.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ClusterStatusChore extends Chore {
|
||||
|
||||
private final HMaster master;
|
||||
private final LoadBalancer balancer;
|
||||
|
||||
public ClusterStatusChore(HMaster master, LoadBalancer balancer) {
|
||||
super(master.getServerName() + "-ClusterStatusChore",
|
||||
master.getConfiguration().getInt("hbase.balancer.statusPeriod", 60000),
|
||||
master);
|
||||
this.master = master;
|
||||
this.balancer = balancer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void chore() {
|
||||
balancer.setClusterStatus(master.getClusterStatus());
|
||||
}
|
||||
}
|
|
@ -104,17 +104,19 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
"hbase.master.balancer.stochastic.stepsPerRegion";
|
||||
private static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps";
|
||||
private static final String MAX_MOVES_KEY = "hbase.master.balancer.stochastic.maxMoveRegions";
|
||||
private static final String KEEP_REGION_LOADS = "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
|
||||
|
||||
private static final Random RANDOM = new Random(System.currentTimeMillis());
|
||||
private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
|
||||
private final RegionLocationFinder regionFinder = new RegionLocationFinder();
|
||||
private ClusterStatus clusterStatus = null;
|
||||
private Map<String, RegionLoad> loads = new HashMap<String, RegionLoad>();
|
||||
private Map<String, List<RegionLoad>> loads = new HashMap<String, List<RegionLoad>>();
|
||||
|
||||
// values are defaults
|
||||
private int maxSteps = 15000;
|
||||
private int stepsPerRegion = 110;
|
||||
private int maxMoves = 600;
|
||||
private int numRegionLoadsToRemember = 15;
|
||||
private float loadMultiplier = 55;
|
||||
private float moveCostMultiplier = 5;
|
||||
private float tableMultiplier = 5;
|
||||
|
@ -124,6 +126,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
private float memStoreSizeMultiplier = 5;
|
||||
private float storeFileSizeMultiplier = 5;
|
||||
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
super.setConf(conf);
|
||||
|
@ -133,6 +136,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
maxMoves = conf.getInt(MAX_MOVES_KEY, maxMoves);
|
||||
stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
|
||||
|
||||
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
|
||||
|
||||
// Load multiplier should be the greatest as it is the most general way to balance data.
|
||||
loadMultiplier = conf.getFloat(REGION_LOAD_COST_KEY, loadMultiplier);
|
||||
|
||||
|
@ -146,9 +151,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
localityMultiplier = conf.getFloat(LOCALITY_COST_KEY, localityMultiplier);
|
||||
memStoreSizeMultiplier = conf.getFloat(MEMSTORE_SIZE_COST_KEY, memStoreSizeMultiplier);
|
||||
storeFileSizeMultiplier = conf.getFloat(STOREFILE_SIZE_COST_KEY, storeFileSizeMultiplier);
|
||||
|
||||
// These are not used currently.
|
||||
// TODO: Start using these once rolling averages are implemented for read/write load.
|
||||
readRequestMultiplier = conf.getFloat(READ_REQUEST_COST_KEY, readRequestMultiplier);
|
||||
writeRequestMultiplier = conf.getFloat(WRITE_REQUEST_COST_KEY, writeRequestMultiplier);
|
||||
}
|
||||
|
@ -313,13 +315,33 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
|
||||
/** Store the current region loads. */
|
||||
private void updateRegionLoad() {
|
||||
loads.clear();
|
||||
private synchronized void updateRegionLoad() {
|
||||
|
||||
//We create a new hashmap so that regions that are no longer there are removed.
|
||||
//However we temporarily need the old loads so we can use them to keep the rolling average.
|
||||
Map<String, List<RegionLoad>> oldLoads = loads;
|
||||
loads = new HashMap<String, List<RegionLoad>>();
|
||||
|
||||
for (ServerName sn : clusterStatus.getServers()) {
|
||||
ServerLoad sl = clusterStatus.getLoad(sn);
|
||||
if (sl == null) continue;
|
||||
for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
|
||||
loads.put(Bytes.toString(entry.getKey()), entry.getValue());
|
||||
List<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
|
||||
if (rLoads != null) {
|
||||
|
||||
//We're only going to keep 15. So if there are that many already take the last 14
|
||||
if (rLoads.size() >= numRegionLoadsToRemember) {
|
||||
int numToRemove = 1 + (rLoads.size() - numRegionLoadsToRemember);
|
||||
|
||||
rLoads = rLoads.subList(numToRemove, rLoads.size());
|
||||
}
|
||||
|
||||
} else {
|
||||
//There was nothing there
|
||||
rLoads = new ArrayList<RegionLoad>();
|
||||
}
|
||||
rLoads.add(entry.getValue());
|
||||
loads.put(Bytes.toString(entry.getKey()), rLoads);
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -402,17 +424,24 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
double localityCost =
|
||||
localityMultiplier * computeDataLocalityCost(initialRegionMapping, clusterState);
|
||||
|
||||
// TODO: Add Read and Write requests back in here after keeping a running average on per
|
||||
// region load metrics.
|
||||
double memstoreSizeCost =
|
||||
memStoreSizeMultiplier
|
||||
* computeRegionLoadCost(clusterState, RegionLoadCostType.MEMSTORE_SIZE);
|
||||
double storefileSizeCost =
|
||||
storeFileSizeMultiplier
|
||||
* computeRegionLoadCost(clusterState, RegionLoadCostType.STOREFILE_SIZE);
|
||||
double total =
|
||||
|
||||
|
||||
double readRequestCost =
|
||||
readRequestMultiplier
|
||||
* computeRegionLoadCost(clusterState, RegionLoadCostType.READ_REQUEST);
|
||||
double writeRequestCost =
|
||||
writeRequestMultiplier
|
||||
* computeRegionLoadCost(clusterState, RegionLoadCostType.WRITE_REQUEST);
|
||||
|
||||
double total =
|
||||
moveCost + regionCountSkewCost + tableSkewCost + localityCost + memstoreSizeCost
|
||||
+ storefileSizeCost;
|
||||
+ storefileSizeCost + readRequestCost + writeRequestCost;
|
||||
LOG.trace("Computed weights for a potential balancing total = " + total + " moveCost = "
|
||||
+ moveCost + " regionCountSkewCost = " + regionCountSkewCost + " tableSkewCost = "
|
||||
+ tableSkewCost + " localityCost = " + localityCost + " memstoreSizeCost = "
|
||||
|
@ -606,7 +635,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
// For each region
|
||||
for (HRegionInfo region : regions) {
|
||||
// Try and get the region using the regionNameAsString
|
||||
RegionLoad rl = loads.get(region.getRegionNameAsString());
|
||||
List<RegionLoad> rl = loads.get(region.getRegionNameAsString());
|
||||
|
||||
// That could have failed if the RegionLoad is using the other regionName
|
||||
if (rl == null) {
|
||||
|
@ -630,24 +659,44 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
/**
|
||||
* Get the un-scaled cost from a RegionLoad
|
||||
*
|
||||
* @param rl the Region load
|
||||
* @param regionLoadList the Region load List
|
||||
* @param type The type of cost to extract
|
||||
* @return the double representing the cost
|
||||
*/
|
||||
private double getRegionLoadCost(RegionLoad rl, RegionLoadCostType type) {
|
||||
switch (type) {
|
||||
case READ_REQUEST:
|
||||
return rl.getReadRequestsCount();
|
||||
case WRITE_REQUEST:
|
||||
return rl.getWriteRequestsCount();
|
||||
case MEMSTORE_SIZE:
|
||||
return rl.getMemStoreSizeMB();
|
||||
case STOREFILE_SIZE:
|
||||
return rl.getStorefileSizeMB();
|
||||
default:
|
||||
assert false : "RegionLoad cost type not supported.";
|
||||
return 0;
|
||||
private double getRegionLoadCost(List<RegionLoad> regionLoadList, RegionLoadCostType type) {
|
||||
double cost = 0;
|
||||
|
||||
int size = regionLoadList.size();
|
||||
for(int i =0; i< size; i++) {
|
||||
RegionLoad rl = regionLoadList.get(i);
|
||||
double toAdd = 0;
|
||||
switch (type) {
|
||||
case READ_REQUEST:
|
||||
toAdd = rl.getReadRequestsCount();
|
||||
break;
|
||||
case WRITE_REQUEST:
|
||||
toAdd = rl.getWriteRequestsCount();
|
||||
break;
|
||||
case MEMSTORE_SIZE:
|
||||
toAdd = rl.getMemStoreSizeMB();
|
||||
break;
|
||||
case STOREFILE_SIZE:
|
||||
toAdd = rl.getStorefileSizeMB();
|
||||
break;
|
||||
default:
|
||||
assert false : "RegionLoad cost type not supported.";
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (cost == 0) {
|
||||
cost = toAdd;
|
||||
} else {
|
||||
cost = (.5 * cost) + (.5 * toAdd);
|
||||
}
|
||||
}
|
||||
|
||||
return cost;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue