HBASE-25894 Improve the performance for region load and region count related cost functions (#3276)
Signed-off-by: Yi Mei <myimeiyi@gmail.com>
This commit is contained in:
parent
8754e88fa7
commit
2cb6cc8a6c
|
@ -27,38 +27,47 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
@InterfaceAudience.Private
|
||||
abstract class CostFromRegionLoadFunction extends CostFunction {
|
||||
|
||||
private double[] stats;
|
||||
private final DoubleArrayCost cost = new DoubleArrayCost();
|
||||
|
||||
private double computeCostForRegionServer(int regionServerIndex) {
|
||||
// Cost this server has from RegionLoad
|
||||
double cost = 0;
|
||||
|
||||
// for every region on this server get the rl
|
||||
for (int regionIndex : cluster.regionsPerServer[regionServerIndex]) {
|
||||
Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
|
||||
|
||||
// Now if we found a region load get the type of cost that was requested.
|
||||
if (regionLoadList != null) {
|
||||
cost += getRegionLoadCost(regionLoadList);
|
||||
}
|
||||
}
|
||||
return cost;
|
||||
}
|
||||
|
||||
@Override
|
||||
void prepare(BalancerClusterState cluster) {
|
||||
super.prepare(cluster);
|
||||
if (stats == null || stats.length != cluster.numServers) {
|
||||
stats = new double[cluster.numServers];
|
||||
}
|
||||
cost.prepare(cluster.numServers);
|
||||
cost.setCosts(costs -> {
|
||||
for (int i = 0; i < costs.length; i++) {
|
||||
costs[i] = computeCostForRegionServer(i);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void regionMoved(int region, int oldServer, int newServer) {
|
||||
// recompute the stat for the given two region servers
|
||||
cost.setCosts(costs -> {
|
||||
costs[oldServer] = computeCostForRegionServer(oldServer);
|
||||
costs[newServer] = computeCostForRegionServer(newServer);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final double cost() {
|
||||
for (int i = 0; i < stats.length; i++) {
|
||||
// Cost this server has from RegionLoad
|
||||
double cost = 0;
|
||||
|
||||
// for every region on this server get the rl
|
||||
for (int regionIndex : cluster.regionsPerServer[i]) {
|
||||
Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
|
||||
|
||||
// Now if we found a region load get the type of cost that was requested.
|
||||
if (regionLoadList != null) {
|
||||
cost += getRegionLoadCost(regionLoadList);
|
||||
}
|
||||
}
|
||||
|
||||
// Add the total cost to the stats.
|
||||
stats[i] = cost;
|
||||
}
|
||||
|
||||
// Now return the scaled cost from data held in the stats object.
|
||||
return costFromArray(stats);
|
||||
return cost.cost();
|
||||
}
|
||||
|
||||
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
|
||||
|
|
|
@ -81,57 +81,6 @@ abstract class CostFunction {
|
|||
|
||||
protected abstract double cost();
|
||||
|
||||
@SuppressWarnings("checkstyle:linelength")
|
||||
/**
|
||||
* Function to compute a scaled cost using
|
||||
* {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
|
||||
* It assumes that this is a zero sum set of costs. It assumes that the worst case possible is all
|
||||
* of the elements in one region server and the rest having 0.
|
||||
* @param stats the costs
|
||||
* @return a scaled set of costs.
|
||||
*/
|
||||
protected final double costFromArray(double[] stats) {
|
||||
double totalCost = 0;
|
||||
double total = getSum(stats);
|
||||
|
||||
double count = stats.length;
|
||||
double mean = total / count;
|
||||
|
||||
// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
|
||||
// a zero sum cost for this to make sense.
|
||||
double max = ((count - 1) * mean) + (total - mean);
|
||||
|
||||
// It's possible that there aren't enough regions to go around
|
||||
double min;
|
||||
if (count > total) {
|
||||
min = ((count - total) * mean) + ((1 - mean) * total);
|
||||
} else {
|
||||
// Some will have 1 more than everything else.
|
||||
int numHigh = (int) (total - (Math.floor(mean) * count));
|
||||
int numLow = (int) (count - numHigh);
|
||||
|
||||
min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
|
||||
|
||||
}
|
||||
min = Math.max(0, min);
|
||||
for (int i = 0; i < stats.length; i++) {
|
||||
double n = stats[i];
|
||||
double diff = Math.abs(mean - n);
|
||||
totalCost += diff;
|
||||
}
|
||||
|
||||
double scaled = scale(min, max, totalCost);
|
||||
return scaled;
|
||||
}
|
||||
|
||||
private double getSum(double[] stats) {
|
||||
double total = 0;
|
||||
for (double s : stats) {
|
||||
total += s;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* Scale the value between 0 and 1.
|
||||
* @param min Min value
|
||||
|
@ -139,7 +88,7 @@ abstract class CostFunction {
|
|||
* @param value The value to be scaled.
|
||||
* @return The scaled value.
|
||||
*/
|
||||
protected final double scale(double min, double max, double value) {
|
||||
protected static double scale(double min, double max, double value) {
|
||||
if (max <= min || value <= min) {
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,100 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A helper class to compute a scaled cost using
|
||||
* {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
|
||||
* It assumes that this is a zero sum set of costs. It assumes that the worst case possible is all
|
||||
* of the elements in one region server and the rest having 0.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
final class DoubleArrayCost {
|
||||
|
||||
private double[] costs;
|
||||
|
||||
// computeCost call is expensive so we use this flag to indicate whether we need to recalculate
|
||||
// the cost by calling computeCost
|
||||
private boolean costsChanged;
|
||||
|
||||
private double cost;
|
||||
|
||||
void prepare(int length) {
|
||||
if (costs == null || costs.length != length) {
|
||||
costs = new double[length];
|
||||
}
|
||||
}
|
||||
|
||||
void setCosts(Consumer<double[]> consumer) {
|
||||
consumer.accept(costs);
|
||||
costsChanged = true;
|
||||
}
|
||||
|
||||
double cost() {
|
||||
if (costsChanged) {
|
||||
cost = computeCost(costs);
|
||||
costsChanged = false;
|
||||
}
|
||||
return cost;
|
||||
}
|
||||
|
||||
private static double computeCost(double[] stats) {
|
||||
double totalCost = 0;
|
||||
double total = getSum(stats);
|
||||
|
||||
double count = stats.length;
|
||||
double mean = total / count;
|
||||
|
||||
// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
|
||||
// a zero sum cost for this to make sense.
|
||||
double max = ((count - 1) * mean) + (total - mean);
|
||||
|
||||
// It's possible that there aren't enough regions to go around
|
||||
double min;
|
||||
if (count > total) {
|
||||
min = ((count - total) * mean) + ((1 - mean) * total);
|
||||
} else {
|
||||
// Some will have 1 more than everything else.
|
||||
int numHigh = (int) (total - (Math.floor(mean) * count));
|
||||
int numLow = (int) (count - numHigh);
|
||||
|
||||
min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
|
||||
|
||||
}
|
||||
min = Math.max(0, min);
|
||||
for (int i = 0; i < stats.length; i++) {
|
||||
double n = stats[i];
|
||||
double diff = Math.abs(mean - n);
|
||||
totalCost += diff;
|
||||
}
|
||||
|
||||
double scaled = CostFunction.scale(min, max, totalCost);
|
||||
return scaled;
|
||||
}
|
||||
|
||||
private static double getSum(double[] stats) {
|
||||
double total = 0;
|
||||
for (double s : stats) {
|
||||
total += s;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
}
|
|
@ -31,14 +31,22 @@ class PrimaryRegionCountSkewCostFunction extends CostFunction {
|
|||
"hbase.master.balancer.stochastic.primaryRegionCountCost";
|
||||
private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
|
||||
|
||||
private final float primaryRegionCountCost;
|
||||
private double[] stats;
|
||||
private final DoubleArrayCost cost = new DoubleArrayCost();
|
||||
|
||||
PrimaryRegionCountSkewCostFunction(Configuration conf) {
|
||||
// Load multiplier should be the greatest as primary regions serve majority of reads/writes.
|
||||
primaryRegionCountCost =
|
||||
conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST);
|
||||
this.setMultiplier(primaryRegionCountCost);
|
||||
this.setMultiplier(
|
||||
conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
|
||||
}
|
||||
|
||||
private double computeCostForRegionServer(int regionServerIndex) {
|
||||
int cost = 0;
|
||||
for (int regionIdx : cluster.regionsPerServer[regionServerIndex]) {
|
||||
if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
|
||||
cost++;
|
||||
}
|
||||
}
|
||||
return cost;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -47,9 +55,20 @@ class PrimaryRegionCountSkewCostFunction extends CostFunction {
|
|||
if (!isNeeded()) {
|
||||
return;
|
||||
}
|
||||
if (stats == null || stats.length != cluster.numServers) {
|
||||
stats = new double[cluster.numServers];
|
||||
}
|
||||
cost.prepare(cluster.numServers);
|
||||
cost.setCosts(costs -> {
|
||||
for (int i = 0; i < costs.length; i++) {
|
||||
costs[i] = computeCostForRegionServer(i);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void regionMoved(int region, int oldServer, int newServer) {
|
||||
cost.setCosts(costs -> {
|
||||
costs[oldServer] = computeCostForRegionServer(oldServer);
|
||||
costs[newServer] = computeCostForRegionServer(newServer);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -59,15 +78,6 @@ class PrimaryRegionCountSkewCostFunction extends CostFunction {
|
|||
|
||||
@Override
|
||||
protected double cost() {
|
||||
for (int i = 0; i < cluster.numServers; i++) {
|
||||
stats[i] = 0;
|
||||
for (int regionIdx : cluster.regionsPerServer[i]) {
|
||||
if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
|
||||
stats[i]++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return costFromArray(stats);
|
||||
return cost.cost();
|
||||
}
|
||||
}
|
|
@ -34,7 +34,7 @@ class RegionCountSkewCostFunction extends CostFunction {
|
|||
"hbase.master.balancer.stochastic.regionCountCost";
|
||||
static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
|
||||
|
||||
private double[] stats;
|
||||
private final DoubleArrayCost cost = new DoubleArrayCost();
|
||||
|
||||
RegionCountSkewCostFunction(Configuration conf) {
|
||||
// Load multiplier should be the greatest as it is the most general way to balance data.
|
||||
|
@ -44,9 +44,12 @@ class RegionCountSkewCostFunction extends CostFunction {
|
|||
@Override
|
||||
void prepare(BalancerClusterState cluster) {
|
||||
super.prepare(cluster);
|
||||
if (stats == null || stats.length != cluster.numServers) {
|
||||
stats = new double[cluster.numServers];
|
||||
}
|
||||
cost.prepare(cluster.numServers);
|
||||
cost.setCosts(costs -> {
|
||||
for (int i = 0; i < cluster.numServers; i++) {
|
||||
costs[i] = cluster.regionsPerServer[i].length;
|
||||
}
|
||||
});
|
||||
LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
|
||||
cluster.numServers, cluster.numRegions);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
|
@ -59,9 +62,14 @@ class RegionCountSkewCostFunction extends CostFunction {
|
|||
|
||||
@Override
|
||||
protected double cost() {
|
||||
for (int i = 0; i < cluster.numServers; i++) {
|
||||
stats[i] = cluster.regionsPerServer[i].length;
|
||||
}
|
||||
return costFromArray(stats);
|
||||
return cost.cost();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void regionMoved(int region, int oldServer, int newServer) {
|
||||
cost.setCosts(costs -> {
|
||||
costs[oldServer] = cluster.regionsPerServer[oldServer].length;
|
||||
costs[newServer] = cluster.regionsPerServer[newServer].length;
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.balancer;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ MasterTests.class, SmallTests.class })
|
||||
public class TestDoubleArrayCost {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestDoubleArrayCost.class);
|
||||
|
||||
@Test
|
||||
public void testComputeCost() {
|
||||
DoubleArrayCost cost = new DoubleArrayCost();
|
||||
|
||||
cost.prepare(100);
|
||||
cost.setCosts(costs -> {
|
||||
for (int i = 0; i < 100; i++) {
|
||||
costs[i] = 10;
|
||||
}
|
||||
});
|
||||
assertEquals(0, cost.cost(), 0.01);
|
||||
|
||||
cost.prepare(101);
|
||||
cost.setCosts(costs -> {
|
||||
for (int i = 0; i < 100; i++) {
|
||||
costs[i] = 0;
|
||||
}
|
||||
costs[100] = 100;
|
||||
});
|
||||
assertEquals(1, cost.cost(), 0.01);
|
||||
|
||||
cost.prepare(200);
|
||||
cost.setCosts(costs -> {
|
||||
for (int i = 0; i < 100; i++) {
|
||||
costs[i] = 0;
|
||||
costs[i + 100] = 100;
|
||||
}
|
||||
costs[100] = 100;
|
||||
});
|
||||
assertEquals(0.5, cost.cost(), 0.01);
|
||||
}
|
||||
}
|
|
@ -352,33 +352,6 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
|
|||
assertEquals(2.5, result, 0.01);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCostFromArray() {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
CostFromRegionLoadFunction costFunction = new MemStoreSizeCostFunction(conf);
|
||||
costFunction.prepare(mockCluster(new int[] { 0, 0, 0, 0, 1 }));
|
||||
|
||||
double[] statOne = new double[100];
|
||||
for (int i = 0; i < 100; i++) {
|
||||
statOne[i] = 10;
|
||||
}
|
||||
assertEquals(0, costFunction.costFromArray(statOne), 0.01);
|
||||
|
||||
double[] statTwo = new double[101];
|
||||
for (int i = 0; i < 100; i++) {
|
||||
statTwo[i] = 0;
|
||||
}
|
||||
statTwo[100] = 100;
|
||||
assertEquals(1, costFunction.costFromArray(statTwo), 0.01);
|
||||
|
||||
double[] statThree = new double[200];
|
||||
for (int i = 0; i < 100; i++) {
|
||||
statThree[i] = 0;
|
||||
statThree[i + 100] = 100;
|
||||
}
|
||||
assertEquals(0.5, costFunction.costFromArray(statThree), 0.01);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLosingRs() throws Exception {
|
||||
int numNodes = 3;
|
||||
|
|
Loading…
Reference in New Issue