HBASE-10620. LoadBalancer.needsBalance() should check for co-located region replicas as well

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1575097 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Devaraj Das 2014-03-06 23:34:44 +00:00 committed by Enis Soztutar
parent b0480db1ae
commit e34dae0c9a
6 changed files with 97 additions and 25 deletions

View File

@ -998,7 +998,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
long cutoffTime = System.currentTimeMillis() + maximumBalanceTime; long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
int rpCount = 0; // number of RegionPlans balanced so far int rpCount = 0; // number of RegionPlans balanced so far
long totalRegPlanExecTime = 0; long totalRegPlanExecTime = 0;
balancerRan = plans != null; balancerRan = plans.size() != 0;
if (plans != null && !plans.isEmpty()) { if (plans != null && !plans.isEmpty()) {
for (RegionPlan plan: plans) { for (RegionPlan plan: plans) {
LOG.info("balance " + plan); LOG.info("balance " + plan);

View File

@ -143,6 +143,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
int numMovedRegions = 0; //num moved regions from the initial configuration int numMovedRegions = 0; //num moved regions from the initial configuration
// num of moved regions away from master that should be on the master // num of moved regions away from master that should be on the master
int numMovedMasterHostedRegions = 0; int numMovedMasterHostedRegions = 0;
int numMovedMetaRegions = 0; //num of moved regions that are META
Map<ServerName, List<HRegionInfo>> clusterState;
protected final RackManager rackManager; protected final RackManager rackManager;
@ -188,6 +190,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
List<List<Integer>> serversPerHostList = new ArrayList<List<Integer>>(); List<List<Integer>> serversPerHostList = new ArrayList<List<Integer>>();
List<List<Integer>> serversPerRackList = new ArrayList<List<Integer>>(); List<List<Integer>> serversPerRackList = new ArrayList<List<Integer>>();
this.clusterState = clusterState;
// Use servername and port as there can be dead servers in this list. We want everything with // Use servername and port as there can be dead servers in this list. We want everything with
// a matching hostname and port to have the same index. // a matching hostname and port to have the same index.
@ -1031,7 +1034,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
return clusterStatus == null ? null : clusterStatus.getBackupMasters(); return clusterStatus == null ? null : clusterStatus.getBackupMasters();
} }
protected boolean needsBalance(ClusterLoadState cs) { protected boolean needsBalance(Cluster c) {
ClusterLoadState cs = new ClusterLoadState(
masterServerName, getBackupMasters(), backupMasterWeight, c.clusterState);
if (cs.getNumServers() < MIN_SERVER_BALANCE) { if (cs.getNumServers() < MIN_SERVER_BALANCE) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Not running balancer because only " + cs.getNumServers() LOG.debug("Not running balancer because only " + cs.getNumServers()
@ -1039,8 +1044,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
} }
return false; return false;
} }
// TODO: check for co-located region replicas as well if(areSomeRegionReplicasColocated(c)) return true;
// Check if we even need to do any load balancing // Check if we even need to do any load balancing
// HBASE-3681 check sloppiness first // HBASE-3681 check sloppiness first
float average = cs.getLoadAverage(); // for logging float average = cs.getLoadAverage(); // for logging
@ -1061,6 +1065,17 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
return true; return true;
} }
/**
* Subclasses should implement this to return true if the cluster has nodes that hosts
* multiple replicas for the same region, or, if there are multiple racks and the same
* rack hosts replicas of the same region
* @param c
* @return
*/
protected boolean areSomeRegionReplicasColocated(Cluster c) {
return false;
}
/** /**
* Generates a bulk assignment plan to be used on cluster startup using a * Generates a bulk assignment plan to be used on cluster startup using a
* simple round-robin assignment. * simple round-robin assignment.

View File

@ -104,21 +104,21 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
* have either floor(average) or ceiling(average) regions. * have either floor(average) or ceiling(average) regions.
* *
* HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that * HBASE-3609 Modeled regionsToMove using Guava's MinMaxPriorityQueue so that
* we can fetch from both ends of the queue. * we can fetch from both ends of the queue.
* At the beginning, we check whether there was empty region server * At the beginning, we check whether there was empty region server
* just discovered by Master. If so, we alternately choose new / old * just discovered by Master. If so, we alternately choose new / old
* regions from head / tail of regionsToMove, respectively. This alternation * regions from head / tail of regionsToMove, respectively. This alternation
* avoids clustering young regions on the newly discovered region server. * avoids clustering young regions on the newly discovered region server.
* Otherwise, we choose new regions from head of regionsToMove. * Otherwise, we choose new regions from head of regionsToMove.
* *
* Another improvement from HBASE-3609 is that we assign regions from * Another improvement from HBASE-3609 is that we assign regions from
* regionsToMove to underloaded servers in round-robin fashion. * regionsToMove to underloaded servers in round-robin fashion.
* Previously one underloaded server would be filled before we move onto * Previously one underloaded server would be filled before we move onto
* the next underloaded server, leading to clustering of young regions. * the next underloaded server, leading to clustering of young regions.
* *
* Finally, we randomly shuffle underloaded servers so that they receive * Finally, we randomly shuffle underloaded servers so that they receive
* offloaded regions relatively evenly across calls to balanceCluster(). * offloaded regions relatively evenly across calls to balanceCluster().
* *
* The algorithm is currently implemented as such: * The algorithm is currently implemented as such:
* *
* <ol> * <ol>
@ -179,6 +179,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
* @return a list of regions to be moved, including source and destination, * @return a list of regions to be moved, including source and destination,
* or null if cluster is already balanced * or null if cluster is already balanced
*/ */
@Override
public List<RegionPlan> balanceCluster( public List<RegionPlan> balanceCluster(
Map<ServerName, List<HRegionInfo>> clusterMap) { Map<ServerName, List<HRegionInfo>> clusterMap) {
List<RegionPlan> regionsToReturn = balanceMasterRegions(clusterMap); List<RegionPlan> regionsToReturn = balanceMasterRegions(clusterMap);
@ -192,9 +193,12 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
Collection<ServerName> backupMasters = getBackupMasters(); Collection<ServerName> backupMasters = getBackupMasters();
ClusterLoadState cs = new ClusterLoadState(masterServerName, ClusterLoadState cs = new ClusterLoadState(masterServerName,
backupMasters, backupMasterWeight, clusterMap); backupMasters, backupMasterWeight, clusterMap);
// construct a Cluster object with clusterMap and rest of the
// argument as defaults
Cluster c = new Cluster(masterServerName, clusterMap, null, this.regionFinder,
getBackupMasters(), tablesOnMaster, this.rackManager);
if (!this.needsBalance(c)) return null;
if (!this.needsBalance(cs)) return null;
int numServers = cs.getNumServers(); int numServers = cs.getNumServers();
NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad(); NavigableMap<ServerAndLoad, List<HRegionInfo>> serversByLoad = cs.getServersByLoad();
int numRegions = cs.getNumRegions(); int numRegions = cs.getNumRegions();
@ -239,7 +243,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
} }
int numToOffload = Math.min((load - max) / w, regions.size()); int numToOffload = Math.min((load - max) / w, regions.size());
// account for the out-of-band regions which were assigned to this server // account for the out-of-band regions which were assigned to this server
// after some other region server crashed // after some other region server crashed
Collections.sort(regions, riComparator); Collections.sort(regions, riComparator);
int numTaken = 0; int numTaken = 0;
for (int i = 0; i <= numToOffload; ) { for (int i = 0; i <= numToOffload; ) {

View File

@ -122,6 +122,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
// when new services are offered // when new services are offered
private LocalityBasedCandidateGenerator localityCandidateGenerator; private LocalityBasedCandidateGenerator localityCandidateGenerator;
private LocalityCostFunction localityCost; private LocalityCostFunction localityCost;
private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
@ -151,13 +153,16 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
new StoreFileCostFunction(conf) new StoreFileCostFunction(conf)
}; };
regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
costFunctions = new CostFunction[]{ costFunctions = new CostFunction[]{
new RegionCountSkewCostFunction(conf, activeMasterWeight, backupMasterWeight), new RegionCountSkewCostFunction(conf, activeMasterWeight, backupMasterWeight),
new MoveCostFunction(conf), new MoveCostFunction(conf),
localityCost, localityCost,
new TableSkewCostFunction(conf), new TableSkewCostFunction(conf),
new RegionReplicaHostCostFunction(conf), regionReplicaHostCostFunction,
new RegionReplicaRackCostFunction(conf), regionReplicaRackCostFunction,
regionLoadFunctions[0], regionLoadFunctions[0],
regionLoadFunctions[1], regionLoadFunctions[1],
regionLoadFunctions[2], regionLoadFunctions[2],
@ -187,6 +192,15 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
@Override
protected boolean areSomeRegionReplicasColocated(Cluster c) {
regionReplicaHostCostFunction.init(c);
if (regionReplicaHostCostFunction.cost() > 0) return true;
regionReplicaRackCostFunction.init(c);
if (regionReplicaRackCostFunction.cost() > 0) return true;
return false;
}
/** /**
* Given the cluster state this will try and approach an optimal balance. This * Given the cluster state this will try and approach an optimal balance. This
* should always approach the optimal state given enough steps. * should always approach the optimal state given enough steps.
@ -198,16 +212,17 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return plans; return plans;
} }
filterExcludedServers(clusterState); filterExcludedServers(clusterState);
if (!needsBalance(new ClusterLoadState(masterServerName, //The clusterState that is given to this method contains the state
getBackupMasters(), backupMasterWeight, clusterState))) { //of all the regions in the table(s) (that's true today)
// Keep track of servers to iterate through them.
Cluster cluster = new Cluster(masterServerName,
clusterState, loads, regionFinder, getBackupMasters(), tablesOnMaster, rackManager);
if (!needsBalance(cluster)) {
return null; return null;
} }
long startTime = EnvironmentEdgeManager.currentTimeMillis(); long startTime = EnvironmentEdgeManager.currentTimeMillis();
// Keep track of servers to iterate through them.
Cluster cluster = new Cluster(masterServerName,
clusterState, loads, regionFinder, getBackupMasters(), tablesOnMaster, rackManager);
initCosts(cluster); initCosts(cluster);
double currentCost = computeCost(cluster, Double.MAX_VALUE); double currentCost = computeCost(cluster, Double.MAX_VALUE);

View File

@ -118,10 +118,14 @@ public class TestRegionRebalancing {
UTIL.getHBaseCluster().getMaster().balance(); UTIL.getHBaseCluster().getMaster().balance();
assertRegionsAreBalanced(); assertRegionsAreBalanced();
// On a balanced cluster, calling balance() should return false
assert(UTIL.getHBaseCluster().getMaster().balance() == false);
// However if we add a server, then the balance() call should return true
// add a region server - total of 3 // add a region server - total of 3
LOG.info("Started third server=" + LOG.info("Started third server=" +
UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName());
UTIL.getHBaseCluster().getMaster().balance(); assert(UTIL.getHBaseCluster().getMaster().balance() == true);
assertRegionsAreBalanced(); assertRegionsAreBalanced();
// kill a region server - total of 2 // kill a region server - total of 2
@ -135,14 +139,14 @@ public class TestRegionRebalancing {
UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName());
LOG.info("Added fourth server=" + LOG.info("Added fourth server=" +
UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName());
UTIL.getHBaseCluster().getMaster().balance(); assert(UTIL.getHBaseCluster().getMaster().balance() == true);
assertRegionsAreBalanced(); assertRegionsAreBalanced();
for (int i = 0; i < 6; i++){ for (int i = 0; i < 6; i++){
LOG.info("Adding " + (i + 5) + "th region server"); LOG.info("Adding " + (i + 5) + "th region server");
UTIL.getHBaseCluster().startRegionServer(); UTIL.getHBaseCluster().startRegionServer();
} }
UTIL.getHBaseCluster().getMaster().balance(); assert(UTIL.getHBaseCluster().getMaster().balance() == true);
assertRegionsAreBalanced(); assertRegionsAreBalanced();
table.close(); table.close();
} }

View File

@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
@ -383,6 +385,34 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
assertTrue(costWith2ReplicasOnTwoServers < costWith3ReplicasSameServer); assertTrue(costWith2ReplicasOnTwoServers < costWith3ReplicasSameServer);
} }
@Test
public void testNeedsBalanceForColocatedReplicas() {
// check for the case where there are two hosts and with one rack, and where
// both the replicas are hosted on the same server
List<HRegionInfo> regions = randomRegions(1);
ServerName s1 = ServerName.valueOf("host1", 1000, 11111);
ServerName s2 = ServerName.valueOf("host11", 1000, 11111);
Map<ServerName, List<HRegionInfo>> map = new HashMap<ServerName, List<HRegionInfo>>();
map.put(s1, regions);
regions.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(0), 1));
// until the step above s1 holds two replicas of a region
regions = randomRegions(1);
map.put(s2, regions);
assertTrue(loadBalancer.needsBalance(new Cluster(master, map, null, null, null, null, null)));
// check for the case where there are two hosts on the same rack and there are two racks
// and both the replicas are on the same rack
map.clear();
regions = randomRegions(1);
List<HRegionInfo> regionsOnS2 = new ArrayList<HRegionInfo>(1);
regionsOnS2.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(0), 1));
map.put(s1, regions);
map.put(s2, regionsOnS2);
// add another server so that the cluster has some host on another rack
map.put(ServerName.valueOf("host2", 1000, 11111), randomRegions(1));
assertTrue(loadBalancer.needsBalance(new Cluster(master, map, null, null, null, null,
new ForTestRackManagerOne())));
}
@Test (timeout = 60000) @Test (timeout = 60000)
public void testSmallCluster() { public void testSmallCluster() {
int numNodes = 10; int numNodes = 10;
@ -547,6 +577,13 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
} }
} }
private static class ForTestRackManagerOne extends RackManager {
@Override
public String getRack(ServerName server) {
return server.getHostname().endsWith("1") ? "rack1" : "rack2";
}
}
@Test (timeout = 120000) @Test (timeout = 120000)
public void testRegionReplicationOnMidClusterWithRacks() { public void testRegionReplicationOnMidClusterWithRacks() {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L); conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L);
@ -650,7 +687,6 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
public MyRackResolver(Configuration conf) {} public MyRackResolver(Configuration conf) {}
@Override
public List<String> resolve(List<String> names) { public List<String> resolve(List<String> names) {
List<String> racks = new ArrayList<String>(names.size()); List<String> racks = new ArrayList<String>(names.size());
for (int i = 0; i < names.size(); i++) { for (int i = 0; i < names.size(); i++) {
@ -659,10 +695,8 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
return racks; return racks;
} }
@Override
public void reloadCachedMappings() {} public void reloadCachedMappings() {}
@Override
public void reloadCachedMappings(List<String> names) { public void reloadCachedMappings(List<String> names) {
} }
} }