HBASE-25819 Fix style issues for StochasticLoadBalancer (#3207)

Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
Duo Zhang 2021-04-29 11:03:55 +08:00 committed by GitHub
parent b061b0c4ed
commit 6c65314cdf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1275 additions and 915 deletions

View File

@ -52,7 +52,7 @@ public class ReflectionUtils {
}
}
private static <T> T instantiate(final String className, Constructor<T> ctor, Object[] ctorArgs) {
public static <T> T instantiate(final String className, Constructor<T> ctor, Object... ctorArgs) {
try {
ctor.setAccessible(true);
return ctor.newInstance(ctorArgs);

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Compute the cost of total number of coprocessor requests The more unbalanced the higher the
* computed cost will be. This uses a rolling average of regionload.
*/
@InterfaceAudience.Private
class CPRequestCostFunction extends CostFromRegionLoadAsRateFunction {
private static final String CP_REQUEST_COST_KEY =
"hbase.master.balancer.stochastic.cpRequestCost";
private static final float DEFAULT_CP_REQUEST_COST = 5;
CPRequestCostFunction(Configuration conf) {
this.setMultiplier(conf.getFloat(CP_REQUEST_COST_KEY, DEFAULT_CP_REQUEST_COST));
}
@Override
protected double getCostFromRl(BalancerRegionLoad rl) {
return rl.getCpRequestsCount();
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.Collection;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Class to be used for the subset of RegionLoad costs that should be treated as rates. We do not
* compare about the actual rate in requests per second but rather the rate relative to the rest of
* the regions.
*/
@InterfaceAudience.Private
abstract class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
@Override
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
double cost = 0;
double previous = 0;
boolean isFirst = true;
for (BalancerRegionLoad rl : regionLoadList) {
double current = getCostFromRl(rl);
if (isFirst) {
isFirst = false;
} else {
cost += current - previous;
}
previous = current;
}
return Math.max(0, cost / (regionLoadList.size() - 1));
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.Collection;
import java.util.Deque;
import java.util.Map;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Base class the allows writing costs functions from rolling average of some number from
* RegionLoad.
*/
@InterfaceAudience.Private
abstract class CostFromRegionLoadFunction extends CostFunction {
private ClusterMetrics clusterStatus;
private Map<String, Deque<BalancerRegionLoad>> loads;
private double[] stats;
void setClusterMetrics(ClusterMetrics status) {
this.clusterStatus = status;
}
void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
this.loads = l;
}
@Override
protected final double cost() {
if (clusterStatus == null || loads == null) {
return 0;
}
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
for (int i = 0; i < stats.length; i++) {
// Cost this server has from RegionLoad
long cost = 0;
// for every region on this server get the rl
for (int regionIndex : cluster.regionsPerServer[i]) {
Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
// Now if we found a region load get the type of cost that was requested.
if (regionLoadList != null) {
cost = (long) (cost + getRegionLoadCost(regionLoadList));
}
}
// Add the total cost to the stats.
stats[i] = cost;
}
// Now return the scaled cost from data held in the stats object.
return costFromArray(stats);
}
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
double cost = 0;
for (BalancerRegionLoad rl : regionLoadList) {
cost += getCostFromRl(rl);
}
return cost / regionLoadList.size();
}
protected abstract double getCostFromRl(BalancerRegionLoad rl);
}

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Base class of StochasticLoadBalancer's Cost Functions.
*/
@InterfaceAudience.Private
abstract class CostFunction {
private float multiplier = 0;
protected BalancerClusterState cluster;
boolean isNeeded() {
return true;
}
float getMultiplier() {
return multiplier;
}
void setMultiplier(float m) {
this.multiplier = m;
}
/**
* Called once per LB invocation to give the cost function to initialize it's state, and perform
* any costly calculation.
*/
void init(BalancerClusterState cluster) {
this.cluster = cluster;
}
/**
* Called once per cluster Action to give the cost function an opportunity to update it's state.
* postAction() is always called at least once before cost() is called with the cluster that this
* action is performed on.
*/
void postAction(BalanceAction action) {
switch (action.getType()) {
case NULL:
break;
case ASSIGN_REGION:
AssignRegionAction ar = (AssignRegionAction) action;
regionMoved(ar.getRegion(), -1, ar.getServer());
break;
case MOVE_REGION:
MoveRegionAction mra = (MoveRegionAction) action;
regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
break;
case SWAP_REGIONS:
SwapRegionsAction a = (SwapRegionsAction) action;
regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
break;
default:
throw new RuntimeException("Uknown action:" + action.getType());
}
}
protected void regionMoved(int region, int oldServer, int newServer) {
}
protected abstract double cost();
@SuppressWarnings("checkstyle:linelength")
/**
* Function to compute a scaled cost using
* {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
* It assumes that this is a zero sum set of costs. It assumes that the worst case possible is all
* of the elements in one region server and the rest having 0.
* @param stats the costs
* @return a scaled set of costs.
*/
protected final double costFromArray(double[] stats) {
double totalCost = 0;
double total = getSum(stats);
double count = stats.length;
double mean = total / count;
// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
// a zero sum cost for this to make sense.
double max = ((count - 1) * mean) + (total - mean);
// It's possible that there aren't enough regions to go around
double min;
if (count > total) {
min = ((count - total) * mean) + ((1 - mean) * total);
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * count));
int numLow = (int) (count - numHigh);
min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
}
min = Math.max(0, min);
for (int i = 0; i < stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
totalCost += diff;
}
double scaled = scale(min, max, totalCost);
return scaled;
}
private double getSum(double[] stats) {
double total = 0;
for (double s : stats) {
total += s;
}
return total;
}
/**
* Scale the value between 0 and 1.
* @param min Min value
* @param max The Max value
* @param value The value to be scaled.
* @return The scaled value.
*/
protected final double scale(double min, double max, double value) {
if (max <= min || value <= min) {
return 0;
}
if ((max - min) == 0) {
return 0;
}
return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
}
}

View File

@ -53,7 +53,7 @@ import org.slf4j.LoggerFactory;
* The rule file can be located on local FS or HDFS, depending on the prefix (file//: or hdfs://).
*/
@InterfaceAudience.Private
public class HeterogeneousRegionCountCostFunction extends StochasticLoadBalancer.CostFunction {
public class HeterogeneousRegionCountCostFunction extends CostFunction {
/**
* configuration used for the path where the rule file is stored.
@ -94,7 +94,6 @@ public class HeterogeneousRegionCountCostFunction extends StochasticLoadBalancer
double overallUsage;
HeterogeneousRegionCountCostFunction(final Configuration conf) {
super(conf);
this.conf = conf;
this.limitPerRS = new HashMap<>();
this.limitPerRule = new HashMap<>();
@ -108,8 +107,8 @@ public class HeterogeneousRegionCountCostFunction extends StochasticLoadBalancer
+ "'. Setting default to 200");
this.defaultNumberOfRegions = 200;
}
if (conf.getFloat(StochasticLoadBalancer.RegionCountSkewCostFunction.REGION_COUNT_SKEW_COST_KEY,
StochasticLoadBalancer.RegionCountSkewCostFunction.DEFAULT_REGION_COUNT_SKEW_COST) > 0) {
if (conf.getFloat(RegionCountSkewCostFunction.REGION_COUNT_SKEW_COST_KEY,
RegionCountSkewCostFunction.DEFAULT_REGION_COUNT_SKEW_COST) > 0) {
LOG.warn("regionCountCost is not set to 0, "
+ " this will interfere with the HeterogeneousRegionCountCostFunction!");
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.balancer.BalancerClusterState.LocalityType;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Compute a cost of a potential cluster configuration based upon where
* {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
*/
@InterfaceAudience.Private
abstract class LocalityBasedCostFunction extends CostFunction {
private final LocalityType type;
private double bestLocality; // best case locality across cluster weighted by local data size
private double locality; // current locality across cluster weighted by local data size
LocalityBasedCostFunction(Configuration conf, LocalityType type, String localityCostKey,
float defaultLocalityCost) {
this.type = type;
this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
this.locality = 0.0;
this.bestLocality = 0.0;
}
/**
* Maps region to the current entity (server or rack) on which it is stored
*/
abstract int regionIndexToEntityIndex(int region);
@Override
void init(BalancerClusterState cluster) {
super.init(cluster);
locality = 0.0;
bestLocality = 0.0;
for (int region = 0; region < cluster.numRegions; region++) {
locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
}
// We normalize locality to be a score between 0 and 1.0 representing how good it
// is compared to how good it could be. If bestLocality is 0, assume locality is 100
// (and the cost is 0)
locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
}
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
int oldEntity =
type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
int newEntity =
type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
double localityDelta =
getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
locality += normalizedDelta;
}
@Override
protected final double cost() {
return 1 - locality;
}
private int getMostLocalEntityForRegion(int region) {
return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
}
private double getWeightedLocality(int region, int entity) {
return cluster.getOrComputeWeightedLocality(region, entity, type);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Compute the cost of total memstore size. The more unbalanced the higher the computed cost will
* be. This uses a rolling average of regionload.
*/
@InterfaceAudience.Private
class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
private static final String MEMSTORE_SIZE_COST_KEY =
"hbase.master.balancer.stochastic.memstoreSizeCost";
private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
MemStoreSizeCostFunction(Configuration conf) {
this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
}
@Override
protected double getCostFromRl(BalancerRegionLoad rl) {
return rl.getMemStoreSizeMB();
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Given the starting state of the regions and a potential ending state compute cost based upon the
* number of regions that have moved.
*/
@InterfaceAudience.Private
class MoveCostFunction extends CostFunction {
private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
private static final String MOVE_COST_OFFPEAK_KEY =
"hbase.master.balancer.stochastic.moveCost.offpeak";
private static final String MAX_MOVES_PERCENT_KEY =
"hbase.master.balancer.stochastic.maxMovePercent";
static final float DEFAULT_MOVE_COST = 7;
static final float DEFAULT_MOVE_COST_OFFPEAK = 3;
private static final int DEFAULT_MAX_MOVES = 600;
private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
private final float maxMovesPercent;
private final Configuration conf;
MoveCostFunction(Configuration conf) {
this.conf = conf;
// What percent of the number of regions a single run of the balancer can move.
maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
// Initialize the multiplier so that addCostFunction will add this cost function.
// It may change during later evaluations, due to OffPeakHours.
this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
}
@Override
protected double cost() {
// Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
// that large benefits are need to overcome the cost of a move.
if (OffPeakHours.getInstance(conf).isOffPeakHour()) {
this.setMultiplier(conf.getFloat(MOVE_COST_OFFPEAK_KEY, DEFAULT_MOVE_COST_OFFPEAK));
} else {
this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
}
// Try and size the max number of Moves, but always be prepared to move some.
int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent), DEFAULT_MAX_MOVES);
double moveCost = cluster.numMovedRegions;
// Don't let this single balance move more than the max moves.
// This allows better scaling to accurately represent the actual cost of a move.
if (moveCost > maxMoves) {
return 1000000; // return a number much greater than any of the other cost
}
return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Compute the cost of a potential cluster state from skew in number of primary regions on a
* cluster.
*/
@InterfaceAudience.Private
class PrimaryRegionCountSkewCostFunction extends CostFunction {
private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
"hbase.master.balancer.stochastic.primaryRegionCountCost";
private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
private double[] stats;
PrimaryRegionCountSkewCostFunction(Configuration conf) {
// Load multiplier should be the greatest as primary regions serve majority of reads/writes.
this.setMultiplier(
conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
}
@Override
boolean isNeeded() {
return cluster.hasRegionReplicas;
}
@Override
protected double cost() {
if (!cluster.hasRegionReplicas) {
return 0;
}
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
for (int i = 0; i < cluster.numServers; i++) {
stats[i] = 0;
for (int regionIdx : cluster.regionsPerServer[i]) {
if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
stats[i]++;
}
}
}
return costFromArray(stats);
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.balancer.BalancerClusterState.LocalityType;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class RackLocalityCostFunction extends LocalityBasedCostFunction {
private static final String RACK_LOCALITY_COST_KEY =
"hbase.master.balancer.stochastic.rackLocalityCost";
private static final float DEFAULT_RACK_LOCALITY_COST = 15;
public RackLocalityCostFunction(Configuration conf) {
super(conf, LocalityType.RACK, RACK_LOCALITY_COST_KEY, DEFAULT_RACK_LOCALITY_COST);
}
@Override
int regionIndexToEntityIndex(int region) {
return cluster.getRackForRegion(region);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Compute the cost of total number of read requests The more unbalanced the higher the computed
* cost will be. This uses a rolling average of regionload.
*/
@InterfaceAudience.Private
class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
private static final String READ_REQUEST_COST_KEY =
"hbase.master.balancer.stochastic.readRequestCost";
private static final float DEFAULT_READ_REQUEST_COST = 5;
ReadRequestCostFunction(Configuration conf) {
this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
}
@Override
protected double getCostFromRl(BalancerRegionLoad rl) {
return rl.getReadRequestsCount();
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Compute the cost of a potential cluster state from skew in number of regions on a cluster.
*/
@InterfaceAudience.Private
class RegionCountSkewCostFunction extends CostFunction {
private static final Logger LOG = LoggerFactory.getLogger(RegionCountSkewCostFunction.class);
static final String REGION_COUNT_SKEW_COST_KEY =
"hbase.master.balancer.stochastic.regionCountCost";
static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
private double[] stats = null;
RegionCountSkewCostFunction(Configuration conf) {
// Load multiplier should be the greatest as it is the most general way to balance data.
this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
}
@Override
void init(BalancerClusterState cluster) {
super.init(cluster);
LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
cluster.numServers, cluster.numRegions);
if (LOG.isTraceEnabled()) {
for (int i = 0; i < cluster.numServers; i++) {
LOG.trace("{} sees server '{}' has {} regions", getClass().getSimpleName(),
cluster.servers[i], cluster.regionsPerServer[i].length);
}
}
}
@Override
protected double cost() {
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
for (int i = 0; i < cluster.numServers; i++) {
stats[i] = cluster.regionsPerServer[i].length;
}
return costFromArray(stats);
}
}

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A cost function for region replicas. We give a very high cost to hosting replicas of the same
* region in the same host. We do not prevent the case though, since if numReplicas >
* numRegionServers, we still want to keep the replica open.
*/
@InterfaceAudience.Private
class RegionReplicaHostCostFunction extends CostFunction {
private static final String REGION_REPLICA_HOST_COST_KEY =
"hbase.master.balancer.stochastic.regionReplicaHostCostKey";
private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
long maxCost = 0;
long[] costsPerGroup; // group is either server, host or rack
int[][] primariesOfRegionsPerGroup;
public RegionReplicaHostCostFunction(Configuration conf) {
this.setMultiplier(
conf.getFloat(REGION_REPLICA_HOST_COST_KEY, DEFAULT_REGION_REPLICA_HOST_COST_KEY));
}
@Override
void init(BalancerClusterState cluster) {
super.init(cluster);
// max cost is the case where every region replica is hosted together regardless of host
maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
costsPerGroup = new long[cluster.numHosts];
primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
? cluster.primariesOfRegionsPerHost : cluster.primariesOfRegionsPerServer;
for (int i = 0; i < primariesOfRegionsPerGroup.length; i++) {
costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
}
}
protected final long getMaxCost(BalancerClusterState cluster) {
if (!cluster.hasRegionReplicas) {
return 0; // short circuit
}
// max cost is the case where every region replica is hosted together regardless of host
int[] primariesOfRegions = new int[cluster.numRegions];
System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
cluster.regions.length);
Arrays.sort(primariesOfRegions);
// compute numReplicas from the sorted array
return costPerGroup(primariesOfRegions);
}
@Override
boolean isNeeded() {
return cluster.hasRegionReplicas;
}
@Override
protected double cost() {
if (maxCost <= 0) {
return 0;
}
long totalCost = 0;
for (int i = 0; i < costsPerGroup.length; i++) {
totalCost += costsPerGroup[i];
}
return scale(0, maxCost, totalCost);
}
/**
* For each primary region, it computes the total number of replicas in the array (numReplicas)
* and returns a sum of numReplicas-1 squared. For example, if the server hosts regions a, b, c,
* d, e, f where a and b are same replicas, and c,d,e are same replicas, it returns (2-1) * (2-1)
* + (3-1) * (3-1) + (1-1) * (1-1).
* @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
* @return a sum of numReplicas-1 squared for each primary region in the group.
*/
protected final long costPerGroup(int[] primariesOfRegions) {
long cost = 0;
int currentPrimary = -1;
int currentPrimaryIndex = -1;
// primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
// sharing the same primary will have consecutive numbers in the array.
for (int j = 0; j <= primariesOfRegions.length; j++) {
int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
if (primary != currentPrimary) { // we see a new primary
int numReplicas = j - currentPrimaryIndex;
// square the cost
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
cost += (numReplicas - 1) * (numReplicas - 1);
}
currentPrimary = primary;
currentPrimaryIndex = j;
}
}
return cost;
}
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
if (maxCost <= 0) {
return; // no need to compute
}
if (cluster.multiServersPerHost) {
int oldHost = cluster.serverIndexToHostIndex[oldServer];
int newHost = cluster.serverIndexToHostIndex[newServer];
if (newHost != oldHost) {
costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
}
} else {
costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
}
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A cost function for region replicas for the rack distribution. We give a relatively high cost to
* hosting replicas of the same region in the same rack. We do not prevent the case though.
*/
@InterfaceAudience.Private
class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
private static final String REGION_REPLICA_RACK_COST_KEY =
"hbase.master.balancer.stochastic.regionReplicaRackCostKey";
private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
public RegionReplicaRackCostFunction(Configuration conf) {
super(conf);
this.setMultiplier(
conf.getFloat(REGION_REPLICA_RACK_COST_KEY, DEFAULT_REGION_REPLICA_RACK_COST_KEY));
}
@Override
void init(BalancerClusterState cluster) {
this.cluster = cluster;
if (cluster.numRacks <= 1) {
maxCost = 0;
return; // disabled for 1 rack
}
// max cost is the case where every region replica is hosted together regardless of rack
maxCost = getMaxCost(cluster);
costsPerGroup = new long[cluster.numRacks];
for (int i = 0; i < cluster.primariesOfRegionsPerRack.length; i++) {
costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
}
}
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
if (maxCost <= 0) {
return; // no need to compute
}
int oldRack = cluster.serverIndexToRackIndex[oldServer];
int newRack = cluster.serverIndexToRackIndex[newServer];
if (newRack != oldRack) {
costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.balancer.BalancerClusterState.LocalityType;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class ServerLocalityCostFunction extends LocalityBasedCostFunction {
private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
private static final float DEFAULT_LOCALITY_COST = 25;
ServerLocalityCostFunction(Configuration conf) {
super(conf, LocalityType.SERVER, LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST);
}
@Override
int regionIndexToEntityIndex(int region) {
return cluster.regionIndexToServerIndex[region];
}
}

View File

@ -17,18 +17,16 @@
*/
package org.apache.hadoop.hbase.master.balancer;
import com.google.errorprone.annotations.RestrictedApi;
import java.lang.reflect.Constructor;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -39,10 +37,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BalancerClusterState.LocalityType;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
@ -82,7 +78,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
* <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
* </ul>
*
* <p>All custom Cost Functions needs to extends {@link StochasticLoadBalancer.CostFunction}</p>
* <p>All custom Cost Functions needs to extends {@link CostFunction}</p>
*
* <p>In addition to the above configurations, the balancer can be tuned by the following
* configuration values:</p>
@ -233,38 +229,44 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
Arrays.toString(getCostFunctionNames()) + " etc.");
}
private static CostFunction createCostFunction(Class<? extends CostFunction> clazz,
Configuration conf) {
try {
Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class);
return ReflectionUtils.instantiate(clazz.getName(), ctor, conf);
} catch (NoSuchMethodException e) {
// will try construct with no parameter
}
return ReflectionUtils.newInstance(clazz);
}
private void loadCustomCostFunctions(Configuration conf) {
String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
if (null == functionsNames) {
return;
}
costFunctions.addAll(Arrays.stream(functionsNames).map(c -> {
Class<? extends CostFunction> klass = null;
for (String className : functionsNames) {
Class<? extends CostFunction> clazz;
try {
klass = Class.forName(c).asSubclass(CostFunction.class);
clazz = Class.forName(className).asSubclass(CostFunction.class);
} catch (ClassNotFoundException e) {
LOG.warn("Cannot load class " + c + "': " + e.getMessage());
LOG.warn("Cannot load class '{}': {}", className, e.getMessage());
continue;
}
if (null == klass) {
return null;
}
CostFunction reflected = ReflectionUtils.newInstance(klass, conf);
LOG.info(
"Successfully loaded custom CostFunction '" + reflected.getClass().getSimpleName() + "'");
return reflected;
}).filter(Objects::nonNull).collect(Collectors.toList()));
CostFunction func = createCostFunction(clazz, conf);
LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName());
costFunctions.add(func);
}
}
protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
this.candidateGenerators = customCandidateGenerators;
}
/**
* Exposed for Testing!
*/
public List<CandidateGenerator> getCandidateGenerators() {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
List<CandidateGenerator> getCandidateGenerators() {
return this.candidateGenerators;
}
@ -296,7 +298,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
/**
* Update the number of metrics that are reported to JMX
*/
public void updateMetricsSize(int size) {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void updateMetricsSize(int size) {
if (metricsBalancer instanceof MetricsStochasticBalancer) {
((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
}
@ -363,6 +367,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return !balanced;
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
BalanceAction nextAction(BalancerClusterState cluster) {
return candidateGenerators.get(ThreadLocalRandom.current().nextInt(candidateGenerators.size()))
.generate(cluster);
@ -574,7 +580,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* @return List of RegionPlan's that represent the moves needed to get to desired final state.
*/
private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) {
List<RegionPlan> plans = new LinkedList<>();
List<RegionPlan> plans = new ArrayList<>();
for (int regionIndex = 0;
regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
@ -624,13 +630,17 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
}
protected void initCosts(BalancerClusterState cluster) {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void initCosts(BalancerClusterState cluster) {
for (CostFunction c:costFunctions) {
c.init(cluster);
}
}
protected void updateCostsWithAction(BalancerClusterState cluster, BalanceAction action) {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void updateCostsWithAction(BalancerClusterState cluster, BalanceAction action) {
for (CostFunction c : costFunctions) {
c.postAction(action);
}
@ -639,7 +649,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
/**
* Get the names of the cost functions
*/
public String[] getCostFunctionNames() {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
String[] getCostFunctionNames() {
if (costFunctions == null) {
return null;
}
@ -661,7 +673,9 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
* @return a double of a cost associated with the proposed cluster state. This cost is an
* aggregate of all individual cost functions.
*/
protected double computeCost(BalancerClusterState cluster, double previousCost) {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
double computeCost(BalancerClusterState cluster, double previousCost) {
double total = 0;
for (int i = 0; i < costFunctions.size(); i++) {
@ -686,776 +700,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return total;
}
/**
* Base class of StochasticLoadBalancer's Cost Functions.
*/
public abstract static class CostFunction {
private float multiplier = 0;
protected BalancerClusterState cluster;
public CostFunction(Configuration c) {
}
boolean isNeeded() {
return true;
}
float getMultiplier() {
return multiplier;
}
void setMultiplier(float m) {
this.multiplier = m;
}
/** Called once per LB invocation to give the cost function
* to initialize it's state, and perform any costly calculation.
*/
void init(BalancerClusterState cluster) {
this.cluster = cluster;
}
/** Called once per cluster Action to give the cost function
* an opportunity to update it's state. postAction() is always
* called at least once before cost() is called with the cluster
* that this action is performed on. */
void postAction(BalanceAction action) {
switch (action.getType()) {
case NULL:
break;
case ASSIGN_REGION:
AssignRegionAction ar = (AssignRegionAction) action;
regionMoved(ar.getRegion(), -1, ar.getServer());
break;
case MOVE_REGION:
MoveRegionAction mra = (MoveRegionAction) action;
regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
break;
case SWAP_REGIONS:
SwapRegionsAction a = (SwapRegionsAction) action;
regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
break;
default:
throw new RuntimeException("Uknown action:" + action.getType());
}
}
protected void regionMoved(int region, int oldServer, int newServer) {
}
protected abstract double cost();
@SuppressWarnings("checkstyle:linelength")
/**
* Function to compute a scaled cost using
* {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
* It assumes that this is a zero sum set of costs. It assumes that the worst case
* possible is all of the elements in one region server and the rest having 0.
*
* @param stats the costs
* @return a scaled set of costs.
*/
protected double costFromArray(double[] stats) {
double totalCost = 0;
double total = getSum(stats);
double count = stats.length;
double mean = total/count;
// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
// a zero sum cost for this to make sense.
double max = ((count - 1) * mean) + (total - mean);
// It's possible that there aren't enough regions to go around
double min;
if (count > total) {
min = ((count - total) * mean) + ((1 - mean) * total);
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * count));
int numLow = (int) (count - numHigh);
min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
}
min = Math.max(0, min);
for (int i=0; i<stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
totalCost += diff;
}
double scaled = scale(min, max, totalCost);
return scaled;
}
private double getSum(double[] stats) {
double total = 0;
for(double s:stats) {
total += s;
}
return total;
}
/**
* Scale the value between 0 and 1.
*
* @param min Min value
* @param max The Max value
* @param value The value to be scaled.
* @return The scaled value.
*/
protected double scale(double min, double max, double value) {
if (max <= min || value <= min) {
return 0;
}
if ((max - min) == 0) {
return 0;
}
return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
}
}
/**
* Given the starting state of the regions and a potential ending state
* compute cost based upon the number of regions that have moved.
*/
static class MoveCostFunction extends CostFunction {
private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
private static final String MOVE_COST_OFFPEAK_KEY =
"hbase.master.balancer.stochastic.moveCost.offpeak";
private static final String MAX_MOVES_PERCENT_KEY =
"hbase.master.balancer.stochastic.maxMovePercent";
static final float DEFAULT_MOVE_COST = 7;
static final float DEFAULT_MOVE_COST_OFFPEAK = 3;
private static final int DEFAULT_MAX_MOVES = 600;
private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
private final float maxMovesPercent;
private final Configuration conf;
MoveCostFunction(Configuration conf) {
super(conf);
this.conf = conf;
// What percent of the number of regions a single run of the balancer can move.
maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
// Initialize the multiplier so that addCostFunction will add this cost function.
// It may change during later evaluations, due to OffPeakHours.
this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
}
@Override
protected double cost() {
// Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
// that large benefits are need to overcome the cost of a move.
if (OffPeakHours.getInstance(conf).isOffPeakHour()) {
this.setMultiplier(conf.getFloat(MOVE_COST_OFFPEAK_KEY, DEFAULT_MOVE_COST_OFFPEAK));
} else {
this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
}
// Try and size the max number of Moves, but always be prepared to move some.
int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
DEFAULT_MAX_MOVES);
double moveCost = cluster.numMovedRegions;
// Don't let this single balance move more than the max moves.
// This allows better scaling to accurately represent the actual cost of a move.
if (moveCost > maxMoves) {
return 1000000; // return a number much greater than any of the other cost
}
return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
}
}
/**
* Compute the cost of a potential cluster state from skew in number of
* regions on a cluster.
*/
static class RegionCountSkewCostFunction extends CostFunction {
static final String REGION_COUNT_SKEW_COST_KEY =
"hbase.master.balancer.stochastic.regionCountCost";
static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
private double[] stats = null;
RegionCountSkewCostFunction(Configuration conf) {
super(conf);
// Load multiplier should be the greatest as it is the most general way to balance data.
this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
}
@Override
void init(BalancerClusterState cluster) {
super.init(cluster);
LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
cluster.numServers, cluster.numRegions);
if (LOG.isTraceEnabled()) {
for (int i =0; i < cluster.numServers; i++) {
LOG.trace("{} sees server '{}' has {} regions", getClass().getSimpleName(),
cluster.servers[i], cluster.regionsPerServer[i].length);
}
}
}
@Override
protected double cost() {
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
for (int i =0; i < cluster.numServers; i++) {
stats[i] = cluster.regionsPerServer[i].length;
}
return costFromArray(stats);
}
}
/**
* Compute the cost of a potential cluster state from skew in number of
* primary regions on a cluster.
*/
static class PrimaryRegionCountSkewCostFunction extends CostFunction {
private static final String PRIMARY_REGION_COUNT_SKEW_COST_KEY =
"hbase.master.balancer.stochastic.primaryRegionCountCost";
private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
private double[] stats = null;
PrimaryRegionCountSkewCostFunction(Configuration conf) {
super(conf);
// Load multiplier should be the greatest as primary regions serve majority of reads/writes.
this.setMultiplier(conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY,
DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
}
@Override
boolean isNeeded() {
return cluster.hasRegionReplicas;
}
@Override
protected double cost() {
if (!cluster.hasRegionReplicas) {
return 0;
}
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
for (int i = 0; i < cluster.numServers; i++) {
stats[i] = 0;
for (int regionIdx : cluster.regionsPerServer[i]) {
if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
stats[i]++;
}
}
}
return costFromArray(stats);
}
}
/**
* Compute the cost of a potential cluster configuration based upon how evenly
* distributed tables are.
*/
static class TableSkewCostFunction extends CostFunction {
private static final String TABLE_SKEW_COST_KEY =
"hbase.master.balancer.stochastic.tableSkewCost";
private static final float DEFAULT_TABLE_SKEW_COST = 35;
TableSkewCostFunction(Configuration conf) {
super(conf);
this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
}
@Override
protected double cost() {
double max = cluster.numRegions;
double min = ((double) cluster.numRegions) / cluster.numServers;
double value = 0;
for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
value += cluster.numMaxRegionsPerTable[i];
}
return scale(min, max, value);
}
}
/**
* Compute a cost of a potential cluster configuration based upon where
* {@link org.apache.hadoop.hbase.regionserver.HStoreFile}s are located.
*/
static abstract class LocalityBasedCostFunction extends CostFunction {
private final LocalityType type;
private double bestLocality; // best case locality across cluster weighted by local data size
private double locality; // current locality across cluster weighted by local data size
LocalityBasedCostFunction(Configuration conf, LocalityType type, String localityCostKey,
float defaultLocalityCost) {
super(conf);
this.type = type;
this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
this.locality = 0.0;
this.bestLocality = 0.0;
}
/**
* Maps region to the current entity (server or rack) on which it is stored
*/
abstract int regionIndexToEntityIndex(int region);
@Override
void init(BalancerClusterState cluster) {
super.init(cluster);
locality = 0.0;
bestLocality = 0.0;
for (int region = 0; region < cluster.numRegions; region++) {
locality += getWeightedLocality(region, regionIndexToEntityIndex(region));
bestLocality += getWeightedLocality(region, getMostLocalEntityForRegion(region));
}
// We normalize locality to be a score between 0 and 1.0 representing how good it
// is compared to how good it could be. If bestLocality is 0, assume locality is 100
// (and the cost is 0)
locality = bestLocality == 0 ? 1.0 : locality / bestLocality;
}
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
int oldEntity =
type == LocalityType.SERVER ? oldServer : cluster.serverIndexToRackIndex[oldServer];
int newEntity =
type == LocalityType.SERVER ? newServer : cluster.serverIndexToRackIndex[newServer];
double localityDelta =
getWeightedLocality(region, newEntity) - getWeightedLocality(region, oldEntity);
double normalizedDelta = bestLocality == 0 ? 0.0 : localityDelta / bestLocality;
locality += normalizedDelta;
}
@Override
protected double cost() {
return 1 - locality;
}
private int getMostLocalEntityForRegion(int region) {
return cluster.getOrComputeRegionsToMostLocalEntities(type)[region];
}
private double getWeightedLocality(int region, int entity) {
return cluster.getOrComputeWeightedLocality(region, entity, type);
}
}
static class ServerLocalityCostFunction extends LocalityBasedCostFunction {
private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
private static final float DEFAULT_LOCALITY_COST = 25;
ServerLocalityCostFunction(Configuration conf) {
super(conf, LocalityType.SERVER, LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST);
}
@Override
int regionIndexToEntityIndex(int region) {
return cluster.regionIndexToServerIndex[region];
}
}
static class RackLocalityCostFunction extends LocalityBasedCostFunction {
private static final String RACK_LOCALITY_COST_KEY =
"hbase.master.balancer.stochastic.rackLocalityCost";
private static final float DEFAULT_RACK_LOCALITY_COST = 15;
public RackLocalityCostFunction(Configuration conf) {
super(conf, LocalityType.RACK, RACK_LOCALITY_COST_KEY, DEFAULT_RACK_LOCALITY_COST);
}
@Override
int regionIndexToEntityIndex(int region) {
return cluster.getRackForRegion(region);
}
}
/**
* Base class the allows writing costs functions from rolling average of some
* number from RegionLoad.
*/
abstract static class CostFromRegionLoadFunction extends CostFunction {
private ClusterMetrics clusterStatus = null;
private Map<String, Deque<BalancerRegionLoad>> loads = null;
private double[] stats = null;
CostFromRegionLoadFunction(Configuration conf) {
super(conf);
}
void setClusterMetrics(ClusterMetrics status) {
this.clusterStatus = status;
}
void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
this.loads = l;
}
@Override
protected double cost() {
if (clusterStatus == null || loads == null) {
return 0;
}
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
for (int i =0; i < stats.length; i++) {
//Cost this server has from RegionLoad
long cost = 0;
// for every region on this server get the rl
for(int regionIndex:cluster.regionsPerServer[i]) {
Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
// Now if we found a region load get the type of cost that was requested.
if (regionLoadList != null) {
cost = (long) (cost + getRegionLoadCost(regionLoadList));
}
}
// Add the total cost to the stats.
stats[i] = cost;
}
// Now return the scaled cost from data held in the stats object.
return costFromArray(stats);
}
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
double cost = 0;
for (BalancerRegionLoad rl : regionLoadList) {
cost += getCostFromRl(rl);
}
return cost / regionLoadList.size();
}
protected abstract double getCostFromRl(BalancerRegionLoad rl);
}
/**
* Class to be used for the subset of RegionLoad costs that should be treated as rates.
* We do not compare about the actual rate in requests per second but rather the rate relative
* to the rest of the regions.
*/
abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFunction {
CostFromRegionLoadAsRateFunction(Configuration conf) {
super(conf);
}
@Override
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
double cost = 0;
double previous = 0;
boolean isFirst = true;
for (BalancerRegionLoad rl : regionLoadList) {
double current = getCostFromRl(rl);
if (isFirst) {
isFirst = false;
} else {
cost += current - previous;
}
previous = current;
}
return Math.max(0, cost / (regionLoadList.size() - 1));
}
}
/**
* Compute the cost of total number of read requests The more unbalanced the higher the
* computed cost will be. This uses a rolling average of regionload.
*/
static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
private static final String READ_REQUEST_COST_KEY =
"hbase.master.balancer.stochastic.readRequestCost";
private static final float DEFAULT_READ_REQUEST_COST = 5;
ReadRequestCostFunction(Configuration conf) {
super(conf);
this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
}
@Override
protected double getCostFromRl(BalancerRegionLoad rl) {
return rl.getReadRequestsCount();
}
}
/**
* Compute the cost of total number of coprocessor requests The more unbalanced the higher the
* computed cost will be. This uses a rolling average of regionload.
*/
static class CPRequestCostFunction extends CostFromRegionLoadAsRateFunction {
private static final String CP_REQUEST_COST_KEY =
"hbase.master.balancer.stochastic.cpRequestCost";
private static final float DEFAULT_CP_REQUEST_COST = 5;
CPRequestCostFunction(Configuration conf) {
super(conf);
this.setMultiplier(conf.getFloat(CP_REQUEST_COST_KEY, DEFAULT_CP_REQUEST_COST));
}
@Override
protected double getCostFromRl(BalancerRegionLoad rl) {
return rl.getCpRequestsCount();
}
}
/**
* Compute the cost of total number of write requests. The more unbalanced the higher the
* computed cost will be. This uses a rolling average of regionload.
*/
static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
private static final String WRITE_REQUEST_COST_KEY =
"hbase.master.balancer.stochastic.writeRequestCost";
private static final float DEFAULT_WRITE_REQUEST_COST = 5;
WriteRequestCostFunction(Configuration conf) {
super(conf);
this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
}
@Override
protected double getCostFromRl(BalancerRegionLoad rl) {
return rl.getWriteRequestsCount();
}
}
/**
* A cost function for region replicas. We give a very high cost to hosting
* replicas of the same region in the same host. We do not prevent the case
* though, since if numReplicas > numRegionServers, we still want to keep the
* replica open.
*/
static class RegionReplicaHostCostFunction extends CostFunction {
private static final String REGION_REPLICA_HOST_COST_KEY =
"hbase.master.balancer.stochastic.regionReplicaHostCostKey";
private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;
long maxCost = 0;
long[] costsPerGroup; // group is either server, host or rack
int[][] primariesOfRegionsPerGroup;
public RegionReplicaHostCostFunction(Configuration conf) {
super(conf);
this.setMultiplier(conf.getFloat(REGION_REPLICA_HOST_COST_KEY,
DEFAULT_REGION_REPLICA_HOST_COST_KEY));
}
@Override
void init(BalancerClusterState cluster) {
super.init(cluster);
// max cost is the case where every region replica is hosted together regardless of host
maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0;
costsPerGroup = new long[cluster.numHosts];
primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
? cluster.primariesOfRegionsPerHost
: cluster.primariesOfRegionsPerServer;
for (int i = 0 ; i < primariesOfRegionsPerGroup.length; i++) {
costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
}
}
long getMaxCost(BalancerClusterState cluster) {
if (!cluster.hasRegionReplicas) {
return 0; // short circuit
}
// max cost is the case where every region replica is hosted together regardless of host
int[] primariesOfRegions = new int[cluster.numRegions];
System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
cluster.regions.length);
Arrays.sort(primariesOfRegions);
// compute numReplicas from the sorted array
return costPerGroup(primariesOfRegions);
}
@Override
boolean isNeeded() {
return cluster.hasRegionReplicas;
}
@Override
protected double cost() {
if (maxCost <= 0) {
return 0;
}
long totalCost = 0;
for (int i = 0 ; i < costsPerGroup.length; i++) {
totalCost += costsPerGroup[i];
}
return scale(0, maxCost, totalCost);
}
/**
* For each primary region, it computes the total number of replicas in the array (numReplicas)
* and returns a sum of numReplicas-1 squared. For example, if the server hosts
* regions a, b, c, d, e, f where a and b are same replicas, and c,d,e are same replicas, it
* returns (2-1) * (2-1) + (3-1) * (3-1) + (1-1) * (1-1).
* @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
* @return a sum of numReplicas-1 squared for each primary region in the group.
*/
protected long costPerGroup(int[] primariesOfRegions) {
long cost = 0;
int currentPrimary = -1;
int currentPrimaryIndex = -1;
// primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
// sharing the same primary will have consecutive numbers in the array.
for (int j = 0 ; j <= primariesOfRegions.length; j++) {
int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
if (primary != currentPrimary) { // we see a new primary
int numReplicas = j - currentPrimaryIndex;
// square the cost
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
cost += (numReplicas - 1) * (numReplicas - 1);
}
currentPrimary = primary;
currentPrimaryIndex = j;
}
}
return cost;
}
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
if (maxCost <= 0) {
return; // no need to compute
}
if (cluster.multiServersPerHost) {
int oldHost = cluster.serverIndexToHostIndex[oldServer];
int newHost = cluster.serverIndexToHostIndex[newServer];
if (newHost != oldHost) {
costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
}
} else {
costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
}
}
}
/**
* A cost function for region replicas for the rack distribution. We give a relatively high
* cost to hosting replicas of the same region in the same rack. We do not prevent the case
* though.
*/
static class RegionReplicaRackCostFunction extends RegionReplicaHostCostFunction {
private static final String REGION_REPLICA_RACK_COST_KEY =
"hbase.master.balancer.stochastic.regionReplicaRackCostKey";
private static final float DEFAULT_REGION_REPLICA_RACK_COST_KEY = 10000;
public RegionReplicaRackCostFunction(Configuration conf) {
super(conf);
this.setMultiplier(conf.getFloat(REGION_REPLICA_RACK_COST_KEY,
DEFAULT_REGION_REPLICA_RACK_COST_KEY));
}
@Override
void init(BalancerClusterState cluster) {
this.cluster = cluster;
if (cluster.numRacks <= 1) {
maxCost = 0;
return; // disabled for 1 rack
}
// max cost is the case where every region replica is hosted together regardless of rack
maxCost = getMaxCost(cluster);
costsPerGroup = new long[cluster.numRacks];
for (int i = 0 ; i < cluster.primariesOfRegionsPerRack.length; i++) {
costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
}
}
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
if (maxCost <= 0) {
return; // no need to compute
}
int oldRack = cluster.serverIndexToRackIndex[oldServer];
int newRack = cluster.serverIndexToRackIndex[newServer];
if (newRack != oldRack) {
costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
}
}
}
/**
* Compute the cost of total memstore size. The more unbalanced the higher the
* computed cost will be. This uses a rolling average of regionload.
*/
static class MemStoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
private static final String MEMSTORE_SIZE_COST_KEY =
"hbase.master.balancer.stochastic.memstoreSizeCost";
private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
MemStoreSizeCostFunction(Configuration conf) {
super(conf);
this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
}
@Override
protected double getCostFromRl(BalancerRegionLoad rl) {
return rl.getMemStoreSizeMB();
}
}
/**
* Compute the cost of total open storefiles size. The more unbalanced the higher the
* computed cost will be. This uses a rolling average of regionload.
*/
static class StoreFileCostFunction extends CostFromRegionLoadFunction {
private static final String STOREFILE_SIZE_COST_KEY =
"hbase.master.balancer.stochastic.storefileSizeCost";
private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
StoreFileCostFunction(Configuration conf) {
super(conf);
this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
}
@Override
protected double getCostFromRl(BalancerRegionLoad rl) {
return rl.getStorefileSizeMB();
}
}
/**
* A helper function to compose the attribute name from tablename and costfunction name
*/
public static String composeAttributeName(String tableName, String costFunctionName) {
static String composeAttributeName(String tableName, String costFunctionName) {
return tableName + TABLE_FUNCTION_SEP + costFunctionName;
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Compute the cost of total open storefiles size. The more unbalanced the higher the computed cost
* will be. This uses a rolling average of regionload.
*/
@InterfaceAudience.Private
class StoreFileCostFunction extends CostFromRegionLoadFunction {
private static final String STOREFILE_SIZE_COST_KEY =
"hbase.master.balancer.stochastic.storefileSizeCost";
private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
StoreFileCostFunction(Configuration conf) {
this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
}
@Override
protected double getCostFromRl(BalancerRegionLoad rl) {
return rl.getStorefileSizeMB();
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Compute the cost of a potential cluster configuration based upon how evenly distributed tables
* are.
*/
@InterfaceAudience.Private
class TableSkewCostFunction extends CostFunction {
private static final String TABLE_SKEW_COST_KEY =
"hbase.master.balancer.stochastic.tableSkewCost";
private static final float DEFAULT_TABLE_SKEW_COST = 35;
TableSkewCostFunction(Configuration conf) {
this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
}
@Override
protected double cost() {
double max = cluster.numRegions;
double min = ((double) cluster.numRegions) / cluster.numServers;
double value = 0;
for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
value += cluster.numMaxRegionsPerTable[i];
}
return scale(min, max, value);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Compute the cost of total number of write requests. The more unbalanced the higher the computed
* cost will be. This uses a rolling average of regionload.
*/
@InterfaceAudience.Private
class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
private static final String WRITE_REQUEST_COST_KEY =
"hbase.master.balancer.stochastic.writeRequestCost";
private static final float DEFAULT_WRITE_REQUEST_COST = 5;
WriteRequestCostFunction(Configuration conf) {
this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
}
@Override
protected double getCostFromRl(BalancerRegionLoad rl) {
return rl.getWriteRequestsCount();
}
}

View File

@ -17,12 +17,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.conf.Configuration;
public class DummyCostFunction extends StochasticLoadBalancer.CostFunction {
public DummyCostFunction(Configuration c) {
super(c);
}
public class DummyCostFunction extends CostFunction {
@Override
protected double cost() {

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertTrue;
@ -35,10 +35,14 @@ import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.JMXListener;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.balancer.BalancerTestBase;
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Threads;
@ -271,13 +275,4 @@ public class TestStochasticBalancerJmxMetrics extends BalancerTestBase {
return ret;
}
private static void printMetrics(Set<String> metrics, String info) {
if (null != info) LOG.info("++++ ------ " + info + " ------");
LOG.info("++++ metrics count = " + metrics.size());
for (String str : metrics) {
LOG.info(" ++++ " + str);
}
}
}

View File

@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MockNoopMasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.ServerLocalityCostFunction;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -76,58 +75,56 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
* [test][region + 1][0] = server that region is hosted on
* [test][region + 1][server + 1] = locality for region on server
*/
private int[][][] clusterRegionLocationMocks = new int[][][]{
// Test 1: each region is entirely on server that hosts it
new int[][]{
new int[]{2, 1, 1},
new int[]{2, 0, 0, 100}, // region 0 is hosted and entirely local on server 2
new int[]{0, 100, 0, 0}, // region 1 is hosted and entirely on server 0
new int[]{0, 100, 0, 0}, // region 2 is hosted and entirely on server 0
new int[]{1, 0, 100, 0}, // region 3 is hosted and entirely on server 1
},
// Test 1: each region is entirely on server that hosts it
new int[][]{
new int[]{2, 1, 1},
new int[]{2, 0, 0, 100}, // region 0 is hosted and entirely local on server 2
new int[]{0, 100, 0, 0}, // region 1 is hosted and entirely on server 0
new int[]{0, 100, 0, 0}, // region 2 is hosted and entirely on server 0
new int[]{1, 0, 100, 0}, // region 3 is hosted and entirely on server 1
},
// Test 2: each region is 0% local on the server that hosts it
new int[][]{
new int[]{1, 2, 1},
new int[]{0, 0, 0, 100}, // region 0 is hosted and entirely local on server 2
new int[]{1, 100, 0, 0}, // region 1 is hosted and entirely on server 0
new int[]{1, 100, 0, 0}, // region 2 is hosted and entirely on server 0
new int[]{2, 0, 100, 0}, // region 3 is hosted and entirely on server 1
},
// Test 2: each region is 0% local on the server that hosts it
new int[][]{
new int[]{1, 2, 1},
new int[]{0, 0, 0, 100}, // region 0 is hosted and entirely local on server 2
new int[]{1, 100, 0, 0}, // region 1 is hosted and entirely on server 0
new int[]{1, 100, 0, 0}, // region 2 is hosted and entirely on server 0
new int[]{2, 0, 100, 0}, // region 3 is hosted and entirely on server 1
},
// Test 3: each region is 25% local on the server that hosts it (and 50% locality is possible)
new int[][]{
new int[]{1, 2, 1},
new int[]{0, 25, 0, 50}, // region 0 is hosted and entirely local on server 2
new int[]{1, 50, 25, 0}, // region 1 is hosted and entirely on server 0
new int[]{1, 50, 25, 0}, // region 2 is hosted and entirely on server 0
new int[]{2, 0, 50, 25}, // region 3 is hosted and entirely on server 1
},
// Test 3: each region is 25% local on the server that hosts it (and 50% locality is possible)
new int[][]{
new int[]{1, 2, 1},
new int[]{0, 25, 0, 50}, // region 0 is hosted and entirely local on server 2
new int[]{1, 50, 25, 0}, // region 1 is hosted and entirely on server 0
new int[]{1, 50, 25, 0}, // region 2 is hosted and entirely on server 0
new int[]{2, 0, 50, 25}, // region 3 is hosted and entirely on server 1
},
// Test 4: each region is 25% local on the server that hosts it (and 100% locality is possible)
new int[][]{
new int[]{1, 2, 1},
new int[]{0, 25, 0, 100}, // region 0 is hosted and entirely local on server 2
new int[]{1, 100, 25, 0}, // region 1 is hosted and entirely on server 0
new int[]{1, 100, 25, 0}, // region 2 is hosted and entirely on server 0
new int[]{2, 0, 100, 25}, // region 3 is hosted and entirely on server 1
},
// Test 4: each region is 25% local on the server that hosts it (and 100% locality is possible)
new int[][]{
new int[]{1, 2, 1},
new int[]{0, 25, 0, 100}, // region 0 is hosted and entirely local on server 2
new int[]{1, 100, 25, 0}, // region 1 is hosted and entirely on server 0
new int[]{1, 100, 25, 0}, // region 2 is hosted and entirely on server 0
new int[]{2, 0, 100, 25}, // region 3 is hosted and entirely on server 1
},
// Test 5: each region is 75% local on the server that hosts it (and 75% locality is possible everywhere)
new int[][]{
new int[]{1, 2, 1},
new int[]{0, 75, 75, 75}, // region 0 is hosted and entirely local on server 2
new int[]{1, 75, 75, 75}, // region 1 is hosted and entirely on server 0
new int[]{1, 75, 75, 75}, // region 2 is hosted and entirely on server 0
new int[]{2, 75, 75, 75}, // region 3 is hosted and entirely on server 1
},
// Test 5: each region is 75% local on the server that hosts it (and 75% locality is possible
// everywhere)
new int[][]{
new int[]{1, 2, 1},
new int[]{0, 75, 75, 75}, // region 0 is hosted and entirely local on server 2
new int[]{1, 75, 75, 75}, // region 1 is hosted and entirely on server 0
new int[]{1, 75, 75, 75}, // region 2 is hosted and entirely on server 0
new int[]{2, 75, 75, 75}, // region 3 is hosted and entirely on server 1
},
};
private ServerMetrics mockServerMetricsWithCpRequests(ServerName server,
List<RegionInfo> regionsOnServer,
long cpRequestCount) {
List<RegionInfo> regionsOnServer, long cpRequestCount) {
ServerMetrics serverMetrics = mock(ServerMetrics.class);
Map<byte[], RegionMetrics> regionLoadMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for(RegionInfo info : regionsOnServer){
@ -204,7 +201,6 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
@Test
public void testKeepRegionLoad() throws Exception {
ServerName sn = ServerName.valueOf("test:8080", 100);
int numClusterStatusToAdd = 20000;
for (int i = 0; i < numClusterStatusToAdd; i++) {
@ -275,7 +271,7 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
public void testLocalityCost() throws Exception {
Configuration conf = HBaseConfiguration.create();
MockNoopMasterServices master = new MockNoopMasterServices();
StochasticLoadBalancer.CostFunction
CostFunction
costFunction = new ServerLocalityCostFunction(conf);
for (int test = 0; test < clusterRegionLocationMocks.length; test++) {
@ -292,12 +288,12 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
@Test
public void testMoveCostMultiplier() throws Exception {
Configuration conf = HBaseConfiguration.create();
StochasticLoadBalancer.CostFunction
costFunction = new StochasticLoadBalancer.MoveCostFunction(conf);
CostFunction
costFunction = new MoveCostFunction(conf);
BalancerClusterState cluster = mockCluster(clusterStateMocks[0]);
costFunction.init(cluster);
costFunction.cost();
assertEquals(StochasticLoadBalancer.MoveCostFunction.DEFAULT_MOVE_COST,
assertEquals(MoveCostFunction.DEFAULT_MOVE_COST,
costFunction.getMultiplier(), 0.01);
// In offpeak hours, the multiplier of move cost should be lower
@ -308,18 +304,18 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
long deltaFor15 = TimeZone.getDefault().getRawOffset() - 28800000;
long timeFor15 = 1597907081000L - deltaFor15;
EnvironmentEdgeManager.injectEdge(() -> timeFor15);
costFunction = new StochasticLoadBalancer.MoveCostFunction(conf);
costFunction = new MoveCostFunction(conf);
costFunction.init(cluster);
costFunction.cost();
assertEquals(StochasticLoadBalancer.MoveCostFunction.DEFAULT_MOVE_COST_OFFPEAK
assertEquals(MoveCostFunction.DEFAULT_MOVE_COST_OFFPEAK
, costFunction.getMultiplier(), 0.01);
}
@Test
public void testMoveCost() throws Exception {
Configuration conf = HBaseConfiguration.create();
StochasticLoadBalancer.CostFunction
costFunction = new StochasticLoadBalancer.MoveCostFunction(conf);
CostFunction
costFunction = new MoveCostFunction(conf);
for (int[] mockCluster : clusterStateMocks) {
BalancerClusterState cluster = mockCluster(mockCluster);
costFunction.init(cluster);
@ -356,8 +352,8 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
@Test
public void testSkewCost() {
Configuration conf = HBaseConfiguration.create();
StochasticLoadBalancer.CostFunction
costFunction = new StochasticLoadBalancer.RegionCountSkewCostFunction(conf);
CostFunction
costFunction = new RegionCountSkewCostFunction(conf);
for (int[] mockCluster : clusterStateMocks) {
costFunction.init(mockCluster(mockCluster));
double cost = costFunction.cost();
@ -403,8 +399,8 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
@Test
public void testTableSkewCost() {
Configuration conf = HBaseConfiguration.create();
StochasticLoadBalancer.CostFunction
costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf);
CostFunction
costFunction = new TableSkewCostFunction(conf);
for (int[] mockCluster : clusterStateMocks) {
BalancerClusterState cluster = mockCluster(mockCluster);
costFunction.init(cluster);
@ -426,20 +422,20 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
}
Configuration conf = HBaseConfiguration.create();
StochasticLoadBalancer.ReadRequestCostFunction readCostFunction =
new StochasticLoadBalancer.ReadRequestCostFunction(conf);
ReadRequestCostFunction readCostFunction =
new ReadRequestCostFunction(conf);
double rateResult = readCostFunction.getRegionLoadCost(regionLoads);
// read requests are treated as a rate so the average rate here is simply 1
assertEquals(1, rateResult, 0.01);
StochasticLoadBalancer.CPRequestCostFunction cpCostFunction =
new StochasticLoadBalancer.CPRequestCostFunction(conf);
CPRequestCostFunction cpCostFunction =
new CPRequestCostFunction(conf);
rateResult = cpCostFunction.getRegionLoadCost(regionLoads);
// coprocessor requests are treated as a rate so the average rate here is simply 1
assertEquals(1, rateResult, 0.01);
StochasticLoadBalancer.StoreFileCostFunction storeFileCostFunction =
new StochasticLoadBalancer.StoreFileCostFunction(conf);
StoreFileCostFunction storeFileCostFunction =
new StoreFileCostFunction(conf);
double result = storeFileCostFunction.getRegionLoadCost(regionLoads);
// storefile size cost is simply an average of it's value over time
assertEquals(2.5, result, 0.01);
@ -448,27 +444,26 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
@Test
public void testCostFromArray() {
Configuration conf = HBaseConfiguration.create();
StochasticLoadBalancer.CostFromRegionLoadFunction
costFunction = new StochasticLoadBalancer.MemStoreSizeCostFunction(conf);
costFunction.init(mockCluster(new int[]{0, 0, 0, 0, 1}));
CostFromRegionLoadFunction costFunction = new MemStoreSizeCostFunction(conf);
costFunction.init(mockCluster(new int[] { 0, 0, 0, 0, 1 }));
double[] statOne = new double[100];
for (int i =0; i < 100; i++) {
for (int i = 0; i < 100; i++) {
statOne[i] = 10;
}
assertEquals(0, costFunction.costFromArray(statOne), 0.01);
double[] statTwo= new double[101];
for (int i =0; i < 100; i++) {
double[] statTwo = new double[101];
for (int i = 0; i < 100; i++) {
statTwo[i] = 0;
}
statTwo[100] = 100;
assertEquals(1, costFunction.costFromArray(statTwo), 0.01);
double[] statThree = new double[200];
for (int i =0; i < 100; i++) {
statThree[i] = (0);
statThree[i+100] = 100;
for (int i = 0; i < 100; i++) {
statThree[i] = 0;
statThree[i + 100] = 100;
}
assertEquals(0.5, costFunction.costFromArray(statThree), 0.01);
}
@ -523,18 +518,18 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
@Test
public void testDefaultCostFunctionList() {
List<String> expected = Arrays.asList(
StochasticLoadBalancer.RegionCountSkewCostFunction.class.getSimpleName(),
StochasticLoadBalancer.PrimaryRegionCountSkewCostFunction.class.getSimpleName(),
StochasticLoadBalancer.MoveCostFunction.class.getSimpleName(),
StochasticLoadBalancer.RackLocalityCostFunction.class.getSimpleName(),
StochasticLoadBalancer.TableSkewCostFunction.class.getSimpleName(),
StochasticLoadBalancer.RegionReplicaHostCostFunction.class.getSimpleName(),
StochasticLoadBalancer.RegionReplicaRackCostFunction.class.getSimpleName(),
StochasticLoadBalancer.ReadRequestCostFunction.class.getSimpleName(),
StochasticLoadBalancer.CPRequestCostFunction.class.getSimpleName(),
StochasticLoadBalancer.WriteRequestCostFunction.class.getSimpleName(),
StochasticLoadBalancer.MemStoreSizeCostFunction.class.getSimpleName(),
StochasticLoadBalancer.StoreFileCostFunction.class.getSimpleName()
RegionCountSkewCostFunction.class.getSimpleName(),
PrimaryRegionCountSkewCostFunction.class.getSimpleName(),
MoveCostFunction.class.getSimpleName(),
RackLocalityCostFunction.class.getSimpleName(),
TableSkewCostFunction.class.getSimpleName(),
RegionReplicaHostCostFunction.class.getSimpleName(),
RegionReplicaRackCostFunction.class.getSimpleName(),
ReadRequestCostFunction.class.getSimpleName(),
CPRequestCostFunction.class.getSimpleName(),
WriteRequestCostFunction.class.getSimpleName(),
MemStoreSizeCostFunction.class.getSimpleName(),
StoreFileCostFunction.class.getSimpleName()
);
List<String> actual = Arrays.asList(loadBalancer.getCostFunctionNames());
@ -542,8 +537,9 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
CollectionUtils.isEqualCollection(expected, actual));
}
private boolean needsBalanceIdleRegion(int[] cluster){
return (Arrays.stream(cluster).anyMatch(x -> x>1)) && (Arrays.stream(cluster).anyMatch(x -> x<1));
private boolean needsBalanceIdleRegion(int[] cluster) {
return Arrays.stream(cluster).anyMatch(x -> x > 1) &&
Arrays.stream(cluster).anyMatch(x -> x < 1);
}
// This mock allows us to test the LocalityCostFunction
@ -563,7 +559,8 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
regionIndexToServerIndex[regionIndex] = regions[i][0];
for (int j = 1; j < regions[i].length; j++) {
int serverIndex = j - 1;
localities[regionIndex][serverIndex] = regions[i][j] > 100 ? regions[i][j] % 100 : regions[i][j];
localities[regionIndex][serverIndex] =
regions[i][j] > 100 ? regions[i][j] % 100 : regions[i][j];
}
}
}

View File

@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -51,8 +50,8 @@ public class TestStochasticLoadBalancerRegionReplica extends StochasticBalancerT
@Test
public void testReplicaCost() {
Configuration conf = HBaseConfiguration.create();
StochasticLoadBalancer.CostFunction costFunction =
new StochasticLoadBalancer.RegionReplicaHostCostFunction(conf);
CostFunction costFunction =
new RegionReplicaHostCostFunction(conf);
for (int[] mockCluster : clusterStateMocks) {
BalancerClusterState cluster = mockCluster(mockCluster);
costFunction.init(cluster);
@ -65,8 +64,8 @@ public class TestStochasticLoadBalancerRegionReplica extends StochasticBalancerT
@Test
public void testReplicaCostForReplicas() {
Configuration conf = HBaseConfiguration.create();
StochasticLoadBalancer.CostFunction costFunction =
new StochasticLoadBalancer.RegionReplicaHostCostFunction(conf);
CostFunction costFunction =
new RegionReplicaHostCostFunction(conf);
int[] servers = new int[] { 3, 3, 3, 3, 3 };
TreeMap<ServerName, List<RegionInfo>> clusterState = mockClusterServers(servers);
@ -102,8 +101,8 @@ public class TestStochasticLoadBalancerRegionReplica extends StochasticBalancerT
// test with replication = 4 for following:
RegionInfo replica3;
Iterator<Entry<ServerName, List<RegionInfo>>> it;
Entry<ServerName, List<RegionInfo>> entry;
Iterator<Map.Entry<ServerName, List<RegionInfo>>> it;
Map.Entry<ServerName, List<RegionInfo>> entry;
clusterState = mockClusterServers(servers);
it = clusterState.entrySet().iterator();