HBASE-25819 Fix style issues for StochasticLoadBalancer (#3207)
Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
parent
3a01941963
commit
c52c091609
|
@ -52,7 +52,7 @@ public class ReflectionUtils {
|
|||
}
|
||||
}
|
||||
|
||||
private static <T> T instantiate(final String className, Constructor<T> ctor, Object[] ctorArgs) {
|
||||
public static <T> T instantiate(final String className, Constructor<T> ctor, Object... ctorArgs) {
|
||||
try {
|
||||
ctor.setAccessible(true);
|
||||
return ctor.newInstance(ctorArgs);
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.Collection;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Class to be used for the subset of RegionLoad costs that should be treated as rates. We do not
|
||||
* compare about the actual rate in requests per second but rather the rate relative to the rest of
|
||||
* the regions.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
abstract class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
|
||||
|
||||
@Override
|
||||
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
|
||||
double cost = 0;
|
||||
double previous = 0;
|
||||
boolean isFirst = true;
|
||||
for (BalancerRegionLoad rl : regionLoadList) {
|
||||
double current = getCostFromRl(rl);
|
||||
if (isFirst) {
|
||||
isFirst = false;
|
||||
} else {
|
||||
cost += current - previous;
|
||||
}
|
||||
previous = current;
|
||||
}
|
||||
return Math.max(0, cost / (regionLoadList.size() - 1));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Deque;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Base class the allows writing costs functions from rolling average of some number from
|
||||
* RegionLoad.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
abstract class CostFromRegionLoadFunction extends CostFunction {
|
||||
|
||||
private ClusterMetrics clusterStatus;
|
||||
private Map<String, Deque<BalancerRegionLoad>> loads;
|
||||
private double[] stats;
|
||||
|
||||
void setClusterMetrics(ClusterMetrics status) {
|
||||
this.clusterStatus = status;
|
||||
}
|
||||
|
||||
void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
|
||||
this.loads = l;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final double cost() {
|
||||
if (clusterStatus == null || loads == null) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (stats == null || stats.length != cluster.numServers) {
|
||||
stats = new double[cluster.numServers];
|
||||
}
|
||||
|
||||
for (int i = 0; i < stats.length; i++) {
|
||||
// Cost this server has from RegionLoad
|
||||
long cost = 0;
|
||||
|
||||
// for every region on this server get the rl
|
||||
for (int regionIndex : cluster.regionsPerServer[i]) {
|
||||
Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
|
||||
|
||||
// Now if we found a region load get the type of cost that was requested.
|
||||
if (regionLoadList != null) {
|
||||
cost = (long) (cost + getRegionLoadCost(regionLoadList));
|
||||
}
|
||||
}
|
||||
|
||||
// Add the total cost to the stats.
|
||||
stats[i] = cost;
|
||||
}
|
||||
|
||||
// Now return the scaled cost from data held in the stats object.
|
||||
return costFromArray(stats);
|
||||
}
|
||||
|
||||
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
|
||||
double cost = 0;
|
||||
for (BalancerRegionLoad rl : regionLoadList) {
|
||||
cost += getCostFromRl(rl);
|
||||
}
|
||||
return cost / regionLoadList.size();
|
||||
}
|
||||
|
||||
protected abstract double getCostFromRl(BalancerRegionLoad rl);
|
||||
}
|
|
@ -0,0 +1,152 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Base class of StochasticLoadBalancer's Cost Functions.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
abstract class CostFunction {
|
||||
|
||||
private float multiplier = 0;
|
||||
|
||||
protected BalancerClusterState cluster;
|
||||
|
||||
boolean isNeeded() {
|
||||
return true;
|
||||
}
|
||||
|
||||
float getMultiplier() {
|
||||
return multiplier;
|
||||
}
|
||||
|
||||
void setMultiplier(float m) {
|
||||
this.multiplier = m;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called once per LB invocation to give the cost function to initialize it's state, and perform
|
||||
* any costly calculation.
|
||||
*/
|
||||
void init(BalancerClusterState cluster) {
|
||||
this.cluster = cluster;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called once per cluster Action to give the cost function an opportunity to update it's state.
|
||||
* postAction() is always called at least once before cost() is called with the cluster that this
|
||||
* action is performed on.
|
||||
*/
|
||||
void postAction(BalanceAction action) {
|
||||
switch (action.getType()) {
|
||||
case NULL:
|
||||
break;
|
||||
case ASSIGN_REGION:
|
||||
AssignRegionAction ar = (AssignRegionAction) action;
|
||||
regionMoved(ar.getRegion(), -1, ar.getServer());
|
||||
break;
|
||||
case MOVE_REGION:
|
||||
MoveRegionAction mra = (MoveRegionAction) action;
|
||||
regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
|
||||
break;
|
||||
case SWAP_REGIONS:
|
||||
SwapRegionsAction a = (SwapRegionsAction) action;
|
||||
regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
|
||||
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Uknown action:" + action.getType());
|
||||
}
|
||||
}
|
||||
|
||||
protected void regionMoved(int region, int oldServer, int newServer) {
|
||||
}
|
||||
|
||||
protected abstract double cost();
|
||||
|
||||
@SuppressWarnings("checkstyle:linelength")
|
||||
/**
|
||||
* Function to compute a scaled cost using
|
||||
* {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
|
||||
* It assumes that this is a zero sum set of costs. It assumes that the worst case possible is all
|
||||
* of the elements in one region server and the rest having 0.
|
||||
* @param stats the costs
|
||||
* @return a scaled set of costs.
|
||||
*/
|
||||
protected final double costFromArray(double[] stats) {
|
||||
double totalCost = 0;
|
||||
double total = getSum(stats);
|
||||
|
||||
double count = stats.length;
|
||||
double mean = total / count;
|
||||
|
||||
// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
|
||||
// a zero sum cost for this to make sense.
|
||||
double max = ((count - 1) * mean) + (total - mean);
|
||||
|
||||
// It's possible that there aren't enough regions to go around
|
||||
double min;
|
||||
if (count > total) {
|
||||
min = ((count - total) * mean) + ((1 - mean) * total);
|
||||
} else {
|
||||
// Some will have 1 more than everything else.
|
||||
int numHigh = (int) (total - (Math.floor(mean) * count));
|
||||
int numLow = (int) (count - numHigh);
|
||||
|
||||
min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
|
||||
|
||||
}
|
||||
min = Math.max(0, min);
|
||||
for (int i = 0; i < stats.length; i++) {
|
||||
double n = stats[i];
|
||||
double diff = Math.abs(mean - n);
|
||||
totalCost += diff;
|
||||
}
|
||||
|
||||
double scaled = scale(min, max, totalCost);
|
||||
return scaled;
|
||||
}
|
||||
|
||||
private double getSum(double[] stats) {
|
||||
double total = 0;
|
||||
for (double s : stats) {
|
||||
total += s;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* Scale the value between 0 and 1.
|
||||
* @param min Min value
|
||||
* @param max The Max value
|
||||
* @param value The value to be scaled.
|
||||
* @return The scaled value.
|
||||
*/
|
||||
protected final double scale(double min, double max, double value) {
|
||||
if (max <= min || value <= min) {
|
||||
return 0;
|
||||
}
|
||||
if ((max - min) == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
|
||||
}
|
||||
}
|
|
@ -53,7 +53,7 @@ import org.slf4j.LoggerFactory;
|
|||
* The rule file can be located on local FS or HDFS, depending on the prefix (file//: or hdfs://).
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class HeterogeneousRegionCountCostFunction extends StochasticLoadBalancer.CostFunction {
|
||||
public class HeterogeneousRegionCountCostFunction extends CostFunction {
|
||||
|
||||
/**
|
||||
* configuration used for the path where the rule file is stored.
|
||||
|
@ -94,7 +94,6 @@ public class HeterogeneousRegionCountCostFunction extends StochasticLoadBalancer
|
|||
double overallUsage;
|
||||
|
||||
HeterogeneousRegionCountCostFunction(final Configuration conf) {
|
||||
super(conf);
|
||||
this.conf = conf;
|
||||
this.limitPerRS = new HashMap<>();
|
||||
this.limitPerRule = new HashMap<>();
|
||||
|
@ -108,8 +107,8 @@ public class HeterogeneousRegionCountCostFunction extends StochasticLoadBalancer
|
|||
+ "'. Setting default to 200");
|
||||
this.defaultNumberOfRegions = 200;
|
||||
}
|
||||
if (conf.getFloat(StochasticLoadBalancer.RegionCountSkewCostFunction.REGION_COUNT_SKEW_COST_KEY,
|
||||
StochasticLoadBalancer.RegionCountSkewCostFunction.DEFAULT_REGION_COUNT_SKEW_COST) > 0) {
|
||||
if (conf.getFloat(RegionCountSkewCostFunction.REGION_COUNT_SKEW_COST_KEY,
|
||||
RegionCountSkewCostFunction.DEFAULT_REGION_COUNT_SKEW_COST) > 0) {
|
||||
LOG.warn("regionCountCost is not set to 0, "
|
||||
+ " this will interfere with the HeterogeneousRegionCountCostFunction!");
|
||||
}
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.master.balancer.BalancerClusterState.LocalityType;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Compute a cost of a potential cluster configuration based upon where
|
||||
* {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
abstract class LocalityBasedCostFunction extends CostFunction {
|
||||
|
||||
private final LocalityType type;
|
||||
|
||||
private double bestLocality; // best case locality across cluster weighted by local data size
|
||||
private double locality; // current locality across cluster weighted by local data size
|
||||
|
||||
LocalityBasedCostFunction(Configuration conf, LocalityType type, String localityCostKey,
|
||||
float defaultLocalityCost) {
|
||||
this.type = type;
|
||||
this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
|
||||
this.locality = 0.0;
|
||||
this.bestLocality = 0.0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps region to the current entity (server or rack) on which it is stored
|
||||
*/
|
||||
abstract int regionIndexToEntityIndex(int region);
|
||||
|
||||
@Override
|
||||
void init(BalancerClusterState cluster) {
|
||||
super.init(cluster);
|
||||
locality = 0.0;
|
||||
bestLocality = 0.0;
|
||||
|
||||
for (int region = 0; region < cluster.numRegions; region++) {
|
||||
locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
|
||||
bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
|
||||
}
|
||||
|
||||
// We normalize locality to be a score between 0 and 1.0 representing how good it
|
||||
// is compared to how good it could be. If bestLocality is 0, assume locality is 100
|
||||
// (and the cost is 0)
|
||||
locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void regionMoved(int region, int oldServer, int newServer) {
|
||||
int oldEntity =
|
||||
type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
|
||||
int newEntity =
|
||||
type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
|
||||
double localityDelta =
|
||||
getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
|
||||
double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
|
||||
locality += normalizedDelta;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final double cost() {
|
||||
return 1 - locality;
|
||||
}
|
||||
|
||||
private int getMostLocalEntityForRegion(int region) {
|
||||
return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
|
||||
}
|
||||
|
||||
private double getWeightedLocality(int region, int entity) {
|
||||
return cluster.getOrComputeWeightedLocality(region, entity, type);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Compute the cost of total memstore size. The more unbalanced the higher the computed cost will
|
||||
* be. This uses a rolling average of regionload.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
|
||||
|
||||
private static final String MEMSTORE_SIZE_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.memstoreSizeCost";
|
||||
private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
|
||||
|
||||
MemStoreSizeCostFunction(Configuration conf) {
|
||||
this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||
return rl.getMemStoreSizeMB();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Given the starting state of the regions and a potential ending state compute cost based upon the
|
||||
* number of regions that have moved.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class MoveCostFunction extends CostFunction {
|
||||
private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
|
||||
private static final String MOVE_COST_OFFPEAK_KEY =
|
||||
"hbase.master.balancer.stochastic.moveCost.offpeak";
|
||||
private static final String MAX_MOVES_PERCENT_KEY =
|
||||
"hbase.master.balancer.stochastic.maxMovePercent";
|
||||
static final float DEFAULT_MOVE_COST = 7;
|
||||
static final float DEFAULT_MOVE_COST_OFFPEAK = 3;
|
||||
private static final int DEFAULT_MAX_MOVES = 600;
|
||||
private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
|
||||
|
||||
private final float maxMovesPercent;
|
||||
private final Configuration conf;
|
||||
|
||||
MoveCostFunction(Configuration conf) {
|
||||
this.conf = conf;
|
||||
// What percent of the number of regions a single run of the balancer can move.
|
||||
maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
|
||||
|
||||
// Initialize the multiplier so that addCostFunction will add this cost function.
|
||||
// It may change during later evaluations, due to OffPeakHours.
|
||||
this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
// Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
|
||||
// that large benefits are need to overcome the cost of a move.
|
||||
if (OffPeakHours.getInstance(conf).isOffPeakHour()) {
|
||||
this.setMultiplier(conf.getFloat(MOVE_COST_OFFPEAK_KEY, DEFAULT_MOVE_COST_OFFPEAK));
|
||||
} else {
|
||||
this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
|
||||
}
|
||||
// Try and size the max number of Moves, but always be prepared to move some.
|
||||
int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent), DEFAULT_MAX_MOVES);
|
||||
|
||||
double moveCost = cluster.numMovedRegions;
|
||||
|
||||
// Don't let this single balance move more than the max moves.
|
||||
// This allows better scaling to accurately represent the actual cost of a move.
|
||||
if (moveCost > maxMoves) {
|
||||
return 1000000; // return a number much greater than any of the other cost
|
||||
}
|
||||
|
||||
return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Compute the cost of a potential cluster state from skew in number of primary regions on a
|
||||
* cluster.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class PrimaryRegionCountSkewCostFunction extends CostFunction {
|
||||
|
||||
private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.primaryRegionCountCost";
|
||||
private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
|
||||
|
||||
private double[] stats;
|
||||
|
||||
PrimaryRegionCountSkewCostFunction(Configuration conf) {
|
||||
// Load multiplier should be the greatest as primary regions serve majority of reads/writes.
|
||||
this.setMultiplier(
|
||||
conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isNeeded() {
|
||||
return cluster.hasRegionReplicas;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
if (!cluster.hasRegionReplicas) {
|
||||
return 0;
|
||||
}
|
||||
if (stats == null || stats.length != cluster.numServers) {
|
||||
stats = new double[cluster.numServers];
|
||||
}
|
||||
|
||||
for (int i = 0; i < cluster.numServers; i++) {
|
||||
stats[i] = 0;
|
||||
for (int regionIdx : cluster.regionsPerServer[i]) {
|
||||
if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
|
||||
stats[i]++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return costFromArray(stats);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.master.balancer.BalancerClusterState.LocalityType;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
class RackLocalityCostFunction extends LocalityBasedCostFunction {
|
||||
|
||||
private static final String RACK_LOCALITY_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.rackLocalityCost";
|
||||
private static final float DEFAULT_RACK_LOCALITY_COST = 15;
|
||||
|
||||
public RackLocalityCostFunction(Configuration conf) {
|
||||
super(conf, LocalityType.RACK, RACK_LOCALITY_COST_KEY, DEFAULT_RACK_LOCALITY_COST);
|
||||
}
|
||||
|
||||
@Override
|
||||
int regionIndexToEntityIndex(int region) {
|
||||
return cluster.getRackForRegion(region);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Compute the cost of total number of read requests The more unbalanced the higher the computed
|
||||
* cost will be. This uses a rolling average of regionload.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
|
||||
|
||||
private static final String READ_REQUEST_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.readRequestCost";
|
||||
private static final float DEFAULT_READ_REQUEST_COST = 5;
|
||||
|
||||
ReadRequestCostFunction(Configuration conf) {
|
||||
this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||
return rl.getReadRequestsCount();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Compute the cost of a potential cluster state from skew in number of regions on a cluster.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class RegionCountSkewCostFunction extends CostFunction {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RegionCountSkewCostFunction.class);
|
||||
|
||||
static final String REGION_COUNT_SKEW_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.regionCountCost";
|
||||
static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
|
||||
|
||||
private double[] stats = null;
|
||||
|
||||
RegionCountSkewCostFunction(Configuration conf) {
|
||||
// Load multiplier should be the greatest as it is the most general way to balance data.
|
||||
this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
void init(BalancerClusterState cluster) {
|
||||
super.init(cluster);
|
||||
LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
|
||||
cluster.numServers, cluster.numRegions);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
for (int i = 0; i < cluster.numServers; i++) {
|
||||
LOG.trace("{} sees server '{}' has {} regions", getClass().getSimpleName(),
|
||||
cluster.servers[i], cluster.regionsPerServer[i].length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
if (stats == null || stats.length != cluster.numServers) {
|
||||
stats = new double[cluster.numServers];
|
||||
}
|
||||
for (int i = 0; i < cluster.numServers; i++) {
|
||||
stats[i] = cluster.regionsPerServer[i].length;
|
||||
}
|
||||
return costFromArray(stats);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,138 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.Arrays;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A cost function for region replicas. We give a very high cost to hosting replicas of the same
|
||||
* region in the same host. We do not prevent the case though, since if numReplicas >
|
||||
* numRegionServers, we still want to keep the replica open.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class RegionReplicaHostCostFunction extends CostFunction {
|
||||
|
||||
private static final String REGION_REPLICA_HOST_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.regionReplicaHostCostKey";
|
||||
private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
|
||||
|
||||
long maxCost = 0;
|
||||
long[] costsPerGroup; // group is either server, host or rack
|
||||
int[][] primariesOfRegionsPerGroup;
|
||||
|
||||
public RegionReplicaHostCostFunction(Configuration conf) {
|
||||
this.setMultiplier(
|
||||
conf.getFloat(REGION_REPLICA_HOST_COST_KEY, DEFAULT_REGION_REPLICA_HOST_COST_KEY));
|
||||
}
|
||||
|
||||
@Override
|
||||
void init(BalancerClusterState cluster) {
|
||||
super.init(cluster);
|
||||
// max cost is the case where every region replica is hosted together regardless of host
|
||||
maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
|
||||
costsPerGroup = new long[cluster.numHosts];
|
||||
primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
|
||||
? cluster.primariesOfRegionsPerHost : cluster.primariesOfRegionsPerServer;
|
||||
for (int i = 0; i < primariesOfRegionsPerGroup.length; i++) {
|
||||
costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
|
||||
}
|
||||
}
|
||||
|
||||
protected final long getMaxCost(BalancerClusterState cluster) {
|
||||
if (!cluster.hasRegionReplicas) {
|
||||
return 0; // short circuit
|
||||
}
|
||||
// max cost is the case where every region replica is hosted together regardless of host
|
||||
int[] primariesOfRegions = new int[cluster.numRegions];
|
||||
System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
|
||||
cluster.regions.length);
|
||||
|
||||
Arrays.sort(primariesOfRegions);
|
||||
|
||||
// compute numReplicas from the sorted array
|
||||
return costPerGroup(primariesOfRegions);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isNeeded() {
|
||||
return cluster.hasRegionReplicas;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
if (maxCost <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
long totalCost = 0;
|
||||
for (int i = 0; i < costsPerGroup.length; i++) {
|
||||
totalCost += costsPerGroup[i];
|
||||
}
|
||||
return scale(0, maxCost, totalCost);
|
||||
}
|
||||
|
||||
/**
|
||||
* For each primary region, it computes the total number of replicas in the array (numReplicas)
|
||||
* and returns a sum of numReplicas-1 squared. For example, if the server hosts regions a, b, c,
|
||||
* d, e, f where a and b are same replicas, and c,d,e are same replicas, it returns (2-1) * (2-1)
|
||||
* + (3-1) * (3-1) + (1-1) * (1-1).
|
||||
* @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
|
||||
* @return a sum of numReplicas-1 squared for each primary region in the group.
|
||||
*/
|
||||
protected final long costPerGroup(int[] primariesOfRegions) {
|
||||
long cost = 0;
|
||||
int currentPrimary = -1;
|
||||
int currentPrimaryIndex = -1;
|
||||
// primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
|
||||
// sharing the same primary will have consecutive numbers in the array.
|
||||
for (int j = 0; j <= primariesOfRegions.length; j++) {
|
||||
int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
|
||||
if (primary != currentPrimary) { // we see a new primary
|
||||
int numReplicas = j - currentPrimaryIndex;
|
||||
// square the cost
|
||||
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
|
||||
cost += (numReplicas - 1) * (numReplicas - 1);
|
||||
}
|
||||
currentPrimary = primary;
|
||||
currentPrimaryIndex = j;
|
||||
}
|
||||
}
|
||||
|
||||
return cost;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void regionMoved(int region, int oldServer, int newServer) {
|
||||
if (maxCost <= 0) {
|
||||
return; // no need to compute
|
||||
}
|
||||
if (cluster.multiServersPerHost) {
|
||||
int oldHost = cluster.serverIndexToHostIndex[oldServer];
|
||||
int newHost = cluster.serverIndexToHostIndex[newServer];
|
||||
if (newHost != oldHost) {
|
||||
costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
|
||||
costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
|
||||
}
|
||||
} else {
|
||||
costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
|
||||
costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A cost function for region replicas for the rack distribution. We give a relatively high cost to
|
||||
* hosting replicas of the same region in the same rack. We do not prevent the case though.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
|
||||
|
||||
private static final String REGION_REPLICA_RACK_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.regionReplicaRackCostKey";
|
||||
private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
|
||||
|
||||
public RegionReplicaRackCostFunction(Configuration conf) {
|
||||
super(conf);
|
||||
this.setMultiplier(
|
||||
conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY));
|
||||
}
|
||||
|
||||
@Override
|
||||
void init(BalancerClusterState cluster) {
|
||||
this.cluster = cluster;
|
||||
if (cluster.numRacks <= 1) {
|
||||
maxCost = 0;
|
||||
return; // disabled for 1 rack
|
||||
}
|
||||
// max cost is the case where every region replica is hosted together regardless of rack
|
||||
maxCost = getMaxCost(cluster);
|
||||
costsPerGroup = new long[cluster.numRacks];
|
||||
for (int i = 0; i < cluster.primariesOfRegionsPerRack.length; i++) {
|
||||
costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void regionMoved(int region, int oldServer, int newServer) {
|
||||
if (maxCost <= 0) {
|
||||
return; // no need to compute
|
||||
}
|
||||
int oldRack = cluster.serverIndexToRackIndex[oldServer];
|
||||
int newRack = cluster.serverIndexToRackIndex[newServer];
|
||||
if (newRack != oldRack) {
|
||||
costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
|
||||
costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.master.balancer.BalancerClusterState.LocalityType;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
class ServerLocalityCostFunction extends LocalityBasedCostFunction {
|
||||
|
||||
private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
|
||||
private static final float DEFAULT_LOCALITY_COST = 25;
|
||||
|
||||
ServerLocalityCostFunction(Configuration conf) {
|
||||
super(conf, LocalityType.SERVER, LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST);
|
||||
}
|
||||
|
||||
@Override
|
||||
int regionIndexToEntityIndex(int region) {
|
||||
return cluster.regionIndexToServerIndex[region];
|
||||
}
|
||||
}
|
|
@ -17,18 +17,16 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import com.google.errorprone.annotations.RestrictedApi;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
|
@ -39,10 +37,8 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.client.BalancerDecision;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.balancer.BalancerClusterState.LocalityType;
|
||||
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
|
||||
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -82,7 +78,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|||
* <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>All custom Cost Functions needs to extends {@link StochasticLoadBalancer.CostFunction}</p>
|
||||
* <p>All custom Cost Functions needs to extends {@link CostFunction}</p>
|
||||
*
|
||||
* <p>In addition to the above configurations, the balancer can be tuned by the following
|
||||
* configuration values:</p>
|
||||
|
@ -231,38 +227,44 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
Arrays.toString(getCostFunctionNames()) + " etc.");
|
||||
}
|
||||
|
||||
private static CostFunction createCostFunction(Class<? extends CostFunction> clazz,
|
||||
Configuration conf) {
|
||||
try {
|
||||
Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class);
|
||||
return ReflectionUtils.instantiate(clazz.getName(), ctor, conf);
|
||||
} catch (NoSuchMethodException e) {
|
||||
// will try construct with no parameter
|
||||
}
|
||||
return ReflectionUtils.newInstance(clazz);
|
||||
}
|
||||
|
||||
private void loadCustomCostFunctions(Configuration conf) {
|
||||
String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
|
||||
|
||||
if (null == functionsNames) {
|
||||
return;
|
||||
}
|
||||
|
||||
costFunctions.addAll(Arrays.stream(functionsNames).map(c -> {
|
||||
Class<? extends CostFunction> klass = null;
|
||||
for (String className : functionsNames) {
|
||||
Class<? extends CostFunction> clazz;
|
||||
try {
|
||||
klass = Class.forName(c).asSubclass(CostFunction.class);
|
||||
clazz = Class.forName(className).asSubclass(CostFunction.class);
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.warn("Cannot load class " + c + "': " + e.getMessage());
|
||||
LOG.warn("Cannot load class '{}': {}", className, e.getMessage());
|
||||
continue;
|
||||
}
|
||||
if (null == klass) {
|
||||
return null;
|
||||
CostFunction func = createCostFunction(clazz, conf);
|
||||
LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName());
|
||||
costFunctions.add(func);
|
||||
}
|
||||
CostFunction reflected = ReflectionUtils.newInstance(klass, conf);
|
||||
LOG.info(
|
||||
"Successfully loaded custom CostFunction '" + reflected.getClass().getSimpleName() + "'");
|
||||
return reflected;
|
||||
}).filter(Objects::nonNull).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
|
||||
this.candidateGenerators = customCandidateGenerators;
|
||||
}
|
||||
|
||||
/**
|
||||
* Exposed for Testing!
|
||||
*/
|
||||
public List<CandidateGenerator> getCandidateGenerators() {
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*/src/test/.*")
|
||||
List<CandidateGenerator> getCandidateGenerators() {
|
||||
return this.candidateGenerators;
|
||||
}
|
||||
|
||||
|
@ -294,7 +296,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
/**
|
||||
* Update the number of metrics that are reported to JMX
|
||||
*/
|
||||
public void updateMetricsSize(int size) {
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
|
||||
void updateMetricsSize(int size) {
|
||||
if (metricsBalancer instanceof MetricsStochasticBalancer) {
|
||||
((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
|
||||
}
|
||||
|
@ -361,6 +365,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
return !balanced;
|
||||
}
|
||||
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
|
||||
BalanceAction nextAction(BalancerClusterState cluster) {
|
||||
return candidateGenerators.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()))
|
||||
.generate(cluster);
|
||||
|
@ -583,7 +589,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
* @return List of RegionPlan's that represent the moves needed to get to desired final state.
|
||||
*/
|
||||
private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) {
|
||||
List<RegionPlan> plans = new LinkedList<>();
|
||||
List<RegionPlan> plans = new ArrayList<>();
|
||||
for (int regionIndex = 0;
|
||||
regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
|
||||
int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
|
||||
|
@ -633,13 +639,17 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
protected void initCosts(BalancerClusterState cluster) {
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
|
||||
void initCosts(BalancerClusterState cluster) {
|
||||
for (CostFunction c:costFunctions) {
|
||||
c.init(cluster);
|
||||
}
|
||||
}
|
||||
|
||||
protected void updateCostsWithAction(BalancerClusterState cluster, BalanceAction action) {
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
|
||||
void updateCostsWithAction(BalancerClusterState cluster, BalanceAction action) {
|
||||
for (CostFunction c : costFunctions) {
|
||||
c.postAction(action);
|
||||
}
|
||||
|
@ -648,8 +658,12 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
/**
|
||||
* Get the names of the cost functions
|
||||
*/
|
||||
public String[] getCostFunctionNames() {
|
||||
if (costFunctions == null) return null;
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
|
||||
String[] getCostFunctionNames() {
|
||||
if (costFunctions == null) {
|
||||
return null;
|
||||
}
|
||||
String[] ret = new String[costFunctions.size()];
|
||||
for (int i = 0; i < costFunctions.size(); i++) {
|
||||
CostFunction c = costFunctions.get(i);
|
||||
|
@ -668,7 +682,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
* @return a double of a cost associated with the proposed cluster state. This cost is an
|
||||
* aggregate of all individual cost functions.
|
||||
*/
|
||||
protected double computeCost(BalancerClusterState cluster, double previousCost) {
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
|
||||
double computeCost(BalancerClusterState cluster, double previousCost) {
|
||||
double total = 0;
|
||||
|
||||
for (int i = 0; i < costFunctions.size(); i++) {
|
||||
|
@ -693,747 +709,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* Base class of StochasticLoadBalancer's Cost Functions.
|
||||
*/
|
||||
public abstract static class CostFunction {
|
||||
|
||||
private float multiplier = 0;
|
||||
|
||||
protected BalancerClusterState cluster;
|
||||
|
||||
public CostFunction(Configuration c) {
|
||||
}
|
||||
|
||||
boolean isNeeded() {
|
||||
return true;
|
||||
}
|
||||
|
||||
float getMultiplier() {
|
||||
return multiplier;
|
||||
}
|
||||
|
||||
void setMultiplier(float m) {
|
||||
this.multiplier = m;
|
||||
}
|
||||
|
||||
/** Called once per LB invocation to give the cost function
|
||||
* to initialize it's state, and perform any costly calculation.
|
||||
*/
|
||||
void init(BalancerClusterState cluster) {
|
||||
this.cluster = cluster;
|
||||
}
|
||||
|
||||
/** Called once per cluster Action to give the cost function
|
||||
* an opportunity to update it's state. postAction() is always
|
||||
* called at least once before cost() is called with the cluster
|
||||
* that this action is performed on. */
|
||||
void postAction(BalanceAction action) {
|
||||
switch (action.getType()) {
|
||||
case NULL:
|
||||
break;
|
||||
case ASSIGN_REGION:
|
||||
AssignRegionAction ar = (AssignRegionAction) action;
|
||||
regionMoved(ar.getRegion(), -1, ar.getServer());
|
||||
break;
|
||||
case MOVE_REGION:
|
||||
MoveRegionAction mra = (MoveRegionAction) action;
|
||||
regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
|
||||
break;
|
||||
case SWAP_REGIONS:
|
||||
SwapRegionsAction a = (SwapRegionsAction) action;
|
||||
regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
|
||||
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Uknown action:" + action.getType());
|
||||
}
|
||||
}
|
||||
|
||||
protected void regionMoved(int region, int oldServer, int newServer) {
|
||||
}
|
||||
|
||||
protected abstract double cost();
|
||||
|
||||
@SuppressWarnings("checkstyle:linelength")
|
||||
/**
|
||||
* Function to compute a scaled cost using
|
||||
* {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
|
||||
* It assumes that this is a zero sum set of costs. It assumes that the worst case
|
||||
* possible is all of the elements in one region server and the rest having 0.
|
||||
*
|
||||
* @param stats the costs
|
||||
* @return a scaled set of costs.
|
||||
*/
|
||||
protected double costFromArray(double[] stats) {
|
||||
double totalCost = 0;
|
||||
double total = getSum(stats);
|
||||
|
||||
double count = stats.length;
|
||||
double mean = total/count;
|
||||
|
||||
// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
|
||||
// a zero sum cost for this to make sense.
|
||||
double max = ((count - 1) * mean) + (total - mean);
|
||||
|
||||
// It's possible that there aren't enough regions to go around
|
||||
double min;
|
||||
if (count > total) {
|
||||
min = ((count - total) * mean) + ((1 - mean) * total);
|
||||
} else {
|
||||
// Some will have 1 more than everything else.
|
||||
int numHigh = (int) (total - (Math.floor(mean) * count));
|
||||
int numLow = (int) (count - numHigh);
|
||||
|
||||
min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
|
||||
|
||||
}
|
||||
min = Math.max(0, min);
|
||||
for (int i=0; i<stats.length; i++) {
|
||||
double n = stats[i];
|
||||
double diff = Math.abs(mean - n);
|
||||
totalCost += diff;
|
||||
}
|
||||
|
||||
double scaled = scale(min, max, totalCost);
|
||||
return scaled;
|
||||
}
|
||||
|
||||
private double getSum(double[] stats) {
|
||||
double total = 0;
|
||||
for(double s:stats) {
|
||||
total += s;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* Scale the value between 0 and 1.
|
||||
*
|
||||
* @param min Min value
|
||||
* @param max The Max value
|
||||
* @param value The value to be scaled.
|
||||
* @return The scaled value.
|
||||
*/
|
||||
protected double scale(double min, double max, double value) {
|
||||
if (max <= min || value <= min) {
|
||||
return 0;
|
||||
}
|
||||
if ((max - min) == 0) return 0;
|
||||
|
||||
return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given the starting state of the regions and a potential ending state
|
||||
* compute cost based upon the number of regions that have moved.
|
||||
*/
|
||||
static class MoveCostFunction extends CostFunction {
|
||||
private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
|
||||
private static final String MOVE_COST_OFFPEAK_KEY =
|
||||
"hbase.master.balancer.stochastic.moveCost.offpeak";
|
||||
private static final String MAX_MOVES_PERCENT_KEY =
|
||||
"hbase.master.balancer.stochastic.maxMovePercent";
|
||||
static final float DEFAULT_MOVE_COST = 7;
|
||||
static final float DEFAULT_MOVE_COST_OFFPEAK = 3;
|
||||
private static final int DEFAULT_MAX_MOVES = 600;
|
||||
private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
|
||||
|
||||
private final float maxMovesPercent;
|
||||
private final Configuration conf;
|
||||
|
||||
MoveCostFunction(Configuration conf) {
|
||||
super(conf);
|
||||
this.conf = conf;
|
||||
// What percent of the number of regions a single run of the balancer can move.
|
||||
maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
|
||||
|
||||
// Initialize the multiplier so that addCostFunction will add this cost function.
|
||||
// It may change during later evaluations, due to OffPeakHours.
|
||||
this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
// Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
|
||||
// that large benefits are need to overcome the cost of a move.
|
||||
if (OffPeakHours.getInstance(conf).isOffPeakHour()) {
|
||||
this.setMultiplier(conf.getFloat(MOVE_COST_OFFPEAK_KEY, DEFAULT_MOVE_COST_OFFPEAK));
|
||||
} else {
|
||||
this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
|
||||
}
|
||||
// Try and size the max number of Moves, but always be prepared to move some.
|
||||
int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
|
||||
DEFAULT_MAX_MOVES);
|
||||
|
||||
double moveCost = cluster.numMovedRegions;
|
||||
|
||||
// Don't let this single balance move more than the max moves.
|
||||
// This allows better scaling to accurately represent the actual cost of a move.
|
||||
if (moveCost > maxMoves) {
|
||||
return 1000000; // return a number much greater than any of the other cost
|
||||
}
|
||||
|
||||
return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the cost of a potential cluster state from skew in number of
|
||||
* regions on a cluster.
|
||||
*/
|
||||
static class RegionCountSkewCostFunction extends CostFunction {
|
||||
static final String REGION_COUNT_SKEW_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.regionCountCost";
|
||||
static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
|
||||
|
||||
private double[] stats = null;
|
||||
|
||||
RegionCountSkewCostFunction(Configuration conf) {
|
||||
super(conf);
|
||||
// Load multiplier should be the greatest as it is the most general way to balance data.
|
||||
this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
void init(BalancerClusterState cluster) {
|
||||
super.init(cluster);
|
||||
LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
|
||||
cluster.numServers, cluster.numRegions);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
for (int i =0; i < cluster.numServers; i++) {
|
||||
LOG.trace("{} sees server '{}' has {} regions", getClass().getSimpleName(),
|
||||
cluster.servers[i], cluster.regionsPerServer[i].length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
if (stats == null || stats.length != cluster.numServers) {
|
||||
stats = new double[cluster.numServers];
|
||||
}
|
||||
for (int i =0; i < cluster.numServers; i++) {
|
||||
stats[i] = cluster.regionsPerServer[i].length;
|
||||
}
|
||||
return costFromArray(stats);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the cost of a potential cluster state from skew in number of
|
||||
* primary regions on a cluster.
|
||||
*/
|
||||
static class PrimaryRegionCountSkewCostFunction extends CostFunction {
|
||||
private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.primaryRegionCountCost";
|
||||
private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
|
||||
|
||||
private double[] stats = null;
|
||||
|
||||
PrimaryRegionCountSkewCostFunction(Configuration conf) {
|
||||
super(conf);
|
||||
// Load multiplier should be the greatest as primary regions serve majority of reads/writes.
|
||||
this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
|
||||
DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isNeeded() {
|
||||
return cluster.hasRegionReplicas;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
if (!cluster.hasRegionReplicas) {
|
||||
return 0;
|
||||
}
|
||||
if (stats == null || stats.length != cluster.numServers) {
|
||||
stats = new double[cluster.numServers];
|
||||
}
|
||||
|
||||
for (int i = 0; i < cluster.numServers; i++) {
|
||||
stats[i] = 0;
|
||||
for (int regionIdx : cluster.regionsPerServer[i]) {
|
||||
if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
|
||||
stats[i]++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return costFromArray(stats);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the cost of a potential cluster configuration based upon how evenly
|
||||
* distributed tables are.
|
||||
*/
|
||||
static class TableSkewCostFunction extends CostFunction {
|
||||
|
||||
private static final String TABLE_SKEW_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.tableSkewCost";
|
||||
private static final float DEFAULT_TABLE_SKEW_COST = 35;
|
||||
|
||||
TableSkewCostFunction(Configuration conf) {
|
||||
super(conf);
|
||||
this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
double max = cluster.numRegions;
|
||||
double min = ((double) cluster.numRegions) / cluster.numServers;
|
||||
double value = 0;
|
||||
|
||||
for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
|
||||
value += cluster.numMaxRegionsPerTable[i];
|
||||
}
|
||||
|
||||
return scale(min, max, value);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute a cost of a potential cluster configuration based upon where
|
||||
* {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
|
||||
*/
|
||||
static abstract class LocalityBasedCostFunction extends CostFunction {
|
||||
|
||||
private final LocalityType type;
|
||||
|
||||
private double bestLocality; // best case locality across cluster weighted by local data size
|
||||
private double locality; // current locality across cluster weighted by local data size
|
||||
|
||||
LocalityBasedCostFunction(Configuration conf, LocalityType type, String localityCostKey,
|
||||
float defaultLocalityCost) {
|
||||
super(conf);
|
||||
this.type = type;
|
||||
this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
|
||||
this.locality = 0.0;
|
||||
this.bestLocality = 0.0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps region to the current entity (server or rack) on which it is stored
|
||||
*/
|
||||
abstract int regionIndexToEntityIndex(int region);
|
||||
|
||||
@Override
|
||||
void init(BalancerClusterState cluster) {
|
||||
super.init(cluster);
|
||||
locality = 0.0;
|
||||
bestLocality = 0.0;
|
||||
|
||||
for (int region = 0; region < cluster.numRegions; region++) {
|
||||
locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
|
||||
bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
|
||||
}
|
||||
|
||||
// We normalize locality to be a score between 0 and 1.0 representing how good it
|
||||
// is compared to how good it could be. If bestLocality is 0, assume locality is 100
|
||||
// (and the cost is 0)
|
||||
locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void regionMoved(int region, int oldServer, int newServer) {
|
||||
int oldEntity = type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
|
||||
int newEntity = type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
|
||||
double localityDelta = getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
|
||||
double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
|
||||
locality += normalizedDelta;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
return 1 - locality;
|
||||
}
|
||||
|
||||
private int getMostLocalEntityForRegion(int region) {
|
||||
return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
|
||||
}
|
||||
|
||||
private double getWeightedLocality(int region, int entity) {
|
||||
return cluster.getOrComputeWeightedLocality(region, entity, type);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
|
||||
|
||||
private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
|
||||
private static final float DEFAULT_LOCALITY_COST = 25;
|
||||
|
||||
ServerLocalityCostFunction(Configuration conf) {
|
||||
super(conf, LocalityType.SERVER, LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST);
|
||||
}
|
||||
|
||||
@Override
|
||||
int regionIndexToEntityIndex(int region) {
|
||||
return cluster.regionIndexToServerIndex[region];
|
||||
}
|
||||
}
|
||||
|
||||
static class RackLocalityCostFunction extends LocalityBasedCostFunction {
|
||||
|
||||
private static final String RACK_LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.rackLocalityCost";
|
||||
private static final float DEFAULT_RACK_LOCALITY_COST = 15;
|
||||
|
||||
public RackLocalityCostFunction(Configuration conf) {
|
||||
super(conf, LocalityType.RACK, RACK_LOCALITY_COST_KEY, DEFAULT_RACK_LOCALITY_COST);
|
||||
}
|
||||
|
||||
@Override
|
||||
int regionIndexToEntityIndex(int region) {
|
||||
return cluster.getRackForRegion(region);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Base class the allows writing costs functions from rolling average of some
|
||||
* number from RegionLoad.
|
||||
*/
|
||||
abstract static class CostFromRegionLoadFunction extends CostFunction {
|
||||
|
||||
private ClusterMetrics clusterStatus = null;
|
||||
private Map<String, Deque<BalancerRegionLoad>> loads = null;
|
||||
private double[] stats = null;
|
||||
CostFromRegionLoadFunction(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
void setClusterMetrics(ClusterMetrics status) {
|
||||
this.clusterStatus = status;
|
||||
}
|
||||
|
||||
void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
|
||||
this.loads = l;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
if (clusterStatus == null || loads == null) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (stats == null || stats.length != cluster.numServers) {
|
||||
stats = new double[cluster.numServers];
|
||||
}
|
||||
|
||||
for (int i =0; i < stats.length; i++) {
|
||||
//Cost this server has from RegionLoad
|
||||
long cost = 0;
|
||||
|
||||
// for every region on this server get the rl
|
||||
for(int regionIndex:cluster.regionsPerServer[i]) {
|
||||
Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
|
||||
|
||||
// Now if we found a region load get the type of cost that was requested.
|
||||
if (regionLoadList != null) {
|
||||
cost = (long) (cost + getRegionLoadCost(regionLoadList));
|
||||
}
|
||||
}
|
||||
|
||||
// Add the total cost to the stats.
|
||||
stats[i] = cost;
|
||||
}
|
||||
|
||||
// Now return the scaled cost from data held in the stats object.
|
||||
return costFromArray(stats);
|
||||
}
|
||||
|
||||
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
|
||||
double cost = 0;
|
||||
for (BalancerRegionLoad rl : regionLoadList) {
|
||||
cost += getCostFromRl(rl);
|
||||
}
|
||||
return cost / regionLoadList.size();
|
||||
}
|
||||
|
||||
protected abstract double getCostFromRl(BalancerRegionLoad rl);
|
||||
}
|
||||
|
||||
/**
|
||||
* Class to be used for the subset of RegionLoad costs that should be treated as rates.
|
||||
* We do not compare about the actual rate in requests per second but rather the rate relative
|
||||
* to the rest of the regions.
|
||||
*/
|
||||
abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
|
||||
|
||||
CostFromRegionLoadAsRateFunction(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
|
||||
double cost = 0;
|
||||
double previous = 0;
|
||||
boolean isFirst = true;
|
||||
for (BalancerRegionLoad rl : regionLoadList) {
|
||||
double current = getCostFromRl(rl);
|
||||
if (isFirst) {
|
||||
isFirst = false;
|
||||
} else {
|
||||
cost += current - previous;
|
||||
}
|
||||
previous = current;
|
||||
}
|
||||
return Math.max(0, cost / (regionLoadList.size() - 1));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the cost of total number of read requests The more unbalanced the higher the
|
||||
* computed cost will be. This uses a rolling average of regionload.
|
||||
*/
|
||||
|
||||
static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
|
||||
|
||||
private static final String READ_REQUEST_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.readRequestCost";
|
||||
private static final float DEFAULT_READ_REQUEST_COST = 5;
|
||||
|
||||
ReadRequestCostFunction(Configuration conf) {
|
||||
super(conf);
|
||||
this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||
return rl.getReadRequestsCount();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the cost of total number of write requests. The more unbalanced the higher the
|
||||
* computed cost will be. This uses a rolling average of regionload.
|
||||
*/
|
||||
static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
|
||||
|
||||
private static final String WRITE_REQUEST_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.writeRequestCost";
|
||||
private static final float DEFAULT_WRITE_REQUEST_COST = 5;
|
||||
|
||||
WriteRequestCostFunction(Configuration conf) {
|
||||
super(conf);
|
||||
this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||
return rl.getWriteRequestsCount();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A cost function for region replicas. We give a very high cost to hosting
|
||||
* replicas of the same region in the same host. We do not prevent the case
|
||||
* though, since if numReplicas > numRegionServers, we still want to keep the
|
||||
* replica open.
|
||||
*/
|
||||
static class RegionReplicaHostCostFunction extends CostFunction {
|
||||
private static final String REGION_REPLICA_HOST_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.regionReplicaHostCostKey";
|
||||
private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
|
||||
|
||||
long maxCost = 0;
|
||||
long[] costsPerGroup; // group is either server, host or rack
|
||||
int[][] primariesOfRegionsPerGroup;
|
||||
|
||||
public RegionReplicaHostCostFunction(Configuration conf) {
|
||||
super(conf);
|
||||
this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
|
||||
DEFAULT_REGION_REPLICA_HOST_COST_KEY));
|
||||
}
|
||||
|
||||
@Override
|
||||
void init(BalancerClusterState cluster) {
|
||||
super.init(cluster);
|
||||
// max cost is the case where every region replica is hosted together regardless of host
|
||||
maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
|
||||
costsPerGroup = new long[cluster.numHosts];
|
||||
primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
|
||||
? cluster.primariesOfRegionsPerHost
|
||||
: cluster.primariesOfRegionsPerServer;
|
||||
for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
|
||||
costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
|
||||
}
|
||||
}
|
||||
|
||||
long getMaxCost(BalancerClusterState cluster) {
|
||||
if (!cluster.hasRegionReplicas) {
|
||||
return 0; // short circuit
|
||||
}
|
||||
// max cost is the case where every region replica is hosted together regardless of host
|
||||
int[] primariesOfRegions = new int[cluster.numRegions];
|
||||
System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
|
||||
cluster.regions.length);
|
||||
|
||||
Arrays.sort(primariesOfRegions);
|
||||
|
||||
// compute numReplicas from the sorted array
|
||||
return costPerGroup(primariesOfRegions);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isNeeded() {
|
||||
return cluster.hasRegionReplicas;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
if (maxCost <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
long totalCost = 0;
|
||||
for (int i = 0 ; i < costsPerGroup.length; i++) {
|
||||
totalCost += costsPerGroup[i];
|
||||
}
|
||||
return scale(0, maxCost, totalCost);
|
||||
}
|
||||
|
||||
/**
|
||||
* For each primary region, it computes the total number of replicas in the array (numReplicas)
|
||||
* and returns a sum of numReplicas-1 squared. For example, if the server hosts
|
||||
* regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
|
||||
* returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
|
||||
* @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
|
||||
* @return a sum of numReplicas-1 squared for each primary region in the group.
|
||||
*/
|
||||
protected long costPerGroup(int[] primariesOfRegions) {
|
||||
long cost = 0;
|
||||
int currentPrimary = -1;
|
||||
int currentPrimaryIndex = -1;
|
||||
// primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
|
||||
// sharing the same primary will have consecutive numbers in the array.
|
||||
for (int j = 0 ; j <= primariesOfRegions.length; j++) {
|
||||
int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
|
||||
if (primary != currentPrimary) { // we see a new primary
|
||||
int numReplicas = j - currentPrimaryIndex;
|
||||
// square the cost
|
||||
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
|
||||
cost += (numReplicas - 1) * (numReplicas - 1);
|
||||
}
|
||||
currentPrimary = primary;
|
||||
currentPrimaryIndex = j;
|
||||
}
|
||||
}
|
||||
|
||||
return cost;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void regionMoved(int region, int oldServer, int newServer) {
|
||||
if (maxCost <= 0) {
|
||||
return; // no need to compute
|
||||
}
|
||||
if (cluster.multiServersPerHost) {
|
||||
int oldHost = cluster.serverIndexToHostIndex[oldServer];
|
||||
int newHost = cluster.serverIndexToHostIndex[newServer];
|
||||
if (newHost != oldHost) {
|
||||
costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
|
||||
costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
|
||||
}
|
||||
} else {
|
||||
costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
|
||||
costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A cost function for region replicas for the rack distribution. We give a relatively high
|
||||
* cost to hosting replicas of the same region in the same rack. We do not prevent the case
|
||||
* though.
|
||||
*/
|
||||
static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
|
||||
private static final String REGION_REPLICA_RACK_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.regionReplicaRackCostKey";
|
||||
private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
|
||||
|
||||
public RegionReplicaRackCostFunction(Configuration conf) {
|
||||
super(conf);
|
||||
this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
|
||||
DEFAULT_REGION_REPLICA_RACK_COST_KEY));
|
||||
}
|
||||
|
||||
@Override
|
||||
void init(BalancerClusterState cluster) {
|
||||
this.cluster = cluster;
|
||||
if (cluster.numRacks <= 1) {
|
||||
maxCost = 0;
|
||||
return; // disabled for 1 rack
|
||||
}
|
||||
// max cost is the case where every region replica is hosted together regardless of rack
|
||||
maxCost = getMaxCost(cluster);
|
||||
costsPerGroup = new long[cluster.numRacks];
|
||||
for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
|
||||
costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void regionMoved(int region, int oldServer, int newServer) {
|
||||
if (maxCost <= 0) {
|
||||
return; // no need to compute
|
||||
}
|
||||
int oldRack = cluster.serverIndexToRackIndex[oldServer];
|
||||
int newRack = cluster.serverIndexToRackIndex[newServer];
|
||||
if (newRack != oldRack) {
|
||||
costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
|
||||
costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the cost of total memstore size. The more unbalanced the higher the
|
||||
* computed cost will be. This uses a rolling average of regionload.
|
||||
*/
|
||||
static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
|
||||
|
||||
private static final String MEMSTORE_SIZE_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.memstoreSizeCost";
|
||||
private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
|
||||
|
||||
MemStoreSizeCostFunction(Configuration conf) {
|
||||
super(conf);
|
||||
this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||
return rl.getMemStoreSizeMB();
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Compute the cost of total open storefiles size. The more unbalanced the higher the
|
||||
* computed cost will be. This uses a rolling average of regionload.
|
||||
*/
|
||||
static class StoreFileCostFunction extends CostFromRegionLoadFunction {
|
||||
|
||||
private static final String STOREFILE_SIZE_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.storefileSizeCost";
|
||||
private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
|
||||
|
||||
StoreFileCostFunction(Configuration conf) {
|
||||
super(conf);
|
||||
this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||
return rl.getStorefileSizeMB();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper function to compose the attribute name from tablename and costfunction name
|
||||
*/
|
||||
public static String composeAttributeName(String tableName, String costFunctionName) {
|
||||
static String composeAttributeName(String tableName, String costFunctionName) {
|
||||
return tableName + TABLE_FUNCTION_SEP + costFunctionName;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Compute the cost of total open storefiles size. The more unbalanced the higher the computed cost
|
||||
* will be. This uses a rolling average of regionload.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class StoreFileCostFunction extends CostFromRegionLoadFunction {
|
||||
|
||||
private static final String STOREFILE_SIZE_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.storefileSizeCost";
|
||||
private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
|
||||
|
||||
StoreFileCostFunction(Configuration conf) {
|
||||
this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||
return rl.getStorefileSizeMB();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Compute the cost of a potential cluster configuration based upon how evenly distributed tables
|
||||
* are.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class TableSkewCostFunction extends CostFunction {
|
||||
|
||||
private static final String TABLE_SKEW_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.tableSkewCost";
|
||||
private static final float DEFAULT_TABLE_SKEW_COST = 35;
|
||||
|
||||
TableSkewCostFunction(Configuration conf) {
|
||||
this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
double max = cluster.numRegions;
|
||||
double min = ((double) cluster.numRegions) / cluster.numServers;
|
||||
double value = 0;
|
||||
|
||||
for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
|
||||
value += cluster.numMaxRegionsPerTable[i];
|
||||
}
|
||||
|
||||
return scale(min, max, value);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Compute the cost of total number of write requests. The more unbalanced the higher the computed
|
||||
* cost will be. This uses a rolling average of regionload.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
|
||||
|
||||
private static final String WRITE_REQUEST_COST_KEY =
|
||||
"hbase.master.balancer.stochastic.writeRequestCost";
|
||||
private static final float DEFAULT_WRITE_REQUEST_COST = 5;
|
||||
|
||||
WriteRequestCostFunction(Configuration conf) {
|
||||
this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double getCostFromRl(BalancerRegionLoad rl) {
|
||||
return rl.getWriteRequestsCount();
|
||||
}
|
||||
}
|
|
@ -17,12 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
public class DummyCostFunction extends StochasticLoadBalancer.CostFunction {
|
||||
public DummyCostFunction(Configuration c) {
|
||||
super(c);
|
||||
}
|
||||
public class DummyCostFunction extends CostFunction {
|
||||
|
||||
@Override
|
||||
protected double cost() {
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -35,10 +35,14 @@ import javax.management.ObjectName;
|
|||
import javax.management.remote.JMXConnector;
|
||||
import javax.management.remote.JMXConnectorFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.JMXListener;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.balancer.BalancerTestBase;
|
||||
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
@ -271,13 +275,4 @@ public class TestStochasticBalancerJmxMetrics extends BalancerTestBase {
|
|||
|
||||
return ret;
|
||||
}
|
||||
|
||||
private static void printMetrics(Set<String> metrics, String info) {
|
||||
if (null != info) LOG.info("++++ ------ " + info + " ------");
|
||||
|
||||
LOG.info("++++ metrics count = " + metrics.size());
|
||||
for (String str : metrics) {
|
||||
LOG.info(" ++++ " + str);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.master.MockNoopMasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.ServerLocalityCostFunction;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -73,16 +72,14 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
* [test][region + 1][0] = server that region is hosted on
|
||||
* [test][region + 1][server + 1] = locality for region on server
|
||||
*/
|
||||
|
||||
private int[][][] clusterRegionLocationMocks = new int[][][]{
|
||||
|
||||
// Test 1: each region is entirely on server that hosts it
|
||||
new int[][]{
|
||||
new int[]{2, 1, 1},
|
||||
new int[]{2, 0, 0, 100}, // region 0 is hosted and entirely local on server 2
|
||||
new int[]{0, 100, 0, 0}, // region 1 is hosted and entirely on server 0
|
||||
new int[]{0, 100, 0, 0}, // region 2 is hosted and entirely on server 0
|
||||
new int[]{1, 0, 100, 0}, // region 1 is hosted and entirely on server 1
|
||||
new int[]{1, 0, 100, 0}, // region 3 is hosted and entirely on server 1
|
||||
},
|
||||
|
||||
// Test 2: each region is 0% local on the server that hosts it
|
||||
|
@ -91,7 +88,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
new int[]{0, 0, 0, 100}, // region 0 is hosted and entirely local on server 2
|
||||
new int[]{1, 100, 0, 0}, // region 1 is hosted and entirely on server 0
|
||||
new int[]{1, 100, 0, 0}, // region 2 is hosted and entirely on server 0
|
||||
new int[]{2, 0, 100, 0}, // region 1 is hosted and entirely on server 1
|
||||
new int[]{2, 0, 100, 0}, // region 3 is hosted and entirely on server 1
|
||||
},
|
||||
|
||||
// Test 3: each region is 25% local on the server that hosts it (and 50% locality is possible)
|
||||
|
@ -100,7 +97,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
new int[]{0, 25, 0, 50}, // region 0 is hosted and entirely local on server 2
|
||||
new int[]{1, 50, 25, 0}, // region 1 is hosted and entirely on server 0
|
||||
new int[]{1, 50, 25, 0}, // region 2 is hosted and entirely on server 0
|
||||
new int[]{2, 0, 50, 25}, // region 1 is hosted and entirely on server 1
|
||||
new int[]{2, 0, 50, 25}, // region 3 is hosted and entirely on server 1
|
||||
},
|
||||
|
||||
// Test 4: each region is 25% local on the server that hosts it (and 100% locality is possible)
|
||||
|
@ -109,22 +106,22 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
new int[]{0, 25, 0, 100}, // region 0 is hosted and entirely local on server 2
|
||||
new int[]{1, 100, 25, 0}, // region 1 is hosted and entirely on server 0
|
||||
new int[]{1, 100, 25, 0}, // region 2 is hosted and entirely on server 0
|
||||
new int[]{2, 0, 100, 25}, // region 1 is hosted and entirely on server 1
|
||||
new int[]{2, 0, 100, 25}, // region 3 is hosted and entirely on server 1
|
||||
},
|
||||
|
||||
// Test 5: each region is 75% local on the server that hosts it (and 75% locality is possible everywhere)
|
||||
// Test 5: each region is 75% local on the server that hosts it (and 75% locality is possible
|
||||
// everywhere)
|
||||
new int[][]{
|
||||
new int[]{1, 2, 1},
|
||||
new int[]{0, 75, 75, 75}, // region 0 is hosted and entirely local on server 2
|
||||
new int[]{1, 75, 75, 75}, // region 1 is hosted and entirely on server 0
|
||||
new int[]{1, 75, 75, 75}, // region 2 is hosted and entirely on server 0
|
||||
new int[]{2, 75, 75, 75}, // region 1 is hosted and entirely on server 1
|
||||
new int[]{2, 75, 75, 75}, // region 3 is hosted and entirely on server 1
|
||||
},
|
||||
};
|
||||
|
||||
@Test
|
||||
public void testKeepRegionLoad() throws Exception {
|
||||
|
||||
ServerName sn = ServerName.valueOf("test:8080", 100);
|
||||
int numClusterStatusToAdd = 20000;
|
||||
for (int i = 0; i < numClusterStatusToAdd; i++) {
|
||||
|
@ -194,7 +191,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
public void testLocalityCost() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
MockNoopMasterServices master = new MockNoopMasterServices();
|
||||
StochasticLoadBalancer.CostFunction
|
||||
CostFunction
|
||||
costFunction = new ServerLocalityCostFunction(conf);
|
||||
|
||||
for (int test = 0; test < clusterRegionLocationMocks.length; test++) {
|
||||
|
@ -211,12 +208,12 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
@Test
|
||||
public void testMoveCostMultiplier() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
StochasticLoadBalancer.CostFunction
|
||||
costFunction = new StochasticLoadBalancer.MoveCostFunction(conf);
|
||||
CostFunction
|
||||
costFunction = new MoveCostFunction(conf);
|
||||
BalancerClusterState cluster = mockCluster(clusterStateMocks[0]);
|
||||
costFunction.init(cluster);
|
||||
costFunction.cost();
|
||||
assertEquals(StochasticLoadBalancer.MoveCostFunction.DEFAULT_MOVE_COST,
|
||||
assertEquals(MoveCostFunction.DEFAULT_MOVE_COST,
|
||||
costFunction.getMultiplier(), 0.01);
|
||||
|
||||
// In offpeak hours, the multiplier of move cost should be lower
|
||||
|
@ -227,18 +224,18 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
long deltaFor15 = TimeZone.getDefault().getRawOffset() - 28800000;
|
||||
long timeFor15 = 1597907081000L - deltaFor15;
|
||||
EnvironmentEdgeManager.injectEdge(() -> timeFor15);
|
||||
costFunction = new StochasticLoadBalancer.MoveCostFunction(conf);
|
||||
costFunction = new MoveCostFunction(conf);
|
||||
costFunction.init(cluster);
|
||||
costFunction.cost();
|
||||
assertEquals(StochasticLoadBalancer.MoveCostFunction.DEFAULT_MOVE_COST_OFFPEAK
|
||||
assertEquals(MoveCostFunction.DEFAULT_MOVE_COST_OFFPEAK
|
||||
, costFunction.getMultiplier(), 0.01);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveCost() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
StochasticLoadBalancer.CostFunction
|
||||
costFunction = new StochasticLoadBalancer.MoveCostFunction(conf);
|
||||
CostFunction
|
||||
costFunction = new MoveCostFunction(conf);
|
||||
for (int[] mockCluster : clusterStateMocks) {
|
||||
BalancerClusterState cluster = mockCluster(mockCluster);
|
||||
costFunction.init(cluster);
|
||||
|
@ -275,8 +272,8 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
@Test
|
||||
public void testSkewCost() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
StochasticLoadBalancer.CostFunction
|
||||
costFunction = new StochasticLoadBalancer.RegionCountSkewCostFunction(conf);
|
||||
CostFunction
|
||||
costFunction = new RegionCountSkewCostFunction(conf);
|
||||
for (int[] mockCluster : clusterStateMocks) {
|
||||
costFunction.init(mockCluster(mockCluster));
|
||||
double cost = costFunction.cost();
|
||||
|
@ -322,8 +319,8 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
@Test
|
||||
public void testTableSkewCost() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
StochasticLoadBalancer.CostFunction
|
||||
costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf);
|
||||
CostFunction
|
||||
costFunction = new TableSkewCostFunction(conf);
|
||||
for (int[] mockCluster : clusterStateMocks) {
|
||||
BalancerClusterState cluster = mockCluster(mockCluster);
|
||||
costFunction.init(cluster);
|
||||
|
@ -344,14 +341,14 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
}
|
||||
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
StochasticLoadBalancer.ReadRequestCostFunction readCostFunction =
|
||||
new StochasticLoadBalancer.ReadRequestCostFunction(conf);
|
||||
ReadRequestCostFunction readCostFunction =
|
||||
new ReadRequestCostFunction(conf);
|
||||
double rateResult = readCostFunction.getRegionLoadCost(regionLoads);
|
||||
// read requests are treated as a rate so the average rate here is simply 1
|
||||
assertEquals(1, rateResult, 0.01);
|
||||
|
||||
StochasticLoadBalancer.StoreFileCostFunction storeFileCostFunction =
|
||||
new StochasticLoadBalancer.StoreFileCostFunction(conf);
|
||||
StoreFileCostFunction storeFileCostFunction =
|
||||
new StoreFileCostFunction(conf);
|
||||
double result = storeFileCostFunction.getRegionLoadCost(regionLoads);
|
||||
// storefile size cost is simply an average of it's value over time
|
||||
assertEquals(2.5, result, 0.01);
|
||||
|
@ -360,27 +357,26 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
@Test
|
||||
public void testCostFromArray() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
StochasticLoadBalancer.CostFromRegionLoadFunction
|
||||
costFunction = new StochasticLoadBalancer.MemStoreSizeCostFunction(conf);
|
||||
costFunction.init(mockCluster(new int[]{0, 0, 0, 0, 1}));
|
||||
CostFromRegionLoadFunction costFunction = new MemStoreSizeCostFunction(conf);
|
||||
costFunction.init(mockCluster(new int[] { 0, 0, 0, 0, 1 }));
|
||||
|
||||
double[] statOne = new double[100];
|
||||
for (int i =0; i < 100; i++) {
|
||||
for (int i = 0; i < 100; i++) {
|
||||
statOne[i] = 10;
|
||||
}
|
||||
assertEquals(0, costFunction.costFromArray(statOne), 0.01);
|
||||
|
||||
double[] statTwo= new double[101];
|
||||
for (int i =0; i < 100; i++) {
|
||||
double[] statTwo = new double[101];
|
||||
for (int i = 0; i < 100; i++) {
|
||||
statTwo[i] = 0;
|
||||
}
|
||||
statTwo[100] = 100;
|
||||
assertEquals(1, costFunction.costFromArray(statTwo), 0.01);
|
||||
|
||||
double[] statThree = new double[200];
|
||||
for (int i =0; i < 100; i++) {
|
||||
statThree[i] = (0);
|
||||
statThree[i+100] = 100;
|
||||
for (int i = 0; i < 100; i++) {
|
||||
statThree[i] = 0;
|
||||
statThree[i + 100] = 100;
|
||||
}
|
||||
assertEquals(0.5, costFunction.costFromArray(statThree), 0.01);
|
||||
}
|
||||
|
@ -435,17 +431,17 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
@Test
|
||||
public void testDefaultCostFunctionList() {
|
||||
List<String> expected = Arrays.asList(
|
||||
StochasticLoadBalancer.RegionCountSkewCostFunction.class.getSimpleName(),
|
||||
StochasticLoadBalancer.PrimaryRegionCountSkewCostFunction.class.getSimpleName(),
|
||||
StochasticLoadBalancer.MoveCostFunction.class.getSimpleName(),
|
||||
StochasticLoadBalancer.RackLocalityCostFunction.class.getSimpleName(),
|
||||
StochasticLoadBalancer.TableSkewCostFunction.class.getSimpleName(),
|
||||
StochasticLoadBalancer.RegionReplicaHostCostFunction.class.getSimpleName(),
|
||||
StochasticLoadBalancer.RegionReplicaRackCostFunction.class.getSimpleName(),
|
||||
StochasticLoadBalancer.ReadRequestCostFunction.class.getSimpleName(),
|
||||
StochasticLoadBalancer.WriteRequestCostFunction.class.getSimpleName(),
|
||||
StochasticLoadBalancer.MemStoreSizeCostFunction.class.getSimpleName(),
|
||||
StochasticLoadBalancer.StoreFileCostFunction.class.getSimpleName()
|
||||
RegionCountSkewCostFunction.class.getSimpleName(),
|
||||
PrimaryRegionCountSkewCostFunction.class.getSimpleName(),
|
||||
MoveCostFunction.class.getSimpleName(),
|
||||
RackLocalityCostFunction.class.getSimpleName(),
|
||||
TableSkewCostFunction.class.getSimpleName(),
|
||||
RegionReplicaHostCostFunction.class.getSimpleName(),
|
||||
RegionReplicaRackCostFunction.class.getSimpleName(),
|
||||
ReadRequestCostFunction.class.getSimpleName(),
|
||||
WriteRequestCostFunction.class.getSimpleName(),
|
||||
MemStoreSizeCostFunction.class.getSimpleName(),
|
||||
StoreFileCostFunction.class.getSimpleName()
|
||||
);
|
||||
|
||||
List<String> actual = Arrays.asList(loadBalancer.getCostFunctionNames());
|
||||
|
@ -453,8 +449,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
CollectionUtils.isEqualCollection(expected, actual));
|
||||
}
|
||||
|
||||
private boolean needsBalanceIdleRegion(int[] cluster){
|
||||
return (Arrays.stream(cluster).anyMatch(x -> x>1)) && (Arrays.stream(cluster).anyMatch(x -> x<1));
|
||||
private boolean needsBalanceIdleRegion(int[] cluster) {
|
||||
return Arrays.stream(cluster).anyMatch(x -> x > 1) &&
|
||||
Arrays.stream(cluster).anyMatch(x -> x < 1);
|
||||
}
|
||||
|
||||
// This mock allows us to test the LocalityCostFunction
|
||||
|
@ -474,7 +471,8 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
regionIndexToServerIndex[regionIndex] = regions[i][0];
|
||||
for (int j = 1; j < regions[i].length; j++) {
|
||||
int serverIndex = j - 1;
|
||||
localities[regionIndex][serverIndex] = regions[i][j] > 100 ? regions[i][j] % 100 : regions[i][j];
|
||||
localities[regionIndex][serverIndex] =
|
||||
regions[i][j] > 100 ? regions[i][j] % 100 : regions[i][j];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.util.HashMap;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.TreeMap;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
|
@ -51,8 +50,8 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
|
|||
@Test
|
||||
public void testReplicaCost() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
StochasticLoadBalancer.CostFunction costFunction =
|
||||
new StochasticLoadBalancer.RegionReplicaHostCostFunction(conf);
|
||||
CostFunction costFunction =
|
||||
new RegionReplicaHostCostFunction(conf);
|
||||
for (int[] mockCluster : clusterStateMocks) {
|
||||
BalancerClusterState cluster = mockCluster(mockCluster);
|
||||
costFunction.init(cluster);
|
||||
|
@ -65,8 +64,8 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
|
|||
@Test
|
||||
public void testReplicaCostForReplicas() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
StochasticLoadBalancer.CostFunction costFunction =
|
||||
new StochasticLoadBalancer.RegionReplicaHostCostFunction(conf);
|
||||
CostFunction costFunction =
|
||||
new RegionReplicaHostCostFunction(conf);
|
||||
|
||||
int[] servers = new int[] { 3, 3, 3, 3, 3 };
|
||||
TreeMap<ServerName, List<RegionInfo>> clusterState = mockClusterServers(servers);
|
||||
|
@ -102,8 +101,8 @@ public class TestStochasticLoadBalancerRegionReplica extends BalancerTestBase {
|
|||
// test with replication = 4 for following:
|
||||
|
||||
RegionInfo replica3;
|
||||
Iterator<Entry<ServerName, List<RegionInfo>>> it;
|
||||
Entry<ServerName, List<RegionInfo>> entry;
|
||||
Iterator<Map.Entry<ServerName, List<RegionInfo>>> it;
|
||||
Map.Entry<ServerName, List<RegionInfo>> entry;
|
||||
|
||||
clusterState = mockClusterServers(servers);
|
||||
it = clusterState.entrySet().iterator();
|
||||
|
|
Loading…
Reference in New Issue