HBASE-15515 Improve LocalityBasedCandidateGenerator in Balancer (Guanghao Zhang)

This commit is contained in:
tedyu 2016-03-24 11:22:34 -07:00
parent 1361a25e6d
commit ff261efbf7
3 changed files with 42 additions and 10 deletions

View File

@ -826,7 +826,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
int getLowestLocalityRegionOnServer(int serverIndex) { int getLowestLocalityRegionOnServer(int serverIndex) {
if (regionFinder != null) { if (regionFinder != null) {
float lowestLocality = 1.0f; float lowestLocality = 1.0f;
int lowestLocalityRegionIndex = 0; int lowestLocalityRegionIndex = -1;
if (regionsPerServer[serverIndex].length == 0) { if (regionsPerServer[serverIndex].length == 0) {
// No regions on that region server // No regions on that region server
return -1; return -1;
@ -836,13 +836,22 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
HDFSBlocksDistribution distribution = regionFinder HDFSBlocksDistribution distribution = regionFinder
.getBlockDistribution(regions[regionIndex]); .getBlockDistribution(regions[regionIndex]);
float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
// skip empty region
if (distribution.getUniqueBlocksTotalWeight() == 0) {
continue;
}
if (locality < lowestLocality) { if (locality < lowestLocality) {
lowestLocality = locality; lowestLocality = locality;
lowestLocalityRegionIndex = j; lowestLocalityRegionIndex = j;
} }
} }
if (lowestLocalityRegionIndex == -1) {
return -1;
}
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(" Lowest locality region index is " + lowestLocalityRegionIndex LOG.trace("Lowest locality region is "
+ regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
.getRegionNameAsString() + " with locality " + lowestLocality
+ " and its region server contains " + regionsPerServer[serverIndex].length + " and its region server contains " + regionsPerServer[serverIndex].length
+ " regions"); + " regions");
} }
@ -861,9 +870,14 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
} }
} }
int getLeastLoadedTopServerForRegion(int region) { /**
* Returns a least loaded server which has better locality for this region
* than the current server.
*/
int getLeastLoadedTopServerForRegion(int region, int currentServer) {
if (regionFinder != null) { if (regionFinder != null) {
List<ServerName> topLocalServers = regionFinder.getTopBlockLocations(regions[region]); List<ServerName> topLocalServers = regionFinder.getTopBlockLocations(regions[region],
servers[currentServer].getHostname());
int leastLoadedServerIndex = -1; int leastLoadedServerIndex = -1;
int load = Integer.MAX_VALUE; int load = Integer.MAX_VALUE;
for (ServerName sn : topLocalServers) { for (ServerName sn : topLocalServers) {
@ -880,6 +894,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
load = tempLoad; load = tempLoad;
} }
} }
if (leastLoadedServerIndex != -1) {
LOG.debug("Pick the least loaded server " + servers[leastLoadedServerIndex].getHostname()
+ " with better locality for region " + regions[region]);
}
return leastLoadedServerIndex; return leastLoadedServerIndex;
} else { } else {
return -1; return -1;

View File

@ -170,6 +170,22 @@ class RegionLocationFinder {
return mapHostNameToServerName(topHosts); return mapHostNameToServerName(topHosts);
} }
/**
* Returns an ordered list of hosts which have better locality for this region
* than the current host.
*/
protected List<ServerName> getTopBlockLocations(HRegionInfo region, String currentHost) {
HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
List<String> topHosts = new ArrayList<String>();
for (String host : blocksDistribution.getTopHosts()) {
if (host.equals(currentHost)) {
break;
}
topHosts.add(host);
}
return mapHostNameToServerName(topHosts);
}
/** /**
* Returns an ordered list of hosts that are hosting the blocks for this * Returns an ordered list of hosts that are hosting the blocks for this
* region. The weight of each host is the sum of the block lengths of all * region. The weight of each host is the sum of the block lengths of all

View File

@ -705,9 +705,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return pickRandomRegions(cluster, thisServer, otherServer); return pickRandomRegions(cluster, thisServer, otherServer);
} }
cluster.calculateRegionServerLocalities(); int thisServer = pickRandomServer(cluster);
// Pick server with lowest locality
int thisServer = pickLowestLocalityServer(cluster);
int thisRegion; int thisRegion;
if (thisServer == -1) { if (thisServer == -1) {
LOG.warn("Could not pick lowest locality region server"); LOG.warn("Could not pick lowest locality region server");
@ -722,7 +720,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
} }
// Pick the least loaded server with good locality for the region // Pick the least loaded server with good locality for the region
int otherServer = cluster.getLeastLoadedTopServerForRegion(thisRegion); int otherServer = cluster.getLeastLoadedTopServerForRegion(thisRegion, thisServer);
if (otherServer == -1) { if (otherServer == -1) {
return Cluster.NullAction; return Cluster.NullAction;