HBASE-13376 Improvements to Stochastic load balancer (Vandana Ayyalasomayajula)
This commit is contained in:
parent
9c69bf766f
commit
54028140f4
|
@ -40,6 +40,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.RegionLoad;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -114,6 +115,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
ArrayList<String> tables;
|
||||
HRegionInfo[] regions;
|
||||
Deque<RegionLoad>[] regionLoads;
|
||||
private RegionLocationFinder regionFinder;
|
||||
|
||||
int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
|
||||
|
||||
|
@ -138,12 +140,14 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
boolean hasRegionReplicas = false; //whether there is regions with replicas
|
||||
|
||||
Integer[] serverIndicesSortedByRegionCount;
|
||||
Integer[] serverIndicesSortedByLocality;
|
||||
|
||||
Map<String, Integer> serversToIndex;
|
||||
Map<String, Integer> hostsToIndex;
|
||||
Map<String, Integer> racksToIndex;
|
||||
Map<String, Integer> tablesToIndex;
|
||||
Map<HRegionInfo, Integer> regionsToIndex;
|
||||
float[] localityPerServer;
|
||||
|
||||
int numServers;
|
||||
int numHosts;
|
||||
|
@ -191,6 +195,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
List<List<Integer>> serversPerHostList = new ArrayList<List<Integer>>();
|
||||
List<List<Integer>> serversPerRackList = new ArrayList<List<Integer>>();
|
||||
this.clusterState = clusterState;
|
||||
this.regionFinder = regionFinder;
|
||||
|
||||
// Use servername and port as there can be dead servers in this list. We want everything with
|
||||
// a matching hostname and port to have the same index.
|
||||
|
@ -234,6 +239,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
regionLoads = new Deque[numRegions];
|
||||
regionLocations = new int[numRegions][];
|
||||
serverIndicesSortedByRegionCount = new Integer[numServers];
|
||||
serverIndicesSortedByLocality = new Integer[numServers];
|
||||
localityPerServer = new float[numServers];
|
||||
|
||||
serverIndexToHostIndex = new int[numServers];
|
||||
serverIndexToRackIndex = new int[numServers];
|
||||
|
@ -265,6 +272,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length];
|
||||
serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
|
||||
serverIndicesSortedByLocality[serverIndex] = serverIndex;
|
||||
}
|
||||
|
||||
hosts = new String[numHosts];
|
||||
|
@ -767,6 +775,123 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
};
|
||||
|
||||
void sortServersByLocality() {
|
||||
Arrays.sort(serverIndicesSortedByLocality, localityComparator);
|
||||
}
|
||||
|
||||
float getLocality(int server) {
|
||||
return localityPerServer[server];
|
||||
}
|
||||
|
||||
private Comparator<Integer> localityComparator = new Comparator<Integer>() {
|
||||
@Override
|
||||
public int compare(Integer integer, Integer integer2) {
|
||||
float locality1 = getLocality(integer);
|
||||
float locality2 = getLocality(integer2);
|
||||
if (locality1 < locality2) {
|
||||
return -1;
|
||||
} else if (locality1 > locality2) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
int getLowestLocalityRegionServer() {
|
||||
if (regionFinder == null) {
|
||||
return -1;
|
||||
} else {
|
||||
sortServersByLocality();
|
||||
// We want to find server with non zero regions having lowest locality.
|
||||
int i = 0;
|
||||
int lowestLocalityServerIndex = serverIndicesSortedByLocality[i];
|
||||
while (localityPerServer[lowestLocalityServerIndex] == 0
|
||||
&& (regionsPerServer[lowestLocalityServerIndex].length == 0)) {
|
||||
i++;
|
||||
lowestLocalityServerIndex = serverIndicesSortedByLocality[i];
|
||||
}
|
||||
LOG.debug("Lowest locality region server with non zero regions is "
|
||||
+ servers[lowestLocalityServerIndex].getHostname() + " with locality "
|
||||
+ localityPerServer[lowestLocalityServerIndex]);
|
||||
return lowestLocalityServerIndex;
|
||||
}
|
||||
}
|
||||
|
||||
int getLowestLocalityRegionOnServer(int serverIndex) {
|
||||
if (regionFinder != null) {
|
||||
float lowestLocality = 1.0f;
|
||||
int lowestLocalityRegionIndex = 0;
|
||||
if (regionsPerServer[serverIndex].length == 0) {
|
||||
// No regions on that region server
|
||||
return -1;
|
||||
}
|
||||
for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
|
||||
int regionIndex = regionsPerServer[serverIndex][j];
|
||||
HDFSBlocksDistribution distribution = regionFinder
|
||||
.getBlockDistribution(regions[regionIndex]);
|
||||
float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
|
||||
if (locality < lowestLocality) {
|
||||
lowestLocality = locality;
|
||||
lowestLocalityRegionIndex = j;
|
||||
}
|
||||
}
|
||||
LOG.debug(" Lowest locality region index is " + lowestLocalityRegionIndex
|
||||
+ " and its region server contains " + regionsPerServer[serverIndex].length
|
||||
+ " regions");
|
||||
return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
float getLocalityOfRegion(int region, int server) {
|
||||
if (regionFinder != null) {
|
||||
HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
|
||||
return distribution.getBlockLocalityIndex(servers[server].getHostname());
|
||||
} else {
|
||||
return 0f;
|
||||
}
|
||||
}
|
||||
|
||||
int getLeastLoadedTopServerForRegion(int region) {
|
||||
if (regionFinder != null) {
|
||||
List<ServerName> topLocalServers = regionFinder.getTopBlockLocations(regions[region]);
|
||||
int leastLoadedServerIndex = -1;
|
||||
int load = Integer.MAX_VALUE;
|
||||
for (ServerName sn : topLocalServers) {
|
||||
int index = serversToIndex.get(sn);
|
||||
int tempLoad = regionsPerServer[index].length;
|
||||
if (tempLoad <= load) {
|
||||
leastLoadedServerIndex = index;
|
||||
load = tempLoad;
|
||||
}
|
||||
}
|
||||
return leastLoadedServerIndex;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
void calculateRegionServerLocalities() {
|
||||
if (regionFinder == null) {
|
||||
LOG.warn("Region location finder found null, skipping locality calculations.");
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < regionsPerServer.length; i++) {
|
||||
HDFSBlocksDistribution distribution = new HDFSBlocksDistribution();
|
||||
if (regionsPerServer[i].length > 0) {
|
||||
for (int j = 0; j < regionsPerServer[i].length; j++) {
|
||||
int regionIndex = regionsPerServer[i][j];
|
||||
distribution.add(regionFinder.getBlockDistribution(regions[regionIndex]));
|
||||
}
|
||||
} else {
|
||||
LOG.debug("Server " + servers[i].getHostname() + " had 0 regions.");
|
||||
}
|
||||
localityPerServer[i] = distribution.getBlockLocalityIndex(servers[i].getHostname());
|
||||
}
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION",
|
||||
justification="Not important but should be fixed")
|
||||
@Override
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -31,17 +30,18 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* This will find where data for a region is located in HDFS. It ranks
|
||||
|
@ -54,31 +54,27 @@ class RegionLocationFinder {
|
|||
private static final Log LOG = LogFactory.getLog(RegionLocationFinder.class);
|
||||
|
||||
private Configuration conf;
|
||||
private ClusterStatus status;
|
||||
private volatile ClusterStatus status;
|
||||
private MasterServices services;
|
||||
|
||||
private CacheLoader<HRegionInfo, List<ServerName>> loader =
|
||||
new CacheLoader<HRegionInfo, List<ServerName>>() {
|
||||
private CacheLoader<HRegionInfo, HDFSBlocksDistribution> loader =
|
||||
new CacheLoader<HRegionInfo, HDFSBlocksDistribution>() {
|
||||
|
||||
@Override
|
||||
public List<ServerName> load(HRegionInfo key) throws Exception {
|
||||
List<ServerName> servers = internalGetTopBlockLocation(key);
|
||||
if (servers == null) {
|
||||
return new LinkedList<ServerName>();
|
||||
}
|
||||
return servers;
|
||||
}
|
||||
};
|
||||
@Override
|
||||
public HDFSBlocksDistribution load(HRegionInfo key) throws Exception {
|
||||
return internalGetTopBlockLocation(key);
|
||||
}
|
||||
};
|
||||
|
||||
// The cache for where regions are located.
|
||||
private LoadingCache<HRegionInfo, List<ServerName>> cache = null;
|
||||
private LoadingCache<HRegionInfo, HDFSBlocksDistribution> cache = null;
|
||||
|
||||
/**
|
||||
* Create a cache for region to list of servers
|
||||
* @param mins Number of mins to cache
|
||||
* @return A new Cache.
|
||||
*/
|
||||
private LoadingCache<HRegionInfo, List<ServerName>> createCache(int mins) {
|
||||
private LoadingCache<HRegionInfo, HDFSBlocksDistribution> createCache(int mins) {
|
||||
return CacheBuilder.newBuilder().expireAfterAccess(mins, TimeUnit.MINUTES).build(loader);
|
||||
}
|
||||
|
||||
|
@ -100,14 +96,9 @@ class RegionLocationFinder {
|
|||
}
|
||||
|
||||
protected List<ServerName> getTopBlockLocations(HRegionInfo region) {
|
||||
List<ServerName> servers = null;
|
||||
try {
|
||||
servers = cache.get(region);
|
||||
} catch (ExecutionException ex) {
|
||||
servers = new LinkedList<ServerName>();
|
||||
}
|
||||
return servers;
|
||||
|
||||
HDFSBlocksDistribution blocksDistribution = getBlockDistribution(region);
|
||||
List<String> topHosts = blocksDistribution.getTopHosts();
|
||||
return mapHostNameToServerName(topHosts);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -119,22 +110,20 @@ class RegionLocationFinder {
|
|||
* @param region region
|
||||
* @return ordered list of hosts holding blocks of the specified region
|
||||
*/
|
||||
protected List<ServerName> internalGetTopBlockLocation(HRegionInfo region) {
|
||||
List<ServerName> topServerNames = null;
|
||||
protected HDFSBlocksDistribution internalGetTopBlockLocation(HRegionInfo region) {
|
||||
try {
|
||||
HTableDescriptor tableDescriptor = getTableDescriptor(region.getTable());
|
||||
if (tableDescriptor != null) {
|
||||
HDFSBlocksDistribution blocksDistribution =
|
||||
HRegion.computeHDFSBlocksDistribution(getConf(), tableDescriptor, region);
|
||||
List<String> topHosts = blocksDistribution.getTopHosts();
|
||||
topServerNames = mapHostNameToServerName(topHosts);
|
||||
return blocksDistribution;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.debug("IOException during HDFSBlocksDistribution computation. for " + "region = "
|
||||
LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = "
|
||||
+ region.getEncodedName(), ioe);
|
||||
}
|
||||
|
||||
return topServerNames;
|
||||
return new HDFSBlocksDistribution();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -167,7 +156,10 @@ class RegionLocationFinder {
|
|||
*/
|
||||
protected List<ServerName> mapHostNameToServerName(List<String> hosts) {
|
||||
if (hosts == null || status == null) {
|
||||
return null;
|
||||
if (hosts == null) {
|
||||
LOG.warn("RegionLocationFinder top hosts is null");
|
||||
}
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
List<ServerName> topServerNames = new ArrayList<ServerName>();
|
||||
|
@ -189,4 +181,25 @@ class RegionLocationFinder {
|
|||
}
|
||||
return topServerNames;
|
||||
}
|
||||
|
||||
public HDFSBlocksDistribution getBlockDistribution(HRegionInfo hri) {
|
||||
HDFSBlocksDistribution blockDistbn = null;
|
||||
try {
|
||||
if (cache.asMap().containsKey(hri)) {
|
||||
blockDistbn = cache.get(hri);
|
||||
return blockDistbn;
|
||||
} else {
|
||||
LOG.debug("HDFSBlocksDistribution not found in cache for region "
|
||||
+ hri.getRegionNameAsString());
|
||||
blockDistbn = internalGetTopBlockLocation(hri);
|
||||
cache.put(hri, blockDistbn);
|
||||
return blockDistbn;
|
||||
}
|
||||
} catch (ExecutionException e) {
|
||||
LOG.warn("Error while fetching cache entry ", e);
|
||||
blockDistbn = internalGetTopBlockLocation(hri);
|
||||
cache.put(hri, blockDistbn);
|
||||
return blockDistbn;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -355,7 +355,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
long endTime = EnvironmentEdgeManager.currentTime();
|
||||
|
||||
metricsBalancer.balanceCluster(endTime - startTime);
|
||||
|
@ -699,46 +698,47 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
@Override
|
||||
Cluster.Action generate(Cluster cluster) {
|
||||
if (this.masterServices == null) {
|
||||
return Cluster.NullAction;
|
||||
int thisServer = pickRandomServer(cluster);
|
||||
// Pick the other server
|
||||
int otherServer = pickOtherRandomServer(cluster, thisServer);
|
||||
return pickRandomRegions(cluster, thisServer, otherServer);
|
||||
}
|
||||
// Pick a random region server
|
||||
int thisServer = pickRandomServer(cluster);
|
||||
|
||||
// Pick a random region on this server
|
||||
int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f);
|
||||
cluster.calculateRegionServerLocalities();
|
||||
// Pick server with lowest locality
|
||||
int thisServer = pickLowestLocalityServer(cluster);
|
||||
int thisRegion;
|
||||
if (thisServer == -1) {
|
||||
LOG.warn("Could not pick lowest locality region server");
|
||||
return Cluster.NullAction;
|
||||
} else {
|
||||
// Pick lowest locality region on this server
|
||||
thisRegion = pickLowestLocalityRegionOnServer(cluster, thisServer);
|
||||
}
|
||||
|
||||
if (thisRegion == -1) {
|
||||
return Cluster.NullAction;
|
||||
}
|
||||
|
||||
// Pick the server with the highest locality
|
||||
int otherServer = pickHighestLocalityServer(cluster, thisServer, thisRegion);
|
||||
// Pick the least loaded server with good locality for the region
|
||||
int otherServer = cluster.getLeastLoadedTopServerForRegion(thisRegion);
|
||||
|
||||
if (otherServer == -1) {
|
||||
return Cluster.NullAction;
|
||||
}
|
||||
|
||||
// pick an region on the other server to potentially swap
|
||||
int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f);
|
||||
// Let the candidate region be moved to its highest locality server.
|
||||
int otherRegion = -1;
|
||||
|
||||
return getAction(thisServer, thisRegion, otherServer, otherRegion);
|
||||
}
|
||||
|
||||
private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) {
|
||||
int[] regionLocations = cluster.regionLocations[thisRegion];
|
||||
private int pickLowestLocalityServer(Cluster cluster) {
|
||||
return cluster.getLowestLocalityRegionServer();
|
||||
}
|
||||
|
||||
if (regionLocations == null || regionLocations.length <= 1) {
|
||||
return pickOtherRandomServer(cluster, thisServer);
|
||||
}
|
||||
|
||||
for (int loc : regionLocations) {
|
||||
if (loc >= 0 && loc != thisServer) { // find the first suitable server
|
||||
return loc;
|
||||
}
|
||||
}
|
||||
|
||||
// no location found
|
||||
return pickOtherRandomServer(cluster, thisServer);
|
||||
private int pickLowestLocalityRegionOnServer(Cluster cluster, int server) {
|
||||
return cluster.getLowestLocalityRegionOnServer(server);
|
||||
}
|
||||
|
||||
void setServices(MasterServices services) {
|
||||
|
@ -1182,11 +1182,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
|
||||
if (index < 0) {
|
||||
if (regionLocations.length > 0) {
|
||||
cost += 1;
|
||||
}
|
||||
cost += 1;
|
||||
} else {
|
||||
cost += (double) index / (double) regionLocations.length;
|
||||
cost += (1 - cluster.getLocalityOfRegion(i, index));
|
||||
}
|
||||
}
|
||||
return scale(0, max, cost);
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -33,7 +35,10 @@ import java.util.SortedSet;
|
|||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -43,6 +48,7 @@ import org.apache.hadoop.hbase.master.RegionPlan;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
/**
|
||||
* Class used to be the base of unit tests on load balancers. It gives helper
|
||||
|
@ -51,9 +57,97 @@ import org.junit.Assert;
|
|||
*
|
||||
*/
|
||||
public class BalancerTestBase {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(BalancerTestBase.class);
|
||||
protected static Random rand = new Random();
|
||||
static int regionId = 0;
|
||||
protected static Configuration conf;
|
||||
protected static StochasticLoadBalancer loadBalancer;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeAllTests() throws Exception {
|
||||
conf = HBaseConfiguration.create();
|
||||
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f);
|
||||
conf.setFloat("hbase.regions.slop", 0.0f);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
|
||||
loadBalancer = new StochasticLoadBalancer();
|
||||
loadBalancer.setConf(conf);
|
||||
}
|
||||
|
||||
protected int[] largeCluster = new int[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 56 };
|
||||
|
||||
// int[testnum][servernumber] -> numregions
|
||||
protected int[][] clusterStateMocks = new int[][]{
|
||||
// 1 node
|
||||
new int[]{0},
|
||||
new int[]{1},
|
||||
new int[]{10},
|
||||
// 2 node
|
||||
new int[]{0, 0},
|
||||
new int[]{2, 0},
|
||||
new int[]{2, 1},
|
||||
new int[]{2, 2},
|
||||
new int[]{2, 3},
|
||||
new int[]{2, 4},
|
||||
new int[]{1, 1},
|
||||
new int[]{0, 1},
|
||||
new int[]{10, 1},
|
||||
new int[]{514, 1432},
|
||||
new int[]{48, 53},
|
||||
// 3 node
|
||||
new int[]{0, 1, 2},
|
||||
new int[]{1, 2, 3},
|
||||
new int[]{0, 2, 2},
|
||||
new int[]{0, 3, 0},
|
||||
new int[]{0, 4, 0},
|
||||
new int[]{20, 20, 0},
|
||||
// 4 node
|
||||
new int[]{0, 1, 2, 3},
|
||||
new int[]{4, 0, 0, 0},
|
||||
new int[]{5, 0, 0, 0},
|
||||
new int[]{6, 6, 0, 0},
|
||||
new int[]{6, 2, 0, 0},
|
||||
new int[]{6, 1, 0, 0},
|
||||
new int[]{6, 0, 0, 0},
|
||||
new int[]{4, 4, 4, 7},
|
||||
new int[]{4, 4, 4, 8},
|
||||
new int[]{0, 0, 0, 7},
|
||||
// 5 node
|
||||
new int[]{1, 1, 1, 1, 4},
|
||||
// more nodes
|
||||
new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
|
||||
new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 10},
|
||||
new int[]{6, 6, 5, 6, 6, 6, 6, 6, 6, 1},
|
||||
new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 54},
|
||||
new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 55},
|
||||
new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 56},
|
||||
new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 16},
|
||||
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 8},
|
||||
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 9},
|
||||
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 10},
|
||||
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 123},
|
||||
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 155},
|
||||
new int[]{10, 7, 12, 8, 11, 10, 9, 14},
|
||||
new int[]{13, 14, 6, 10, 10, 10, 8, 10},
|
||||
new int[]{130, 14, 60, 10, 100, 10, 80, 10},
|
||||
new int[]{130, 140, 60, 100, 100, 100, 80, 100},
|
||||
new int[]{0, 5 , 5, 5, 5},
|
||||
largeCluster,
|
||||
|
||||
};
|
||||
|
||||
// This class is introduced because IP to rack resolution can be lengthy.
|
||||
public static class MockMapping implements DNSToSwitchMapping {
|
||||
|
@ -317,4 +411,74 @@ public class BalancerTestBase {
|
|||
this.serverQueue.addAll(servers);
|
||||
}
|
||||
|
||||
protected void testWithCluster(int numNodes,
|
||||
int numRegions,
|
||||
int numRegionsPerServer,
|
||||
int replication,
|
||||
int numTables,
|
||||
boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) {
|
||||
Map<ServerName, List<HRegionInfo>> serverMap =
|
||||
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
|
||||
testWithCluster(serverMap, null, assertFullyBalanced, assertFullyBalancedForReplicas);
|
||||
}
|
||||
|
||||
protected void testWithCluster(Map<ServerName, List<HRegionInfo>> serverMap,
|
||||
RackManager rackManager, boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) {
|
||||
List<ServerAndLoad> list = convertToList(serverMap);
|
||||
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
|
||||
|
||||
loadBalancer.setRackManager(rackManager);
|
||||
// Run the balancer.
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap);
|
||||
assertNotNull(plans);
|
||||
|
||||
// Check to see that this actually got to a stable place.
|
||||
if (assertFullyBalanced || assertFullyBalancedForReplicas) {
|
||||
// Apply the plan to the mock cluster.
|
||||
List<ServerAndLoad> balancedCluster = reconcile(list, plans, serverMap);
|
||||
|
||||
// Print out the cluster loads to make debugging easier.
|
||||
LOG.info("Mock Balance : " + printMock(balancedCluster));
|
||||
|
||||
if (assertFullyBalanced) {
|
||||
assertClusterAsBalanced(balancedCluster);
|
||||
List<RegionPlan> secondPlans = loadBalancer.balanceCluster(serverMap);
|
||||
assertNull(secondPlans);
|
||||
}
|
||||
|
||||
if (assertFullyBalancedForReplicas) {
|
||||
assertRegionReplicaPlacement(serverMap, rackManager);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Map<ServerName, List<HRegionInfo>> createServerMap(int numNodes,
|
||||
int numRegions,
|
||||
int numRegionsPerServer,
|
||||
int replication,
|
||||
int numTables) {
|
||||
//construct a cluster of numNodes, having a total of numRegions. Each RS will hold
|
||||
//numRegionsPerServer many regions except for the last one, which will host all the
|
||||
//remaining regions
|
||||
int[] cluster = new int[numNodes];
|
||||
for (int i =0; i < numNodes; i++) {
|
||||
cluster[i] = numRegionsPerServer;
|
||||
}
|
||||
cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer);
|
||||
Map<ServerName, List<HRegionInfo>> clusterState = mockClusterServers(cluster, numTables);
|
||||
if (replication > 0) {
|
||||
// replicate the regions to the same servers
|
||||
for (List<HRegionInfo> regions : clusterState.values()) {
|
||||
int length = regions.size();
|
||||
for (int i = 0; i < length; i++) {
|
||||
for (int r = 1; r < replication ; r++) {
|
||||
regions.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(i), r));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return clusterState;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -50,101 +50,13 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
|
|||
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({FlakeyTests.class, MediumTests.class})
|
||||
public class TestStochasticLoadBalancer extends BalancerTestBase {
|
||||
public static final String REGION_KEY = "testRegion";
|
||||
static StochasticLoadBalancer loadBalancer;
|
||||
private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer.class);
|
||||
static Configuration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeAllTests() throws Exception {
|
||||
conf = HBaseConfiguration.create();
|
||||
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f);
|
||||
conf.setFloat("hbase.regions.slop", 0.0f);
|
||||
loadBalancer = new StochasticLoadBalancer();
|
||||
loadBalancer.setConf(conf);
|
||||
}
|
||||
|
||||
int[] largeCluster = new int[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 56 };
|
||||
|
||||
// int[testnum][servernumber] -> numregions
|
||||
int[][] clusterStateMocks = new int[][]{
|
||||
// 1 node
|
||||
new int[]{0},
|
||||
new int[]{1},
|
||||
new int[]{10},
|
||||
// 2 node
|
||||
new int[]{0, 0},
|
||||
new int[]{2, 0},
|
||||
new int[]{2, 1},
|
||||
new int[]{2, 2},
|
||||
new int[]{2, 3},
|
||||
new int[]{2, 4},
|
||||
new int[]{1, 1},
|
||||
new int[]{0, 1},
|
||||
new int[]{10, 1},
|
||||
new int[]{514, 1432},
|
||||
new int[]{48, 53},
|
||||
// 3 node
|
||||
new int[]{0, 1, 2},
|
||||
new int[]{1, 2, 3},
|
||||
new int[]{0, 2, 2},
|
||||
new int[]{0, 3, 0},
|
||||
new int[]{0, 4, 0},
|
||||
new int[]{20, 20, 0},
|
||||
// 4 node
|
||||
new int[]{0, 1, 2, 3},
|
||||
new int[]{4, 0, 0, 0},
|
||||
new int[]{5, 0, 0, 0},
|
||||
new int[]{6, 6, 0, 0},
|
||||
new int[]{6, 2, 0, 0},
|
||||
new int[]{6, 1, 0, 0},
|
||||
new int[]{6, 0, 0, 0},
|
||||
new int[]{4, 4, 4, 7},
|
||||
new int[]{4, 4, 4, 8},
|
||||
new int[]{0, 0, 0, 7},
|
||||
// 5 node
|
||||
new int[]{1, 1, 1, 1, 4},
|
||||
// more nodes
|
||||
new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
|
||||
new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 10},
|
||||
new int[]{6, 6, 5, 6, 6, 6, 6, 6, 6, 1},
|
||||
new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 54},
|
||||
new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 55},
|
||||
new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 56},
|
||||
new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 16},
|
||||
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 8},
|
||||
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 9},
|
||||
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 10},
|
||||
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 123},
|
||||
new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 155},
|
||||
new int[]{10, 7, 12, 8, 11, 10, 9, 14},
|
||||
new int[]{13, 14, 6, 10, 10, 10, 8, 10},
|
||||
new int[]{130, 14, 60, 10, 100, 10, 80, 10},
|
||||
new int[]{130, 140, 60, 100, 100, 100, 80, 100},
|
||||
largeCluster,
|
||||
|
||||
};
|
||||
|
||||
@Test
|
||||
public void testKeepRegionLoad() throws Exception {
|
||||
|
@ -579,74 +491,4 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
testWithCluster(serverMap, rm, false, true);
|
||||
}
|
||||
|
||||
protected void testWithCluster(int numNodes,
|
||||
int numRegions,
|
||||
int numRegionsPerServer,
|
||||
int replication,
|
||||
int numTables,
|
||||
boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) {
|
||||
Map<ServerName, List<HRegionInfo>> serverMap =
|
||||
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
|
||||
testWithCluster(serverMap, null, assertFullyBalanced, assertFullyBalancedForReplicas);
|
||||
}
|
||||
|
||||
|
||||
protected void testWithCluster(Map<ServerName, List<HRegionInfo>> serverMap,
|
||||
RackManager rackManager, boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) {
|
||||
List<ServerAndLoad> list = convertToList(serverMap);
|
||||
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
|
||||
|
||||
loadBalancer.setRackManager(rackManager);
|
||||
// Run the balancer.
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap);
|
||||
assertNotNull(plans);
|
||||
|
||||
// Check to see that this actually got to a stable place.
|
||||
if (assertFullyBalanced || assertFullyBalancedForReplicas) {
|
||||
// Apply the plan to the mock cluster.
|
||||
List<ServerAndLoad> balancedCluster = reconcile(list, plans, serverMap);
|
||||
|
||||
// Print out the cluster loads to make debugging easier.
|
||||
LOG.info("Mock Balance : " + printMock(balancedCluster));
|
||||
|
||||
if (assertFullyBalanced) {
|
||||
assertClusterAsBalanced(balancedCluster);
|
||||
List<RegionPlan> secondPlans = loadBalancer.balanceCluster(serverMap);
|
||||
assertNull(secondPlans);
|
||||
}
|
||||
|
||||
if (assertFullyBalancedForReplicas) {
|
||||
assertRegionReplicaPlacement(serverMap, rackManager);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Map<ServerName, List<HRegionInfo>> createServerMap(int numNodes,
|
||||
int numRegions,
|
||||
int numRegionsPerServer,
|
||||
int replication,
|
||||
int numTables) {
|
||||
//construct a cluster of numNodes, having a total of numRegions. Each RS will hold
|
||||
//numRegionsPerServer many regions except for the last one, which will host all the
|
||||
//remaining regions
|
||||
int[] cluster = new int[numNodes];
|
||||
for (int i =0; i < numNodes; i++) {
|
||||
cluster[i] = numRegionsPerServer;
|
||||
}
|
||||
cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer);
|
||||
Map<ServerName, List<HRegionInfo>> clusterState = mockClusterServers(cluster, numTables);
|
||||
if (replication > 0) {
|
||||
// replicate the regions to the same servers
|
||||
for (List<HRegionInfo> regions : clusterState.values()) {
|
||||
int length = regions.size();
|
||||
for (int i = 0; i < length; i++) {
|
||||
for (int r = 1; r < replication ; r++) {
|
||||
regions.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(i), r));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return clusterState;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({FlakeyTests.class, MediumTests.class})
|
||||
public class TestStochasticLoadBalancer2 extends TestStochasticLoadBalancer {
|
||||
public class TestStochasticLoadBalancer2 extends BalancerTestBase {
|
||||
private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer2.class);
|
||||
|
||||
@Test (timeout = 800000)
|
||||
|
@ -33,6 +33,7 @@ public class TestStochasticLoadBalancer2 extends TestStochasticLoadBalancer {
|
|||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
|
||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
|
||||
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
|
||||
TestStochasticLoadBalancer.loadBalancer.setConf(conf);
|
||||
int numNodes = 200;
|
||||
int numRegions = 40 * 200;
|
||||
|
@ -47,6 +48,7 @@ public class TestStochasticLoadBalancer2 extends TestStochasticLoadBalancer {
|
|||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
|
||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
|
||||
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
|
||||
loadBalancer.setConf(conf);
|
||||
int numNodes = 1000;
|
||||
int numRegions = 20 * numNodes; // 20 * replication regions per RS
|
||||
|
@ -60,6 +62,7 @@ public class TestStochasticLoadBalancer2 extends TestStochasticLoadBalancer {
|
|||
public void testRegionReplicasOnMidClusterHighReplication() {
|
||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L);
|
||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
|
||||
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
||||
loadBalancer.setConf(conf);
|
||||
int numNodes = 80;
|
||||
|
@ -74,6 +77,7 @@ public class TestStochasticLoadBalancer2 extends TestStochasticLoadBalancer {
|
|||
public void testRegionReplicationOnMidClusterReplicationGreaterThanNumNodes() {
|
||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
|
||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
|
||||
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
||||
loadBalancer.setConf(conf);
|
||||
int numNodes = 40;
|
||||
|
|
Loading…
Reference in New Issue