diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java index c69f17c5263..dbe23ef1835 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java @@ -179,6 +179,7 @@ class BalancerClusterState { serversPerHostList.get(hostIndex).add(serverIndex); String rack = this.rackManager.getRack(sn); + if (!racksToIndex.containsKey(rack)) { racksToIndex.put(rack, numRacks++); serversPerRackList.add(new ArrayList<>()); @@ -187,6 +188,7 @@ class BalancerClusterState { serversPerRackList.get(rackIndex).add(serverIndex); } + LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex); // Count how many regions there are. for (Map.Entry> entry : clusterState.entrySet()) { numRegions += entry.getValue().size(); @@ -285,6 +287,7 @@ class BalancerClusterState { serversPerHost[i] = new int[serversPerHostList.get(i).size()]; for (int j = 0; j < serversPerHost[i].length; j++) { serversPerHost[i][j] = serversPerHostList.get(i).get(j); + LOG.debug("server {} is on host {}",serversPerHostList.get(i).get(j), i); } if (serversPerHost[i].length > 1) { multiServersPerHost = true; @@ -295,6 +298,7 @@ class BalancerClusterState { serversPerRack[i] = new int[serversPerRackList.get(i).size()]; for (int j = 0; j < serversPerRack[i].length; j++) { serversPerRack[i][j] = serversPerRackList.get(i).get(j); + LOG.info("server {} is on rack {}",serversPerRackList.get(i).get(j), i); } } @@ -792,6 +796,10 @@ class BalancerClusterState { private Comparator numRegionsComparator = Comparator.comparingInt(this::getNumRegions); + public Comparator getNumRegionsComparator() { + return numRegionsComparator; + } + int getLowestLocalityRegionOnServer(int serverIndex) { if (regionFinder != null) { float lowestLocality = 1.0f; diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java index 595e1857e25..8604f4a47f7 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.balancer; +import java.util.concurrent.ThreadLocalRandom; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private @@ -34,27 +35,53 @@ class LoadCandidateGenerator extends CandidateGenerator { private int pickLeastLoadedServer(final BalancerClusterState cluster, int thisServer) { Integer[] servers = cluster.serverIndicesSortedByRegionCount; - int index = 0; - while (servers[index] == null || servers[index] == thisServer) { - index++; - if (index == servers.length) { - return -1; + int selectedIndex = -1; + double currentLargestRandom = -1; + for (int i = 0; i < servers.length; i++) { + if (servers[i] == null || servers[i] == thisServer) { + continue; + } + if (selectedIndex != -1 + && cluster.getNumRegionsComparator().compare(servers[i], servers[selectedIndex]) != 0) { + // Exhausted servers of the same region count + break; + } + // we don't know how many servers have the same region count, we will randomly select one + // using a simplified inline reservoir sampling by assignmening a random number to stream + // data and choose the greatest one. (http://gregable.com/2007/10/reservoir-sampling.html) + double currentRandom = ThreadLocalRandom.current().nextDouble(); + if (currentRandom > currentLargestRandom) { + selectedIndex = i; + currentLargestRandom = currentRandom; } } - return servers[index]; + return selectedIndex == -1 ? -1 : servers[selectedIndex]; } private int pickMostLoadedServer(final BalancerClusterState cluster, int thisServer) { Integer[] servers = cluster.serverIndicesSortedByRegionCount; - int index = servers.length - 1; - while (servers[index] == null || servers[index] == thisServer) { - index--; - if (index < 0) { - return -1; + int selectedIndex = -1; + double currentLargestRandom = -1; + for (int i = servers.length - 1; i >= 0; i--) { + if (servers[i] == null || servers[i] == thisServer) { + continue; + } + if (selectedIndex != -1 && cluster.getNumRegionsComparator().compare(servers[i], + servers[selectedIndex]) != 0) { + // Exhausted servers of the same region count + break; + } + // we don't know how many servers have the same region count, we will randomly select one + // using a simplified inline reservoir sampling by assignmening a random number to stream + // data and choose the greatest one. (http://gregable.com/2007/10/reservoir-sampling.html) + double currentRandom = ThreadLocalRandom.current().nextDouble(); + if (currentRandom > currentLargestRandom) { + selectedIndex = i; + currentLargestRandom = currentRandom; } } - return servers[index]; + return selectedIndex == -1? -1 : servers[selectedIndex]; } } diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 11eb85bce02..a796085145f 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -345,8 +345,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { } if (idleRegionServerExist(cluster)){ - LOG.info("Running balancer because at least one server hosts replicas of the same region." + - "regionReplicaRackCostFunction={}", regionReplicaRackCostFunction.cost()); LOG.info("Running balancer because cluster has idle server(s)."+ " function cost={}", functionCost()); return true; @@ -510,9 +508,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { LOG.info("Finished computing new moving plan. Computation took {} ms" + " to try {} different iterations. Found a solution that moves " + "{} regions; Going from a computed imbalance of {}" + - " to a new imbalance of {}. ", + " to a new imbalance of {}. funtionCost={}", endTime - startTime, step, plans.size(), - initCost / sumMultiplier, currentCost / sumMultiplier); + initCost / sumMultiplier, currentCost / sumMultiplier, functionCost()); sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step); return plans; } diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/StochasticBalancerTestBase.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/StochasticBalancerTestBase.java index 56d5f10f6b7..9237968dff6 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/StochasticBalancerTestBase.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/StochasticBalancerTestBase.java @@ -80,7 +80,7 @@ public class StochasticBalancerTestBase extends BalancerTestBase { List balancedCluster = reconcile(list, plans, serverMap); // Print out the cluster loads to make debugging easier. - LOG.info("Mock Balance : " + printMock(balancedCluster)); + LOG.info("Mock after Balance : " + printMock(balancedCluster)); if (assertFullyBalanced) { assertClusterAsBalanced(balancedCluster); @@ -95,4 +95,40 @@ public class StochasticBalancerTestBase extends BalancerTestBase { } } } + + protected void testWithClusterWithIteration(Map> serverMap, + RackManager rackManager, boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) { + List list = convertToList(serverMap); + LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list)); + + loadBalancer.setRackManager(rackManager); + // Run the balancer. + Map>> LoadOfAllTable = (Map) mockClusterServersWithTables(serverMap); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + assertNotNull("Initial cluster balance should produce plans.", plans); + + List balancedCluster = null; + // Run through iteration until done. Otherwise will be killed as test time out + while (plans != null && (assertFullyBalanced || assertFullyBalancedForReplicas)) { + // Apply the plan to the mock cluster. + balancedCluster = reconcile(list, plans, serverMap); + + // Print out the cluster loads to make debugging easier. + LOG.info("Mock after balance: " + printMock(balancedCluster)); + + LoadOfAllTable = (Map) mockClusterServersWithTables(serverMap); + plans = loadBalancer.balanceCluster(LoadOfAllTable); + } + + // Print out the cluster loads to make debugging easier. + LOG.info("Mock Final balance: " + printMock(balancedCluster)); + + if (assertFullyBalanced) { + assertNull("Given a requirement to be fully balanced, second attempt at plans should " + + "produce none.", plans); + } + if (assertFullyBalancedForReplicas) { + assertRegionReplicaPlacement(serverMap, rackManager); + } + } } diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java index 3b2c847c545..0e5ecd33f9f 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.master.balancer; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -37,25 +38,36 @@ public class TestStochasticLoadBalancerRegionReplicaWithRacks extends Stochastic HBaseClassTestRule.forClass(TestStochasticLoadBalancerRegionReplicaWithRacks.class); private static class ForTestRackManager extends RackManager { + int numRacks; + Map serverIndexes = new HashMap(); + int numServers = 0; public ForTestRackManager(int numRacks) { this.numRacks = numRacks; } + @Override public String getRack(ServerName server) { - return "rack_" + (server.hashCode() % numRacks); + String key = server.getServerName(); + if (!serverIndexes.containsKey(key)) { + serverIndexes.put(key, numServers++); + } + return "rack_" + serverIndexes.get(key) % numRacks; } } @Test public void testRegionReplicationOnMidClusterWithRacks() { - conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 10000000L); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 100000000L); + conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true); conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec + // for full balance +// conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.001f); loadBalancer.onConfigurationChange(conf); - int numNodes = 4; + int numNodes = 5; int numRegions = numNodes * 1; int replication = 3; // 3 replicas per region int numRegionsPerServer = 1; @@ -65,6 +77,26 @@ public class TestStochasticLoadBalancerRegionReplicaWithRacks extends Stochastic createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables); RackManager rm = new ForTestRackManager(numRacks); - testWithCluster(serverMap, rm, false, true); + testWithClusterWithIteration(serverMap, rm, true, true); + } + + @Test + public void testRegionReplicationOnLargeClusterWithRacks() { + conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", false); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 5000L); + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10 * 1000); // 10 sec + loadBalancer.onConfigurationChange(conf); + int numNodes = 100; + int numRegions = numNodes * 30; + int replication = 3; // 3 replicas per region + int numRegionsPerServer = 28; + int numTables = 1; + int numRacks = 4; // all replicas should be on a different rack + Map> serverMap = + createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables); + RackManager rm = new ForTestRackManager(numRacks); + + testWithClusterWithIteration(serverMap, rm, true, true); } }