HBASE-8517 Stochastic Loadbalancer isn't finding steady state on real clusters

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1486258 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
eclark 2013-05-25 00:22:59 +00:00
parent c19b7ba9d9
commit 4ff4038b79
5 changed files with 895 additions and 403 deletions

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.master.balancer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -58,7 +60,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
*/
protected static class Cluster {
ServerName[] servers;
ArrayList<byte[]> tables;
ArrayList<String> tables;
HRegionInfo[] regions;
List<RegionLoad>[] regionLoads;
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
@ -70,8 +72,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS
Map<ServerName, Integer> serversToIndex;
Map<Integer, Integer> tablesToIndex;
Integer[] serverIndicesSortedByRegionCount;
Map<String, Integer> serversToIndex;
Map<String, Integer> tablesToIndex;
int numRegions;
int numServers;
@ -82,21 +86,35 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
protected Cluster(Map<ServerName, List<HRegionInfo>> clusterState, Map<String, List<RegionLoad>> loads,
RegionLocationFinder regionFinder) {
serversToIndex = new HashMap<ServerName, Integer>(clusterState.size());
tablesToIndex = new HashMap<Integer, Integer>();
serversToIndex = new HashMap<String, Integer>();
tablesToIndex = new HashMap<String, Integer>();
//regionsToIndex = new HashMap<HRegionInfo, Integer>();
//TODO: We should get the list of tables from master
tables = new ArrayList<byte[]>();
tables = new ArrayList<String>();
numServers = clusterState.size();
numRegions = 0;
int serverIndex = 0;
// Use servername and port as there can be dead servers in this list. We want everything with
// a matching hostname and port to have the same index.
for (ServerName sn:clusterState.keySet()) {
if (serversToIndex.get(sn.getHostAndPort()) == null) {
serversToIndex.put(sn.getHostAndPort(), serverIndex++);
}
}
// Count how many regions there are.
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
numRegions += entry.getValue().size();
}
regionsPerServer = new int[clusterState.size()][];
numServers = serversToIndex.size();
regionsPerServer = new int[serversToIndex.size()][];
servers = new ServerName[numServers];
regions = new HRegionInfo[numRegions];
regionIndexToServerIndex = new int[numRegions];
@ -104,26 +122,35 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
regionIndexToTableIndex = new int[numRegions];
regionLoads = new List[numRegions];
regionLocations = new int[numRegions][];
serverIndicesSortedByRegionCount = new Integer[numServers];
int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
int tableIndex = 0, serverIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
// populate serversToIndex first
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
servers[serverIndex] = entry.getKey();
serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
// keep the servername if this is the first server name for this hostname
// or this servername has the newest startcode.
if (servers[serverIndex] == null ||
servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
servers[serverIndex] = entry.getKey();
}
regionsPerServer[serverIndex] = new int[entry.getValue().size()];
serversToIndex.put(servers[serverIndex], Integer.valueOf(serverIndex));
serverIndex++;
serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
}
serverIndex = 0;
for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
regionPerServerIndex = 0;
for (HRegionInfo region : entry.getValue()) {
byte[] tableName = region.getTableName();
int tableHash = Bytes.mapKey(tableName);
Integer idx = tablesToIndex.get(tableHash);
String tableName = region.getTableNameAsString();
Integer idx = tablesToIndex.get(tableName);
if (idx == null) {
tables.add(tableName);
idx = tableIndex;
tablesToIndex.put(tableHash, tableIndex++);
tablesToIndex.put(tableName, tableIndex++);
}
regions[regionIndex] = region;
@ -132,7 +159,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
regionIndexToTableIndex[regionIndex] = idx;
regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
//region load
// region load
if (loads != null) {
List<RegionLoad> rl = loads.get(region.getRegionNameAsString());
// That could have failed if the RegionLoad is using the other regionName
@ -156,7 +183,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
regionIndex++;
}
serverIndex++;
}
numTables = tables.size();
@ -263,6 +289,53 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
return regions;
}
void sortServersByRegionCount() {
Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
}
int getNumRegions(int server) {
return regionsPerServer[server].length;
}
private Comparator<Integer> numRegionsComparator = new Comparator<Integer>() {
@Override
public int compare(Integer integer, Integer integer2) {
return Integer.valueOf(getNumRegions(integer)).compareTo(getNumRegions(integer2));
}
};
@Override
public String toString() {
String desc = "Cluster{" +
"servers=[";
for(ServerName sn:servers) {
desc += sn.getHostAndPort() + ", ";
}
desc +=
", serverIndicesSortedByRegionCount="+
Arrays.toString(serverIndicesSortedByRegionCount) +
", regionsPerServer=[";
for (int[]r:regionsPerServer) {
desc += Arrays.toString(r);
}
desc += "]" +
", numMaxRegionsPerTable=" +
Arrays.toString(numMaxRegionsPerTable) +
", numRegions=" +
numRegions +
", numServers=" +
numServers +
", numTables=" +
numTables +
", numMovedRegions=" +
numMovedRegions +
", numMovedMetaRegions=" +
numMovedMetaRegions +
'}';
return desc;
}
}
// slop for regions
@ -270,7 +343,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
private Configuration config;
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class);
protected MasterServices services;
@Override

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.util.Bytes;
*/
public class BalancerTestBase {
private static Random rand = new Random();
protected static Random rand = new Random();
static int regionId = 0;
/**
@ -125,7 +125,9 @@ public class BalancerTestBase {
* @param plans
* @return
*/
protected List<ServerAndLoad> reconcile(List<ServerAndLoad> list, List<RegionPlan> plans) {
protected List<ServerAndLoad> reconcile(List<ServerAndLoad> list,
List<RegionPlan> plans,
Map<ServerName, List<HRegionInfo>> servers) {
List<ServerAndLoad> result = new ArrayList<ServerAndLoad>(list.size());
if (plans == null) return result;
Map<ServerName, ServerAndLoad> map = new HashMap<ServerName, ServerAndLoad>(list.size());
@ -134,9 +136,13 @@ public class BalancerTestBase {
}
for (RegionPlan plan : plans) {
ServerName source = plan.getSource();
updateLoad(map, source, -1);
ServerName destination = plan.getDestination();
updateLoad(map, destination, +1);
servers.get(source).remove(plan.getRegionInfo());
servers.get(destination).add(plan.getRegionInfo());
}
result.clear();
result.addAll(map.values());

View File

@ -116,7 +116,7 @@ public class TestDefaultLoadBalancer extends BalancerTestBase {
List<ServerAndLoad> list = convertToList(servers);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
List<ServerAndLoad> balancedCluster = reconcile(list, plans);
List<ServerAndLoad> balancedCluster = reconcile(list, plans, servers);
LOG.info("Mock Balance : " + printMock(balancedCluster));
assertClusterAsBalanced(balancedCluster);
for (Map.Entry<ServerName, List<HRegionInfo>> entry : servers.entrySet()) {

View File

@ -19,10 +19,14 @@ package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,7 +38,6 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -46,6 +49,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
@BeforeClass
public static void beforeAllTests() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f);
loadBalancer = new StochasticLoadBalancer();
loadBalancer.setConf(conf);
}
@ -101,6 +105,10 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 10},
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 123},
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 155},
new int[]{10, 7, 12, 8, 11, 10, 9, 14},
new int[]{13, 14, 6, 10, 10, 10, 8, 10},
new int[]{130, 14, 60, 10, 100, 10, 80, 10},
new int[]{130, 140, 60, 100, 100, 100, 80, 100}
};
/**
@ -119,9 +127,11 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
List<ServerAndLoad> list = convertToList(servers);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
List<ServerAndLoad> balancedCluster = reconcile(list, plans);
List<ServerAndLoad> balancedCluster = reconcile(list, plans, servers);
LOG.info("Mock Balance : " + printMock(balancedCluster));
assertClusterAsBalanced(balancedCluster);
List<RegionPlan> secondPlans = loadBalancer.balanceCluster(servers);
assertNull(secondPlans);
for (Map.Entry<ServerName, List<HRegionInfo>> entry : servers.entrySet()) {
returnRegions(entry.getValue());
returnServer(entry.getKey());
@ -132,56 +142,96 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
@Test
public void testSkewCost() {
Configuration conf = HBaseConfiguration.create();
StochasticLoadBalancer.CostFunction
costFunction = new StochasticLoadBalancer.RegionCountSkewCostFunction(conf);
for (int[] mockCluster : clusterStateMocks) {
double cost = loadBalancer.computeSkewLoadCost(mockCluster(mockCluster));
double cost = costFunction.cost(mockCluster(mockCluster));
assertTrue(cost >= 0);
assertTrue(cost <= 1.01);
}
assertEquals(1,
loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 0, 0, 1 })), 0.01);
costFunction.cost(mockCluster(new int[]{0, 0, 0, 0, 1})), 0.01);
assertEquals(.75,
loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 0, 1, 1 })), 0.01);
costFunction.cost(mockCluster(new int[]{0, 0, 0, 1, 1})), 0.01);
assertEquals(.5,
loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 1, 1, 1 })), 0.01);
costFunction.cost(mockCluster(new int[]{0, 0, 1, 1, 1})), 0.01);
assertEquals(.25,
loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 1, 1, 1, 1 })), 0.01);
costFunction.cost(mockCluster(new int[]{0, 1, 1, 1, 1})), 0.01);
assertEquals(0,
loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 1, 1, 1, 1, 1 })), 0.01);
costFunction.cost(mockCluster(new int[]{1, 1, 1, 1, 1})), 0.01);
assertEquals(0,
loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 10, 10, 10, 10, 10 })), 0.01);
costFunction.cost(mockCluster(new int[]{10, 10, 10, 10, 10})), 0.01);
}
@Test
public void testTableSkewCost() {
Configuration conf = HBaseConfiguration.create();
StochasticLoadBalancer.CostFunction
costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf);
for (int[] mockCluster : clusterStateMocks) {
BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster);
double cost = loadBalancer.computeTableSkewLoadCost(cluster);
double cost = costFunction.cost(cluster);
assertTrue(cost >= 0);
assertTrue(cost <= 1.01);
}
}
@Test
public void testCostFromStats() {
DescriptiveStatistics statOne = new DescriptiveStatistics();
for (int i =0; i < 100; i++) {
statOne.addValue(10);
}
assertEquals(0, loadBalancer.costFromStats(statOne), 0.01);
public void testCostFromArray() {
Configuration conf = HBaseConfiguration.create();
StochasticLoadBalancer.CostFromRegionLoadFunction
costFunction = new StochasticLoadBalancer.MemstoreSizeCostFunction(conf);
DescriptiveStatistics statTwo = new DescriptiveStatistics();
double[] statOne = new double[100];
for (int i =0; i < 100; i++) {
statTwo.addValue(0);
statOne[i] = 10;
}
statTwo.addValue(100);
assertEquals(1, loadBalancer.costFromStats(statTwo), 0.01);
assertEquals(0, costFunction.costFromArray(statOne), 0.01);
DescriptiveStatistics statThree = new DescriptiveStatistics();
double[] statTwo= new double[101];
for (int i =0; i < 100; i++) {
statThree.addValue(0);
statThree.addValue(100);
statTwo[i] = 0;
}
assertEquals(0.5, loadBalancer.costFromStats(statThree), 0.01);
statTwo[100] = 100;
assertEquals(1, costFunction.costFromArray(statTwo), 0.01);
double[] statThree = new double[200];
for (int i =0; i < 100; i++) {
statThree[i] = (0);
statThree[i+100] = 100;
}
assertEquals(0.5, costFunction.costFromArray(statThree), 0.01);
}
@Test(timeout = 30000)
public void testLosingRs() throws Exception {
int numNodes = 3;
int numRegions = 20;
int numRegionsPerServer = 3; //all servers except one
int numTables = 2;
Map<ServerName, List<HRegionInfo>> serverMap =
createServerMap(numNodes, numRegions, numRegionsPerServer, numTables);
List<ServerAndLoad> list = convertToList(serverMap);
List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap);
assertNotNull(plans);
// Apply the plan to the mock cluster.
List<ServerAndLoad> balancedCluster = reconcile(list, plans, serverMap);
assertClusterAsBalanced(balancedCluster);
ServerName sn = serverMap.keySet().toArray(new ServerName[serverMap.size()])[0];
ServerName deadSn = new ServerName(sn.getHostname(), sn.getPort(), sn.getStartcode() -100);
serverMap.put(deadSn, new ArrayList<HRegionInfo>(0));
plans = loadBalancer.balanceCluster(serverMap);
assertNull(plans);
}
@Test (timeout = 20000)
@ -190,7 +240,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
int numRegions = 1000;
int numRegionsPerServer = 40; //all servers except one
int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true);
}
@Test (timeout = 20000)
@ -199,45 +249,92 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
int numRegions = 2000;
int numRegionsPerServer = 40; //all servers except one
int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true);
}
@Test (timeout = 40000)
@Test (timeout = 20000)
public void testSmallCluster3() {
int numNodes = 20;
int numRegions = 2000;
int numRegionsPerServer = 1; // all servers except one
int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, false /* max moves */);
}
@Test (timeout = 800000)
public void testMidCluster() {
int numNodes = 100;
int numRegions = 10000;
int numRegionsPerServer = 60; //all servers except one
int numRegionsPerServer = 60; // all servers except one
int numTables = 40;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true);
}
@Test (timeout = 1200000)
@Test (timeout = 800000)
public void testMidCluster2() {
int numNodes = 200;
int numRegions = 100000;
int numRegionsPerServer = 40; //all servers except one
int numRegionsPerServer = 40; // all servers except one
int numTables = 400;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
testWithCluster(numNodes,
numRegions,
numRegionsPerServer,
numTables,
false /* num large num regions means may not always get to best balance with one run */);
}
@Test (timeout = 800000)
public void testMidCluster3() {
int numNodes = 100;
int numRegions = 2000;
int numRegionsPerServer = 9; // all servers except one
int numTables = 110;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true);
// TODO(eclark): Make sure that the tables are well distributed.
}
@Test
@Ignore
//TODO: This still does not finish, making the LoadBalancer unusable at this scale. We should solve this.
//There are two reasons so far;
// - It takes too long for iterating for all servers
// - Moving one region out of the loaded server only costs a slight decrease in the cost of regionCountSkewCost
// but also a slight increase on the moveCost. loadMultiplier / moveCostMultiplier is not high enough to bring down
// the total cost, so that the eager selection cannot continue. This can be solved by smt like
// http://en.wikipedia.org/wiki/Simulated_annealing instead of random walk with eager selection
public void testLargeCluster() {
int numNodes = 1000;
int numRegions = 100000; //100 regions per RS
int numRegionsPerServer = 80; //all servers except one
int numTables = 100;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true);
}
protected void testWithCluster(int numNodes, int numRegions, int numRegionsPerServer, int numTables) {
protected void testWithCluster(int numNodes,
int numRegions,
int numRegionsPerServer,
int numTables,
boolean assertFullyBalanced) {
Map<ServerName, List<HRegionInfo>> serverMap =
createServerMap(numNodes, numRegions, numRegionsPerServer, numTables);
List<ServerAndLoad> list = convertToList(serverMap);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
// Run the balancer.
List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap);
assertNotNull(plans);
// Check to see that this actually got to a stable place.
if (assertFullyBalanced) {
// Apply the plan to the mock cluster.
List<ServerAndLoad> balancedCluster = reconcile(list, plans, serverMap);
// Print out the cluster loads to make debugging easier.
LOG.info("Mock Balance : " + printMock(balancedCluster));
assertClusterAsBalanced(balancedCluster);
List<RegionPlan> secondPlans = loadBalancer.balanceCluster(serverMap);
assertNull(secondPlans);
}
}
private Map<ServerName, List<HRegionInfo>> createServerMap(int numNodes,
int numRegions,
int numRegionsPerServer,
int numTables) {
//construct a cluster of numNodes, having a total of numRegions. Each RS will hold
//numRegionsPerServer many regions except for the last one, which will host all the
//remaining regions
@ -246,8 +343,6 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
cluster[i] = numRegionsPerServer;
}
cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer);
assertNotNull(loadBalancer.balanceCluster(mockClusterServers(cluster, numTables)));
return mockClusterServers(cluster, numTables);
}
}