HBASE-25873 Refactor and cleanup the code for CostFunction (#3274)
Signed-off-by: Yi Mei <myimeiyi@gmail.com>
This commit is contained in:
parent
fe70fced07
commit
f94f4e29fe
@ -50,5 +50,4 @@ class AssignRegionAction extends BalanceAction {
|
||||
public String toString() {
|
||||
return getType() + ": " + region + ":" + server;
|
||||
}
|
||||
|
||||
}
|
@ -18,6 +18,7 @@
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
@ -30,18 +31,20 @@ abstract class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFuncti
|
||||
|
||||
@Override
|
||||
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
|
||||
double cost = 0;
|
||||
double previous = 0;
|
||||
boolean isFirst = true;
|
||||
for (BalancerRegionLoad rl : regionLoadList) {
|
||||
double current = getCostFromRl(rl);
|
||||
if (isFirst) {
|
||||
isFirst = false;
|
||||
} else {
|
||||
cost += current - previous;
|
||||
}
|
||||
previous = current;
|
||||
Iterator<BalancerRegionLoad> iter = regionLoadList.iterator();
|
||||
if (!iter.hasNext()) {
|
||||
return 0;
|
||||
}
|
||||
double previous = getCostFromRl(iter.next());
|
||||
if (!iter.hasNext()) {
|
||||
return 0;
|
||||
}
|
||||
double cost = 0;
|
||||
do {
|
||||
double current = getCostFromRl(iter.next());
|
||||
cost += current - previous;
|
||||
previous = current;
|
||||
} while (iter.hasNext());
|
||||
return Math.max(0, cost / (regionLoadList.size() - 1));
|
||||
}
|
||||
}
|
@ -18,9 +18,6 @@
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Deque;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
@ -30,31 +27,21 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||
@InterfaceAudience.Private
|
||||
abstract class CostFromRegionLoadFunction extends CostFunction {
|
||||
|
||||
private ClusterMetrics clusterStatus;
|
||||
private Map<String, Deque<BalancerRegionLoad>> loads;
|
||||
private double[] stats;
|
||||
|
||||
void setClusterMetrics(ClusterMetrics status) {
|
||||
this.clusterStatus = status;
|
||||
}
|
||||
|
||||
void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
|
||||
this.loads = l;
|
||||
@Override
|
||||
void prepare(BalancerClusterState cluster) {
|
||||
super.prepare(cluster);
|
||||
if (stats == null || stats.length != cluster.numServers) {
|
||||
stats = new double[cluster.numServers];
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final double cost() {
|
||||
if (clusterStatus == null || loads == null) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (stats == null || stats.length != cluster.numServers) {
|
||||
stats = new double[cluster.numServers];
|
||||
}
|
||||
|
||||
for (int i = 0; i < stats.length; i++) {
|
||||
// Cost this server has from RegionLoad
|
||||
long cost = 0;
|
||||
double cost = 0;
|
||||
|
||||
// for every region on this server get the rl
|
||||
for (int regionIndex : cluster.regionsPerServer[i]) {
|
||||
@ -62,7 +49,7 @@ abstract class CostFromRegionLoadFunction extends CostFunction {
|
||||
|
||||
// Now if we found a region load get the type of cost that was requested.
|
||||
if (regionLoadList != null) {
|
||||
cost = (long) (cost + getRegionLoadCost(regionLoadList));
|
||||
cost += getRegionLoadCost(regionLoadList);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -45,7 +45,7 @@ abstract class CostFunction {
|
||||
* Called once per LB invocation to give the cost function to initialize it's state, and perform
|
||||
* any costly calculation.
|
||||
*/
|
||||
void init(BalancerClusterState cluster) {
|
||||
void prepare(BalancerClusterState cluster) {
|
||||
this.cluster = cluster;
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import com.google.errorprone.annotations.RestrictedApi;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
@ -119,7 +120,7 @@ public class HeterogeneousRegionCountCostFunction extends CostFunction {
|
||||
* any costly calculation.
|
||||
*/
|
||||
@Override
|
||||
void init(final BalancerClusterState cluster) {
|
||||
void prepare(final BalancerClusterState cluster) {
|
||||
this.cluster = cluster;
|
||||
this.loadRules();
|
||||
}
|
||||
@ -148,6 +149,8 @@ public class HeterogeneousRegionCountCostFunction extends CostFunction {
|
||||
/**
|
||||
* used to load the rule files.
|
||||
*/
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*(/src/test/.*|HeterogeneousRegionCountCostFunction).java")
|
||||
void loadRules() {
|
||||
final List<String> lines = readFile(this.rulesPath);
|
||||
if (null == lines) {
|
||||
|
@ -47,8 +47,8 @@ abstract class LocalityBasedCostFunction extends CostFunction {
|
||||
abstract int regionIndexToEntityIndex(int region);
|
||||
|
||||
@Override
|
||||
void init(BalancerClusterState cluster) {
|
||||
super.init(cluster);
|
||||
void prepare(BalancerClusterState cluster) {
|
||||
super.prepare(cluster);
|
||||
locality = 0.0;
|
||||
bestLocality = 0.0;
|
||||
|
||||
|
@ -38,27 +38,35 @@ class MoveCostFunction extends CostFunction {
|
||||
private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
|
||||
|
||||
private final float maxMovesPercent;
|
||||
private final Configuration conf;
|
||||
private final OffPeakHours offPeakHours;
|
||||
private final float moveCost;
|
||||
private final float moveCostOffPeak;
|
||||
|
||||
MoveCostFunction(Configuration conf) {
|
||||
this.conf = conf;
|
||||
// What percent of the number of regions a single run of the balancer can move.
|
||||
maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
|
||||
|
||||
offPeakHours = OffPeakHours.getInstance(conf);
|
||||
moveCost = conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST);
|
||||
moveCostOffPeak = conf.getFloat(MOVE_COST_OFFPEAK_KEY, DEFAULT_MOVE_COST_OFFPEAK);
|
||||
// Initialize the multiplier so that addCostFunction will add this cost function.
|
||||
// It may change during later evaluations, due to OffPeakHours.
|
||||
this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
|
||||
this.setMultiplier(moveCost);
|
||||
}
|
||||
|
||||
@Override
|
||||
void prepare(BalancerClusterState cluster) {
|
||||
super.prepare(cluster);
|
||||
// Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
|
||||
// that large benefits are need to overcome the cost of a move.
|
||||
if (offPeakHours.isOffPeakHour()) {
|
||||
this.setMultiplier(moveCostOffPeak);
|
||||
} else {
|
||||
this.setMultiplier(moveCost);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
// Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
|
||||
// that large benefits are need to overcome the cost of a move.
|
||||
if (OffPeakHours.getInstance(conf).isOffPeakHour()) {
|
||||
this.setMultiplier(conf.getFloat(MOVE_COST_OFFPEAK_KEY, DEFAULT_MOVE_COST_OFFPEAK));
|
||||
} else {
|
||||
this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
|
||||
}
|
||||
// Try and size the max number of Moves, but always be prepared to move some.
|
||||
int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent), DEFAULT_MAX_MOVES);
|
||||
|
||||
|
@ -31,12 +31,25 @@ class PrimaryRegionCountSkewCostFunction extends CostFunction {
|
||||
"hbase.master.balancer.stochastic.primaryRegionCountCost";
|
||||
private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
|
||||
|
||||
private final float primaryRegionCountCost;
|
||||
private double[] stats;
|
||||
|
||||
PrimaryRegionCountSkewCostFunction(Configuration conf) {
|
||||
// Load multiplier should be the greatest as primary regions serve majority of reads/writes.
|
||||
this.setMultiplier(
|
||||
conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
|
||||
primaryRegionCountCost =
|
||||
conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST);
|
||||
this.setMultiplier(primaryRegionCountCost);
|
||||
}
|
||||
|
||||
@Override
|
||||
void prepare(BalancerClusterState cluster) {
|
||||
super.prepare(cluster);
|
||||
if (!isNeeded()) {
|
||||
return;
|
||||
}
|
||||
if (stats == null || stats.length != cluster.numServers) {
|
||||
stats = new double[cluster.numServers];
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -46,13 +59,6 @@ class PrimaryRegionCountSkewCostFunction extends CostFunction {
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
if (!cluster.hasRegionReplicas) {
|
||||
return 0;
|
||||
}
|
||||
if (stats == null || stats.length != cluster.numServers) {
|
||||
stats = new double[cluster.numServers];
|
||||
}
|
||||
|
||||
for (int i = 0; i < cluster.numServers; i++) {
|
||||
stats[i] = 0;
|
||||
for (int regionIdx : cluster.regionsPerServer[i]) {
|
||||
|
@ -34,7 +34,7 @@ class RegionCountSkewCostFunction extends CostFunction {
|
||||
"hbase.master.balancer.stochastic.regionCountCost";
|
||||
static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
|
||||
|
||||
private double[] stats = null;
|
||||
private double[] stats;
|
||||
|
||||
RegionCountSkewCostFunction(Configuration conf) {
|
||||
// Load multiplier should be the greatest as it is the most general way to balance data.
|
||||
@ -42,8 +42,11 @@ class RegionCountSkewCostFunction extends CostFunction {
|
||||
}
|
||||
|
||||
@Override
|
||||
void init(BalancerClusterState cluster) {
|
||||
super.init(cluster);
|
||||
void prepare(BalancerClusterState cluster) {
|
||||
super.prepare(cluster);
|
||||
if (stats == null || stats.length != cluster.numServers) {
|
||||
stats = new double[cluster.numServers];
|
||||
}
|
||||
LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
|
||||
cluster.numServers, cluster.numRegions);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
@ -56,9 +59,6 @@ class RegionCountSkewCostFunction extends CostFunction {
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
if (stats == null || stats.length != cluster.numServers) {
|
||||
stats = new double[cluster.numServers];
|
||||
}
|
||||
for (int i = 0; i < cluster.numServers; i++) {
|
||||
stats[i] = cluster.regionsPerServer[i].length;
|
||||
}
|
||||
|
@ -0,0 +1,104 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.Arrays;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A cost function for region replicas. We give a high cost for hosting replicas of the same region
|
||||
* in the same server, host or rack. We do not prevent the case though, since if numReplicas >
|
||||
* numRegionServers, we still want to keep the replica open.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
abstract class RegionReplicaGroupingCostFunction extends CostFunction {
|
||||
|
||||
protected long maxCost = 0;
|
||||
protected long[] costsPerGroup; // group is either server, host or rack
|
||||
|
||||
@Override
|
||||
final void prepare(BalancerClusterState cluster) {
|
||||
super.prepare(cluster);
|
||||
if (!isNeeded()) {
|
||||
return;
|
||||
}
|
||||
loadCosts();
|
||||
}
|
||||
|
||||
protected abstract void loadCosts();
|
||||
|
||||
protected final long getMaxCost(BalancerClusterState cluster) {
|
||||
// max cost is the case where every region replica is hosted together regardless of host
|
||||
int[] primariesOfRegions = new int[cluster.numRegions];
|
||||
System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
|
||||
cluster.regions.length);
|
||||
|
||||
Arrays.sort(primariesOfRegions);
|
||||
|
||||
// compute numReplicas from the sorted array
|
||||
return costPerGroup(primariesOfRegions);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isNeeded() {
|
||||
return cluster.hasRegionReplicas;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
if (maxCost <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
long totalCost = 0;
|
||||
for (int i = 0; i < costsPerGroup.length; i++) {
|
||||
totalCost += costsPerGroup[i];
|
||||
}
|
||||
return scale(0, maxCost, totalCost);
|
||||
}
|
||||
|
||||
/**
|
||||
* For each primary region, it computes the total number of replicas in the array (numReplicas)
|
||||
* and returns a sum of numReplicas-1 squared. For example, if the server hosts regions a, b, c,
|
||||
* d, e, f where a and b are same replicas, and c,d,e are same replicas, it returns (2-1) * (2-1)
|
||||
* + (3-1) * (3-1) + (1-1) * (1-1).
|
||||
* @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
|
||||
* @return a sum of numReplicas-1 squared for each primary region in the group.
|
||||
*/
|
||||
protected final long costPerGroup(int[] primariesOfRegions) {
|
||||
long cost = 0;
|
||||
int currentPrimary = -1;
|
||||
int currentPrimaryIndex = -1;
|
||||
// primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
|
||||
// sharing the same primary will have consecutive numbers in the array.
|
||||
for (int j = 0; j <= primariesOfRegions.length; j++) {
|
||||
int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
|
||||
if (primary != currentPrimary) { // we see a new primary
|
||||
int numReplicas = j - currentPrimaryIndex;
|
||||
// square the cost
|
||||
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
|
||||
cost += (numReplicas - 1) * (numReplicas - 1);
|
||||
}
|
||||
currentPrimary = primary;
|
||||
currentPrimaryIndex = j;
|
||||
}
|
||||
}
|
||||
|
||||
return cost;
|
||||
}
|
||||
}
|
@ -17,7 +17,6 @@
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.Arrays;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@ -27,15 +26,13 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||
* numRegionServers, we still want to keep the replica open.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class RegionReplicaHostCostFunction extends CostFunction {
|
||||
class RegionReplicaHostCostFunction extends RegionReplicaGroupingCostFunction {
|
||||
|
||||
private static final String REGION_REPLICA_HOST_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.regionReplicaHostCostKey";
|
||||
private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
|
||||
|
||||
long maxCost = 0;
|
||||
long[] costsPerGroup; // group is either server, host or rack
|
||||
int[][] primariesOfRegionsPerGroup;
|
||||
private int[][] primariesOfRegionsPerGroup;
|
||||
|
||||
public RegionReplicaHostCostFunction(Configuration conf) {
|
||||
this.setMultiplier(
|
||||
@ -43,8 +40,7 @@ class RegionReplicaHostCostFunction extends CostFunction {
|
||||
}
|
||||
|
||||
@Override
|
||||
void init(BalancerClusterState cluster) {
|
||||
super.init(cluster);
|
||||
protected void loadCosts() {
|
||||
// max cost is the case where every region replica is hosted together regardless of host
|
||||
maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
|
||||
costsPerGroup = new long[cluster.numHosts];
|
||||
@ -55,69 +51,6 @@ class RegionReplicaHostCostFunction extends CostFunction {
|
||||
}
|
||||
}
|
||||
|
||||
protected final long getMaxCost(BalancerClusterState cluster) {
|
||||
if (!cluster.hasRegionReplicas) {
|
||||
return 0; // short circuit
|
||||
}
|
||||
// max cost is the case where every region replica is hosted together regardless of host
|
||||
int[] primariesOfRegions = new int[cluster.numRegions];
|
||||
System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
|
||||
cluster.regions.length);
|
||||
|
||||
Arrays.sort(primariesOfRegions);
|
||||
|
||||
// compute numReplicas from the sorted array
|
||||
return costPerGroup(primariesOfRegions);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isNeeded() {
|
||||
return cluster.hasRegionReplicas;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
if (maxCost <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
long totalCost = 0;
|
||||
for (int i = 0; i < costsPerGroup.length; i++) {
|
||||
totalCost += costsPerGroup[i];
|
||||
}
|
||||
return scale(0, maxCost, totalCost);
|
||||
}
|
||||
|
||||
/**
|
||||
* For each primary region, it computes the total number of replicas in the array (numReplicas)
|
||||
* and returns a sum of numReplicas-1 squared. For example, if the server hosts regions a, b, c,
|
||||
* d, e, f where a and b are same replicas, and c,d,e are same replicas, it returns (2-1) * (2-1)
|
||||
* + (3-1) * (3-1) + (1-1) * (1-1).
|
||||
* @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
|
||||
* @return a sum of numReplicas-1 squared for each primary region in the group.
|
||||
*/
|
||||
protected final long costPerGroup(int[] primariesOfRegions) {
|
||||
long cost = 0;
|
||||
int currentPrimary = -1;
|
||||
int currentPrimaryIndex = -1;
|
||||
// primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
|
||||
// sharing the same primary will have consecutive numbers in the array.
|
||||
for (int j = 0; j <= primariesOfRegions.length; j++) {
|
||||
int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
|
||||
if (primary != currentPrimary) { // we see a new primary
|
||||
int numReplicas = j - currentPrimaryIndex;
|
||||
// square the cost
|
||||
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
|
||||
cost += (numReplicas - 1) * (numReplicas - 1);
|
||||
}
|
||||
currentPrimary = primary;
|
||||
currentPrimaryIndex = j;
|
||||
}
|
||||
}
|
||||
|
||||
return cost;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void regionMoved(int region, int oldServer, int newServer) {
|
||||
if (maxCost <= 0) {
|
||||
|
@ -25,21 +25,19 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||
* hosting replicas of the same region in the same rack. We do not prevent the case though.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
|
||||
class RegionReplicaRackCostFunction extends RegionReplicaGroupingCostFunction {
|
||||
|
||||
private static final String REGION_REPLICA_RACK_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.regionReplicaRackCostKey";
|
||||
private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
|
||||
|
||||
public RegionReplicaRackCostFunction(Configuration conf) {
|
||||
super(conf);
|
||||
this.setMultiplier(
|
||||
conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY));
|
||||
}
|
||||
|
||||
@Override
|
||||
void init(BalancerClusterState cluster) {
|
||||
this.cluster = cluster;
|
||||
protected void loadCosts() {
|
||||
if (cluster.numRacks <= 1) {
|
||||
maxCost = 0;
|
||||
return; // disabled for 1 rack
|
||||
|
@ -132,7 +132,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||
private boolean isBalancerRejectionRecording = false;
|
||||
|
||||
private List<CandidateGenerator> candidateGenerators;
|
||||
private CostFromRegionLoadFunction[] regionLoadFunctions;
|
||||
private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
|
||||
|
||||
// to save and report costs to JMX
|
||||
@ -222,17 +221,12 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||
|
||||
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
|
||||
minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
|
||||
if (localityCandidateGenerator == null) {
|
||||
localityCandidateGenerator = new LocalityBasedCandidateGenerator();
|
||||
}
|
||||
localityCandidateGenerator = new LocalityBasedCandidateGenerator();
|
||||
localityCost = new ServerLocalityCostFunction(conf);
|
||||
rackLocalityCost = new RackLocalityCostFunction(conf);
|
||||
|
||||
this.candidateGenerators = createCandidateGenerators();
|
||||
|
||||
regionLoadFunctions = new CostFromRegionLoadFunction[] { new ReadRequestCostFunction(conf),
|
||||
new CPRequestCostFunction(conf), new WriteRequestCostFunction(conf),
|
||||
new MemStoreSizeCostFunction(conf), new StoreFileCostFunction(conf) };
|
||||
regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
|
||||
regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
|
||||
|
||||
@ -245,11 +239,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||
addCostFunction(new TableSkewCostFunction(conf));
|
||||
addCostFunction(regionReplicaHostCostFunction);
|
||||
addCostFunction(regionReplicaRackCostFunction);
|
||||
addCostFunction(regionLoadFunctions[0]);
|
||||
addCostFunction(regionLoadFunctions[1]);
|
||||
addCostFunction(regionLoadFunctions[2]);
|
||||
addCostFunction(regionLoadFunctions[3]);
|
||||
addCostFunction(regionLoadFunctions[4]);
|
||||
addCostFunction(new ReadRequestCostFunction(conf));
|
||||
addCostFunction(new CPRequestCostFunction(conf));
|
||||
addCostFunction(new WriteRequestCostFunction(conf));
|
||||
addCostFunction(new MemStoreSizeCostFunction(conf));
|
||||
addCostFunction(new StoreFileCostFunction(conf));
|
||||
loadCustomCostFunctions(conf);
|
||||
|
||||
curFunctionCosts = new double[costFunctions.size()];
|
||||
@ -275,9 +269,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||
public void updateClusterMetrics(ClusterMetrics st) {
|
||||
super.updateClusterMetrics(st);
|
||||
updateRegionLoad();
|
||||
for (CostFromRegionLoadFunction cost : regionLoadFunctions) {
|
||||
cost.setClusterMetrics(st);
|
||||
}
|
||||
|
||||
// update metrics size
|
||||
try {
|
||||
@ -303,11 +294,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||
}
|
||||
|
||||
private boolean areSomeRegionReplicasColocated(BalancerClusterState c) {
|
||||
regionReplicaHostCostFunction.init(c);
|
||||
regionReplicaHostCostFunction.prepare(c);
|
||||
if (regionReplicaHostCostFunction.cost() > 0) {
|
||||
return true;
|
||||
}
|
||||
regionReplicaRackCostFunction.init(c);
|
||||
regionReplicaRackCostFunction.prepare(c);
|
||||
if (regionReplicaRackCostFunction.cost() > 0) {
|
||||
return true;
|
||||
}
|
||||
@ -393,6 +384,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||
this.rackManager = rackManager;
|
||||
}
|
||||
|
||||
private long calculateMaxSteps(BalancerClusterState cluster) {
|
||||
return (long) cluster.numRegions * (long) this.stepsPerRegion * (long) cluster.numServers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given the cluster state this will try and approach an optimal balance. This
|
||||
* should always approach the optimal state given enough steps.
|
||||
@ -432,11 +427,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||
|
||||
long computedMaxSteps;
|
||||
if (runMaxSteps) {
|
||||
computedMaxSteps = Math.max(this.maxSteps,
|
||||
((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
|
||||
computedMaxSteps = Math.max(this.maxSteps, calculateMaxSteps(cluster));
|
||||
} else {
|
||||
long calculatedMaxSteps = (long)cluster.numRegions * (long)this.stepsPerRegion *
|
||||
(long)cluster.numServers;
|
||||
long calculatedMaxSteps = calculateMaxSteps(cluster);
|
||||
computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
|
||||
if (calculatedMaxSteps > maxSteps) {
|
||||
LOG.warn("calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
|
||||
@ -581,12 +574,16 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||
|
||||
private String functionCost() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
for (CostFunction c:costFunctions) {
|
||||
for (CostFunction c : costFunctions) {
|
||||
builder.append(c.getClass().getSimpleName());
|
||||
builder.append(" : (");
|
||||
builder.append(c.getMultiplier());
|
||||
builder.append(", ");
|
||||
builder.append(c.cost());
|
||||
if (c.isNeeded()) {
|
||||
builder.append(c.getMultiplier());
|
||||
builder.append(", ");
|
||||
builder.append(c.cost());
|
||||
} else {
|
||||
builder.append("not needed");
|
||||
}
|
||||
builder.append("); ");
|
||||
}
|
||||
return builder.toString();
|
||||
@ -595,11 +592,15 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||
private String totalCostsPerFunc() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
for (CostFunction c : costFunctions) {
|
||||
if (c.getMultiplier() * c.cost() > 0.0) {
|
||||
if (c.getMultiplier() <= 0 || !c.isNeeded()) {
|
||||
continue;
|
||||
}
|
||||
double cost = c.getMultiplier() * c.cost();
|
||||
if (cost > 0.0) {
|
||||
builder.append(" ");
|
||||
builder.append(c.getClass().getSimpleName());
|
||||
builder.append(" : ");
|
||||
builder.append(c.getMultiplier() * c.cost());
|
||||
builder.append(cost);
|
||||
builder.append(";");
|
||||
}
|
||||
}
|
||||
@ -661,17 +662,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||
loads.put(regionNameAsString, rLoads);
|
||||
});
|
||||
});
|
||||
|
||||
for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
|
||||
cost.setLoads(loads);
|
||||
}
|
||||
}
|
||||
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
|
||||
void initCosts(BalancerClusterState cluster) {
|
||||
for (CostFunction c:costFunctions) {
|
||||
c.init(cluster);
|
||||
for (CostFunction c : costFunctions) {
|
||||
c.prepare(cluster);
|
||||
}
|
||||
}
|
||||
|
||||
@ -679,7 +676,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
|
||||
void updateCostsWithAction(BalancerClusterState cluster, BalanceAction action) {
|
||||
for (CostFunction c : costFunctions) {
|
||||
c.postAction(action);
|
||||
if (c.getMultiplier() > 0 && c.isNeeded()) {
|
||||
c.postAction(action);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -689,9 +688,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
|
||||
String[] getCostFunctionNames() {
|
||||
if (costFunctions == null) {
|
||||
return null;
|
||||
}
|
||||
String[] ret = new String[costFunctions.size()];
|
||||
for (int i = 0; i < costFunctions.size(); i++) {
|
||||
CostFunction c = costFunctions.get(i);
|
||||
@ -719,14 +715,14 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||
CostFunction c = costFunctions.get(i);
|
||||
this.tempFunctionCosts[i] = 0.0;
|
||||
|
||||
if (c.getMultiplier() <= 0) {
|
||||
if (c.getMultiplier() <= 0 || !c.isNeeded()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Float multiplier = c.getMultiplier();
|
||||
double cost = c.cost();
|
||||
|
||||
this.tempFunctionCosts[i] = multiplier*cost;
|
||||
this.tempFunctionCosts[i] = multiplier * cost;
|
||||
total += this.tempFunctionCosts[i];
|
||||
|
||||
if (total > previousCost) {
|
||||
|
@ -46,6 +46,7 @@ public class StochasticBalancerTestBase extends BalancerTestBase {
|
||||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f);
|
||||
conf.setFloat("hbase.regions.slop", 0.0f);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
|
||||
conf.setLong(StochasticLoadBalancer.MAX_RUNNING_TIME_KEY, 3 * 60 * 1000L);
|
||||
loadBalancer = new StochasticLoadBalancer();
|
||||
loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
|
||||
loadBalancer.initialize();
|
||||
|
@ -27,7 +27,7 @@ public class StochasticBalancerTestBase2 extends StochasticBalancerTestBase {
|
||||
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
|
||||
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
|
||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
|
||||
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 3 * 60 * 1000L);
|
||||
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
|
||||
loadBalancer.onConfigurationChange(conf);
|
||||
}
|
||||
|
@ -20,15 +20,12 @@ package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.LogEntry;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
||||
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
|
||||
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
|
||||
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
|
||||
@ -36,7 +33,6 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.Assert;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@ -65,11 +61,6 @@ public class TestBalancerRejection extends StochasticBalancerTestBase {
|
||||
return mockCost;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isNeeded() {
|
||||
return super.isNeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
float getMultiplier() {
|
||||
return 1;
|
||||
|
@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Size;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.MockNoopMasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
@ -270,14 +269,13 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||
@Test
|
||||
public void testLocalityCost() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
MockNoopMasterServices master = new MockNoopMasterServices();
|
||||
CostFunction
|
||||
costFunction = new ServerLocalityCostFunction(conf);
|
||||
|
||||
for (int test = 0; test < clusterRegionLocationMocks.length; test++) {
|
||||
int[][] clusterRegionLocations = clusterRegionLocationMocks[test];
|
||||
MockCluster cluster = new MockCluster(clusterRegionLocations);
|
||||
costFunction.init(cluster);
|
||||
costFunction.prepare(cluster);
|
||||
double cost = costFunction.cost();
|
||||
double expected = 1 - expectedLocalities[test];
|
||||
assertEquals(expected, cost, 0.001);
|
||||
@ -291,7 +289,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||
CostFunction
|
||||
costFunction = new MoveCostFunction(conf);
|
||||
BalancerClusterState cluster = mockCluster(clusterStateMocks[0]);
|
||||
costFunction.init(cluster);
|
||||
costFunction.prepare(cluster);
|
||||
costFunction.cost();
|
||||
assertEquals(MoveCostFunction.DEFAULT_MOVE_COST,
|
||||
costFunction.getMultiplier(), 0.01);
|
||||
@ -305,7 +303,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||
long timeFor15 = 1597907081000L - deltaFor15;
|
||||
EnvironmentEdgeManager.injectEdge(() -> timeFor15);
|
||||
costFunction = new MoveCostFunction(conf);
|
||||
costFunction.init(cluster);
|
||||
costFunction.prepare(cluster);
|
||||
costFunction.cost();
|
||||
assertEquals(MoveCostFunction.DEFAULT_MOVE_COST_OFFPEAK
|
||||
, costFunction.getMultiplier(), 0.01);
|
||||
@ -318,7 +316,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||
costFunction = new MoveCostFunction(conf);
|
||||
for (int[] mockCluster : clusterStateMocks) {
|
||||
BalancerClusterState cluster = mockCluster(mockCluster);
|
||||
costFunction.init(cluster);
|
||||
costFunction.prepare(cluster);
|
||||
double cost = costFunction.cost();
|
||||
assertEquals(0.0f, cost, 0.001);
|
||||
|
||||
@ -355,23 +353,23 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||
CostFunction
|
||||
costFunction = new RegionCountSkewCostFunction(conf);
|
||||
for (int[] mockCluster : clusterStateMocks) {
|
||||
costFunction.init(mockCluster(mockCluster));
|
||||
costFunction.prepare(mockCluster(mockCluster));
|
||||
double cost = costFunction.cost();
|
||||
assertTrue(cost >= 0);
|
||||
assertTrue(cost <= 1.01);
|
||||
}
|
||||
|
||||
costFunction.init(mockCluster(new int[]{0, 0, 0, 0, 1}));
|
||||
costFunction.prepare(mockCluster(new int[]{0, 0, 0, 0, 1}));
|
||||
assertEquals(0,costFunction.cost(), 0.01);
|
||||
costFunction.init(mockCluster(new int[]{0, 0, 0, 1, 1}));
|
||||
costFunction.prepare(mockCluster(new int[]{0, 0, 0, 1, 1}));
|
||||
assertEquals(0, costFunction.cost(), 0.01);
|
||||
costFunction.init(mockCluster(new int[]{0, 0, 1, 1, 1}));
|
||||
costFunction.prepare(mockCluster(new int[]{0, 0, 1, 1, 1}));
|
||||
assertEquals(0, costFunction.cost(), 0.01);
|
||||
costFunction.init(mockCluster(new int[]{0, 1, 1, 1, 1}));
|
||||
costFunction.prepare(mockCluster(new int[]{0, 1, 1, 1, 1}));
|
||||
assertEquals(0, costFunction.cost(), 0.01);
|
||||
costFunction.init(mockCluster(new int[]{1, 1, 1, 1, 1}));
|
||||
costFunction.prepare(mockCluster(new int[]{1, 1, 1, 1, 1}));
|
||||
assertEquals(0, costFunction.cost(), 0.01);
|
||||
costFunction.init(mockCluster(new int[]{10000, 0, 0, 0, 0}));
|
||||
costFunction.prepare(mockCluster(new int[]{10000, 0, 0, 0, 0}));
|
||||
assertEquals(1, costFunction.cost(), 0.01);
|
||||
}
|
||||
|
||||
@ -403,7 +401,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||
costFunction = new TableSkewCostFunction(conf);
|
||||
for (int[] mockCluster : clusterStateMocks) {
|
||||
BalancerClusterState cluster = mockCluster(mockCluster);
|
||||
costFunction.init(cluster);
|
||||
costFunction.prepare(cluster);
|
||||
double cost = costFunction.cost();
|
||||
assertTrue(cost >= 0);
|
||||
assertTrue(cost <= 1.01);
|
||||
@ -445,7 +443,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
|
||||
public void testCostFromArray() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
CostFromRegionLoadFunction costFunction = new MemStoreSizeCostFunction(conf);
|
||||
costFunction.init(mockCluster(new int[] { 0, 0, 0, 0, 1 }));
|
||||
costFunction.prepare(mockCluster(new int[] { 0, 0, 0, 0, 1 }));
|
||||
|
||||
double[] statOne = new double[100];
|
||||
for (int i = 0; i < 100; i++) {
|
||||
|
@ -198,7 +198,7 @@ public class TestStochasticLoadBalancerHeterogeneousCost extends StochasticBalan
|
||||
assertNotNull(cf);
|
||||
BalancerClusterState cluster =
|
||||
new BalancerClusterState(serverMap, null, null, null);
|
||||
cf.init(cluster);
|
||||
cf.prepare(cluster);
|
||||
|
||||
// checking that we all hosts have a number of regions below their limit
|
||||
for (final ServerAndLoad serverAndLoad : balancedCluster) {
|
||||
|
@ -54,7 +54,7 @@ public class TestStochasticLoadBalancerRegionReplica extends StochasticBalancerT
|
||||
new RegionReplicaHostCostFunction(conf);
|
||||
for (int[] mockCluster : clusterStateMocks) {
|
||||
BalancerClusterState cluster = mockCluster(mockCluster);
|
||||
costFunction.init(cluster);
|
||||
costFunction.prepare(cluster);
|
||||
double cost = costFunction.cost();
|
||||
assertTrue(cost >= 0);
|
||||
assertTrue(cost <= 1.01);
|
||||
@ -73,7 +73,7 @@ public class TestStochasticLoadBalancerRegionReplica extends StochasticBalancerT
|
||||
BalancerClusterState cluster;
|
||||
|
||||
cluster = new BalancerClusterState(clusterState, null, null, null);
|
||||
costFunction.init(cluster);
|
||||
costFunction.prepare(cluster);
|
||||
double costWithoutReplicas = costFunction.cost();
|
||||
assertEquals(0, costWithoutReplicas, 0);
|
||||
|
||||
@ -83,7 +83,7 @@ public class TestStochasticLoadBalancerRegionReplica extends StochasticBalancerT
|
||||
clusterState.lastEntry().getValue().add(replica1);
|
||||
|
||||
cluster = new BalancerClusterState(clusterState, null, null, null);
|
||||
costFunction.init(cluster);
|
||||
costFunction.prepare(cluster);
|
||||
double costWith1ReplicaDifferentServer = costFunction.cost();
|
||||
|
||||
assertEquals(0, costWith1ReplicaDifferentServer, 0);
|
||||
@ -93,7 +93,7 @@ public class TestStochasticLoadBalancerRegionReplica extends StochasticBalancerT
|
||||
clusterState.lastEntry().getValue().add(replica2);
|
||||
|
||||
cluster = new BalancerClusterState(clusterState, null, null, null);
|
||||
costFunction.init(cluster);
|
||||
costFunction.prepare(cluster);
|
||||
double costWith1ReplicaSameServer = costFunction.cost();
|
||||
|
||||
assertTrue(costWith1ReplicaDifferentServer < costWith1ReplicaSameServer);
|
||||
@ -116,7 +116,7 @@ public class TestStochasticLoadBalancerRegionReplica extends StochasticBalancerT
|
||||
it.next().getValue().add(replica3); // 2nd server
|
||||
|
||||
cluster = new BalancerClusterState(clusterState, null, null, null);
|
||||
costFunction.init(cluster);
|
||||
costFunction.prepare(cluster);
|
||||
double costWith3ReplicasSameServer = costFunction.cost();
|
||||
|
||||
clusterState = mockClusterServers(servers);
|
||||
@ -130,7 +130,7 @@ public class TestStochasticLoadBalancerRegionReplica extends StochasticBalancerT
|
||||
clusterState.lastEntry().getValue().add(replica3);
|
||||
|
||||
cluster = new BalancerClusterState(clusterState, null, null, null);
|
||||
costFunction.init(cluster);
|
||||
costFunction.prepare(cluster);
|
||||
double costWith2ReplicasOnTwoServers = costFunction.cost();
|
||||
|
||||
assertTrue(costWith2ReplicasOnTwoServers < costWith3ReplicasSameServer);
|
||||
|
Loading…
x
Reference in New Issue
Block a user