HBASE-10351 LoadBalancer changes for supporting region replicas

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1572298 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Enis Soztutar 2014-02-26 22:16:32 +00:00
parent 87b2b923e2
commit a98f52953a
12 changed files with 1673 additions and 332 deletions

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -3916,7 +3917,8 @@ public class AssignmentManager extends ZooKeeperListener {
return this.balancer; return this.balancer;
} }
public Map<ServerName, List<HRegionInfo>> getSnapShotOfAssignment(List<HRegionInfo> infos) { public Map<ServerName, List<HRegionInfo>>
getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
return getRegionStates().getRegionAssignments(infos); return getRegionStates().getRegionAssignments(infos);
} }
} }

View File

@ -41,6 +41,9 @@ public class RackManager {
private DNSToSwitchMapping switchMapping; private DNSToSwitchMapping switchMapping;
public RackManager() {
}
public RackManager(Configuration conf) { public RackManager(Configuration conf) {
switchMapping = ReflectionUtils.instantiateWithCustomCtor( switchMapping = ReflectionUtils.instantiateWithCustomCtor(
conf.getClass("hbase.util.ip.to.rack.determiner", ScriptBasedMapping.class, conf.getClass("hbase.util.ip.to.rack.determiner", ScriptBasedMapping.class,

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -153,7 +154,8 @@ public class RegionStates {
* @param regions * @param regions
* @return a pair containing the groupings as a map * @return a pair containing the groupings as a map
*/ */
synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignments(List<HRegionInfo> regions) { synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignments(
Collection<HRegionInfo> regions) {
Map<ServerName, List<HRegionInfo>> map = new HashMap<ServerName, List<HRegionInfo>>(); Map<ServerName, List<HRegionInfo>> map = new HashMap<ServerName, List<HRegionInfo>>();
for (HRegionInfo region : regions) { for (HRegionInfo region : regions) {
HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(region); HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(region);
@ -900,6 +902,19 @@ public class RegionStates {
return getRegionState(hri.getEncodedName()); return getRegionState(hri.getEncodedName());
} }
/**
* Returns a clone of region assignments per server
* @return a Map of ServerName to a List of HRegionInfo's
*/
protected synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignmentsByServer() {
Map<ServerName, List<HRegionInfo>> regionsByServer =
new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
regionsByServer.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
}
return regionsByServer;
}
protected synchronized RegionState getRegionState(final String encodedName) { protected synchronized RegionState getRegionState(final String encodedName) {
return regionStates.get(encodedName); return regionStates.get(encodedName);
} }

View File

@ -62,6 +62,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
super.setConf(conf);
globalFavoredNodesAssignmentPlan = new FavoredNodesPlan(); globalFavoredNodesAssignmentPlan = new FavoredNodesPlan();
this.rackManager = new RackManager(conf); this.rackManager = new RackManager(conf);
super.setConf(conf); super.setConf(conf);
@ -80,7 +81,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
LOG.warn("Not running balancer since exception was thrown " + ie); LOG.warn("Not running balancer since exception was thrown " + ie);
return plans; return plans;
} }
globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan(); globalFavoredNodesAssignmentPlan = snaphotOfRegionAssignment.getExistingAssignmentPlan();
Map<ServerName, ServerName> serverNameToServerNameWithoutCode = Map<ServerName, ServerName> serverNameToServerNameWithoutCode =
new HashMap<ServerName, ServerName>(); new HashMap<ServerName, ServerName>();
Map<ServerName, ServerName> serverNameWithoutCodeToServerName = Map<ServerName, ServerName> serverNameWithoutCodeToServerName =
@ -133,7 +134,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2)); destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
} }
} }
if (destination != null) { if (destination != null) {
RegionPlan plan = new RegionPlan(region, currentServer, destination); RegionPlan plan = new RegionPlan(region, currentServer, destination);
plans.add(plan); plans.add(plan);
@ -159,7 +160,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
// one of the favored node is still alive. In this case, try to adhere // one of the favored node is still alive. In this case, try to adhere
// to the current favored nodes assignment as much as possible - i.e., // to the current favored nodes assignment as much as possible - i.e.,
// if the current primary is gone, then make the secondary or tertiary // if the current primary is gone, then make the secondary or tertiary
// as the new host for the region (based on their current load). // as the new host for the region (based on their current load).
// Note that we don't change the favored // Note that we don't change the favored
// node assignments here (even though one or more favored node is currently // node assignments here (even though one or more favored node is currently
// down). It is up to the balanceCluster to do this hard work. The HDFS // down). It is up to the balanceCluster to do this hard work. The HDFS
@ -222,7 +223,7 @@ public class FavoredNodeLoadBalancer extends BaseLoadBalancer {
} }
} }
private Pair<Map<ServerName, List<HRegionInfo>>, List<HRegionInfo>> private Pair<Map<ServerName, List<HRegionInfo>>, List<HRegionInfo>>
segregateRegionsAndAssignRegionsWithFavoredNodes(List<HRegionInfo> regions, segregateRegionsAndAssignRegionsWithFavoredNodes(List<HRegionInfo> regions,
List<ServerName> availableServers) { List<ServerName> availableServers) {
Map<ServerName, List<HRegionInfo>> assignmentMapForFavoredNodes = Map<ServerName, List<HRegionInfo>> assignmentMapForFavoredNodes =

View File

@ -147,7 +147,7 @@ class RegionLocationFinder {
protected HTableDescriptor getTableDescriptor(TableName tableName) throws IOException { protected HTableDescriptor getTableDescriptor(TableName tableName) throws IOException {
HTableDescriptor tableDescriptor = null; HTableDescriptor tableDescriptor = null;
try { try {
if (this.services != null) { if (this.services != null && this.services.getTableDescriptors() != null) {
tableDescriptor = this.services.getTableDescriptors().get(tableName); tableDescriptor = this.services.getTableDescriptors().get(tableName);
} }
} catch (FileNotFoundException fnfe) { } catch (FileNotFoundException fnfe) {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.balancer; package org.apache.hadoop.hbase.master.balancer;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Deque; import java.util.Deque;
import java.util.HashMap; import java.util.HashMap;
@ -37,11 +38,16 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
/** /**
* <p>This is a best effort load balancer. Given a Cost function F(C) => x It will * <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
@ -89,19 +95,18 @@ import org.apache.hadoop.hbase.util.Pair;
@InterfaceAudience.Private @InterfaceAudience.Private
public class StochasticLoadBalancer extends BaseLoadBalancer { public class StochasticLoadBalancer extends BaseLoadBalancer {
private static final String STEPS_PER_REGION_KEY = protected static final String STEPS_PER_REGION_KEY =
"hbase.master.balancer.stochastic.stepsPerRegion"; "hbase.master.balancer.stochastic.stepsPerRegion";
private static final String MAX_STEPS_KEY = protected static final String MAX_STEPS_KEY =
"hbase.master.balancer.stochastic.maxSteps"; "hbase.master.balancer.stochastic.maxSteps";
private static final String MAX_RUNNING_TIME_KEY = protected static final String MAX_RUNNING_TIME_KEY =
"hbase.master.balancer.stochastic.maxRunningTime"; "hbase.master.balancer.stochastic.maxRunningTime";
private static final String KEEP_REGION_LOADS = protected static final String KEEP_REGION_LOADS =
"hbase.master.balancer.stochastic.numRegionLoadsToRemember"; "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class); private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
private final RegionLocationFinder regionFinder = new RegionLocationFinder();
Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>(); Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>();
// values are defaults // values are defaults
@ -110,20 +115,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private long maxRunningTime = 30 * 1000 * 1; // 30 seconds. private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
private int numRegionLoadsToRemember = 15; private int numRegionLoadsToRemember = 15;
private RegionPicker[] pickers; private CandidateGenerator[] candidateGenerators;
private CostFromRegionLoadFunction[] regionLoadFunctions; private CostFromRegionLoadFunction[] regionLoadFunctions;
private CostFunction[] costFunctions; private CostFunction[] costFunctions;
// Keep locality based picker and cost function to alert them // Keep locality based picker and cost function to alert them
// when new services are offered // when new services are offered
private LocalityBasedPicker localityPicker; private LocalityBasedCandidateGenerator localityCandidateGenerator;
private LocalityCostFunction localityCost; private LocalityCostFunction localityCost;
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
super.setConf(conf); super.setConf(conf);
regionFinder.setConf(conf);
maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps); maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion); stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
@ -131,13 +134,14 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember); numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
localityPicker = new LocalityBasedPicker(services); localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
localityCost = new LocalityCostFunction(conf, services); localityCost = new LocalityCostFunction(conf, services);
pickers = new RegionPicker[] { candidateGenerators = new CandidateGenerator[] {
new RandomRegionPicker(), new RandomCandidateGenerator(),
new LoadPicker(), new LoadCandidateGenerator(),
localityPicker localityCandidateGenerator,
new RegionReplicaCandidateGenerator(),
}; };
regionLoadFunctions = new CostFromRegionLoadFunction[] { regionLoadFunctions = new CostFromRegionLoadFunction[] {
@ -152,6 +156,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
new MoveCostFunction(conf), new MoveCostFunction(conf),
localityCost, localityCost,
new TableSkewCostFunction(conf), new TableSkewCostFunction(conf),
new RegionReplicaHostCostFunction(conf),
new RegionReplicaRackCostFunction(conf),
regionLoadFunctions[0], regionLoadFunctions[0],
regionLoadFunctions[1], regionLoadFunctions[1],
regionLoadFunctions[2], regionLoadFunctions[2],
@ -167,7 +173,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
@Override @Override
public void setClusterStatus(ClusterStatus st) { public void setClusterStatus(ClusterStatus st) {
super.setClusterStatus(st); super.setClusterStatus(st);
regionFinder.setClusterStatus(st);
updateRegionLoad(); updateRegionLoad();
for(CostFromRegionLoadFunction cost : regionLoadFunctions) { for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
cost.setClusterStatus(st); cost.setClusterStatus(st);
@ -177,9 +182,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
@Override @Override
public void setMasterServices(MasterServices masterServices) { public void setMasterServices(MasterServices masterServices) {
super.setMasterServices(masterServices); super.setMasterServices(masterServices);
this.regionFinder.setServices(masterServices);
this.localityCost.setServices(masterServices); this.localityCost.setServices(masterServices);
this.localityPicker.setServices(masterServices); this.localityCandidateGenerator.setServices(masterServices);
} }
@ -202,8 +206,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
long startTime = EnvironmentEdgeManager.currentTimeMillis(); long startTime = EnvironmentEdgeManager.currentTimeMillis();
// Keep track of servers to iterate through them. // Keep track of servers to iterate through them.
Cluster cluster = new Cluster(masterServerName, clusterState, Cluster cluster = new Cluster(masterServerName,
loads, regionFinder, getBackupMasters(), tablesOnMaster); clusterState, loads, regionFinder, getBackupMasters(), tablesOnMaster, rackManager);
initCosts(cluster);
double currentCost = computeCost(cluster, Double.MAX_VALUE); double currentCost = computeCost(cluster, Double.MAX_VALUE);
double initCost = currentCost; double initCost = currentCost;
@ -213,42 +219,30 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers)); ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
// Perform a stochastic walk to see if we can get a good fit. // Perform a stochastic walk to see if we can get a good fit.
long step; long step;
for (step = 0; step < computedMaxSteps; step++) { for (step = 0; step < computedMaxSteps; step++) {
int pickerIdx = RANDOM.nextInt(pickers.length); int generatorIdx = RANDOM.nextInt(candidateGenerators.length);
RegionPicker p = pickers[pickerIdx]; CandidateGenerator p = candidateGenerators[generatorIdx];
Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> picks = p.pick(cluster); Cluster.Action action = p.generate(cluster);
int leftServer = picks.getFirst().getFirst(); if (action.type == Type.NULL) {
int leftRegion = picks.getFirst().getSecond();
int rightServer = picks.getSecond().getFirst();
int rightRegion = picks.getSecond().getSecond();
// We couldn't find a server
if (rightServer < 0 || leftServer < 0) {
continue; continue;
} }
// We randomly picked to do nothing. cluster.doAction(action);
if (leftRegion < 0 && rightRegion < 0) { updateCostsWithAction(cluster, action);
continue;
}
cluster.moveOrSwapRegion(leftServer,
rightServer,
leftRegion,
rightRegion);
newCost = computeCost(cluster, currentCost); newCost = computeCost(cluster, currentCost);
// Should this be kept? // Should this be kept?
if (newCost < currentCost) { if (newCost < currentCost) {
currentCost = newCost; currentCost = newCost;
} else { } else {
// Put things back the way they were before. // Put things back the way they were before.
// TODO: undo by remembering old values, using an UndoAction class // TODO: undo by remembering old values
cluster.moveOrSwapRegion(leftServer, Action undoAction = action.undoAction();
rightServer, cluster.doAction(undoAction);
rightRegion, updateCostsWithAction(cluster, undoAction);
leftRegion);
} }
if (EnvironmentEdgeManager.currentTimeMillis() - startTime > if (EnvironmentEdgeManager.currentTimeMillis() - startTime >
@ -343,6 +337,17 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
} }
protected void initCosts(Cluster cluster) {
for (CostFunction c:costFunctions) {
c.init(cluster);
}
}
protected void updateCostsWithAction(Cluster cluster, Action action) {
for (CostFunction c : costFunctions) {
c.postAction(action);
}
}
/** /**
* This is the main cost function. It will compute a cost associated with a proposed cluster * This is the main cost function. It will compute a cost associated with a proposed cluster
@ -361,7 +366,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
continue; continue;
} }
total += c.getMultiplier() * c.cost(cluster); total += c.getMultiplier() * c.cost();
if (total > previousCost) { if (total > previousCost) {
return total; return total;
@ -370,8 +375,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return total; return total;
} }
abstract static class RegionPicker { /** Generates a candidate action to be applied to the cluster for cost function search */
abstract Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster); abstract static class CandidateGenerator {
abstract Cluster.Action generate(Cluster cluster);
/** /**
* From a list of regions pick a random one. Null can be returned which * From a list of regions pick a random one. Null can be returned which
@ -402,6 +408,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return RANDOM.nextInt(cluster.numServers); return RANDOM.nextInt(cluster.numServers);
} }
protected int pickOtherRandomServer(Cluster cluster, int serverIndex) { protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
if (cluster.numServers < 2) { if (cluster.numServers < 2) {
return -1; return -1;
@ -414,11 +421,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
} }
protected Pair<Integer, Integer> pickRandomRegions(Cluster cluster, protected Cluster.Action pickRandomRegions(Cluster cluster,
int thisServer, int thisServer,
int otherServer) { int otherServer) {
if (thisServer < 0 || otherServer < 0) { if (thisServer < 0 || otherServer < 0) {
return new Pair<Integer, Integer>(-1, -1); return Cluster.NullAction;
} }
// Decide who is most likely to need another region // Decide who is most likely to need another region
@ -432,45 +439,50 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
int thisRegion = pickRandomRegion(cluster, thisServer, thisChance); int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
int otherRegion = pickRandomRegion(cluster, otherServer, otherChance); int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
return new Pair<Integer, Integer>(thisRegion, otherRegion); return getAction(thisServer, thisRegion, otherServer, otherRegion);
}
protected Cluster.Action getAction (int fromServer, int fromRegion,
int toServer, int toRegion) {
if (fromServer < 0 || toServer < 0) {
return Cluster.NullAction;
}
if (fromRegion > 0 && toRegion > 0) {
return new Cluster.SwapRegionsAction(fromServer, fromRegion,
toServer, toRegion);
} else if (fromRegion > 0) {
return new Cluster.MoveRegionAction(fromRegion, fromServer, toServer);
} else if (toRegion > 0) {
return new Cluster.MoveRegionAction(toRegion, toServer, fromServer);
} else {
return Cluster.NullAction;
}
} }
} }
static class RandomRegionPicker extends RegionPicker { static class RandomCandidateGenerator extends CandidateGenerator {
@Override @Override
Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) { Cluster.Action generate(Cluster cluster) {
int thisServer = pickRandomServer(cluster); int thisServer = pickRandomServer(cluster);
// Pick the other server // Pick the other server
int otherServer = pickOtherRandomServer(cluster, thisServer); int otherServer = pickOtherRandomServer(cluster, thisServer);
Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer); return pickRandomRegions(cluster, thisServer, otherServer);
return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
new Pair<Integer, Integer>(thisServer, regions.getFirst()),
new Pair<Integer, Integer>(otherServer, regions.getSecond())
);
} }
} }
public static class LoadPicker extends RegionPicker { public static class LoadCandidateGenerator extends CandidateGenerator {
@Override @Override
Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) { Cluster.Action generate(Cluster cluster) {
cluster.sortServersByRegionCount(); cluster.sortServersByRegionCount();
int thisServer = pickMostLoadedServer(cluster, -1); int thisServer = pickMostLoadedServer(cluster, -1);
int otherServer = pickLeastLoadedServer(cluster, thisServer); int otherServer = pickLeastLoadedServer(cluster, thisServer);
Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer); return pickRandomRegions(cluster, thisServer, otherServer);
return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
new Pair<Integer, Integer>(thisServer, regions.getFirst()),
new Pair<Integer, Integer>(otherServer, regions.getSecond())
);
} }
private int pickLeastLoadedServer(final Cluster cluster, int thisServer) { private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
@ -500,21 +512,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
} }
static class LocalityBasedPicker extends RegionPicker { static class LocalityBasedCandidateGenerator extends CandidateGenerator {
private MasterServices masterServices; private MasterServices masterServices;
LocalityBasedPicker(MasterServices masterServices) { LocalityBasedCandidateGenerator(MasterServices masterServices) {
this.masterServices = masterServices; this.masterServices = masterServices;
} }
@Override @Override
Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) { Cluster.Action generate(Cluster cluster) {
if (this.masterServices == null) { if (this.masterServices == null) {
return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>( return Cluster.NullAction;
new Pair<Integer, Integer>(-1,-1),
new Pair<Integer, Integer>(-1,-1)
);
} }
// Pick a random region server // Pick a random region server
int thisServer = pickRandomServer(cluster); int thisServer = pickRandomServer(cluster);
@ -523,10 +532,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f); int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f);
if (thisRegion == -1) { if (thisRegion == -1) {
return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>( return Cluster.NullAction;
new Pair<Integer, Integer>(-1,-1),
new Pair<Integer, Integer>(-1,-1)
);
} }
// Pick the server with the highest locality // Pick the server with the highest locality
@ -535,10 +541,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
// pick an region on the other server to potentially swap // pick an region on the other server to potentially swap
int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f); int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f);
return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>( return getAction(thisServer, thisRegion, otherServer, otherRegion);
new Pair<Integer, Integer>(thisServer,thisRegion),
new Pair<Integer, Integer>(otherServer,otherRegion)
);
} }
private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) { private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) {
@ -563,6 +566,79 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
} }
/**
* Generates candidates which moves the replicas out of the region server for
* co-hosted region replicas
*/
public static class RegionReplicaCandidateGenerator extends CandidateGenerator {
RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
@Override
Cluster.Action generate(Cluster cluster) {
int serverIndex = pickRandomServer(cluster);
if (cluster.numServers <= 1 || serverIndex == -1) {
return Cluster.NullAction;
}
// randomly select one primaryIndex out of all region replicas in the same server
// we don't know how many region replicas are co-hosted, we will randomly select one
// using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
int currentPrimary = -1;
int currentPrimaryIndex = -1;
int primaryIndex = -1;
double currentLargestRandom = -1;
// regionsByPrimaryPerServer is a sorted array. Since it contains the primary region
// ids for the regions hosted in server, a consecutive repetition means that replicas
// are co-hosted
for (int j = 0; j <= cluster.primariesOfRegionsPerServer[serverIndex].length; j++) {
int primary = j < cluster.primariesOfRegionsPerServer[serverIndex].length
? cluster.primariesOfRegionsPerServer[serverIndex][j] : -1;
if (primary != currentPrimary) { // check for whether we see a new primary
int numReplicas = j - currentPrimaryIndex;
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
// decide to select this primary region id or not
double currentRandom = RANDOM.nextDouble();
if (currentRandom > currentLargestRandom) {
primaryIndex = currentPrimary; // select this primary
currentLargestRandom = currentRandom;
}
}
currentPrimary = primary;
currentPrimaryIndex = j;
}
}
// if there are no pairs of region replicas co-hosted, default to random generator
if (primaryIndex == -1) {
// default to randompicker
return randomGenerator.generate(cluster);
}
// we have found the primary id for the region to move. Now find the actual regionIndex
// with the given primary, prefer to move the secondary region.
int regionIndex = -1;
for (int k = 0; k < cluster.regionsPerServer[serverIndex].length; k++) {
int region = cluster.regionsPerServer[serverIndex][k];
if (primaryIndex == cluster.regionIndexToPrimaryIndex[region]) {
// always move the secondary, not the primary
if (!RegionReplicaUtil.isDefaultReplica(cluster.regions[region])) {
regionIndex = region;
break;
}
}
}
int toServerIndex = pickOtherRandomServer(cluster, serverIndex);
int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
return getAction (serverIndex, regionIndex, toServerIndex, toRegionIndex);
}
}
/** /**
* Base class of StochasticLoadBalancer's Cost Functions. * Base class of StochasticLoadBalancer's Cost Functions.
*/ */
@ -570,6 +646,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private float multiplier = 0; private float multiplier = 0;
protected Cluster cluster;
CostFunction(Configuration c) { CostFunction(Configuration c) {
} }
@ -582,7 +660,42 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
this.multiplier = m; this.multiplier = m;
} }
abstract double cost(Cluster cluster); /** Called once per LB invocation to give the cost function
* to initialize it's state, and perform any costly calculation.
*/
void init(Cluster cluster) {
this.cluster = cluster;
}
/** Called once per cluster Action to give the cost function
* an opportunity to update it's state. postAction() is always
* called at least once before cost() is called with the cluster
* that this action is performed on. */
void postAction(Action action) {
switch (action.type) {
case NULL: break;
case ASSIGN_REGION:
AssignRegionAction ar = (AssignRegionAction) action;
regionMoved(ar.region, -1, ar.server);
break;
case MOVE_REGION:
MoveRegionAction mra = (MoveRegionAction) action;
regionMoved(mra.region, mra.fromServer, mra.toServer);
break;
case SWAP_REGIONS:
SwapRegionsAction a = (SwapRegionsAction) action;
regionMoved(a.fromRegion, a.fromServer, a.toServer);
regionMoved(a.toRegion, a.toServer, a.fromServer);
break;
default:
throw new RuntimeException("Uknown action:" + action.type);
}
}
protected void regionMoved(int region, int oldServer, int newServer) {
}
abstract double cost();
/** /**
* Function to compute a scaled cost using {@link DescriptiveStatistics}. It * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
@ -611,8 +724,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return scaled; return scaled;
} }
private double getSum(double[] stats) { private double getSum(double[] stats) {
double total = 0; double total = 0;
for(double s:stats) { for(double s:stats) {
@ -663,7 +774,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
@Override @Override
double cost(Cluster cluster) { double cost() {
// Try and size the max number of Moves, but always be prepared to move some. // Try and size the max number of Moves, but always be prepared to move some.
int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent), int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
DEFAULT_MAX_MOVES); DEFAULT_MAX_MOVES);
@ -705,7 +816,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
@Override @Override
double cost(Cluster cluster) { double cost() {
if (stats == null || stats.length != cluster.numServers) { if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers]; stats = new double[cluster.numServers];
} }
@ -740,7 +851,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
@Override @Override
double cost(Cluster cluster) { double cost() {
double max = cluster.numRegions; double max = cluster.numRegions;
double min = ((double) cluster.numRegions) / cluster.numServers; double min = ((double) cluster.numRegions) / cluster.numServers;
double value = 0; double value = 0;
@ -775,7 +886,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
@Override @Override
double cost(Cluster cluster) { double cost() {
double max = 0; double max = 0;
double cost = 0; double cost = 0;
@ -834,9 +945,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
this.loads = l; this.loads = l;
} }
@Override @Override
double cost(Cluster cluster) { double cost() {
if (clusterStatus == null || loads == null) { if (clusterStatus == null || loads == null) {
return 0; return 0;
} }
@ -930,6 +1040,165 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
} }
/**
* A cost function for region replicas. We give a very high cost to hosting
* replicas of the same region in the same host. We do not prevent the case
* though, since if numReplicas > numRegionServers, we still want to keep the
* replica open.
*/
public static class RegionReplicaHostCostFunction extends CostFunction {
private static final String REGION_REPLICA_HOST_COST_KEY =
"hbase.master.balancer.stochastic.regionReplicaHostCostKey";
private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
long maxCost = 0;
long[] costsPerGroup; // group is either server, host or rack
int[][] primariesOfRegionsPerGroup;
public RegionReplicaHostCostFunction(Configuration conf) {
super(conf);
this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
DEFAULT_REGION_REPLICA_HOST_COST_KEY));
}
@Override
void init(Cluster cluster) {
super.init(cluster);
// max cost is the case where every region replica is hosted together regardless of host
maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
costsPerGroup = new long[cluster.numHosts];
primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
? cluster.primariesOfRegionsPerHost
: cluster.primariesOfRegionsPerServer;
for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
}
}
long getMaxCost(Cluster cluster) {
if (!cluster.hasRegionReplicas) {
return 0; // short circuit
}
// max cost is the case where every region replica is hosted together regardless of host
int[] primariesOfRegions = new int[cluster.numRegions];
for (int i = 0; i < cluster.regions.length; i++) {
// assume all regions are hosted by only one server
int primaryIndex = cluster.regionIndexToPrimaryIndex[i];
primariesOfRegions[i] = primaryIndex;
}
Arrays.sort(primariesOfRegions);
// compute numReplicas from the sorted array
return costPerGroup(primariesOfRegions);
}
@Override
double cost() {
if (maxCost <= 0) {
return 0;
}
long totalCost = 0;
for (int i = 0 ; i < costsPerGroup.length; i++) {
totalCost += costsPerGroup[i];
}
return scale(0, maxCost, totalCost);
}
/**
* For each primary region, it computes the total number of replicas in the array (numReplicas)
* and returns a sum of numReplicas-1 squared. For example, if the server hosts
* regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
* returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
* @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
* @return a sum of numReplicas-1 squared for each primary region in the group.
*/
protected long costPerGroup(int[] primariesOfRegions) {
long cost = 0;
int currentPrimary = -1;
int currentPrimaryIndex = -1;
// primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
// sharing the same primary will have consecutive numbers in the array.
for (int j = 0 ; j <= primariesOfRegions.length; j++) {
int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
if (primary != currentPrimary) { // we see a new primary
int numReplicas = j - currentPrimaryIndex;
// square the cost
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
cost += (numReplicas - 1) * (numReplicas - 1);
}
currentPrimary = primary;
currentPrimaryIndex = j;
}
}
return cost;
}
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
if (maxCost <= 0) {
return; // no need to compute
}
if (cluster.multiServersPerHost) {
int oldHost = cluster.serverIndexToHostIndex[oldServer];
int newHost = cluster.serverIndexToHostIndex[newServer];
if (newHost != oldHost) {
costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
}
} else {
costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
}
}
}
/**
* A cost function for region replicas for the rack distribution. We give a relatively high
* cost to hosting replicas of the same region in the same rack. We do not prevent the case
* though.
*/
public static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
private static final String REGION_REPLICA_RACK_COST_KEY =
"hbase.master.balancer.stochastic.regionReplicaRackCostKey";
private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
public RegionReplicaRackCostFunction(Configuration conf) {
super(conf);
this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY));
}
@Override
void init(Cluster cluster) {
this.cluster = cluster;
if (cluster.numRacks <= 1) {
maxCost = 0;
return; // disabled for 1 rack
}
// max cost is the case where every region replica is hosted together regardless of rack
maxCost = getMaxCost(cluster);
costsPerGroup = new long[cluster.numRacks];
for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
}
}
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
if (maxCost <= 0) {
return; // no need to compute
}
int oldRack = cluster.serverIndexToRackIndex[oldServer];
int newRack = cluster.serverIndexToRackIndex[newServer];
if (newRack != oldRack) {
costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
}
}
}
/** /**
* Compute the cost of total memstore size. The more unbalanced the higher the * Compute the cost of total memstore size. The more unbalanced the higher the
* computed cost will be. This uses a rolling average of regionload. * computed cost will be. This uses a rolling average of regionload.

View File

@ -36,8 +36,10 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.CatalogTracker;
@ -123,60 +125,60 @@ public class TestMasterOperationsForRegionReplicas {
assert (state != null); assert (state != null);
} }
} }
// TODO: HBASE-10351 should uncomment the following tests (since the tests assume region placements are handled)
// List<Result> metaRows = MetaReader.fullScan(ct); List<Result> metaRows = MetaReader.fullScan(ct);
// int numRows = 0; int numRows = 0;
// for (Result result : metaRows) { for (Result result : metaRows) {
// RegionLocations locations = MetaReader.getRegionLocations(result); RegionLocations locations = MetaReader.getRegionLocations(result);
// HRegionInfo hri = locations.getRegionLocation().getRegionInfo(); HRegionInfo hri = locations.getRegionLocation().getRegionInfo();
// if (!hri.getTable().equals(table)) continue; if (!hri.getTable().equals(table)) continue;
// numRows += 1; numRows += 1;
// HRegionLocation[] servers = locations.getRegionLocations(); HRegionLocation[] servers = locations.getRegionLocations();
// // have two locations for the replicas of a region, and the locations should be different // have two locations for the replicas of a region, and the locations should be different
// assert(servers.length == 2); assert(servers.length == 2);
// assert(!servers[0].equals(servers[1])); assert(!servers[0].equals(servers[1]));
// } }
// assert(numRows == numRegions); assert(numRows == numRegions);
//
// // The same verification of the meta as above but with the SnapshotOfRegionAssignmentFromMeta // The same verification of the meta as above but with the SnapshotOfRegionAssignmentFromMeta
// // class // class
// validateFromSnapshotFromMeta(table, numRegions, numReplica, ct); validateFromSnapshotFromMeta(TEST_UTIL, table, numRegions, numReplica, ct);
//
// // Now kill the master, restart it and see if the assignments are kept // Now kill the master, restart it and see if the assignments are kept
// ServerName master = TEST_UTIL.getHBaseClusterInterface().getClusterStatus().getMaster(); ServerName master = TEST_UTIL.getHBaseClusterInterface().getClusterStatus().getMaster();
// TEST_UTIL.getHBaseClusterInterface().stopMaster(master); TEST_UTIL.getHBaseClusterInterface().stopMaster(master);
// TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(master, 30000); TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(master, 30000);
// TEST_UTIL.getHBaseClusterInterface().startMaster(master.getHostname()); TEST_UTIL.getHBaseClusterInterface().startMaster(master.getHostname());
// TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster(); TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster();
// for (int i = 0; i < numRegions; i++) { for (int i = 0; i < numRegions; i++) {
// for (int j = 0; j < numReplica; j++) { for (int j = 0; j < numReplica; j++) {
// HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j); HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j);
// RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
// .getRegionStates().getRegionState(replica); .getRegionStates().getRegionState(replica);
// assert (state != null); assert (state != null);
// } }
// } }
// validateFromSnapshotFromMeta(table, numRegions, numReplica, ct); validateFromSnapshotFromMeta(TEST_UTIL, table, numRegions, numReplica, ct);
//
// // Now shut the whole cluster down, and verify the assignments are kept so that the // Now shut the whole cluster down, and verify the assignments are kept so that the
// // availability constraints are met. // availability constraints are met.
// TEST_UTIL.getConfiguration().setBoolean("hbase.master.startup.retainassign", true); TEST_UTIL.getConfiguration().setBoolean("hbase.master.startup.retainassign", true);
// TEST_UTIL.shutdownMiniHBaseCluster(); TEST_UTIL.shutdownMiniHBaseCluster();
// TEST_UTIL.startMiniHBaseCluster(1, numSlaves); TEST_UTIL.startMiniHBaseCluster(1, numSlaves);
// TEST_UTIL.waitTableEnabled(table.getName()); TEST_UTIL.waitTableEnabled(table.getName());
// ct = new CatalogTracker(TEST_UTIL.getConfiguration()); ct = new CatalogTracker(TEST_UTIL.getConfiguration());
// validateFromSnapshotFromMeta(table, numRegions, numReplica, ct); validateFromSnapshotFromMeta(TEST_UTIL, table, numRegions, numReplica, ct);
//
// // Now shut the whole cluster down, and verify regions are assigned even if there is only // Now shut the whole cluster down, and verify regions are assigned even if there is only
// // one server running // one server running
// TEST_UTIL.shutdownMiniHBaseCluster(); TEST_UTIL.shutdownMiniHBaseCluster();
// TEST_UTIL.startMiniHBaseCluster(1, 1); TEST_UTIL.startMiniHBaseCluster(1, 1);
// TEST_UTIL.waitTableEnabled(table.getName()); TEST_UTIL.waitTableEnabled(table.getName());
// ct = new CatalogTracker(TEST_UTIL.getConfiguration()); ct = new CatalogTracker(TEST_UTIL.getConfiguration());
// validateSingleRegionServerAssignment(ct, numRegions, numReplica); validateSingleRegionServerAssignment(ct, numRegions, numReplica);
// for (int i = 1; i < numSlaves; i++) { //restore the cluster for (int i = 1; i < numSlaves; i++) { //restore the cluster
// TEST_UTIL.getMiniHBaseCluster().startRegionServer(); TEST_UTIL.getMiniHBaseCluster().startRegionServer();
// } }
//check on alter table //check on alter table
admin.disableTable(table); admin.disableTable(table);
@ -288,7 +290,7 @@ public class TestMasterOperationsForRegionReplicas {
assert(count.get() == numRegions); assert(count.get() == numRegions);
} }
private void validateFromSnapshotFromMeta(TableName table, int numRegions, private void validateFromSnapshotFromMeta(HBaseTestingUtility util, TableName table, int numRegions,
int numReplica, CatalogTracker ct) throws IOException { int numReplica, CatalogTracker ct) throws IOException {
SnapshotOfRegionAssignmentFromMeta snapshot = new SnapshotOfRegionAssignmentFromMeta(ct); SnapshotOfRegionAssignmentFromMeta snapshot = new SnapshotOfRegionAssignmentFromMeta(ct);
snapshot.initialize(); snapshot.initialize();
@ -296,6 +298,9 @@ public class TestMasterOperationsForRegionReplicas {
assert(regionToServerMap.size() == numRegions * numReplica + 1); //'1' for the namespace assert(regionToServerMap.size() == numRegions * numReplica + 1); //'1' for the namespace
Map<ServerName, List<HRegionInfo>> serverToRegionMap = snapshot.getRegionServerToRegionMap(); Map<ServerName, List<HRegionInfo>> serverToRegionMap = snapshot.getRegionServerToRegionMap();
for (Map.Entry<ServerName, List<HRegionInfo>> entry : serverToRegionMap.entrySet()) { for (Map.Entry<ServerName, List<HRegionInfo>> entry : serverToRegionMap.entrySet()) {
if (entry.getKey().equals(util.getHBaseCluster().getMaster().getServerName())) {
continue;
}
List<HRegionInfo> regions = entry.getValue(); List<HRegionInfo> regions = entry.getValue();
Set<byte[]> setOfStartKeys = new HashSet<byte[]>(); Set<byte[]> setOfStartKeys = new HashSet<byte[]>();
for (HRegionInfo region : regions) { for (HRegionInfo region : regions) {
@ -307,7 +312,7 @@ public class TestMasterOperationsForRegionReplicas {
} }
// the number of startkeys will be equal to the number of regions hosted in each server // the number of startkeys will be equal to the number of regions hosted in each server
// (each server will be hosting one replica of a region) // (each server will be hosting one replica of a region)
assertEquals(setOfStartKeys.size() , numRegions); assertEquals(numRegions, setOfStartKeys.size());
} }
} }
@ -316,9 +321,14 @@ public class TestMasterOperationsForRegionReplicas {
SnapshotOfRegionAssignmentFromMeta snapshot = new SnapshotOfRegionAssignmentFromMeta(ct); SnapshotOfRegionAssignmentFromMeta snapshot = new SnapshotOfRegionAssignmentFromMeta(ct);
snapshot.initialize(); snapshot.initialize();
Map<HRegionInfo, ServerName> regionToServerMap = snapshot.getRegionToRegionServerMap(); Map<HRegionInfo, ServerName> regionToServerMap = snapshot.getRegionToRegionServerMap();
assert(regionToServerMap.size() == numRegions * numReplica + 1); //'1' for the namespace assertEquals(regionToServerMap.size(), numRegions * numReplica + 1); //'1' for the namespace
Map<ServerName, List<HRegionInfo>> serverToRegionMap = snapshot.getRegionServerToRegionMap(); Map<ServerName, List<HRegionInfo>> serverToRegionMap = snapshot.getRegionServerToRegionMap();
assert(serverToRegionMap.keySet().size() == 1); assertEquals(serverToRegionMap.keySet().size(), 2); // 1 rs + 1 master
assert(serverToRegionMap.values().iterator().next().size() == numRegions * numReplica + 1); for (Map.Entry<ServerName, List<HRegionInfo>> entry : serverToRegionMap.entrySet()) {
if (entry.getKey().equals(TEST_UTIL.getHBaseCluster().getMaster().getServerName())) {
continue;
}
assertEquals(entry.getValue().size(), numRegions * numReplica);
}
} }
} }

View File

@ -21,20 +21,26 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue; import java.util.Queue;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
/** /**
* Class used to be the base of unit tests on load balancers. It gives helper * Class used to be the base of unit tests on load balancers. It gives helper
@ -80,6 +86,50 @@ public class BalancerTestBase {
} }
} }
/**
* Checks whether region replicas are not hosted on the same host.
*/
public void assertRegionReplicaPlacement(Map<ServerName, List<HRegionInfo>> serverMap, RackManager rackManager) {
TreeMap<String, Set<HRegionInfo>> regionsPerHost = new TreeMap<String, Set<HRegionInfo>>();
TreeMap<String, Set<HRegionInfo>> regionsPerRack = new TreeMap<String, Set<HRegionInfo>>();
for (Entry<ServerName, List<HRegionInfo>> entry : serverMap.entrySet()) {
String hostname = entry.getKey().getHostname();
Set<HRegionInfo> infos = regionsPerHost.get(hostname);
if (infos == null) {
infos = new HashSet<HRegionInfo>();
regionsPerHost.put(hostname, infos);
}
for (HRegionInfo info : entry.getValue()) {
HRegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
if (!infos.add(primaryInfo)) {
Assert.fail("Two or more region replicas are hosted on the same host after balance");
}
}
}
if (rackManager == null) {
return;
}
for (Entry<ServerName, List<HRegionInfo>> entry : serverMap.entrySet()) {
String rack = rackManager.getRack(entry.getKey());
Set<HRegionInfo> infos = regionsPerRack.get(rack);
if (infos == null) {
infos = new HashSet<HRegionInfo>();
regionsPerRack.put(rack, infos);
}
for (HRegionInfo info : entry.getValue()) {
HRegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
if (!infos.add(primaryInfo)) {
Assert.fail("Two or more region replicas are hosted on the same rack after balance");
}
}
}
}
protected String printStats(List<ServerAndLoad> servers) { protected String printStats(List<ServerAndLoad> servers) {
int numServers = servers.size(); int numServers = servers.size();
int totalRegions = 0; int totalRegions = 0;
@ -159,18 +209,18 @@ public class BalancerTestBase {
map.put(sn, sal); map.put(sn, sal);
} }
protected Map<ServerName, List<HRegionInfo>> mockClusterServers(int[] mockCluster) { protected TreeMap<ServerName, List<HRegionInfo>> mockClusterServers(int[] mockCluster) {
return mockClusterServers(mockCluster, -1); return mockClusterServers(mockCluster, -1);
} }
protected BaseLoadBalancer.Cluster mockCluster(int[] mockCluster) { protected BaseLoadBalancer.Cluster mockCluster(int[] mockCluster) {
return new BaseLoadBalancer.Cluster(null, return new BaseLoadBalancer.Cluster(null,
mockClusterServers(mockCluster, -1), null, null, null, null); mockClusterServers(mockCluster, -1), null, null, null, null, null);
} }
protected Map<ServerName, List<HRegionInfo>> mockClusterServers(int[] mockCluster, int numTables) { protected TreeMap<ServerName, List<HRegionInfo>> mockClusterServers(int[] mockCluster, int numTables) {
int numServers = mockCluster.length; int numServers = mockCluster.length;
Map<ServerName, List<HRegionInfo>> servers = new TreeMap<ServerName, List<HRegionInfo>>(); TreeMap<ServerName, List<HRegionInfo>> servers = new TreeMap<ServerName, List<HRegionInfo>>();
for (int i = 0; i < numServers; i++) { for (int i = 0; i < numServers; i++) {
int numRegions = mockCluster[i]; int numRegions = mockCluster[i];
ServerAndLoad sal = randomServer(0); ServerAndLoad sal = randomServer(0);
@ -218,7 +268,7 @@ public class BalancerTestBase {
ServerName sn = this.serverQueue.poll(); ServerName sn = this.serverQueue.poll();
return new ServerAndLoad(sn, numRegionsPerServer); return new ServerAndLoad(sn, numRegionsPerServer);
} }
String host = "srv" + rand.nextInt(100000); String host = "srv" + rand.nextInt(Integer.MAX_VALUE);
int port = rand.nextInt(60000); int port = rand.nextInt(60000);
long startCode = rand.nextLong(); long startCode = rand.nextLong();
ServerName sn = ServerName.valueOf(host, port, startCode); ServerName sn = ServerName.valueOf(host, port, startCode);

View File

@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -46,6 +47,10 @@ import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -57,8 +62,11 @@ import com.google.common.collect.Lists;
public class TestBaseLoadBalancer extends BalancerTestBase { public class TestBaseLoadBalancer extends BalancerTestBase {
private static LoadBalancer loadBalancer; private static LoadBalancer loadBalancer;
private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class); private static final Log LOG = LogFactory.getLog(TestBaseLoadBalancer.class);
private static final ServerName master = ServerName.valueOf("fake-master", 0, 1L); private static final ServerName master = ServerName.valueOf("fake-master", 0, 1L);
private static RackManager rackManager;
private static final int NUM_SERVERS = 15;
private static ServerName[] servers = new ServerName[NUM_SERVERS];
int[][] regionsAndServersMocks = new int[][] { int[][] regionsAndServersMocks = new int[][] {
// { num regions, num servers } // { num regions, num servers }
@ -75,6 +83,21 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
MasterServices st = Mockito.mock(MasterServices.class); MasterServices st = Mockito.mock(MasterServices.class);
Mockito.when(st.getServerName()).thenReturn(master); Mockito.when(st.getServerName()).thenReturn(master);
loadBalancer.setMasterServices(st); loadBalancer.setMasterServices(st);
// Set up the rack topologies (5 machines per rack)
rackManager = Mockito.mock(RackManager.class);
for (int i = 0; i < NUM_SERVERS; i++) {
servers[i] = ServerName.valueOf("foo"+i+":1234",-1);
if (i < 5) {
Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack1");
}
if (i >= 5 && i < 10) {
Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack2");
}
if (i >= 10) {
Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack3");
}
}
} }
public static class MockBalancer extends BaseLoadBalancer { public static class MockBalancer extends BaseLoadBalancer {
@ -214,6 +237,138 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
assertRetainedAssignment(existing, listOfServerNames, assignment); assertRetainedAssignment(existing, listOfServerNames, assignment);
} }
@Test
public void testRegionAvailability() throws Exception {
// Create a cluster with a few servers, assign them to specific racks
// then assign some regions. The tests should check whether moving a
// replica from one node to a specific other node or rack lowers the
// availability of the region or not
List<HRegionInfo> list0 = new ArrayList<HRegionInfo>();
List<HRegionInfo> list1 = new ArrayList<HRegionInfo>();
List<HRegionInfo> list2 = new ArrayList<HRegionInfo>();
// create a region (region1)
HRegionInfo hri1 = new HRegionInfo(
TableName.valueOf("table"), "key1".getBytes(), "key2".getBytes(),
false, 100);
// create a replica of the region (replica_of_region1)
HRegionInfo hri2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1);
// create a second region (region2)
HRegionInfo hri3 = new HRegionInfo(
TableName.valueOf("table"), "key2".getBytes(), "key3".getBytes(),
false, 101);
list0.add(hri1); //only region1
list1.add(hri2); //only replica_of_region1
list2.add(hri3); //only region2
Map<ServerName, List<HRegionInfo>> clusterState =
new LinkedHashMap<ServerName, List<HRegionInfo>>();
clusterState.put(servers[0], list0); //servers[0] hosts region1
clusterState.put(servers[1], list1); //servers[1] hosts replica_of_region1
clusterState.put(servers[2], list2); //servers[2] hosts region2
// create a cluster with the above clusterState. The way in which the
// cluster is created (constructor code) would make sure the indices of
// the servers are in the order in which it is inserted in the clusterState
// map (linkedhashmap is important). A similar thing applies to the region lists
Cluster cluster = new Cluster(master, clusterState, null, null, null, null, rackManager);
// check whether a move of region1 from servers[0] to servers[1] would lower
// the availability of region1
assertTrue(cluster.wouldLowerAvailability(hri1, servers[1]));
// check whether a move of region1 from servers[0] to servers[2] would lower
// the availability of region1
assertTrue(!cluster.wouldLowerAvailability(hri1, servers[2]));
// check whether a move of replica_of_region1 from servers[0] to servers[2] would lower
// the availability of replica_of_region1
assertTrue(!cluster.wouldLowerAvailability(hri2, servers[2]));
// check whether a move of region2 from servers[0] to servers[1] would lower
// the availability of region2
assertTrue(!cluster.wouldLowerAvailability(hri3, servers[1]));
// now lets have servers[1] host replica_of_region2
list1.add(RegionReplicaUtil.getRegionInfoForReplica(hri3, 1));
// create a new clusterState with the above change
cluster = new Cluster(master, clusterState, null, null, null, null, rackManager);
// now check whether a move of a replica from servers[0] to servers[1] would lower
// the availability of region2
assertTrue(cluster.wouldLowerAvailability(hri3, servers[1]));
// start over again
clusterState.clear();
clusterState.put(servers[0], list0); //servers[0], rack1 hosts region1
clusterState.put(servers[5], list1); //servers[5], rack2 hosts replica_of_region1 and replica_of_region2
clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2
clusterState.put(servers[10], new ArrayList<HRegionInfo>()); //servers[10], rack3 hosts no region
// create a cluster with the above clusterState
cluster = new Cluster(master, clusterState, null, null, null, null, rackManager);
// check whether a move of region1 from servers[0],rack1 to servers[6],rack2 would
// lower the availability
assertTrue(cluster.wouldLowerAvailability(hri1, servers[0]));
// now create a cluster without the rack manager
cluster = new Cluster(master, clusterState, null, null, null, null, null);
// now repeat check whether a move of region1 from servers[0] to servers[6] would
// lower the availability
assertTrue(!cluster.wouldLowerAvailability(hri1, servers[6]));
}
@Test
public void testRegionAvailabilityWithRegionMoves() throws Exception {
List<HRegionInfo> list0 = new ArrayList<HRegionInfo>();
List<HRegionInfo> list1 = new ArrayList<HRegionInfo>();
List<HRegionInfo> list2 = new ArrayList<HRegionInfo>();
// create a region (region1)
HRegionInfo hri1 = new HRegionInfo(
TableName.valueOf("table"), "key1".getBytes(), "key2".getBytes(),
false, 100);
// create a replica of the region (replica_of_region1)
HRegionInfo hri2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1);
// create a second region (region2)
HRegionInfo hri3 = new HRegionInfo(
TableName.valueOf("table"), "key2".getBytes(), "key3".getBytes(),
false, 101);
list0.add(hri1); //only region1
list1.add(hri2); //only replica_of_region1
list2.add(hri3); //only region2
Map<ServerName, List<HRegionInfo>> clusterState =
new LinkedHashMap<ServerName, List<HRegionInfo>>();
clusterState.put(servers[0], list0); //servers[0] hosts region1
clusterState.put(servers[1], list1); //servers[1] hosts replica_of_region1
clusterState.put(servers[2], list2); //servers[2] hosts region2
// create a cluster with the above clusterState. The way in which the
// cluster is created (constructor code) would make sure the indices of
// the servers are in the order in which it is inserted in the clusterState
// map (linkedhashmap is important).
Cluster cluster = new Cluster(master, clusterState, null, null, null, null, rackManager);
// check whether moving region1 from servers[1] to servers[2] would lower availability
assertTrue(!cluster.wouldLowerAvailability(hri1, servers[2]));
// now move region1 from servers[0] to servers[2]
cluster.doAction(new MoveRegionAction(0, 0, 2));
// now repeat check whether moving region1 from servers[1] to servers[2]
// would lower availability
assertTrue(cluster.wouldLowerAvailability(hri1, servers[2]));
// start over again
clusterState.clear();
List<HRegionInfo> list3 = new ArrayList<HRegionInfo>();
HRegionInfo hri4 = RegionReplicaUtil.getRegionInfoForReplica(hri3, 1);
list3.add(hri4);
clusterState.put(servers[0], list0); //servers[0], rack1 hosts region1
clusterState.put(servers[5], list1); //servers[5], rack2 hosts replica_of_region1
clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2
clusterState.put(servers[12], list3); //servers[12], rack3 hosts replica_of_region2
// create a cluster with the above clusterState
cluster = new Cluster(master, clusterState, null, null, null, null, rackManager);
// check whether a move of replica_of_region2 from servers[12],rack3 to servers[0],rack1 would
// lower the availability
assertTrue(!cluster.wouldLowerAvailability(hri4, servers[0]));
// now move region2 from servers[6],rack2 to servers[0],rack1
cluster.doAction(new MoveRegionAction(2, 2, 0));
// now repeat check if replica_of_region2 from servers[12],rack3 to servers[0],rack1 would
// lower the availability
assertTrue(cluster.wouldLowerAvailability(hri3, servers[0]));
}
private List<ServerName> getListOfServerNames(final List<ServerAndLoad> sals) { private List<ServerName> getListOfServerNames(final List<ServerAndLoad> sals) {
List<ServerName> list = new ArrayList<ServerName>(); List<ServerName> list = new ArrayList<ServerName>();
for (ServerAndLoad e : sals) { for (ServerAndLoad e : sals) {
@ -289,7 +444,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
assignRegions(regions, oldServers, clusterState); assignRegions(regions, oldServers, clusterState);
// should not throw exception: // should not throw exception:
BaseLoadBalancer.Cluster cluster = new Cluster(null, clusterState, null, null, null, null); BaseLoadBalancer.Cluster cluster = new Cluster(null, clusterState, null, null, null, null, null);
assertEquals(101 + 9, cluster.numRegions); assertEquals(101 + 9, cluster.numRegions);
assertEquals(10, cluster.numServers); // only 10 servers because they share the same host + port assertEquals(10, cluster.numServers); // only 10 servers because they share the same host + port
} }
@ -331,7 +486,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn( when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn(
Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus
BaseLoadBalancer.Cluster cluster = new Cluster(null, clusterState, null, locationFinder, null, null); BaseLoadBalancer.Cluster cluster = new Cluster(null, clusterState, null, locationFinder, null, null, null);
int r0 = ArrayUtils.indexOf(cluster.regions, regions.get(0)); // this is ok, it is just a test int r0 = ArrayUtils.indexOf(cluster.regions, regions.get(0)); // this is ok, it is just a test
int r1 = ArrayUtils.indexOf(cluster.regions, regions.get(1)); int r1 = ArrayUtils.indexOf(cluster.regions, regions.get(1));

View File

@ -17,10 +17,19 @@
*/ */
package org.apache.hadoop.hbase.master.balancer; package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue; import java.util.Queue;
import java.util.TreeMap; import java.util.TreeMap;
@ -34,29 +43,30 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@Category(MediumTests.class) @Category(MediumTests.class)
public class TestStochasticLoadBalancer extends BalancerTestBase { public class TestStochasticLoadBalancer extends BalancerTestBase {
public static final String REGION_KEY = "testRegion"; public static final String REGION_KEY = "testRegion";
private static StochasticLoadBalancer loadBalancer; private static StochasticLoadBalancer loadBalancer;
private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class); private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class);
private static Configuration conf;
private static final ServerName master = ServerName.valueOf("fake-master", 0, 1L);
@BeforeClass @BeforeClass
public static void beforeAllTests() throws Exception { public static void beforeAllTests() throws Exception {
Configuration conf = HBaseConfiguration.create(); conf = HBaseConfiguration.create();
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f);
conf.setClass("hbase.util.ip.to.rack.determiner",
MyRackResolver.class, DNSToSwitchMapping.class);
loadBalancer = new StochasticLoadBalancer(); loadBalancer = new StochasticLoadBalancer();
loadBalancer.setConf(conf); loadBalancer.setConf(conf);
} }
@ -187,22 +197,29 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
StochasticLoadBalancer.CostFunction StochasticLoadBalancer.CostFunction
costFunction = new StochasticLoadBalancer.RegionCountSkewCostFunction(conf, 1, 1); costFunction = new StochasticLoadBalancer.RegionCountSkewCostFunction(conf, 1, 1);
for (int[] mockCluster : clusterStateMocks) { for (int[] mockCluster : clusterStateMocks) {
double cost = costFunction.cost(mockCluster(mockCluster)); costFunction.init(mockCluster(mockCluster));
double cost = costFunction.cost();
assertTrue(cost >= 0); assertTrue(cost >= 0);
assertTrue(cost <= 1.01); assertTrue(cost <= 1.01);
} }
costFunction.init(mockCluster(new int[]{0, 0, 0, 0, 1}));
assertEquals(1, assertEquals(1,
costFunction.cost(mockCluster(new int[]{0, 0, 0, 0, 1})), 0.01); costFunction.cost(), 0.01);
costFunction.init(mockCluster(new int[]{0, 0, 0, 1, 1}));
assertEquals(.75, assertEquals(.75,
costFunction.cost(mockCluster(new int[]{0, 0, 0, 1, 1})), 0.01); costFunction.cost(), 0.01);
costFunction.init(mockCluster(new int[]{0, 0, 1, 1, 1}));
assertEquals(.5, assertEquals(.5,
costFunction.cost(mockCluster(new int[]{0, 0, 1, 1, 1})), 0.01); costFunction.cost(), 0.01);
costFunction.init(mockCluster(new int[]{0, 1, 1, 1, 1}));
assertEquals(.25, assertEquals(.25,
costFunction.cost(mockCluster(new int[]{0, 1, 1, 1, 1})), 0.01); costFunction.cost(), 0.01);
costFunction.init(mockCluster(new int[]{1, 1, 1, 1, 1}));
assertEquals(0, assertEquals(0,
costFunction.cost(mockCluster(new int[]{1, 1, 1, 1, 1})), 0.01); costFunction.cost(), 0.01);
costFunction.init(mockCluster(new int[]{10, 10, 10, 10, 10}));
assertEquals(0, assertEquals(0,
costFunction.cost(mockCluster(new int[]{10, 10, 10, 10, 10})), 0.01); costFunction.cost(), 0.01);
} }
@Test @Test
@ -212,7 +229,8 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf); costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf);
for (int[] mockCluster : clusterStateMocks) { for (int[] mockCluster : clusterStateMocks) {
BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster); BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster);
double cost = costFunction.cost(cluster); costFunction.init(cluster);
double cost = costFunction.cost();
assertTrue(cost >= 0); assertTrue(cost >= 0);
assertTrue(cost <= 1.01); assertTrue(cost <= 1.01);
} }
@ -250,10 +268,11 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
int numNodes = 3; int numNodes = 3;
int numRegions = 20; int numRegions = 20;
int numRegionsPerServer = 3; //all servers except one int numRegionsPerServer = 3; //all servers except one
int replication = 1;
int numTables = 2; int numTables = 2;
Map<ServerName, List<HRegionInfo>> serverMap = Map<ServerName, List<HRegionInfo>> serverMap =
createServerMap(numNodes, numRegions, numRegionsPerServer, numTables); createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
List<ServerAndLoad> list = convertToList(serverMap); List<ServerAndLoad> list = convertToList(serverMap);
@ -275,13 +294,103 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
assertNull(plans); assertNull(plans);
} }
@Test
public void testReplicaCost() {
Configuration conf = HBaseConfiguration.create();
StochasticLoadBalancer.CostFunction
costFunction = new StochasticLoadBalancer.RegionReplicaHostCostFunction(conf);
for (int[] mockCluster : clusterStateMocks) {
BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster);
costFunction.init(cluster);
double cost = costFunction.cost();
assertTrue(cost >= 0);
assertTrue(cost <= 1.01);
}
}
@Test
public void testReplicaCostForReplicas() {
Configuration conf = HBaseConfiguration.create();
StochasticLoadBalancer.CostFunction
costFunction = new StochasticLoadBalancer.RegionReplicaHostCostFunction(conf);
int [] servers = new int[] {3,3,3,3,3};
TreeMap<ServerName, List<HRegionInfo>> clusterState = mockClusterServers(servers);
BaseLoadBalancer.Cluster cluster;
cluster = new BaseLoadBalancer.Cluster(master, clusterState, null, null, null, null, null);
costFunction.init(cluster);
double costWithoutReplicas = costFunction.cost();
assertEquals(0, costWithoutReplicas, 0);
// replicate the region from first server to the last server
HRegionInfo replica1 = RegionReplicaUtil.getRegionInfoForReplica(
clusterState.firstEntry().getValue().get(0),1);
clusterState.lastEntry().getValue().add(replica1);
cluster = new BaseLoadBalancer.Cluster(master, clusterState, null, null, null, null, null);
costFunction.init(cluster);
double costWith1ReplicaDifferentServer = costFunction.cost();
assertEquals(0, costWith1ReplicaDifferentServer, 0);
// add a third replica to the last server
HRegionInfo replica2 = RegionReplicaUtil.getRegionInfoForReplica(replica1, 2);
clusterState.lastEntry().getValue().add(replica2);
cluster = new BaseLoadBalancer.Cluster(master, clusterState, null, null, null, null, null);
costFunction.init(cluster);
double costWith1ReplicaSameServer = costFunction.cost();
assertTrue(costWith1ReplicaDifferentServer < costWith1ReplicaSameServer);
// test with replication = 4 for following:
HRegionInfo replica3;
Iterator<Entry<ServerName, List<HRegionInfo>>> it;
Entry<ServerName, List<HRegionInfo>> entry;
clusterState = mockClusterServers(servers);
it = clusterState.entrySet().iterator();
entry = it.next(); //first server
HRegionInfo hri = entry.getValue().get(0);
replica1 = RegionReplicaUtil.getRegionInfoForReplica(hri, 1);
replica2 = RegionReplicaUtil.getRegionInfoForReplica(hri, 2);
replica3 = RegionReplicaUtil.getRegionInfoForReplica(hri, 3);
entry.getValue().add(replica1);
entry.getValue().add(replica2);
it.next().getValue().add(replica3); //2nd server
cluster = new BaseLoadBalancer.Cluster(master, clusterState, null, null, null, null, null);
costFunction.init(cluster);
double costWith3ReplicasSameServer = costFunction.cost();
clusterState = mockClusterServers(servers);
hri = clusterState.firstEntry().getValue().get(0);
replica1 = RegionReplicaUtil.getRegionInfoForReplica(hri, 1);
replica2 = RegionReplicaUtil.getRegionInfoForReplica(hri, 2);
replica3 = RegionReplicaUtil.getRegionInfoForReplica(hri, 3);
clusterState.firstEntry().getValue().add(replica1);
clusterState.lastEntry().getValue().add(replica2);
clusterState.lastEntry().getValue().add(replica3);
cluster = new BaseLoadBalancer.Cluster(master, clusterState, null, null, null, null, null);
costFunction.init(cluster);
double costWith2ReplicasOnTwoServers = costFunction.cost();
assertTrue(costWith2ReplicasOnTwoServers < costWith3ReplicasSameServer);
}
@Test (timeout = 60000) @Test (timeout = 60000)
public void testSmallCluster() { public void testSmallCluster() {
int numNodes = 10; int numNodes = 10;
int numRegions = 1000; int numRegions = 1000;
int numRegionsPerServer = 40; //all servers except one int numRegionsPerServer = 40; //all servers except one
int replication = 1;
int numTables = 10; int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
} }
@Test (timeout = 60000) @Test (timeout = 60000)
@ -289,8 +398,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
int numNodes = 20; int numNodes = 20;
int numRegions = 2000; int numRegions = 2000;
int numRegionsPerServer = 40; //all servers except one int numRegionsPerServer = 40; //all servers except one
int replication = 1;
int numTables = 10; int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
} }
@Test (timeout = 60000) @Test (timeout = 60000)
@ -298,8 +408,10 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
int numNodes = 20; int numNodes = 20;
int numRegions = 2000; int numRegions = 2000;
int numRegionsPerServer = 1; // all servers except one int numRegionsPerServer = 1; // all servers except one
int replication = 1;
int numTables = 10; int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, false /* max moves */); /* fails because of max moves */
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false, false);
} }
@Test (timeout = 800000) @Test (timeout = 800000)
@ -307,8 +419,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
int numNodes = 100; int numNodes = 100;
int numRegions = 10000; int numRegions = 10000;
int numRegionsPerServer = 60; // all servers except one int numRegionsPerServer = 60; // all servers except one
int replication = 1;
int numTables = 40; int numTables = 40;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
} }
@Test (timeout = 800000) @Test (timeout = 800000)
@ -316,12 +429,15 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
int numNodes = 200; int numNodes = 200;
int numRegions = 100000; int numRegions = 100000;
int numRegionsPerServer = 40; // all servers except one int numRegionsPerServer = 40; // all servers except one
int replication = 1;
int numTables = 400; int numTables = 400;
testWithCluster(numNodes, testWithCluster(numNodes,
numRegions, numRegions,
numRegionsPerServer, numRegionsPerServer,
replication,
numTables, numTables,
false /* num large num regions means may not always get to best balance with one run */); false, /* num large num regions means may not always get to best balance with one run */
false);
} }
@ -330,8 +446,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
int numNodes = 100; int numNodes = 100;
int numRegions = 2000; int numRegions = 2000;
int numRegionsPerServer = 9; // all servers except one int numRegionsPerServer = 9; // all servers except one
int replication = 1;
int numTables = 110; int numTables = 110;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
// TODO(eclark): Make sure that the tables are well distributed. // TODO(eclark): Make sure that the tables are well distributed.
} }
@ -341,20 +458,145 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
int numRegions = 100000; //100 regions per RS int numRegions = 100000; //100 regions per RS
int numRegionsPerServer = 80; //all servers except one int numRegionsPerServer = 80; //all servers except one
int numTables = 100; int numTables = 100;
testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true); int replication = 1;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
}
@Test (timeout = 60000)
public void testRegionReplicasOnSmallCluster() {
int numNodes = 10;
int numRegions = 1000;
int replication = 3; // 3 replicas per region
int numRegionsPerServer = 80; //all regions are mostly balanced
int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
}
@Test (timeout = 60000)
public void testRegionReplicasOnMidCluster() {
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
loadBalancer.setConf(conf);
int numNodes = 200;
int numRegions = 40 * 200;
int replication = 3; // 3 replicas per region
int numRegionsPerServer = 30; //all regions are mostly balanced
int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
}
@Test (timeout = 60000)
public void testRegionReplicasOnLargeCluster() {
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
loadBalancer.setConf(conf);
int numNodes = 1000;
int numRegions = 40 * numNodes; //40 regions per RS
int numRegionsPerServer = 30; //all servers except one
int numTables = 100;
int replication = 3;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
}
@Test (timeout = 60000)
public void testRegionReplicasOnMidClusterHighReplication() {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
loadBalancer.setConf(conf);
int numNodes = 100;
int numRegions = 6 * 100;
int replication = 100; // 100 replicas per region, one for each server
int numRegionsPerServer = 5;
int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true);
}
@Test (timeout = 60000)
public void testRegionReplicationOnMidClusterSameHosts() {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
loadBalancer.setConf(conf);
int numHosts = 100;
int numRegions = 100 * 100;
int replication = 3; // 3 replicas per region
int numRegionsPerServer = 5;
int numTables = 10;
Map<ServerName, List<HRegionInfo>> serverMap =
createServerMap(numHosts, numRegions, numRegionsPerServer, replication, numTables);
int numNodesPerHost = 4;
// create a new map with 4 RS per host.
Map<ServerName, List<HRegionInfo>> newServerMap = new TreeMap<ServerName, List<HRegionInfo>>(serverMap);
for (Map.Entry<ServerName, List<HRegionInfo>> entry : serverMap.entrySet()) {
for (int i=1; i < numNodesPerHost; i++) {
ServerName s1 = entry.getKey();
ServerName s2 = ServerName.valueOf(s1.getHostname(), s1.getPort() + i, 1); // create an RS for the same host
newServerMap.put(s2, new ArrayList<HRegionInfo>());
}
}
testWithCluster(newServerMap, null, true, true);
}
private static class ForTestRackManager extends RackManager {
int numRacks;
public ForTestRackManager(int numRacks) {
this.numRacks = numRacks;
}
@Override
public String getRack(ServerName server) {
return "rack_" + (server.hashCode() % numRacks);
}
}
@Test (timeout = 120000)
public void testRegionReplicationOnMidClusterWithRacks() {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L);
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 60 * 1000); // 60 sec
loadBalancer.setConf(conf);
int numNodes = 50;
int numRegions = numNodes * 30;
int replication = 3; // 3 replicas per region
int numRegionsPerServer = 25;
int numTables = 10;
int numRacks = 4; // all replicas should be on a different rack
Map<ServerName, List<HRegionInfo>> serverMap =
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
RackManager rm = new ForTestRackManager(numRacks);
testWithCluster(serverMap, rm, true, true);
}
@Test (timeout = 60000)
public void testRegionReplicationOnMidClusterReplicationGreaterThanNumNodes() {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
loadBalancer.setConf(conf);
int numNodes = 80;
int numRegions = 6 * 100;
int replication = 100; // 100 replicas per region, more than numNodes
int numRegionsPerServer = 5;
int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, false);
} }
protected void testWithCluster(int numNodes, protected void testWithCluster(int numNodes,
int numRegions, int numRegions,
int numRegionsPerServer, int numRegionsPerServer,
int numTables, int replication,
boolean assertFullyBalanced) { int numTables,
boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) {
Map<ServerName, List<HRegionInfo>> serverMap = Map<ServerName, List<HRegionInfo>> serverMap =
createServerMap(numNodes, numRegions, numRegionsPerServer, numTables); createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
testWithCluster(serverMap, null, assertFullyBalanced, assertFullyBalancedForReplicas);
}
protected void testWithCluster(Map<ServerName, List<HRegionInfo>> serverMap,
RackManager rackManager, boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) {
List<ServerAndLoad> list = convertToList(serverMap); List<ServerAndLoad> list = convertToList(serverMap);
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
loadBalancer.setRackManager(rackManager);
// Run the balancer. // Run the balancer.
List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap); List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap);
assertNotNull(plans); assertNotNull(plans);
@ -369,12 +611,16 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
assertClusterAsBalanced(balancedCluster); assertClusterAsBalanced(balancedCluster);
List<RegionPlan> secondPlans = loadBalancer.balanceCluster(serverMap); List<RegionPlan> secondPlans = loadBalancer.balanceCluster(serverMap);
assertNull(secondPlans); assertNull(secondPlans);
if (assertFullyBalancedForReplicas) {
assertRegionReplicaPlacement(serverMap, rackManager);
}
} }
} }
private Map<ServerName, List<HRegionInfo>> createServerMap(int numNodes, private Map<ServerName, List<HRegionInfo>> createServerMap(int numNodes,
int numRegions, int numRegions,
int numRegionsPerServer, int numRegionsPerServer,
int replication,
int numTables) { int numTables) {
//construct a cluster of numNodes, having a total of numRegions. Each RS will hold //construct a cluster of numNodes, having a total of numRegions. Each RS will hold
//numRegionsPerServer many regions except for the last one, which will host all the //numRegionsPerServer many regions except for the last one, which will host all the
@ -384,6 +630,40 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
cluster[i] = numRegionsPerServer; cluster[i] = numRegionsPerServer;
} }
cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer); cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer);
return mockClusterServers(cluster, numTables); Map<ServerName, List<HRegionInfo>> clusterState = mockClusterServers(cluster, numTables);
if (replication > 0) {
// replicate the regions to the same servers
for (List<HRegionInfo> regions : clusterState.values()) {
int length = regions.size();
for (int i = 0; i < length; i++) {
for (int r = 1; r < replication ; r++) {
regions.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(i), r));
}
}
}
}
return clusterState;
}
public static class MyRackResolver implements DNSToSwitchMapping {
public MyRackResolver(Configuration conf) {}
@Override
public List<String> resolve(List<String> names) {
List<String> racks = new ArrayList<String>(names.size());
for (int i = 0; i < names.size(); i++) {
racks.add(i, NetworkTopology.DEFAULT_RACK);
}
return racks;
}
@Override
public void reloadCachedMappings() {}
@Override
public void reloadCachedMappings(List<String> names) {
}
} }
} }

View File

@ -105,7 +105,7 @@ public class TestRegionReplicas {
private void openRegion(HRegionInfo hri) throws Exception { private void openRegion(HRegionInfo hri) throws Exception {
ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName()); ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
// first version is '0' // first version is '0'
AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null); AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null, null);
AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr); AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
Assert.assertTrue(responseOpen.getOpeningStateCount() == 1); Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
Assert.assertTrue(responseOpen.getOpeningState(0). Assert.assertTrue(responseOpen.getOpeningState(0).