HBASE-25767 CandidateGenerator.getRandomIterationOrder is too slow on large cluster (#3149)
Signed-off-by: XinSun <ddupgs@gmail.com> Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
parent
ae3daa5c0b
commit
15659f0e85
|
@ -18,13 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -135,17 +130,4 @@ abstract class CandidateGenerator {
|
|||
return BaseLoadBalancer.Cluster.NullAction;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a random iteration order of indexes of an array with size length
|
||||
*/
|
||||
List<Integer> getRandomIterationOrder(int length) {
|
||||
ArrayList<Integer> order = new ArrayList<>(length);
|
||||
for (int i = 0; i < length; i++) {
|
||||
order.add(i);
|
||||
}
|
||||
Collections.shuffle(order);
|
||||
return order;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,42 +18,30 @@
|
|||
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Optional;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
class LocalityBasedCandidateGenerator extends CandidateGenerator {
|
||||
|
||||
private MasterServices masterServices;
|
||||
|
||||
LocalityBasedCandidateGenerator(MasterServices masterServices) {
|
||||
this.masterServices = masterServices;
|
||||
}
|
||||
|
||||
@Override
|
||||
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
|
||||
if (this.masterServices == null) {
|
||||
int thisServer = pickRandomServer(cluster);
|
||||
// Pick the other server
|
||||
int otherServer = pickOtherRandomServer(cluster, thisServer);
|
||||
return pickRandomRegions(cluster, thisServer, otherServer);
|
||||
}
|
||||
|
||||
// Randomly iterate through regions until you find one that is not on ideal host
|
||||
for (int region : getRandomIterationOrder(cluster.numRegions)) {
|
||||
int currentServer = cluster.regionIndexToServerIndex[region];
|
||||
if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(
|
||||
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]) {
|
||||
Optional<BaseLoadBalancer.Cluster.Action> potential = tryMoveOrSwap(cluster,
|
||||
currentServer, region,
|
||||
cluster.getOrComputeRegionsToMostLocalEntities(
|
||||
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]
|
||||
);
|
||||
if (potential.isPresent()) {
|
||||
return potential.get();
|
||||
// iterate through regions until you find one that is not on ideal host
|
||||
// start from a random point to avoid always balance the regions in front
|
||||
if (cluster.numRegions > 0) {
|
||||
int startIndex = ThreadLocalRandom.current().nextInt(cluster.numRegions);
|
||||
for (int i = 0; i < cluster.numRegions; i++) {
|
||||
int region = (startIndex + i) % cluster.numRegions;
|
||||
int currentServer = cluster.regionIndexToServerIndex[region];
|
||||
if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(
|
||||
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]) {
|
||||
Optional<BaseLoadBalancer.Cluster.Action> potential = tryMoveOrSwap(cluster,
|
||||
currentServer, region, cluster.getOrComputeRegionsToMostLocalEntities(
|
||||
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]);
|
||||
if (potential.isPresent()) {
|
||||
return potential.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -69,25 +57,25 @@ class LocalityBasedCandidateGenerator extends CandidateGenerator {
|
|||
// Compare locality gain/loss from swapping fromRegion with regions on toServer
|
||||
double fromRegionLocalityDelta = getWeightedLocality(cluster, fromRegion, toServer)
|
||||
- getWeightedLocality(cluster, fromRegion, fromServer);
|
||||
for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
|
||||
int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
|
||||
double toRegionLocalityDelta = getWeightedLocality(cluster, toRegion, fromServer)
|
||||
- getWeightedLocality(cluster, toRegion, toServer);
|
||||
// If locality would remain neutral or improve, attempt the swap
|
||||
if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
|
||||
return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
|
||||
int toServertotalRegions = cluster.regionsPerServer[toServer].length;
|
||||
if (toServertotalRegions > 0) {
|
||||
int startIndex = ThreadLocalRandom.current().nextInt(toServertotalRegions);
|
||||
for (int i = 0; i < toServertotalRegions; i++) {
|
||||
int toRegionIndex = (startIndex + i) % toServertotalRegions;
|
||||
int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
|
||||
double toRegionLocalityDelta = getWeightedLocality(cluster, toRegion, fromServer) -
|
||||
getWeightedLocality(cluster, toRegion, toServer);
|
||||
// If locality would remain neutral or improve, attempt the swap
|
||||
if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
|
||||
return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
|
||||
}
|
||||
}
|
||||
}
|
||||
return Optional.absent();
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
private double getWeightedLocality(BaseLoadBalancer.Cluster cluster, int region, int server) {
|
||||
return cluster.getOrComputeWeightedLocality(region, server,
|
||||
BaseLoadBalancer.Cluster.LocalityType.SERVER);
|
||||
}
|
||||
|
||||
void setServices(MasterServices services) {
|
||||
this.masterServices = services;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.BalancerDecision;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
|
||||
|
@ -187,7 +186,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
|
||||
minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
|
||||
if (localityCandidateGenerator == null) {
|
||||
localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
|
||||
localityCandidateGenerator = new LocalityBasedCandidateGenerator();
|
||||
}
|
||||
localityCost = new ServerLocalityCostFunction(conf);
|
||||
rackLocalityCost = new RackLocalityCostFunction(conf);
|
||||
|
@ -307,18 +306,16 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setMasterServices(MasterServices masterServices) {
|
||||
super.setMasterServices(masterServices);
|
||||
this.localityCandidateGenerator.setServices(masterServices);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
|
||||
regionReplicaHostCostFunction.init(c);
|
||||
if (regionReplicaHostCostFunction.cost() > 0) return true;
|
||||
if (regionReplicaHostCostFunction.cost() > 0) {
|
||||
return true;
|
||||
}
|
||||
regionReplicaRackCostFunction.init(c);
|
||||
if (regionReplicaRackCostFunction.cost() > 0) return true;
|
||||
if (regionReplicaRackCostFunction.cost() > 0) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue