HBASE-25802 Miscellaneous style improvements for load balancer related classes (#3192)

Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
Duo Zhang 2021-04-23 15:20:27 +08:00 committed by GitHub
parent 996862c1cc
commit 96fefce9c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 228 additions and 172 deletions

View File

@ -33,7 +33,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerMetrics;
@ -64,6 +63,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
@InterfaceAudience.Private
public abstract class BaseLoadBalancer implements LoadBalancer {
private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class);
public static final String BALANCER_DECISION_BUFFER_ENABLED =
"hbase.master.balancer.decision.buffer.enabled";
public static final boolean DEFAULT_BALANCER_DECISION_BUFFER_ENABLED = false;
@ -71,21 +72,28 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
protected static final int MIN_SERVER_BALANCE = 2;
private volatile boolean stopped = false;
static final List<RegionInfo> EMPTY_REGION_LIST = Collections.emptyList();
static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
private static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
= load -> load.getRegionMetrics().isEmpty();
protected RegionHDFSBlockLocationFinder regionFinder;
protected boolean useRegionFinder;
protected boolean isByTable = false;
// slop for regions
protected float slop;
// overallSlop to control simpleLoadBalancer's cluster level threshold
protected float overallSlop;
protected Configuration config;
protected RackManager rackManager;
protected MetricsBalancer metricsBalancer = null;
protected ClusterMetrics clusterStatus = null;
protected ServerName masterServerName;
protected MasterServices services;
/**
* The constructor that uses the basic MetricsBalancer
*/
protected BaseLoadBalancer() {
metricsBalancer = new MetricsBalancer();
createRegionFinder();
this(null);
}
/**
@ -94,28 +102,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
*/
protected BaseLoadBalancer(MetricsBalancer metricsBalancer) {
this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer();
createRegionFinder();
}
private void createRegionFinder() {
useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true);
if (useRegionFinder) {
regionFinder = new RegionHDFSBlockLocationFinder();
}
}
// slop for regions
protected float slop;
// overallSlop to control simpleLoadBalancer's cluster level threshold
protected float overallSlop;
protected Configuration config = HBaseConfiguration.create();
protected RackManager rackManager;
static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class);
protected MetricsBalancer metricsBalancer = null;
protected ClusterMetrics clusterStatus = null;
protected ServerName masterServerName;
protected MasterServices services;
@Override
public void setConf(Configuration conf) {
this.config = conf;
@ -133,7 +121,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
this.rackManager = new RackManager(getConf());
useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true);
if (useRegionFinder) {
regionFinder = new RegionHDFSBlockLocationFinder();
regionFinder.setConf(conf);
}
this.isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
@ -195,7 +185,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
return false;
}
if(areSomeRegionReplicasColocated(c)) return true;
if (areSomeRegionReplicasColocated(c)) {
return true;
}
if(idleRegionServerExist(c)) {
return true;
}
@ -248,10 +240,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
/**
* Generates a bulk assignment plan to be used on cluster startup using a
* simple round-robin assignment.
* <p>
* <p/>
* Takes a list of all the regions and all the servers in the cluster and
* returns a map of each server to the regions that it should be assigned.
* <p>
* <p/>
* Currently implemented as a round-robin assignment. Same invariant as load
* balancing, all servers holding floor(avg) or ceiling(avg).
*
@ -288,7 +280,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
return assignments;
}
protected BalancerClusterState createCluster(List<ServerName> servers,
private BalancerClusterState createCluster(List<ServerName> servers,
Collection<RegionInfo> regions) throws HBaseIOException {
boolean hasRegionReplica = false;
try {
@ -320,7 +312,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
for (ServerName server : servers) {
if (!clusterState.containsKey(server)) {
clusterState.put(server, EMPTY_REGION_LIST);
clusterState.put(server, Collections.emptyList());
}
}
return new BalancerClusterState(regions, clusterState, null, this.regionFinder,
@ -615,7 +607,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
}
protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer(
private Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer(
Collection<RegionInfo> regions) {
if (this.services != null && this.services.getAssignmentManager() != null) {
return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.balancer;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;
@ -46,11 +47,11 @@ abstract class CandidateGenerator {
double chanceOfNoSwap) {
// Check to see if this is just a move.
if (cluster.regionsPerServer[server].length == 0
|| StochasticLoadBalancer.RANDOM.nextFloat() < chanceOfNoSwap) {
|| ThreadLocalRandom.current().nextFloat() < chanceOfNoSwap) {
// signal a move only.
return -1;
}
int rand = StochasticLoadBalancer.RANDOM.nextInt(cluster.regionsPerServer[server].length);
int rand = ThreadLocalRandom.current().nextInt(cluster.regionsPerServer[server].length);
return cluster.regionsPerServer[server][rand];
}
@ -59,7 +60,7 @@ abstract class CandidateGenerator {
return -1;
}
return StochasticLoadBalancer.RANDOM.nextInt(cluster.numServers);
return ThreadLocalRandom.current().nextInt(cluster.numServers);
}
int pickRandomRack(BalancerClusterState cluster) {
@ -67,7 +68,7 @@ abstract class CandidateGenerator {
return -1;
}
return StochasticLoadBalancer.RANDOM.nextInt(cluster.numRacks);
return ThreadLocalRandom.current().nextInt(cluster.numRacks);
}
int pickOtherRandomServer(BalancerClusterState cluster, int serverIndex) {

View File

@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
@ -275,7 +276,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
}
} else {
// We don't have one/more load, lets just choose a random node
s = RANDOM.nextBoolean() ? secondaryHost : tertiaryHost;
s = ThreadLocalRandom.current().nextBoolean() ? secondaryHost : tertiaryHost;
}
addRegionToMap(assignmentMapForFavoredNodes, region, s);
} else if (secondaryHost != null) {
@ -320,7 +321,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
List<ServerName> onlineServers = getOnlineFavoredNodes(servers, favoredNodes);
if (onlineServers.size() > 0) {
destination = onlineServers.get(RANDOM.nextInt(onlineServers.size()));
destination = onlineServers.get(ThreadLocalRandom.current().nextInt(onlineServers.size()));
}
boolean alwaysAssign = getConf().getBoolean(FAVORED_ALWAYS_ASSIGN_REGIONS, true);
@ -398,7 +399,8 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
if (FavoredNodesPlan.getFavoredServerPosition(favoredNodes, sn) != null) {
addRegionToMap(assignmentMap, hri, sn);
} else {
ServerName destination = onlineFN.get(RANDOM.nextInt(onlineFN.size()));
ServerName destination =
onlineFN.get(ThreadLocalRandom.current().nextInt(onlineFN.size()));
LOG.warn("Region: " + hri + " not hosted on favored nodes: " + favoredNodes
+ " current: " + sn + " moving to: " + destination);
addRegionToMap(assignmentMap, hri, destination);

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class RandomCandidateGenerator extends CandidateGenerator {
@Override
BalanceAction generate(BalancerClusterState cluster) {
int thisServer = pickRandomServer(cluster);
// Pick the other server
int otherServer = pickOtherRandomServer(cluster, thisServer);
return pickRandomRegions(cluster, thisServer, otherServer);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.balancer;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -27,8 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class RegionReplicaCandidateGenerator extends CandidateGenerator {
StochasticLoadBalancer.RandomCandidateGenerator randomGenerator =
new StochasticLoadBalancer.RandomCandidateGenerator();
protected final RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();
/**
* Randomly select one regionIndex out of all region replicas co-hosted in the same group
@ -56,7 +56,7 @@ class RegionReplicaCandidateGenerator extends CandidateGenerator {
int numReplicas = j - currentPrimaryIndex;
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
// decide to select this primary region id or not
double currentRandom = StochasticLoadBalancer.RANDOM.nextDouble();
double currentRandom = ThreadLocalRandom.current().nextDouble();
// we don't know how many region replicas are co-hosted, we will randomly select one
// using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
if (currentRandom > currentLargestRandom) {

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Generates candidates which moves the replicas out of the rack for co-hosted region replicas in
* the same rack
*/
@InterfaceAudience.Private
class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
@Override
BalanceAction generate(BalancerClusterState cluster) {
int rackIndex = pickRandomRack(cluster);
if (cluster.numRacks <= 1 || rackIndex == -1) {
return super.generate(cluster);
}
int regionIndex = selectCoHostedRegionPerGroup(cluster.primariesOfRegionsPerRack[rackIndex],
cluster.regionsPerRack[rackIndex], cluster.regionIndexToPrimaryIndex);
// if there are no pairs of region replicas co-hosted, default to random generator
if (regionIndex == -1) {
// default to randompicker
return randomGenerator.generate(cluster);
}
int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
int rand = ThreadLocalRandom.current().nextInt(cluster.serversPerRack[toRackIndex].length);
int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
}
}

View File

@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -41,22 +40,20 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue;
/**
* Makes decisions about the placement and movement of Regions across
* RegionServers.
*
* <p>Cluster-wide load balancing will occur only when there are no regions in
* transition and according to a fixed period of a time using {@link #balanceCluster(Map)}.
*
* <p>On cluster startup, bulk assignment can be used to determine
* locations for all Regions in a cluster.
*
* <p>This classes produces plans for the
* Makes decisions about the placement and movement of Regions across RegionServers.
* <p/>
* Cluster-wide load balancing will occur only when there are no regions in transition and according
* to a fixed period of a time using {@link #balanceCluster(Map)}.
* <p/>
* On cluster startup, bulk assignment can be used to determine locations for all Regions in a
* cluster.
* <p/>
* This classes produces plans for the
* {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to execute.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class SimpleLoadBalancer extends BaseLoadBalancer {
private static final Logger LOG = LoggerFactory.getLogger(SimpleLoadBalancer.class);
private static final Random RANDOM = new Random(System.currentTimeMillis());
private RegionInfoComparator riComparator = new RegionInfoComparator();
private RegionPlan.RegionPlanComparator rpComparator = new RegionPlan.RegionPlanComparator();
@ -66,12 +63,12 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
/**
* Stores additional per-server information about the regions added/removed
* during the run of the balancing algorithm.
*
* </p>
* For servers that shed regions, we need to track which regions we have already
* shed. <b>nextRegionForUnload</b> contains the index in the list of regions on
* the server that is the next to be shed.
*/
static class BalanceInfo {
private static final class BalanceInfo {
private int nextRegionForUnload;
private int numRegionsAdded;
@ -136,8 +133,9 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
originSlop, slop, originOverallSlop, overallSlop);
}
private void setLoad(List<ServerAndLoad> slList, int i, int loadChange){
ServerAndLoad newsl = new ServerAndLoad(slList.get(i).getServerName(),slList.get(i).getLoad() + loadChange);
private void setLoad(List<ServerAndLoad> slList, int i, int loadChange) {
ServerAndLoad newsl =
new ServerAndLoad(slList.get(i).getServerName(), slList.get(i).getLoad() + loadChange);
slList.set(i, newsl);
}
@ -310,7 +308,9 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
i++;
regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
numTaken++;
if (numTaken >= numToOffload) break;
if (numTaken >= numToOffload) {
break;
}
}
serverBalanceInfo.put(sal.getServerName(),
new BalanceInfo(numToOffload, (-1)*numTaken, server.getValue()));
@ -325,7 +325,9 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
int maxToTake = numRegions - min;
for (Map.Entry<ServerAndLoad, List<RegionInfo>> server:
serversByLoad.entrySet()) {
if (maxToTake == 0) break; // no more to take
if (maxToTake == 0) {
break; // no more to take
}
int load = server.getKey().getLoad();
if (load >= min) {
continue; // look for other servers which haven't reached min
@ -339,15 +341,19 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
int incr = 1;
List<ServerName> sns =
Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
Collections.shuffle(sns, RANDOM);
Collections.shuffle(sns);
while (regionsToMove.size() > 0) {
int cnt = 0;
int i = incr > 0 ? 0 : underloadedServers.size()-1;
for (; i >= 0 && i < underloadedServers.size(); i += incr) {
if (regionsToMove.isEmpty()) break;
if (regionsToMove.isEmpty()) {
break;
}
ServerName si = sns.get(i);
int numToTake = underloadedServers.get(si);
if (numToTake == 0) continue;
if (numToTake == 0) {
continue;
}
addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
@ -356,7 +362,9 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
BalanceInfo bi = serverBalanceInfo.get(si);
bi.setNumRegionsAdded(bi.getNumRegionsAdded()+1);
}
if (cnt == 0) break;
if (cnt == 0) {
break;
}
// iterates underloadedServers in the other direction
incr = -incr;
}
@ -377,9 +385,13 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
serverBalanceInfo.get(server.getKey().getServerName());
int idx =
balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
if (idx >= server.getValue().size()) break;
if (idx >= server.getValue().size()) {
break;
}
RegionInfo region = server.getValue().get(idx);
if (region.isMetaRegion()) continue; // Don't move meta regions.
if (region.isMetaRegion()) {
continue; // Don't move meta regions.
}
regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() - 1);
balanceInfo.setNextRegionForUnload(balanceInfo.getNextRegionForUnload() + 1);
@ -398,7 +410,9 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
for (Map.Entry<ServerAndLoad, List<RegionInfo>> server :
serversByLoad.entrySet()) {
int regionCount = server.getKey().getLoad();
if (regionCount >= min) break;
if (regionCount >= min) {
break;
}
BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
if(balanceInfo != null) {
regionCount += balanceInfo.getNumRegionsAdded();
@ -429,7 +443,9 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
", serversUnderloaded=" + serversUnderloaded);
StringBuilder sb = new StringBuilder();
for (Map.Entry<ServerName, List<RegionInfo>> e: loadOfOneTable.entrySet()) {
if (sb.length() > 0) sb.append(", ");
if (sb.length() > 0) {
sb.append(", ");
}
sb.append(e.getKey().toString());
sb.append(" ");
sb.append(e.getValue().size());
@ -447,15 +463,16 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
}
/**
* If we need to balanceoverall, we need to add one more round to peel off one region from each max.
* Together with other regions left to be assigned, we distribute all regionToMove, to the RS
* If we need to balanceoverall, we need to add one more round to peel off one region from each
* max. Together with other regions left to be assigned, we distribute all regionToMove, to the RS
* that have less regions in whole cluster scope.
*/
public void balanceOverall(List<RegionPlan> regionsToReturn,
Map<ServerName, BalanceInfo> serverBalanceInfo, boolean fetchFromTail,
MinMaxPriorityQueue<RegionPlan> regionsToMove, int max, int min ){
Map<ServerName, BalanceInfo> serverBalanceInfo, boolean fetchFromTail,
MinMaxPriorityQueue<RegionPlan> regionsToMove, int max, int min) {
// Step 1.
// A map to record the plan we have already got as status quo, in order to resolve a cyclic assignment pair,
// A map to record the plan we have already got as status quo, in order to resolve a cyclic
// assignment pair,
// e.g. plan 1: A -> B, plan 2: B ->C => resolve plan1 to A -> C, remove plan2
Map<ServerName, List<Integer>> returnMap = new HashMap<>();
for (int i = 0; i < regionsToReturn.size(); i++) {
@ -491,12 +508,14 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
RegionPlan maxPlan = new RegionPlan(hriToPlan, serverload.getServerName(), null);
regionsToMove.add(maxPlan);
setLoad(serverLoadList, i, -1);
}else if(balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() > max
|| balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() < min){
LOG.warn("Encounter incorrect region numbers after calculating move plan during balanceOverall, " +
"for this table, " + serverload.getServerName() + " originally has " + balanceInfo.getHriList().size() +
" regions and " + balanceInfo.getNumRegionsAdded() + " regions have been added. Yet, max =" +
max + ", min =" + min + ". Thus stop balance for this table"); // should not happen
} else if (balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() > max ||
balanceInfo.getHriList().size() + balanceInfo.getNumRegionsAdded() < min) {
LOG.warn(
"Encounter incorrect region numbers after calculating move plan during balanceOverall, " +
"for this table, " + serverload.getServerName() + " originally has " +
balanceInfo.getHriList().size() + " regions and " + balanceInfo.getNumRegionsAdded() +
" regions have been added. Yet, max =" + max + ", min =" + min +
". Thus stop balance for this table"); // should not happen
return;
}
}
@ -504,12 +523,16 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
// Step 3. sort the ServerLoadList, the ArrayList hold overall load for each server.
// We only need to assign the regionsToMove to
// the first n = regionsToMove.size() RS that has least load.
Collections.sort(serverLoadList,new Comparator<ServerAndLoad>(){
Collections.sort(serverLoadList, new Comparator<ServerAndLoad>() {
@Override
public int compare(ServerAndLoad s1, ServerAndLoad s2) {
if(s1.getLoad() == s2.getLoad()) return 0;
else return (s1.getLoad() > s2.getLoad())? 1 : -1;
}});
if (s1.getLoad() == s2.getLoad()) {
return 0;
} else {
return (s1.getLoad() > s2.getLoad()) ? 1 : -1;
}
}
});
// Step 4.
// Preparation before assign out all regionsToMove.
@ -524,16 +547,19 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
Pair<ServerAndLoad,Integer> shredLoad;
// A List to help mark the plan in regionsToMove that should be removed
List<RegionPlan> planToRemoveList = new ArrayList<>();
// A structure to record how many times a server becomes the source of a plan, from regionsToMove.
// A structure to record how many times a server becomes the source of a plan, from
// regionsToMove.
Map<ServerName, Integer> sourceMap = new HashMap<>();
// We remove one of the plan which would cause source RS equals destination RS.
// But we should keep in mind that the second plan from such RS should be kept.
for(RegionPlan plan: regionsToMove){
// the source RS's load and index in ServerLoadList
shredLoad = SnLoadMap.get(plan.getSource());
if(!sourceMap.containsKey(plan.getSource())) sourceMap.put(plan.getSource(), 0);
if (!sourceMap.containsKey(plan.getSource())) {
sourceMap.put(plan.getSource(), 0);
}
sourceMap.put(plan.getSource(), sourceMap.get(plan.getSource()) + 1);
if(shredLoad.getSecond() < assignLength && sourceMap.get(plan.getSource()) == 1) {
if (shredLoad.getSecond() < assignLength && sourceMap.get(plan.getSource()) == 1) {
planToRemoveList.add(plan);
// While marked as to be removed, the count should be add back to the source RS
setLoad(serverLoadList, shredLoad.getSecond(), 1);
@ -552,14 +578,18 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
// With this strategy adopted, we can gradually achieve the overall balance,
// while keeping table level balanced.
for(int i = 0; i < assignLength; i++){
// skip the RS that is also the source, we have removed them from regionsToMove in previous step
if(sourceMap.containsKey(serverLoadList.get(i).getServerName())) continue;
// skip the RS that is also the source, we have removed them from regionsToMove in previous
// step
if (sourceMap.containsKey(serverLoadList.get(i).getServerName())) {
continue;
}
addRegionPlan(regionsToMove, fetchFromTail,
serverLoadList.get(i).getServerName(), regionsToReturn);
setLoad(serverLoadList, i, 1);
// resolve a possible cyclic assignment pair if we just produced one:
// e.g. plan1: A -> B, plan2: B -> C => resolve plan1 to A -> C and remove plan2
List<Integer> pos = returnMap.get(regionsToReturn.get(regionsToReturn.size() - 1).getSource());
List<Integer> pos =
returnMap.get(regionsToReturn.get(regionsToReturn.size() - 1).getSource());
if (pos != null && pos.size() != 0) {
regionsToReturn.get(pos.get(pos.size() - 1)).setDestination(
regionsToReturn.get(regionsToReturn.size() - 1).getDestination());
@ -574,10 +604,13 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
* Add a region from the head or tail to the List of regions to return.
*/
private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
final boolean fetchFromTail, final ServerName sn, List<RegionPlan> regionsToReturn) {
RegionPlan rp = null;
if (!fetchFromTail) rp = regionsToMove.remove();
else rp = regionsToMove.removeLast();
if (!fetchFromTail) {
rp = regionsToMove.remove();
} else {
rp = regionsToMove.removeLast();
}
rp.setDestination(sn);
regionsToReturn.add(rp);
}

View File

@ -27,7 +27,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
@ -106,6 +106,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
justification="Complaint is about costFunctions not being synchronized; not end of the world")
public class StochasticLoadBalancer extends BaseLoadBalancer {
private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
protected static final String STEPS_PER_REGION_KEY =
"hbase.master.balancer.stochastic.stepsPerRegion";
protected static final String MAX_STEPS_KEY =
@ -122,9 +124,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
"hbase.master.balancer.stochastic.additionalCostFunctions";
protected static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
// values are defaults
@ -365,8 +364,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
BalanceAction nextAction(BalancerClusterState cluster) {
return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size()))
.generate(cluster);
return candidateGenerators.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()))
.generate(cluster);
}
/**
@ -683,53 +682,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return total;
}
static class RandomCandidateGenerator extends CandidateGenerator {
@Override
BalanceAction generate(BalancerClusterState cluster) {
int thisServer = pickRandomServer(cluster);
// Pick the other server
int otherServer = pickOtherRandomServer(cluster, thisServer);
return pickRandomRegions(cluster, thisServer, otherServer);
}
}
/**
* Generates candidates which moves the replicas out of the rack for
* co-hosted region replicas in the same rack
*/
static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {
@Override
BalanceAction generate(BalancerClusterState cluster) {
int rackIndex = pickRandomRack(cluster);
if (cluster.numRacks <= 1 || rackIndex == -1) {
return super.generate(cluster);
}
int regionIndex = selectCoHostedRegionPerGroup(
cluster.primariesOfRegionsPerRack[rackIndex],
cluster.regionsPerRack[rackIndex],
cluster.regionIndexToPrimaryIndex);
// if there are no pairs of region replicas co-hosted, default to random generator
if (regionIndex == -1) {
// default to randompicker
return randomGenerator.generate(cluster);
}
int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
int toRackIndex = pickOtherRandomRack(cluster, rackIndex);
int rand = RANDOM.nextInt(cluster.serversPerRack[toRackIndex].length);
int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
}
}
/**
* Base class of StochasticLoadBalancer's Cost Functions.
*/
@ -745,6 +697,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
boolean isNeeded() {
return true;
}
float getMultiplier() {
return multiplier;
}

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -57,7 +59,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -93,22 +94,22 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
loadBalancer = new MockBalancer();
loadBalancer.setConf(conf);
MasterServices st = Mockito.mock(MasterServices.class);
Mockito.when(st.getServerName()).thenReturn(master);
MasterServices st = mock(MasterServices.class);
when(st.getServerName()).thenReturn(master);
loadBalancer.setMasterServices(st);
// Set up the rack topologies (5 machines per rack)
rackManager = Mockito.mock(RackManager.class);
rackManager = mock(RackManager.class);
for (int i = 0; i < NUM_SERVERS; i++) {
servers[i] = ServerName.valueOf("foo"+i+":1234",-1);
if (i < 5) {
Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack1");
when(rackManager.getRack(servers[i])).thenReturn("rack1");
}
if (i >= 5 && i < 10) {
Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack2");
when(rackManager.getRack(servers[i])).thenReturn("rack2");
}
if (i >= 10) {
Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack3");
when(rackManager.getRack(servers[i])).thenReturn("rack3");
}
}
}
@ -127,19 +128,6 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
}
}
/**
* All regions have an assignment.
* @param regions
* @param servers
* @param assignments
*/
private void assertImmediateAssignment(List<RegionInfo> regions, List<ServerName> servers,
Map<RegionInfo, ServerName> assignments) {
for (RegionInfo region : regions) {
assertTrue(assignments.containsKey(region));
}
}
/**
* Tests the bulk assignment used during cluster startup.
*
@ -242,11 +230,10 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
Configuration conf = HBaseConfiguration.create();
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
balancer.setConf(conf);
ServerManager sm = Mockito.mock(ServerManager.class);
Mockito.when(sm.getOnlineServersListWithPredicator(allServers, BaseLoadBalancer.IDLE_SERVER_PREDICATOR))
.thenReturn(idleServers);
MasterServices services = Mockito.mock(MasterServices.class);
Mockito.when(services.getServerManager()).thenReturn(sm);
ServerManager sm = mock(ServerManager.class);
when(sm.getOnlineServersListWithPredicator(anyList(), any())).thenReturn(idleServers);
MasterServices services = mock(MasterServices.class);
when(services.getServerManager()).thenReturn(sm);
balancer.setMasterServices(services);
RegionInfo hri1 = RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
.setStartKey(Bytes.toBytes("key1"))
@ -254,7 +241,7 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
.setSplit(false)
.setRegionId(100)
.build();
assertNull(balancer.randomAssignment(hri1, Collections.EMPTY_LIST));
assertNull(balancer.randomAssignment(hri1, Collections.emptyList()));
assertNull(balancer.randomAssignment(hri1, null));
for (int i = 0; i != 3; ++i) {
ServerName sn = balancer.randomAssignment(hri1, allServers);

View File

@ -282,7 +282,7 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends BalancerTestBas
}
static class FairRandomCandidateGenerator extends
StochasticLoadBalancer.RandomCandidateGenerator {
RandomCandidateGenerator {
@Override
public BalanceAction pickRandomRegions(BalancerClusterState cluster,