HBASE-26309 Balancer tends to move regions to the server at the end o… (#3812)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
cc701e2c28
commit
0b7630bc1f
|
@ -338,6 +338,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
racks[entry.getValue()] = entry.getKey();
|
||||
}
|
||||
|
||||
LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex);
|
||||
for (Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
|
||||
int serverIndex = serversToIndex.get(entry.getKey().getAddress());
|
||||
regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
|
||||
|
@ -365,6 +366,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
serversPerHost[i] = new int[serversPerHostList.get(i).size()];
|
||||
for (int j = 0; j < serversPerHost[i].length; j++) {
|
||||
serversPerHost[i][j] = serversPerHostList.get(i).get(j);
|
||||
LOG.debug("server {} is on host {}",serversPerHostList.get(i).get(j), i);
|
||||
}
|
||||
if (serversPerHost[i].length > 1) {
|
||||
multiServersPerHost = true;
|
||||
|
@ -375,6 +377,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
serversPerRack[i] = new int[serversPerRackList.get(i).size()];
|
||||
for (int j = 0; j < serversPerRack[i].length; j++) {
|
||||
serversPerRack[i][j] = serversPerRackList.get(i).get(j);
|
||||
LOG.info("server {} is on rack {}",serversPerRackList.get(i).get(j), i);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -960,6 +963,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
|
||||
private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
|
||||
|
||||
public Comparator<Integer> getNumRegionsComparator() {
|
||||
return numRegionsComparator;
|
||||
}
|
||||
|
||||
int getLowestLocalityRegionOnServer(int serverIndex) {
|
||||
if (regionFinder != null) {
|
||||
float lowestLocality = 1.0f;
|
||||
|
|
|
@ -18,13 +18,13 @@
|
|||
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
class LoadCandidateGenerator extends CandidateGenerator {
|
||||
|
||||
@Override
|
||||
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
|
||||
@Override BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
|
||||
cluster.sortServersByRegionCount();
|
||||
int thisServer = pickMostLoadedServer(cluster, -1);
|
||||
int otherServer = pickLeastLoadedServer(cluster, thisServer);
|
||||
|
@ -34,27 +34,53 @@ class LoadCandidateGenerator extends CandidateGenerator {
|
|||
private int pickLeastLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) {
|
||||
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
|
||||
|
||||
int index = 0;
|
||||
while (servers[index] == null || servers[index] == thisServer) {
|
||||
index++;
|
||||
if (index == servers.length) {
|
||||
return -1;
|
||||
int selectedIndex = -1;
|
||||
double currentLargestRandom = -1;
|
||||
for (int i = 0; i < servers.length; i++) {
|
||||
if (servers[i] == null || servers[i] == thisServer) {
|
||||
continue;
|
||||
}
|
||||
if (selectedIndex != -1 && cluster.getNumRegionsComparator().compare(servers[i],
|
||||
servers[selectedIndex]) != 0) {
|
||||
// Exhausted servers of the same region count
|
||||
break;
|
||||
}
|
||||
// we don't know how many servers have the same region count, we will randomly select one
|
||||
// using a simplified inline reservoir sampling by assignmening a random number to stream
|
||||
// data and choose the greatest one. (http://gregable.com/2007/10/reservoir-sampling.html)
|
||||
double currentRandom = ThreadLocalRandom.current().nextDouble();
|
||||
if (currentRandom > currentLargestRandom) {
|
||||
selectedIndex = i;
|
||||
currentLargestRandom = currentRandom;
|
||||
}
|
||||
}
|
||||
return servers[index];
|
||||
return selectedIndex == -1 ? -1 : servers[selectedIndex];
|
||||
}
|
||||
|
||||
private int pickMostLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) {
|
||||
Integer[] servers = cluster.serverIndicesSortedByRegionCount;
|
||||
|
||||
int index = servers.length - 1;
|
||||
while (servers[index] == null || servers[index] == thisServer) {
|
||||
index--;
|
||||
if (index < 0) {
|
||||
return -1;
|
||||
int selectedIndex = -1;
|
||||
double currentLargestRandom = -1;
|
||||
for (int i = servers.length - 1; i >= 0; i--) {
|
||||
if (servers[i] == null || servers[i] == thisServer) {
|
||||
continue;
|
||||
}
|
||||
if (selectedIndex != -1
|
||||
&& cluster.getNumRegionsComparator().compare(servers[i], servers[selectedIndex]) != 0) {
|
||||
// Exhausted servers of the same region count
|
||||
break;
|
||||
}
|
||||
// we don't know how many servers have the same region count, we will randomly select one
|
||||
// using a simplified inline reservoir sampling by assignmening a random number to stream
|
||||
// data and choose the greatest one. (http://gregable.com/2007/10/reservoir-sampling.html)
|
||||
double currentRandom = ThreadLocalRandom.current().nextDouble();
|
||||
if (currentRandom > currentLargestRandom) {
|
||||
selectedIndex = i;
|
||||
currentLargestRandom = currentRandom;
|
||||
}
|
||||
}
|
||||
return servers[index];
|
||||
return selectedIndex == -1 ? -1 : servers[selectedIndex];
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -510,9 +510,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
plans = createRegionPlans(cluster);
|
||||
LOG.info("Finished computing new moving plan. Computation took {} ms" +
|
||||
" to try {} different iterations. Found a solution that moves " +
|
||||
"{} regions; Going from a computed imbalance of {}" + " to a new imbalance of {}. ",
|
||||
endTime - startTime, step, plans.size(), initCost / sumMultiplier,
|
||||
currentCost / sumMultiplier);
|
||||
"{} regions; Going from a computed imbalance of {}" +
|
||||
" to a new imbalance of {}. funtionCost={}",
|
||||
endTime - startTime, step, plans.size(),
|
||||
initCost / sumMultiplier, currentCost / sumMultiplier, functionCost());
|
||||
|
||||
sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
|
||||
return plans;
|
||||
|
|
|
@ -571,7 +571,7 @@ public class BalancerTestBase {
|
|||
List<ServerAndLoad> balancedCluster = reconcile(list, plans, serverMap);
|
||||
|
||||
// Print out the cluster loads to make debugging easier.
|
||||
LOG.info("Mock Balance : " + printMock(balancedCluster));
|
||||
LOG.info("Mock after Balance : " + printMock(balancedCluster));
|
||||
|
||||
if (assertFullyBalanced) {
|
||||
assertClusterAsBalanced(balancedCluster);
|
||||
|
@ -587,6 +587,43 @@ public class BalancerTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
protected void testWithClusterWithIteration(Map<ServerName, List<RegionInfo>> serverMap,
|
||||
RackManager rackManager, boolean assertFullyBalanced, boolean assertFullyBalancedForReplicas) {
|
||||
List<ServerAndLoad> list = convertToList(serverMap);
|
||||
LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
|
||||
|
||||
loadBalancer.setRackManager(rackManager);
|
||||
// Run the balancer.
|
||||
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
|
||||
(Map) mockClusterServersWithTables(serverMap);
|
||||
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
|
||||
assertNotNull("Initial cluster balance should produce plans.", plans);
|
||||
|
||||
List<ServerAndLoad> balancedCluster = null;
|
||||
// Run through iteration until done. Otherwise will be killed as test time out
|
||||
while (plans != null && (assertFullyBalanced || assertFullyBalancedForReplicas)) {
|
||||
// Apply the plan to the mock cluster.
|
||||
balancedCluster = reconcile(list, plans, serverMap);
|
||||
|
||||
// Print out the cluster loads to make debugging easier.
|
||||
LOG.info("Mock after balance: " + printMock(balancedCluster));
|
||||
|
||||
LoadOfAllTable = (Map) mockClusterServersWithTables(serverMap);
|
||||
plans = loadBalancer.balanceCluster(LoadOfAllTable);
|
||||
}
|
||||
|
||||
// Print out the cluster loads to make debugging easier.
|
||||
LOG.info("Mock Final balance: " + printMock(balancedCluster));
|
||||
|
||||
if (assertFullyBalanced) {
|
||||
assertNull("Given a requirement to be fully balanced, second attempt at plans should " +
|
||||
"produce none.", plans);
|
||||
}
|
||||
if (assertFullyBalancedForReplicas) {
|
||||
assertRegionReplicaPlacement(serverMap, rackManager);
|
||||
}
|
||||
}
|
||||
|
||||
protected Map<ServerName, List<RegionInfo>> createServerMap(int numNodes,
|
||||
int numRegions,
|
||||
int numRegionsPerServer,
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
|
@ -38,6 +39,8 @@ public class TestStochasticLoadBalancerRegionReplicaWithRacks extends BalancerTe
|
|||
|
||||
private static class ForTestRackManager extends RackManager {
|
||||
int numRacks;
|
||||
Map<String, Integer> serverIndexes = new HashMap<String, Integer>();
|
||||
int numServers = 0;
|
||||
|
||||
public ForTestRackManager(int numRacks) {
|
||||
this.numRacks = numRacks;
|
||||
|
@ -45,13 +48,18 @@ public class TestStochasticLoadBalancerRegionReplicaWithRacks extends BalancerTe
|
|||
|
||||
@Override
|
||||
public String getRack(ServerName server) {
|
||||
return "rack_" + (server.hashCode() % numRacks);
|
||||
String key = server.getServerName();
|
||||
if (!serverIndexes.containsKey(key)) {
|
||||
serverIndexes.put(key, numServers++);
|
||||
}
|
||||
return "rack_" + serverIndexes.get(key) % numRacks;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionReplicationOnMidClusterWithRacks() {
|
||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 10000000L);
|
||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 100000000L);
|
||||
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
|
||||
loadBalancer.setConf(conf);
|
||||
|
@ -65,6 +73,26 @@ public class TestStochasticLoadBalancerRegionReplicaWithRacks extends BalancerTe
|
|||
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
|
||||
RackManager rm = new ForTestRackManager(numRacks);
|
||||
|
||||
testWithCluster(serverMap, rm, false, true);
|
||||
testWithClusterWithIteration(serverMap, rm, true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionReplicationOnLargeClusterWithRacks() {
|
||||
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", false);
|
||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 5000L);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10 * 1000); // 10 sec
|
||||
loadBalancer.onConfigurationChange(conf);
|
||||
int numNodes = 100;
|
||||
int numRegions = numNodes * 30;
|
||||
int replication = 3; // 3 replicas per region
|
||||
int numRegionsPerServer = 28;
|
||||
int numTables = 1;
|
||||
int numRacks = 4; // all replicas should be on a different rack
|
||||
Map<ServerName, List<RegionInfo>> serverMap =
|
||||
createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
|
||||
RackManager rm = new ForTestRackManager(numRacks);
|
||||
|
||||
testWithClusterWithIteration(serverMap, rm, true, true);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue