HBASE-18164 Fast locality computation in balancer
-Added new LocalityCostFunction and LocalityCandidateGenerator that cache localities of every region/rack combination and mappings of every region to its most local server and to its most local rack. -Made LocalityCostFunction incremental so that it only computes locality based on most recent region moves/swaps, rather than recomputing the locality of every region in the cluster at every iteration of the balancer -Changed locality cost function to reflect the ratio of: (Current locality) / (Best locality possible given current cluster) Signed-off-by: Sean Busbey <busbey@apache.org> Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
deb47d677d
commit
3a79590bb4
|
@ -164,6 +164,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
Map<ServerName, List<HRegionInfo>> clusterState;
|
Map<ServerName, List<HRegionInfo>> clusterState;
|
||||||
|
|
||||||
protected final RackManager rackManager;
|
protected final RackManager rackManager;
|
||||||
|
// Maps region -> rackIndex -> locality of region on rack
|
||||||
|
private float[][] rackLocalities;
|
||||||
|
// Maps localityType -> region -> [server|rack]Index with highest locality
|
||||||
|
private int[][] regionsToMostLocalEntities;
|
||||||
|
|
||||||
protected Cluster(
|
protected Cluster(
|
||||||
Map<ServerName, List<HRegionInfo>> clusterState,
|
Map<ServerName, List<HRegionInfo>> clusterState,
|
||||||
|
@ -482,6 +486,126 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true iff a given server has less regions than the balanced amount
|
||||||
|
*/
|
||||||
|
public boolean serverHasTooFewRegions(int server) {
|
||||||
|
int minLoad = this.numRegions / numServers;
|
||||||
|
int numRegions = getNumRegions(server);
|
||||||
|
return numRegions < minLoad;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieves and lazily initializes a field storing the locality of
|
||||||
|
* every region/server combination
|
||||||
|
*/
|
||||||
|
public float[][] getOrComputeRackLocalities() {
|
||||||
|
if (rackLocalities == null || regionsToMostLocalEntities == null) {
|
||||||
|
computeCachedLocalities();
|
||||||
|
}
|
||||||
|
return rackLocalities;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lazily initializes and retrieves a mapping of region -> server for which region has
|
||||||
|
* the highest the locality
|
||||||
|
*/
|
||||||
|
public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) {
|
||||||
|
if (rackLocalities == null || regionsToMostLocalEntities == null) {
|
||||||
|
computeCachedLocalities();
|
||||||
|
}
|
||||||
|
return regionsToMostLocalEntities[type.ordinal()];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Looks up locality from cache of localities. Will create cache if it does
|
||||||
|
* not already exist.
|
||||||
|
*/
|
||||||
|
public float getOrComputeLocality(int region, int entity, LocalityType type) {
|
||||||
|
switch (type) {
|
||||||
|
case SERVER:
|
||||||
|
return getLocalityOfRegion(region, entity);
|
||||||
|
case RACK:
|
||||||
|
return getOrComputeRackLocalities()[region][entity];
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("Unsupported LocalityType: " + type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns locality weighted by region size in MB. Will create locality cache
|
||||||
|
* if it does not already exist.
|
||||||
|
*/
|
||||||
|
public double getOrComputeWeightedLocality(int region, int server, LocalityType type) {
|
||||||
|
return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the size in MB from the most recent RegionLoad for region
|
||||||
|
*/
|
||||||
|
public int getRegionSizeMB(int region) {
|
||||||
|
Deque<BalancerRegionLoad> load = regionLoads[region];
|
||||||
|
// This means regions have no actual data on disk
|
||||||
|
if (load == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return regionLoads[region].getLast().getStorefileSizeMB();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes and caches the locality for each region/rack combinations,
|
||||||
|
* as well as storing a mapping of region -> server and region -> rack such that server
|
||||||
|
* and rack have the highest locality for region
|
||||||
|
*/
|
||||||
|
private void computeCachedLocalities() {
|
||||||
|
rackLocalities = new float[numRegions][numServers];
|
||||||
|
regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
|
||||||
|
|
||||||
|
// Compute localities and find most local server per region
|
||||||
|
for (int region = 0; region < numRegions; region++) {
|
||||||
|
int serverWithBestLocality = 0;
|
||||||
|
float bestLocalityForRegion = 0;
|
||||||
|
for (int server = 0; server < numServers; server++) {
|
||||||
|
// Aggregate per-rack locality
|
||||||
|
float locality = getLocalityOfRegion(region, server);
|
||||||
|
int rack = serverIndexToRackIndex[server];
|
||||||
|
int numServersInRack = serversPerRack[rack].length;
|
||||||
|
rackLocalities[region][rack] += locality / numServersInRack;
|
||||||
|
|
||||||
|
if (locality > bestLocalityForRegion) {
|
||||||
|
serverWithBestLocality = server;
|
||||||
|
bestLocalityForRegion = locality;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
|
||||||
|
|
||||||
|
// Find most local rack per region
|
||||||
|
int rackWithBestLocality = 0;
|
||||||
|
float bestRackLocalityForRegion = 0.0f;
|
||||||
|
for (int rack = 0; rack < numRacks; rack++) {
|
||||||
|
float rackLocality = rackLocalities[region][rack];
|
||||||
|
if (rackLocality > bestRackLocalityForRegion) {
|
||||||
|
bestRackLocalityForRegion = rackLocality;
|
||||||
|
rackWithBestLocality = rack;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maps region index to rack index
|
||||||
|
*/
|
||||||
|
public int getRackForRegion(int region) {
|
||||||
|
return serverIndexToRackIndex[regionIndexToServerIndex[region]];
|
||||||
|
}
|
||||||
|
|
||||||
|
enum LocalityType {
|
||||||
|
SERVER,
|
||||||
|
RACK
|
||||||
|
}
|
||||||
|
|
||||||
/** An action to move or swap a region */
|
/** An action to move or swap a region */
|
||||||
public static class Action {
|
public static class Action {
|
||||||
public static enum Type {
|
public static enum Type {
|
||||||
|
|
|
@ -18,8 +18,10 @@
|
||||||
package org.apache.hadoop.hbase.master.balancer;
|
package org.apache.hadoop.hbase.master.balancer;
|
||||||
|
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Deque;
|
import java.util.Deque;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -45,11 +47,13 @@ import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction;
|
||||||
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
|
||||||
|
import com.google.common.base.Optional;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -139,7 +143,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
// Keep locality based picker and cost function to alert them
|
// Keep locality based picker and cost function to alert them
|
||||||
// when new services are offered
|
// when new services are offered
|
||||||
private LocalityBasedCandidateGenerator localityCandidateGenerator;
|
private LocalityBasedCandidateGenerator localityCandidateGenerator;
|
||||||
private LocalityCostFunction localityCost;
|
private ServerLocalityCostFunction localityCost;
|
||||||
|
private RackLocalityCostFunction rackLocalityCost;
|
||||||
private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
|
private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
|
||||||
private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
|
private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
|
||||||
private boolean isByTable = false;
|
private boolean isByTable = false;
|
||||||
|
@ -172,7 +177,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
if (localityCandidateGenerator == null) {
|
if (localityCandidateGenerator == null) {
|
||||||
localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
|
localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
|
||||||
}
|
}
|
||||||
localityCost = new LocalityCostFunction(conf, services);
|
localityCost = new ServerLocalityCostFunction(conf, services);
|
||||||
|
rackLocalityCost = new RackLocalityCostFunction(conf, services);
|
||||||
|
|
||||||
if (this.candidateGenerators == null) {
|
if (this.candidateGenerators == null) {
|
||||||
candidateGenerators = Lists.newArrayList();
|
candidateGenerators = Lists.newArrayList();
|
||||||
|
@ -194,6 +200,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
new PrimaryRegionCountSkewCostFunction(conf),
|
new PrimaryRegionCountSkewCostFunction(conf),
|
||||||
new MoveCostFunction(conf),
|
new MoveCostFunction(conf),
|
||||||
localityCost,
|
localityCost,
|
||||||
|
rackLocalityCost,
|
||||||
new TableSkewCostFunction(conf),
|
new TableSkewCostFunction(conf),
|
||||||
regionReplicaHostCostFunction,
|
regionReplicaHostCostFunction,
|
||||||
regionReplicaRackCostFunction,
|
regionReplicaRackCostFunction,
|
||||||
|
@ -250,8 +257,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
public synchronized void setMasterServices(MasterServices masterServices) {
|
public synchronized void setMasterServices(MasterServices masterServices) {
|
||||||
super.setMasterServices(masterServices);
|
super.setMasterServices(masterServices);
|
||||||
this.localityCost.setServices(masterServices);
|
this.localityCost.setServices(masterServices);
|
||||||
|
this.rackLocalityCost.setServices(masterServices);
|
||||||
this.localityCandidateGenerator.setServices(masterServices);
|
this.localityCandidateGenerator.setServices(masterServices);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -336,7 +343,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
// Allow turning this feature off if the locality cost is not going to
|
// Allow turning this feature off if the locality cost is not going to
|
||||||
// be used in any computations.
|
// be used in any computations.
|
||||||
RegionLocationFinder finder = null;
|
RegionLocationFinder finder = null;
|
||||||
if (this.localityCost != null && this.localityCost.getMultiplier() > 0) {
|
if (this.localityCost != null && this.localityCost.getMultiplier() > 0
|
||||||
|
|| this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0) {
|
||||||
finder = this.regionFinder;
|
finder = this.regionFinder;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -700,6 +708,18 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
return Cluster.NullAction;
|
return Cluster.NullAction;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a random iteration order of indexes of an array with size length
|
||||||
|
*/
|
||||||
|
protected List<Integer> getRandomIterationOrder(int length) {
|
||||||
|
ArrayList<Integer> order = new ArrayList<>(length);
|
||||||
|
for (int i = 0; i < length; i++) {
|
||||||
|
order.add(i);
|
||||||
|
}
|
||||||
|
Collections.shuffle(order);
|
||||||
|
return order;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class RandomCandidateGenerator extends CandidateGenerator {
|
static class RandomCandidateGenerator extends CandidateGenerator {
|
||||||
|
@ -771,39 +791,55 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
return pickRandomRegions(cluster, thisServer, otherServer);
|
return pickRandomRegions(cluster, thisServer, otherServer);
|
||||||
}
|
}
|
||||||
|
|
||||||
int thisServer = pickRandomServer(cluster);
|
// Randomly iterate through regions until you find one that is not on ideal host
|
||||||
int thisRegion;
|
for (int region : getRandomIterationOrder(cluster.numRegions)) {
|
||||||
if (thisServer == -1) {
|
int currentServer = cluster.regionIndexToServerIndex[region];
|
||||||
LOG.warn("Could not pick lowest locality region server");
|
if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]) {
|
||||||
return Cluster.NullAction;
|
Optional<Action> potential = tryMoveOrSwap(
|
||||||
} else {
|
cluster,
|
||||||
// Pick lowest locality region on this server
|
currentServer,
|
||||||
thisRegion = pickLowestLocalityRegionOnServer(cluster, thisServer);
|
region,
|
||||||
|
cluster.getOrComputeRegionsToMostLocalEntities(LocalityType.SERVER)[region]
|
||||||
|
);
|
||||||
|
if (potential.isPresent()) {
|
||||||
|
return potential.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (thisRegion == -1) {
|
|
||||||
return Cluster.NullAction;
|
return Cluster.NullAction;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pick the least loaded server with good locality for the region
|
/**
|
||||||
int otherServer = cluster.getLeastLoadedTopServerForRegion(thisRegion, thisServer);
|
* Try to generate a move/swap fromRegion between fromServer and toServer such that locality is improved.
|
||||||
|
* Returns empty optional if no move can be found
|
||||||
if (otherServer == -1) {
|
*/
|
||||||
return Cluster.NullAction;
|
private Optional<Action> tryMoveOrSwap(Cluster cluster,
|
||||||
|
int fromServer,
|
||||||
|
int fromRegion,
|
||||||
|
int toServer) {
|
||||||
|
// Try move first. We know apriori fromRegion has the highest locality on toServer
|
||||||
|
if (cluster.serverHasTooFewRegions(toServer)) {
|
||||||
|
return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let the candidate region be moved to its highest locality server.
|
// Compare locality gain/loss from swapping fromRegion with regions on toServer
|
||||||
int otherRegion = -1;
|
double fromRegionLocalityDelta =
|
||||||
|
getWeightedLocality(cluster, fromRegion, toServer) - getWeightedLocality(cluster, fromRegion, fromServer);
|
||||||
return getAction(thisServer, thisRegion, otherServer, otherRegion);
|
for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
|
||||||
|
int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
|
||||||
|
double toRegionLocalityDelta =
|
||||||
|
getWeightedLocality(cluster, toRegion, fromServer) - getWeightedLocality(cluster, toRegion, toServer);
|
||||||
|
// If locality would remain neutral or improve, attempt the swap
|
||||||
|
if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
|
||||||
|
return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private int pickLowestLocalityServer(Cluster cluster) {
|
return Optional.absent();
|
||||||
return cluster.getLowestLocalityRegionServer();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private int pickLowestLocalityRegionOnServer(Cluster cluster, int server) {
|
private double getWeightedLocality(Cluster cluster, int region, int server) {
|
||||||
return cluster.getLowestLocalityRegionOnServer(server);
|
return cluster.getOrComputeWeightedLocality(region, server, LocalityType.SERVER);
|
||||||
}
|
}
|
||||||
|
|
||||||
void setServices(MasterServices services) {
|
void setServices(MasterServices services) {
|
||||||
|
@ -1202,59 +1238,124 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
* Compute a cost of a potential cluster configuration based upon where
|
* Compute a cost of a potential cluster configuration based upon where
|
||||||
* {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located.
|
* {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located.
|
||||||
*/
|
*/
|
||||||
static class LocalityCostFunction extends CostFunction {
|
static abstract class LocalityBasedCostFunction extends CostFunction {
|
||||||
|
|
||||||
private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
|
private final LocalityType type;
|
||||||
private static final float DEFAULT_LOCALITY_COST = 25;
|
|
||||||
|
private double bestLocality; // best case locality across cluster weighted by local data size
|
||||||
|
private double locality; // current locality across cluster weighted by local data size
|
||||||
|
|
||||||
private MasterServices services;
|
private MasterServices services;
|
||||||
|
|
||||||
LocalityCostFunction(Configuration conf, MasterServices srv) {
|
LocalityBasedCostFunction(Configuration conf,
|
||||||
|
MasterServices srv,
|
||||||
|
LocalityType type,
|
||||||
|
String localityCostKey,
|
||||||
|
float defaultLocalityCost) {
|
||||||
super(conf);
|
super(conf);
|
||||||
this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST));
|
this.type = type;
|
||||||
|
this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
|
||||||
this.services = srv;
|
this.services = srv;
|
||||||
|
this.locality = 0.0;
|
||||||
|
this.bestLocality = 0.0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setServices(MasterServices srvc) {
|
/**
|
||||||
|
* Maps region to the current entity (server or rack) on which it is stored
|
||||||
|
*/
|
||||||
|
abstract int regionIndexToEntityIndex(int region);
|
||||||
|
|
||||||
|
public void setServices(MasterServices srvc) {
|
||||||
this.services = srvc;
|
this.services = srvc;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
double cost() {
|
void init(Cluster cluster) {
|
||||||
double max = 0;
|
super.init(cluster);
|
||||||
double cost = 0;
|
locality = 0.0;
|
||||||
|
bestLocality = 0.0;
|
||||||
|
|
||||||
// If there's no master so there's no way anything else works.
|
// If no master, no computation will work, so assume 0 cost
|
||||||
if (this.services == null) {
|
if (this.services == null) {
|
||||||
return cost;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < cluster.regionLocations.length; i++) {
|
for (int region = 0; region < cluster.numRegions; region++) {
|
||||||
max += 1;
|
locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
|
||||||
int serverIndex = cluster.regionIndexToServerIndex[i];
|
bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
|
||||||
int[] regionLocations = cluster.regionLocations[i];
|
|
||||||
|
|
||||||
// If we can't find where the data is getTopBlock returns null.
|
|
||||||
// so count that as being the best possible.
|
|
||||||
if (regionLocations == null) {
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int index = -1;
|
// We normalize locality to be a score between 0 and 1.0 representing how good it
|
||||||
for (int j = 0; j < regionLocations.length; j++) {
|
// is compared to how good it could be
|
||||||
if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
|
locality /= bestLocality;
|
||||||
index = j;
|
}
|
||||||
break;
|
|
||||||
|
@Override
|
||||||
|
protected void regionMoved(int region, int oldServer, int newServer) {
|
||||||
|
int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
|
||||||
|
int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
|
||||||
|
if (this.services == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
|
||||||
|
double normalizedDelta = localityDelta / bestLocality;
|
||||||
|
locality += normalizedDelta;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
double cost() {
|
||||||
|
return 1 - locality;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getMostLocalEntityForRegion(int region) {
|
||||||
|
return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
|
||||||
|
}
|
||||||
|
|
||||||
|
private double getWeightedLocality(int region, int entity) {
|
||||||
|
return cluster.getOrComputeWeightedLocality(region, entity, type);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
|
||||||
|
|
||||||
|
private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
|
||||||
|
private static final float DEFAULT_LOCALITY_COST = 25;
|
||||||
|
|
||||||
|
ServerLocalityCostFunction(Configuration conf, MasterServices srv) {
|
||||||
|
super(
|
||||||
|
conf,
|
||||||
|
srv,
|
||||||
|
LocalityType.SERVER,
|
||||||
|
LOCALITY_COST_KEY,
|
||||||
|
DEFAULT_LOCALITY_COST
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
int regionIndexToEntityIndex(int region) {
|
||||||
|
return cluster.regionIndexToServerIndex[region];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (index < 0) {
|
static class RackLocalityCostFunction extends LocalityBasedCostFunction {
|
||||||
cost += 1;
|
|
||||||
} else {
|
private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost";
|
||||||
cost += (1 - cluster.getLocalityOfRegion(i, serverIndex));
|
private static final float DEFAULT_RACK_LOCALITY_COST = 15;
|
||||||
|
|
||||||
|
public RackLocalityCostFunction(Configuration conf, MasterServices services) {
|
||||||
|
super(
|
||||||
|
conf,
|
||||||
|
services,
|
||||||
|
LocalityType.RACK,
|
||||||
|
RACK_LOCALITY_COST_KEY,
|
||||||
|
DEFAULT_RACK_LOCALITY_COST
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return scale(0, max, cost);
|
@Override
|
||||||
|
int regionIndexToEntityIndex(int region) {
|
||||||
|
return cluster.getRackForRegion(region);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,6 @@ import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -153,84 +152,6 @@ public class BalancerTestBase {
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
|
||||||
* Data set for testLocalityCost:
|
|
||||||
*
|
|
||||||
* [test][regions][0] = [serverIndex] -> number of regions
|
|
||||||
* [test][regions][regionIndex+1] = {server hosting region, locality percentage, datanodes}
|
|
||||||
*
|
|
||||||
* For each [test], there is a list of cluster config information grouped by [regions].
|
|
||||||
* - [0] - the first element of the [regions] list is a list of servers with the value
|
|
||||||
* indicating the number of regions it hosts.
|
|
||||||
* - [regionIndex+1] - the remaining elements of the array are regions, where the index value
|
|
||||||
* is 1 greater than the regionIndex. This element holds an array that identifies:
|
|
||||||
* [0] - the serverIndex of the server hosting this region
|
|
||||||
* [1] - the locality percentage returned by getLocalityOfRegion(region, server) when the
|
|
||||||
* server is hosting both region and the hdfs blocks.
|
|
||||||
* [.] - the serverIndex of servers hosting the hdfs blocks, where a value of -1 indicates
|
|
||||||
* a dfs server not in the list of region servers.
|
|
||||||
*/
|
|
||||||
protected int[][][] clusterRegionLocationMocks = new int[][][]{
|
|
||||||
// Test 1: Basic region placement with 1 region server not hosting dfs block
|
|
||||||
// Locality Calculation:
|
|
||||||
// region[0] = 1 - 80/100 = (.2) - server[2] hosts both the region and dfs blocks
|
|
||||||
// region[1] = 1.0 - server[0] only hosts the region, not dfs blocks
|
|
||||||
// region[2] = 1 - 70/100 = (.3) - server[1] hosts both the region and dfs blocks
|
|
||||||
//
|
|
||||||
// RESULT = 0.2 + 1.0 + 0.3 / 3.0 (3.0 is max value)
|
|
||||||
// = 1.5 / 3.0
|
|
||||||
// = 0.5
|
|
||||||
new int[][]{
|
|
||||||
new int[]{1, 1, 1}, // 3 region servers with 1 region each
|
|
||||||
new int[]{2, 80, 1, 2, 0}, // region[0] on server[2] w/ 80% locality
|
|
||||||
new int[]{0, 50, 1, 2}, // region[1] on server[0] w/ 50% , but no local dfs blocks
|
|
||||||
new int[]{1, 70, 2, 0, 1}, // region[2] on server[1] w/ 70% locality
|
|
||||||
},
|
|
||||||
|
|
||||||
// Test 2: Sames as Test 1, but the last region has a datanode that isn't a region server
|
|
||||||
new int[][]{
|
|
||||||
new int[]{1, 1, 1},
|
|
||||||
new int[]{2, 80, 1, 2, 0},
|
|
||||||
new int[]{0, 50, 1, 2},
|
|
||||||
new int[]{1, 70, -1, 2, 0, 1}, // the first region location is not on a region server
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
// This mock allows us to test the LocalityCostFunction
|
|
||||||
protected class MockCluster extends BaseLoadBalancer.Cluster {
|
|
||||||
|
|
||||||
protected int[][] localityValue = null; // [region][server] = percent of blocks
|
|
||||||
|
|
||||||
protected MockCluster(int[][] regions) {
|
|
||||||
|
|
||||||
// regions[0] is an array where index = serverIndex an value = number of regions
|
|
||||||
super(mockClusterServers(regions[0], 1), null, null, null);
|
|
||||||
|
|
||||||
localityValue = new int[regions.length-1][];
|
|
||||||
// the remaining elements in the regions array contain values for:
|
|
||||||
// [0] - the serverIndex of the server hosting this region
|
|
||||||
// [1] - the locality percentage (in whole numbers) for the hosting region server
|
|
||||||
// [.] - a list of servers hosting dfs blocks for the region (-1 means its not one
|
|
||||||
// of our region servers.
|
|
||||||
for (int i = 1; i < regions.length; i++){
|
|
||||||
int regionIndex = i - 1;
|
|
||||||
int serverIndex = regions[i][0];
|
|
||||||
int locality = regions[i][1];
|
|
||||||
int[] locations = Arrays.copyOfRange(regions[i], 2, regions[i].length);
|
|
||||||
|
|
||||||
regionIndexToServerIndex[regionIndex] = serverIndex;
|
|
||||||
localityValue[regionIndex] = new int[servers.length];
|
|
||||||
localityValue[regionIndex][serverIndex] = (locality > 100)? locality % 100 : locality;
|
|
||||||
regionLocations[regionIndex] = locations;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
float getLocalityOfRegion(int region, int server) {
|
|
||||||
// convert the locality percentage to a fraction
|
|
||||||
return localityValue[region][server] / 100.0f;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This class is introduced because IP to rack resolution can be lengthy.
|
// This class is introduced because IP to rack resolution can be lengthy.
|
||||||
public static class MockMapping implements DNSToSwitchMapping {
|
public static class MockMapping implements DNSToSwitchMapping {
|
||||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.master.MockNoopMasterServices;
|
||||||
import org.apache.hadoop.hbase.master.RackManager;
|
import org.apache.hadoop.hbase.master.RackManager;
|
||||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
|
||||||
|
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.ServerLocalityCostFunction;
|
||||||
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
|
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -60,6 +61,65 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
||||||
public static final String REGION_KEY = "testRegion";
|
public static final String REGION_KEY = "testRegion";
|
||||||
private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class);
|
private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class);
|
||||||
|
|
||||||
|
// Mapping of locality test -> expected locality
|
||||||
|
private float[] expectedLocalities = {1.0f, 0.0f, 0.50f, 0.25f, 1.0f};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Data set for testLocalityCost:
|
||||||
|
* [test][0][0] = mapping of server to number of regions it hosts
|
||||||
|
* [test][region + 1][0] = server that region is hosted on
|
||||||
|
* [test][region + 1][server + 1] = locality for region on server
|
||||||
|
*/
|
||||||
|
|
||||||
|
private int[][][] clusterRegionLocationMocks = new int[][][]{
|
||||||
|
|
||||||
|
// Test 1: each region is entirely on server that hosts it
|
||||||
|
new int[][]{
|
||||||
|
new int[]{2, 1, 1},
|
||||||
|
new int[]{2, 0, 0, 100}, // region 0 is hosted and entirely local on server 2
|
||||||
|
new int[]{0, 100, 0, 0}, // region 1 is hosted and entirely on server 0
|
||||||
|
new int[]{0, 100, 0, 0}, // region 2 is hosted and entirely on server 0
|
||||||
|
new int[]{1, 0, 100, 0}, // region 1 is hosted and entirely on server 1
|
||||||
|
},
|
||||||
|
|
||||||
|
// Test 2: each region is 0% local on the server that hosts it
|
||||||
|
new int[][]{
|
||||||
|
new int[]{1, 2, 1},
|
||||||
|
new int[]{0, 0, 0, 100}, // region 0 is hosted and entirely local on server 2
|
||||||
|
new int[]{1, 100, 0, 0}, // region 1 is hosted and entirely on server 0
|
||||||
|
new int[]{1, 100, 0, 0}, // region 2 is hosted and entirely on server 0
|
||||||
|
new int[]{2, 0, 100, 0}, // region 1 is hosted and entirely on server 1
|
||||||
|
},
|
||||||
|
|
||||||
|
// Test 3: each region is 25% local on the server that hosts it (and 50% locality is possible)
|
||||||
|
new int[][]{
|
||||||
|
new int[]{1, 2, 1},
|
||||||
|
new int[]{0, 25, 0, 50}, // region 0 is hosted and entirely local on server 2
|
||||||
|
new int[]{1, 50, 25, 0}, // region 1 is hosted and entirely on server 0
|
||||||
|
new int[]{1, 50, 25, 0}, // region 2 is hosted and entirely on server 0
|
||||||
|
new int[]{2, 0, 50, 25}, // region 1 is hosted and entirely on server 1
|
||||||
|
},
|
||||||
|
|
||||||
|
// Test 4: each region is 25% local on the server that hosts it (and 100% locality is possible)
|
||||||
|
new int[][]{
|
||||||
|
new int[]{1, 2, 1},
|
||||||
|
new int[]{0, 25, 0, 100}, // region 0 is hosted and entirely local on server 2
|
||||||
|
new int[]{1, 100, 25, 0}, // region 1 is hosted and entirely on server 0
|
||||||
|
new int[]{1, 100, 25, 0}, // region 2 is hosted and entirely on server 0
|
||||||
|
new int[]{2, 0, 100, 25}, // region 1 is hosted and entirely on server 1
|
||||||
|
},
|
||||||
|
|
||||||
|
// Test 5: each region is 75% local on the server that hosts it (and 75% locality is possible everywhere)
|
||||||
|
new int[][]{
|
||||||
|
new int[]{1, 2, 1},
|
||||||
|
new int[]{0, 75, 75, 75}, // region 0 is hosted and entirely local on server 2
|
||||||
|
new int[]{1, 75, 75, 75}, // region 1 is hosted and entirely on server 0
|
||||||
|
new int[]{1, 75, 75, 75}, // region 2 is hosted and entirely on server 0
|
||||||
|
new int[]{2, 75, 75, 75}, // region 1 is hosted and entirely on server 1
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testKeepRegionLoad() throws Exception {
|
public void testKeepRegionLoad() throws Exception {
|
||||||
|
|
||||||
|
@ -144,14 +204,15 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
MockNoopMasterServices master = new MockNoopMasterServices();
|
MockNoopMasterServices master = new MockNoopMasterServices();
|
||||||
StochasticLoadBalancer.CostFunction
|
StochasticLoadBalancer.CostFunction
|
||||||
costFunction = new StochasticLoadBalancer.LocalityCostFunction(conf, master);
|
costFunction = new ServerLocalityCostFunction(conf, master);
|
||||||
|
|
||||||
for (int[][] clusterRegionLocations : clusterRegionLocationMocks) {
|
for (int test = 0; test < clusterRegionLocationMocks.length; test++) {
|
||||||
|
int[][] clusterRegionLocations = clusterRegionLocationMocks[test];
|
||||||
MockCluster cluster = new MockCluster(clusterRegionLocations);
|
MockCluster cluster = new MockCluster(clusterRegionLocations);
|
||||||
costFunction.init(cluster);
|
costFunction.init(cluster);
|
||||||
double cost = costFunction.cost();
|
double cost = costFunction.cost();
|
||||||
|
double expected = 1 - expectedLocalities[test];
|
||||||
assertEquals(0.5f, cost, 0.001);
|
assertEquals(expected, cost, 0.001);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -587,4 +648,39 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
||||||
testWithCluster(serverMap, rm, false, true);
|
testWithCluster(serverMap, rm, false, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This mock allows us to test the LocalityCostFunction
|
||||||
|
private class MockCluster extends BaseLoadBalancer.Cluster {
|
||||||
|
|
||||||
|
private int[][] localities = null; // [region][server] = percent of blocks
|
||||||
|
|
||||||
|
public MockCluster(int[][] regions) {
|
||||||
|
|
||||||
|
// regions[0] is an array where index = serverIndex an value = number of regions
|
||||||
|
super(mockClusterServers(regions[0], 1), null, null, null);
|
||||||
|
|
||||||
|
localities = new int[regions.length - 1][];
|
||||||
|
for (int i = 1; i < regions.length; i++) {
|
||||||
|
int regionIndex = i - 1;
|
||||||
|
localities[regionIndex] = new int[regions[i].length - 1];
|
||||||
|
regionIndexToServerIndex[regionIndex] = regions[i][0];
|
||||||
|
for (int j = 1; j < regions[i].length; j++) {
|
||||||
|
int serverIndex = j - 1;
|
||||||
|
localities[regionIndex][serverIndex] = regions[i][j] > 100 ? regions[i][j] % 100 : regions[i][j];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
float getLocalityOfRegion(int region, int server) {
|
||||||
|
// convert the locality percentage to a fraction
|
||||||
|
return localities[region][server] / 100.0f;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getRegionSizeMB(int region) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue