HBASE-8119 Optimize StochasticLoadBalancer

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1467109 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Enis Soztutar 2013-04-11 21:59:31 +00:00
parent 48ee32824f
commit e21a1ee4ae
6 changed files with 469 additions and 281 deletions

View File

@ -18,8 +18,17 @@
*/
package org.apache.hadoop.hbase;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -37,16 +46,8 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.io.DataInputBuffer;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* HRegion information.
@ -182,6 +183,7 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
// Current TableName
private byte[] tableName = null;
private String tableNameAsString = null;
/** HRegionInfo for root region */
public static final HRegionInfo ROOT_REGIONINFO =
@ -532,7 +534,10 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
* @return string representation of current table
*/
public String getTableNameAsString() {
return Bytes.toString(tableName);
if (tableNameAsString == null) {
tableNameAsString = Bytes.toString(tableName);
}
return tableNameAsString;
}
/**
@ -684,7 +689,7 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
}
/**
* @deprecated Use protobuf deserialization instead.
* @deprecated Use protobuf deserialization instead.
* @see #parseFrom(byte[])
*/
@Deprecated

View File

@ -1241,10 +1241,10 @@ Server {
int balancerCutoffTime =
getConfiguration().getInt("hbase.balancer.max.balancing", -1);
if (balancerCutoffTime == -1) {
// No time period set so create one -- do half of balancer period.
// No time period set so create one
int balancerPeriod =
getConfiguration().getInt("hbase.balancer.period", 300000);
balancerCutoffTime = balancerPeriod / 2;
balancerCutoffTime = balancerPeriod;
// If nonsense period, set it to balancerPeriod
if (balancerCutoffTime <= 0) balancerCutoffTime = balancerPeriod;
}
@ -1261,7 +1261,6 @@ Server {
if (!this.loadBalancerTracker.isBalancerOn()) return false;
// Do this call outside of synchronized block.
int maximumBalanceTime = getBalancerCutoffTime();
long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
boolean balancerRan;
synchronized (this.balancer) {
// Only allow one balance run at at time.
@ -1296,6 +1295,7 @@ Server {
List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
if (partialPlans != null) plans.addAll(partialPlans);
}
long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
int rpCount = 0; // number of RegionPlans balanced so far
long totalRegPlanExecTime = 0;
balancerRan = plans != null;
@ -1303,12 +1303,14 @@ Server {
for (RegionPlan plan: plans) {
LOG.info("balance " + plan);
long balStartTime = System.currentTimeMillis();
//TODO: bulk assign
this.assignmentManager.balance(plan);
totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
rpCount++;
if (rpCount < plans.size() &&
// if performing next balance exceeds cutoff time, exit the loop
(System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
//TODO: After balance, there should not be a cutoff time (keeping it as a security net for now)
LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
maximumBalanceTime);
break;

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.hbase.master.balancer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
@ -29,10 +31,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.base.Joiner;
import com.google.common.collect.ArrayListMultimap;
@ -46,6 +50,213 @@ import com.google.common.collect.Sets;
*/
public abstract class BaseLoadBalancer implements LoadBalancer {
/**
* An efficient array based implementation similar to ClusterState for keeping
* the status of the cluster in terms of region assignment and distribution.
* To be used by LoadBalancers.
*/
protected static class Cluster {
ServerName[] servers;
ArrayList<byte[]> tables;
HRegionInfo[] regions;
List<RegionLoad>[] regionLoads;
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
int[][] regionsPerServer; //serverIndex -> region list
int[] regionIndexToServerIndex; //regionIndex -> serverIndex
int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state)
int[] regionIndexToTableIndex; //regionIndex -> tableIndex
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS
Map<ServerName, Integer> serversToIndex;
Map<Integer, Integer> tablesToIndex;
int numRegions;
int numServers;
int numTables;
int numMovedRegions = 0; //num moved regions from the initial configuration
int numMovedMetaRegions = 0; //num of moved regions that are META
protected Cluster(Map<ServerName, List<HRegionInfo>> clusterState, Map<String, List<RegionLoad>> loads,
RegionLocationFinder regionFinder) {
serversToIndex = new HashMap<ServerName, Integer>(clusterState.size());
tablesToIndex = new HashMap<Integer, Integer>();
//regionsToIndex = new HashMap<HRegionInfo, Integer>();
//TODO: We should get the list of tables from master
tables = new ArrayList<byte[]>();
numServers = clusterState.size();
numRegions = 0;
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
numRegions += entry.getValue().size();
}
regionsPerServer = new int[clusterState.size()][];
servers = new ServerName[numServers];
regions = new HRegionInfo[numRegions];
regionIndexToServerIndex = new int[numRegions];
initialRegionIndexToServerIndex = new int[numRegions];
regionIndexToTableIndex = new int[numRegions];
regionLoads = new List[numRegions];
regionLocations = new int[numRegions][];
int tableIndex = 0, serverIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
servers[serverIndex] = entry.getKey();
regionsPerServer[serverIndex] = new int[entry.getValue().size()];
serversToIndex.put(servers[serverIndex], Integer.valueOf(serverIndex));
regionPerServerIndex = 0;
for (HRegionInfo region : entry.getValue()) {
byte[] tableName = region.getTableName();
int tableHash = Bytes.mapKey(tableName);
Integer idx = tablesToIndex.get(tableHash);
if (idx == null) {
tables.add(tableName);
idx = tableIndex;
tablesToIndex.put(tableHash, tableIndex++);
}
regions[regionIndex] = region;
regionIndexToServerIndex[regionIndex] = serverIndex;
initialRegionIndexToServerIndex[regionIndex] = serverIndex;
regionIndexToTableIndex[regionIndex] = idx;
regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
//region load
if (loads != null) {
List<RegionLoad> rl = loads.get(region.getRegionNameAsString());
// That could have failed if the RegionLoad is using the other regionName
if (rl == null) {
// Try getting the region load using encoded name.
rl = loads.get(region.getEncodedName());
}
regionLoads[regionIndex] = rl;
}
if (regionFinder != null) {
//region location
List<ServerName> loc = regionFinder.getTopBlockLocations(region);
regionLocations[regionIndex] = new int[loc.size()];
for (int i=0; i < loc.size(); i++) {
regionLocations[regionIndex][i] = serversToIndex.get(loc.get(i));
}
}
regionIndex++;
}
serverIndex++;
}
numTables = tables.size();
numRegionsPerServerPerTable = new int[numServers][numTables];
for (int i = 0; i < numServers; i++) {
for (int j = 0; j < numTables; j++) {
numRegionsPerServerPerTable[i][j] = 0;
}
}
for (int i=0; i < regionIndexToServerIndex.length; i++) {
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
}
numMaxRegionsPerTable = new int[numTables];
for (serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
for (tableIndex = 0 ; tableIndex < numRegionsPerServerPerTable[serverIndex].length; tableIndex++) {
if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
}
}
}
}
public void moveOrSwapRegion(int lServer, int rServer, int lRegion, int rRegion) {
//swap
if (rRegion >= 0 && lRegion >= 0) {
regionMoved(rRegion, rServer, lServer);
regionsPerServer[rServer] = replaceRegion(regionsPerServer[rServer], rRegion, lRegion);
regionMoved(lRegion, lServer, rServer);
regionsPerServer[lServer] = replaceRegion(regionsPerServer[lServer], lRegion, rRegion);
} else if (rRegion >= 0) { //move rRegion
regionMoved(rRegion, rServer, lServer);
regionsPerServer[rServer] = removeRegion(regionsPerServer[rServer], rRegion);
regionsPerServer[lServer] = addRegion(regionsPerServer[lServer], rRegion);
} else if (lRegion >= 0) { //move lRegion
regionMoved(lRegion, lServer, rServer);
regionsPerServer[lServer] = removeRegion(regionsPerServer[lServer], lRegion);
regionsPerServer[rServer] = addRegion(regionsPerServer[rServer], lRegion);
}
}
/** Region moved out of the server */
void regionMoved(int regionIndex, int oldServerIndex, int newServerIndex) {
regionIndexToServerIndex[regionIndex] = newServerIndex;
if (initialRegionIndexToServerIndex[regionIndex] == newServerIndex) {
numMovedRegions--; //region moved back to original location
if (regions[regionIndex].isMetaRegion()) {
numMovedMetaRegions--;
}
} else if (initialRegionIndexToServerIndex[regionIndex] == oldServerIndex) {
numMovedRegions++; //region moved from original location
if (regions[regionIndex].isMetaRegion()) {
numMovedMetaRegions++;
}
}
int tableIndex = regionIndexToTableIndex[regionIndex];
numRegionsPerServerPerTable[oldServerIndex][tableIndex]--;
numRegionsPerServerPerTable[newServerIndex][tableIndex]++;
//check whether this caused maxRegionsPerTable in the new Server to be updated
if (numRegionsPerServerPerTable[newServerIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numRegionsPerServerPerTable[newServerIndex][tableIndex] = numMaxRegionsPerTable[tableIndex];
} else if ((numRegionsPerServerPerTable[oldServerIndex][tableIndex] + 1)
== numMaxRegionsPerTable[tableIndex]) {
//recompute maxRegionsPerTable since the previous value was coming from the old server
for (int serverIndex = 0 ; serverIndex < numRegionsPerServerPerTable.length; serverIndex++) {
if (numRegionsPerServerPerTable[serverIndex][tableIndex] > numMaxRegionsPerTable[tableIndex]) {
numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[serverIndex][tableIndex];
}
}
}
}
int[] removeRegion(int[] regions, int regionIndex) {
//TODO: this maybe costly. Consider using linked lists
int[] newRegions = new int[regions.length - 1];
int i = 0;
for (i = 0; i < regions.length; i++) {
if (regions[i] == regionIndex) {
break;
}
newRegions[i] = regions[i];
}
System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i);
return newRegions;
}
int[] addRegion(int[] regions, int regionIndex) {
int[] newRegions = new int[regions.length + 1];
System.arraycopy(regions, 0, newRegions, 0, regions.length);
newRegions[newRegions.length - 1] = regionIndex;
return newRegions;
}
int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
int i = 0;
for (i = 0; i < regions.length; i++) {
if (regions[i] == regionIndex) {
regions[i] = newRegionIndex;
break;
}
}
return regions;
}
}
// slop for regions
private float slop;
private Configuration config;

View File

@ -17,21 +17,6 @@
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@ -40,6 +25,21 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
* randomly try and mutate the cluster to Cprime. If F(Cprime) < F(C) then the
@ -104,6 +104,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
"hbase.master.balancer.stochastic.stepsPerRegion";
private static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps";
private static final String MAX_MOVES_KEY = "hbase.master.balancer.stochastic.maxMoveRegions";
private static final String MAX_RUNNING_TIME_KEY = "hbase.master.balancer.stochastic.maxRunningTime";
private static final String KEEP_REGION_LOADS = "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
private static final Random RANDOM = new Random(System.currentTimeMillis());
@ -115,10 +116,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
// values are defaults
private int maxSteps = 15000;
private int stepsPerRegion = 110;
private long maxRunningTime = 1 * 60 * 1000; //5 min
private int maxMoves = 600;
private int numRegionLoadsToRemember = 15;
private float loadMultiplier = 55;
private float moveCostMultiplier = 5;
private float loadMultiplier = 100;
private float moveCostMultiplier = 1;
private float tableMultiplier = 5;
private float localityMultiplier = 5;
private float readRequestMultiplier = 0;
@ -135,6 +137,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
maxMoves = conf.getInt(MAX_MOVES_KEY, maxMoves);
stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
@ -183,86 +186,75 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return null;
}
long startTime = System.currentTimeMillis();
long startTime = EnvironmentEdgeManager.currentTimeMillis();
// Keep track of servers to iterate through them.
List<ServerName> servers = new ArrayList<ServerName>(clusterState.keySet());
Map<HRegionInfo, ServerName> initialRegionMapping = createRegionMapping(clusterState);
double currentCost, newCost, initCost;
currentCost = newCost = initCost = computeCost(initialRegionMapping, clusterState);
Cluster cluster = new Cluster(clusterState, loads, regionFinder);
currentCost = newCost = initCost = computeCost(cluster);
int computedMaxSteps =
Math.min(this.maxSteps, (initialRegionMapping.size() * this.stepsPerRegion));
Math.min(this.maxSteps, (cluster.numRegions * this.stepsPerRegion));
// Perform a stochastic walk to see if we can get a good fit.
for (int step = 0; step < computedMaxSteps; step++) {
int step;
for (step = 0; step < computedMaxSteps; step++) {
// try and perform a mutation
for (ServerName leftServer : servers) {
for (int leftServer = 0; leftServer < cluster.numServers; leftServer++) {
// What server are we going to be swapping regions with ?
ServerName rightServer = pickOtherServer(leftServer, servers);
if (rightServer == null) {
int rightServer = pickOtherServer(leftServer, cluster);
if (rightServer < 0) {
continue;
}
// Get the regions.
List<HRegionInfo> leftRegionList = clusterState.get(leftServer);
List<HRegionInfo> rightRegionList = clusterState.get(rightServer);
// Pick what regions to swap around.
// If we get a null for one then this isn't a swap just a move
HRegionInfo lRegion = pickRandomRegion(leftRegionList, 0);
HRegionInfo rRegion = pickRandomRegion(rightRegionList, 0.5);
int lRegion = pickRandomRegion(cluster, leftServer, 0);
int rRegion = pickRandomRegion(cluster, rightServer, 0.5);
// We randomly picked to do nothing.
if (lRegion == null && rRegion == null) {
if (lRegion < 0 && rRegion < 0) {
continue;
}
if (rRegion != null) {
leftRegionList.add(rRegion);
}
if (lRegion != null) {
rightRegionList.add(lRegion);
}
newCost = computeCost(initialRegionMapping, clusterState);
cluster.moveOrSwapRegion(leftServer, rightServer, lRegion, rRegion);
newCost = computeCost(cluster);
// Should this be kept?
if (newCost < currentCost) {
currentCost = newCost;
} else {
// Put things back the way they were before.
if (rRegion != null) {
leftRegionList.remove(rRegion);
rightRegionList.add(rRegion);
}
if (lRegion != null) {
rightRegionList.remove(lRegion);
leftRegionList.add(lRegion);
}
//TODO: undo by remembering old values, using an UndoAction class
cluster.moveOrSwapRegion(leftServer, rightServer, rRegion, lRegion);
}
}
if (EnvironmentEdgeManager.currentTimeMillis() - startTime > maxRunningTime) {
break;
}
}
long endTime = System.currentTimeMillis();
long endTime = EnvironmentEdgeManager.currentTimeMillis();
if (initCost > currentCost) {
List<RegionPlan> plans = createRegionPlans(initialRegionMapping, clusterState);
List<RegionPlan> plans = createRegionPlans(cluster);
LOG.debug("Finished computing new laod balance plan. Computation took "
+ (endTime - startTime) + "ms to try " + computedMaxSteps
+ " different iterations. Found a solution that moves " + plans.size()
+ " regions; Going from a computed cost of " + initCost + " to a new cost of "
+ currentCost);
if (LOG.isDebugEnabled()) {
LOG.debug("Finished computing new laod balance plan. Computation took "
+ (endTime - startTime) + "ms to try " + step
+ " different iterations. Found a solution that moves " + plans.size()
+ " regions; Going from a computed cost of " + initCost + " to a new cost of "
+ currentCost);
}
return plans;
}
LOG.debug("Could not find a better load balance plan. Tried " + computedMaxSteps
+ " different configurations in " + (endTime - startTime)
+ "ms, and did not find anything with a computed cost less than " + initCost);
if (LOG.isDebugEnabled()) {
LOG.debug("Could not find a better load balance plan. Tried " + step
+ " different configurations in " + (endTime - startTime)
+ "ms, and did not find anything with a computed cost less than " + initCost);
}
return null;
}
@ -274,46 +266,27 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* @param clusterState The desired mapping of ServerName to Regions
* @return List of RegionPlan's that represent the moves needed to get to desired final state.
*/
private List<RegionPlan> createRegionPlans(Map<HRegionInfo, ServerName> initialRegionMapping,
Map<ServerName, List<HRegionInfo>> clusterState) {
private List<RegionPlan> createRegionPlans(Cluster cluster) {
List<RegionPlan> plans = new LinkedList<RegionPlan>();
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
ServerName newServer = entry.getKey();
for (HRegionInfo region : entry.getValue()) {
ServerName initialServer = initialRegionMapping.get(region);
if (!newServer.equals(initialServer)) {
for (int regionIndex = 0; regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
if (initialServerIndex != newServerIndex) {
HRegionInfo region = cluster.regions[regionIndex];
ServerName initialServer = cluster.servers[initialServerIndex];
ServerName newServer = cluster.servers[newServerIndex];
if (LOG.isTraceEnabled()) {
LOG.trace("Moving Region " + region.getEncodedName() + " from server "
+ initialServer.getHostname() + " to " + newServer.getHostname());
RegionPlan rp = new RegionPlan(region, initialServer, newServer);
plans.add(rp);
}
RegionPlan rp = new RegionPlan(region, initialServer, newServer);
plans.add(rp);
}
}
return plans;
}
/**
* Create a map that will represent the initial location of regions on a
* {@link ServerName}
*
* @param clusterState starting state of the cluster and regions.
* @return A map of {@link HRegionInfo} to the {@link ServerName} that is
* currently hosting that region
*/
private Map<HRegionInfo, ServerName> createRegionMapping(
Map<ServerName, List<HRegionInfo>> clusterState) {
Map<HRegionInfo, ServerName> mapping = new HashMap<HRegionInfo, ServerName>();
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
for (HRegionInfo region : entry.getValue()) {
mapping.put(region, entry.getKey());
}
}
return mapping;
}
/** Store the current region loads. */
private synchronized void updateRegionLoad() {
@ -358,32 +331,15 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* @return a random {@link HRegionInfo} or null if an asymmetrical move is
* suggested.
*/
private HRegionInfo pickRandomRegion(List<HRegionInfo> regions, double chanceOfNoSwap) {
private int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
//Check to see if this is just a move.
if (regions.isEmpty() || RANDOM.nextFloat() < chanceOfNoSwap) {
if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
//signal a move only.
return null;
return -1;
}
int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
return cluster.regionsPerServer[server][rand];
int count = 0;
HRegionInfo r = null;
//We will try and find a region up to 10 times. If we always
while (count < 10 && r == null ) {
count++;
r = regions.get(RANDOM.nextInt(regions.size()));
// If this is a special region we always try not to move it.
// so clear out r. try again
if (r.isMetaRegion()) {
r = null;
}
}
if (r != null) {
regions.remove(r);
}
return r;
}
/**
@ -394,16 +350,16 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* @param allServers list of all server from which to pick
* @return random server. Null if no other servers were found.
*/
private ServerName pickOtherServer(ServerName server, List<ServerName> allServers) {
ServerName s = null;
int count = 0;
while (count < 100 && (s == null || ServerName.isSameHostnameAndPort(s, server))) {
count++;
s = allServers.get(RANDOM.nextInt(allServers.size()));
private int pickOtherServer(int serverIndex, Cluster cluster) {
if (cluster.numServers < 2) {
return -1;
}
while (true) {
int otherServerIndex = RANDOM.nextInt(cluster.numServers);
if (otherServerIndex != serverIndex) {
return otherServerIndex;
}
}
// If nothing but the current server was found return null.
return (s == null || ServerName.isSameHostnameAndPort(s, server)) ? null : s;
}
/**
@ -414,38 +370,39 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* @param clusterState Map of ServerName to list of regions.
* @return a double of a cost associated with the proposed
*/
protected double computeCost(Map<HRegionInfo, ServerName> initialRegionMapping,
Map<ServerName, List<HRegionInfo>> clusterState) {
protected double computeCost(Cluster cluster) {
double moveCost = moveCostMultiplier * computeMoveCost(initialRegionMapping, clusterState);
double moveCost = moveCostMultiplier * computeMoveCost(cluster);
double regionCountSkewCost = loadMultiplier * computeSkewLoadCost(clusterState);
double tableSkewCost = tableMultiplier * computeTableSkewLoadCost(clusterState);
double regionCountSkewCost = loadMultiplier * computeSkewLoadCost(cluster);
double tableSkewCost = tableMultiplier * computeTableSkewLoadCost(cluster);
double localityCost =
localityMultiplier * computeDataLocalityCost(initialRegionMapping, clusterState);
localityMultiplier * computeDataLocalityCost(cluster);
double memstoreSizeCost =
memStoreSizeMultiplier
* computeRegionLoadCost(clusterState, RegionLoadCostType.MEMSTORE_SIZE);
* computeRegionLoadCost(cluster, RegionLoadCostType.MEMSTORE_SIZE);
double storefileSizeCost =
storeFileSizeMultiplier
* computeRegionLoadCost(clusterState, RegionLoadCostType.STOREFILE_SIZE);
* computeRegionLoadCost(cluster, RegionLoadCostType.STOREFILE_SIZE);
double readRequestCost =
readRequestMultiplier
* computeRegionLoadCost(clusterState, RegionLoadCostType.READ_REQUEST);
* computeRegionLoadCost(cluster, RegionLoadCostType.READ_REQUEST);
double writeRequestCost =
writeRequestMultiplier
* computeRegionLoadCost(clusterState, RegionLoadCostType.WRITE_REQUEST);
* computeRegionLoadCost(cluster, RegionLoadCostType.WRITE_REQUEST);
double total =
double total =
moveCost + regionCountSkewCost + tableSkewCost + localityCost + memstoreSizeCost
+ storefileSizeCost + readRequestCost + writeRequestCost;
LOG.trace("Computed weights for a potential balancing total = " + total + " moveCost = "
+ moveCost + " regionCountSkewCost = " + regionCountSkewCost + " tableSkewCost = "
+ tableSkewCost + " localityCost = " + localityCost + " memstoreSizeCost = "
+ memstoreSizeCost + " storefileSizeCost = " + storefileSizeCost);
if (LOG.isTraceEnabled()) {
LOG.trace("Computed weights for a potential balancing total = " + total + " moveCost = "
+ moveCost + " regionCountSkewCost = " + regionCountSkewCost + " tableSkewCost = "
+ tableSkewCost + " localityCost = " + localityCost + " memstoreSizeCost = "
+ memstoreSizeCost + " storefileSizeCost = " + storefileSizeCost);
}
return total;
}
@ -457,24 +414,21 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* @param clusterState The potential new cluster state.
* @return The cost. Between 0 and 1.
*/
double computeMoveCost(Map<HRegionInfo, ServerName> initialRegionMapping,
Map<ServerName, List<HRegionInfo>> clusterState) {
float moveCost = 0;
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
for (HRegionInfo region : entry.getValue()) {
if (initialRegionMapping.get(region) != entry.getKey()) {
moveCost += 1;
}
}
}
double computeMoveCost(Cluster cluster) {
double moveCost = cluster.numMovedRegions;
//Don't let this single balance move more than the max moves.
//This allows better scaling to accurately represent the actual cost of a move.
if (moveCost > maxMoves) {
return 10000; //return a number much greater than any of the other cost functions
return Double.MAX_VALUE; //return a number much greater than any of the other cost functions
}
return scale(0, Math.min(maxMoves, initialRegionMapping.size()), moveCost);
//META region is special
if (cluster.numMovedMetaRegions > 0) {
maxMoves += 9 * cluster.numMovedMetaRegions; //assume each META region move costs 10 times
}
return scale(0, cluster.numRegions, moveCost);
}
/**
@ -484,11 +438,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* @param clusterState The proposed cluster state
* @return The cost of region load imbalance.
*/
double computeSkewLoadCost(Map<ServerName, List<HRegionInfo>> clusterState) {
double computeSkewLoadCost(Cluster cluster) {
DescriptiveStatistics stats = new DescriptiveStatistics();
for (List<HRegionInfo> regions : clusterState.values()) {
int size = regions.size();
stats.addValue(size);
for (int[] regions : cluster.regionsPerServer) {
stats.addValue(regions.length);
}
return costFromStats(stats);
}
@ -500,68 +453,15 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* @param clusterState Proposed cluster state.
* @return Cost of imbalance in table.
*/
double computeTableSkewLoadCost(Map<ServerName, List<HRegionInfo>> clusterState) {
Map<String, MutableInt> tableRegionsTotal = new HashMap<String, MutableInt>();
Map<String, MutableInt> tableRegionsOnCurrentServer = new HashMap<String, MutableInt>();
Map<String, Integer> tableCostSeenSoFar = new HashMap<String, Integer>();
// Go through everything per server
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
tableRegionsOnCurrentServer.clear();
// For all of the regions count how many are from each table
for (HRegionInfo region : entry.getValue()) {
String tableName = region.getTableNameAsString();
// See if this table already has a count on this server
MutableInt regionsOnServerCount = tableRegionsOnCurrentServer.get(tableName);
// If this is the first time we've seen this table on this server
// create a new mutable int.
if (regionsOnServerCount == null) {
regionsOnServerCount = new MutableInt(0);
tableRegionsOnCurrentServer.put(tableName, regionsOnServerCount);
}
// Increment the count of how many regions from this table are host on
// this server
regionsOnServerCount.increment();
// Now count the number of regions in this table.
MutableInt totalCount = tableRegionsTotal.get(tableName);
// If this is the first region from this table create a new counter for
// this table.
if (totalCount == null) {
totalCount = new MutableInt(0);
tableRegionsTotal.put(tableName, totalCount);
}
totalCount.increment();
}
// Now go through all of the tables we have seen and keep the max number
// of regions of this table a single region server is hosting.
for (Entry<String, MutableInt> currentServerEntry: tableRegionsOnCurrentServer.entrySet()) {
String tableName = currentServerEntry.getKey();
Integer thisCount = currentServerEntry.getValue().toInteger();
Integer maxCountSoFar = tableCostSeenSoFar.get(tableName);
if (maxCountSoFar == null || thisCount.compareTo(maxCountSoFar) > 0) {
tableCostSeenSoFar.put(tableName, thisCount);
}
}
}
double max = 0;
double min = 0;
double computeTableSkewLoadCost(Cluster cluster) {
double max = cluster.numRegions;
double min = cluster.numRegions / cluster.numServers;
double value = 0;
// Compute the min, value, and max.
for (Entry<String, MutableInt> currentEntry : tableRegionsTotal.entrySet()) {
max += tableRegionsTotal.get(currentEntry.getKey()).doubleValue();
min += tableRegionsTotal.get(currentEntry.getKey()).doubleValue() / clusterState.size();
value += tableCostSeenSoFar.get(currentEntry.getKey()).doubleValue();
for (int i = 0 ; i < cluster.numMaxRegionsPerTable.length; i++) {
value += cluster.numMaxRegionsPerTable[i];
}
return scale(min, max, value);
}
@ -574,8 +474,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* @return A cost between 0 and 1. 0 Means all regions are on the sever with
* the most local store files.
*/
double computeDataLocalityCost(Map<HRegionInfo, ServerName> initialRegionMapping,
Map<ServerName, List<HRegionInfo>> clusterState) {
double computeDataLocalityCost(Cluster cluster) {
double max = 0;
double cost = 0;
@ -583,27 +482,29 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
// If there's no master so there's no way anything else works.
if (this.services == null) return cost;
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
ServerName sn = entry.getKey();
for (HRegionInfo region : entry.getValue()) {
for (int i = 0; i < cluster.regionLocations.length; i++) {
max += 1;
int serverIndex = cluster.regionIndexToServerIndex[i];
int[] regionLocations = cluster.regionLocations[i];
max += 1;
// If we can't find where the data is getTopBlock returns null.
// so count that as being the best possible.
if (regionLocations == null) {
continue;
}
List<ServerName> dataOnServers = regionFinder.getTopBlockLocations(region);
// If we can't find where the data is getTopBlock returns null.
// so count that as being the best possible.
if (dataOnServers == null) {
continue;
}
int index = dataOnServers.indexOf(sn);
if (index < 0) {
cost += 1;
} else {
cost += (double) index / (double) dataOnServers.size();
int index = -1;
for (int j = 0; j < regionLocations.length; j++) {
if (regionLocations[j] == serverIndex) {
index = j;
break;
}
}
if (index < 0) {
cost += 1;
} else {
cost += (double) index / (double) regionLocations.length;
}
}
return scale(0, max, cost);
@ -621,31 +522,17 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* @param costType what type of cost to consider
* @return the scaled cost.
*/
private double computeRegionLoadCost(Map<ServerName, List<HRegionInfo>> clusterState,
RegionLoadCostType costType) {
private double computeRegionLoadCost(Cluster cluster, RegionLoadCostType costType) {
if (this.clusterStatus == null || this.loads == null || this.loads.size() == 0) return 0;
DescriptiveStatistics stats = new DescriptiveStatistics();
// For every server look at the cost of each region
for (List<HRegionInfo> regions : clusterState.values()) {
for (List<RegionLoad> rl : cluster.regionLoads) {
long cost = 0; //Cost this server has from RegionLoad
// For each region
for (HRegionInfo region : regions) {
// Try and get the region using the regionNameAsString
List<RegionLoad> rl = loads.get(region.getRegionNameAsString());
// That could have failed if the RegionLoad is using the other regionName
if (rl == null) {
// Try getting the region load using encoded name.
rl = loads.get(region.getEncodedName());
}
// Now if we found a region load get the type of cost that was requested.
if (rl != null) {
cost += getRegionLoadCost(rl, costType);
}
if (rl != null) {
cost += getRegionLoadCost(rl, costType);
}
// Add the total cost to the stats.
@ -713,10 +600,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
//Compute max as if all region servers had 0 and one had the sum of all costs. This must be
// a zero sum cost for this to make sense.
double max = ((stats.getN() - 1) * stats.getMean()) + (stats.getSum() - stats.getMean());
//TODO: Should we make this sum of square errors?
double max = ((stats.getN() - 1) * mean) + (stats.getSum() - mean);
for (double n : stats.getValues()) {
totalCost += Math.abs(mean - n);
double diff = Math.abs(mean - n);
totalCost += diff;
}
return scale(0, max, totalCost);

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
@ -154,12 +153,20 @@ public class BalancerTestBase {
}
protected Map<ServerName, List<HRegionInfo>> mockClusterServers(int[] mockCluster) {
return mockClusterServers(mockCluster, -1);
}
protected BaseLoadBalancer.Cluster mockCluster(int[] mockCluster) {
return new BaseLoadBalancer.Cluster(mockClusterServers(mockCluster, -1), null, null);
}
protected Map<ServerName, List<HRegionInfo>> mockClusterServers(int[] mockCluster, int numTables) {
int numServers = mockCluster.length;
Map<ServerName, List<HRegionInfo>> servers = new TreeMap<ServerName, List<HRegionInfo>>();
for (int i = 0; i < numServers; i++) {
int numRegions = mockCluster[i];
ServerAndLoad sal = randomServer(0);
List<HRegionInfo> regions = randomRegions(numRegions);
List<HRegionInfo> regions = randomRegions(numRegions, numTables);
servers.put(sal.getServerName(), regions);
}
return servers;
@ -168,6 +175,10 @@ public class BalancerTestBase {
private Queue<HRegionInfo> regionQueue = new LinkedList<HRegionInfo>();
protected List<HRegionInfo> randomRegions(int numRegions) {
return randomRegions(numRegions, -1);
}
protected List<HRegionInfo> randomRegions(int numRegions, int numTables) {
List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
byte[] start = new byte[16];
byte[] end = new byte[16];
@ -180,7 +191,8 @@ public class BalancerTestBase {
}
Bytes.putInt(start, 0, numRegions << 1);
Bytes.putInt(end, 0, (numRegions << 1) + 1);
HRegionInfo hri = new HRegionInfo(Bytes.toBytes("table" + i), start, end, false, regionId++);
byte[] tableName = Bytes.toBytes("table" + (numTables > 0 ? rand.nextInt(numTables) : i));
HRegionInfo hri = new HRegionInfo(tableName, start, end, false, regionId++);
regions.add(hri);
}
return regions;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.List;
@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -131,28 +133,29 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
@Test
public void testSkewCost() {
for (int[] mockCluster : clusterStateMocks) {
double cost = loadBalancer.computeSkewLoadCost(mockClusterServers(mockCluster));
double cost = loadBalancer.computeSkewLoadCost(mockCluster(mockCluster));
assertTrue(cost >= 0);
assertTrue(cost <= 1.01);
}
assertEquals(1,
loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 0, 0, 0, 1 })), 0.01);
loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 0, 0, 1 })), 0.01);
assertEquals(.75,
loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 0, 0, 1, 1 })), 0.01);
loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 0, 1, 1 })), 0.01);
assertEquals(.5,
loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 0, 1, 1, 1 })), 0.01);
loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 1, 1, 1 })), 0.01);
assertEquals(.25,
loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 0, 1, 1, 1, 1 })), 0.01);
loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 1, 1, 1, 1 })), 0.01);
assertEquals(0,
loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 1, 1, 1, 1, 1 })), 0.01);
loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 1, 1, 1, 1, 1 })), 0.01);
assertEquals(0,
loadBalancer.computeSkewLoadCost(mockClusterServers(new int[] { 10, 10, 10, 10, 10 })), 0.01);
loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 10, 10, 10, 10, 10 })), 0.01);
}
@Test
public void testTableSkewCost() {
for (int[] mockCluster : clusterStateMocks) {
double cost = loadBalancer.computeTableSkewLoadCost(mockClusterServers(mockCluster));
BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster);
double cost = loadBalancer.computeTableSkewLoadCost(cluster);
assertTrue(cost >= 0);
assertTrue(cost <= 1.01);
}
@ -180,4 +183,71 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
}
assertEquals(0.5, loadBalancer.costFromStats(statThree), 0.01);
}
@Test (timeout = 20000)
public void testSmallCluster() {
int numNodes = 10;
int numRegions = 1000;
int numRegionsPerServer = 40; //all servers except one
int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
}
@Test (timeout = 20000)
public void testSmallCluster2() {
int numNodes = 20;
int numRegions = 2000;
int numRegionsPerServer = 40; //all servers except one
int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
}
@Test (timeout = 40000)
public void testMidCluster() {
int numNodes = 100;
int numRegions = 10000;
int numRegionsPerServer = 60; //all servers except one
int numTables = 40;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
}
@Test (timeout = 1200000)
public void testMidCluster2() {
int numNodes = 200;
int numRegions = 100000;
int numRegionsPerServer = 40; //all servers except one
int numTables = 400;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
}
@Test
@Ignore
//TODO: This still does not finish, making the LoadBalancer unusable at this scale. We should solve this.
//There are two reasons so far;
// - It takes too long for iterating for all servers
// - Moving one region out of the loaded server only costs a slight decrease in the cost of regionCountSkewCost
// but also a slight increase on the moveCost. loadMultiplier / moveCostMultiplier is not high enough to bring down
// the total cost, so that the eager selection cannot continue. This can be solved by smt like
// http://en.wikipedia.org/wiki/Simulated_annealing instead of random walk with eager selection
public void testLargeCluster() {
int numNodes = 1000;
int numRegions = 100000; //100 regions per RS
int numRegionsPerServer = 80; //all servers except one
int numTables = 100;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
}
protected void testWithCluster(int numNodes, int numRegions, int numRegionsPerServer, int numTables) {
//construct a cluster of numNodes, having a total of numRegions. Each RS will hold
//numRegionsPerServer many regions except for the last one, which will host all the
//remaining regions
int[] cluster = new int[numNodes];
for (int i =0; i < numNodes; i++) {
cluster[i] = numRegionsPerServer;
}
cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer);
assertNotNull(loadBalancer.balanceCluster(mockClusterServers(cluster, numTables)));
}
}