HBASE-25894 Improve the performance for region load and region count related cost functions (#3276)

Signed-off-by: Yi Mei <myimeiyi@gmail.com>
This commit is contained in:
Duo Zhang 2021-05-25 18:04:06 +08:00 committed by GitHub
parent 36affdaa8e
commit 6a77872879
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 245 additions and 129 deletions

View File

@ -27,24 +27,14 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
abstract class CostFromRegionLoadFunction extends CostFunction {
private double[] stats;
private final DoubleArrayCost cost = new DoubleArrayCost();
@Override
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
}
@Override
protected final double cost() {
for (int i = 0; i < stats.length; i++) {
private double computeCostForRegionServer(int regionServerIndex) {
// Cost this server has from RegionLoad
double cost = 0;
// for every region on this server get the rl
for (int regionIndex : cluster.regionsPerServer[i]) {
for (int regionIndex : cluster.regionsPerServer[regionServerIndex]) {
Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
// Now if we found a region load get the type of cost that was requested.
@ -52,13 +42,32 @@ abstract class CostFromRegionLoadFunction extends CostFunction {
cost += getRegionLoadCost(regionLoadList);
}
}
// Add the total cost to the stats.
stats[i] = cost;
return cost;
}
// Now return the scaled cost from data held in the stats object.
return costFromArray(stats);
@Override
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
cost.prepare(cluster.numServers);
cost.setCosts(costs -> {
for (int i = 0; i < costs.length; i++) {
costs[i] = computeCostForRegionServer(i);
}
});
}
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
// recompute the stat for the given two region servers
cost.setCosts(costs -> {
costs[oldServer] = computeCostForRegionServer(oldServer);
costs[newServer] = computeCostForRegionServer(newServer);
});
}
@Override
protected final double cost() {
return cost.cost();
}
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {

View File

@ -81,57 +81,6 @@ abstract class CostFunction {
protected abstract double cost();
@SuppressWarnings("checkstyle:linelength")
/**
* Function to compute a scaled cost using
* {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
* It assumes that this is a zero sum set of costs. It assumes that the worst case possible is all
* of the elements in one region server and the rest having 0.
* @param stats the costs
* @return a scaled set of costs.
*/
protected final double costFromArray(double[] stats) {
double totalCost = 0;
double total = getSum(stats);
double count = stats.length;
double mean = total / count;
// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
// a zero sum cost for this to make sense.
double max = ((count - 1) * mean) + (total - mean);
// It's possible that there aren't enough regions to go around
double min;
if (count > total) {
min = ((count - total) * mean) + ((1 - mean) * total);
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * count));
int numLow = (int) (count - numHigh);
min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
}
min = Math.max(0, min);
for (int i = 0; i < stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
totalCost += diff;
}
double scaled = scale(min, max, totalCost);
return scaled;
}
private double getSum(double[] stats) {
double total = 0;
for (double s : stats) {
total += s;
}
return total;
}
/**
* Scale the value between 0 and 1.
* @param min Min value
@ -139,7 +88,7 @@ abstract class CostFunction {
* @param value The value to be scaled.
* @return The scaled value.
*/
protected final double scale(double min, double max, double value) {
protected static double scale(double min, double max, double value) {
if (max <= min || value <= min) {
return 0;
}

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.function.Consumer;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A helper class to compute a scaled cost using
* {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
* It assumes that this is a zero sum set of costs. It assumes that the worst case possible is all
* of the elements in one region server and the rest having 0.
*/
@InterfaceAudience.Private
final class DoubleArrayCost {
private double[] costs;
// computeCost call is expensive so we use this flag to indicate whether we need to recalculate
// the cost by calling computeCost
private boolean costsChanged;
private double cost;
void prepare(int length) {
if (costs == null || costs.length != length) {
costs = new double[length];
}
}
void setCosts(Consumer<double[]> consumer) {
consumer.accept(costs);
costsChanged = true;
}
double cost() {
if (costsChanged) {
cost = computeCost(costs);
costsChanged = false;
}
return cost;
}
private static double computeCost(double[] stats) {
double totalCost = 0;
double total = getSum(stats);
double count = stats.length;
double mean = total / count;
// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
// a zero sum cost for this to make sense.
double max = ((count - 1) * mean) + (total - mean);
// It's possible that there aren't enough regions to go around
double min;
if (count > total) {
min = ((count - total) * mean) + ((1 - mean) * total);
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * count));
int numLow = (int) (count - numHigh);
min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
}
min = Math.max(0, min);
for (int i = 0; i < stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
totalCost += diff;
}
double scaled = CostFunction.scale(min, max, totalCost);
return scaled;
}
private static double getSum(double[] stats) {
double total = 0;
for (double s : stats) {
total += s;
}
return total;
}
}

View File

@ -31,14 +31,22 @@ class PrimaryRegionCountSkewCostFunction extends CostFunction {
"hbase.master.balancer.stochastic.primaryRegionCountCost";
private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;
private final float primaryRegionCountCost;
private double[] stats;
private final DoubleArrayCost cost = new DoubleArrayCost();
PrimaryRegionCountSkewCostFunction(Configuration conf) {
// Load multiplier should be the greatest as primary regions serve majority of reads/writes.
primaryRegionCountCost =
conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST);
this.setMultiplier(primaryRegionCountCost);
this.setMultiplier(
conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
}
private double computeCostForRegionServer(int regionServerIndex) {
int cost = 0;
for (int regionIdx : cluster.regionsPerServer[regionServerIndex]) {
if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
cost++;
}
}
return cost;
}
@Override
@ -47,9 +55,20 @@ class PrimaryRegionCountSkewCostFunction extends CostFunction {
if (!isNeeded()) {
return;
}
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
cost.prepare(cluster.numServers);
cost.setCosts(costs -> {
for (int i = 0; i < costs.length; i++) {
costs[i] = computeCostForRegionServer(i);
}
});
}
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
cost.setCosts(costs -> {
costs[oldServer] = computeCostForRegionServer(oldServer);
costs[newServer] = computeCostForRegionServer(newServer);
});
}
@Override
@ -59,15 +78,6 @@ class PrimaryRegionCountSkewCostFunction extends CostFunction {
@Override
protected double cost() {
for (int i = 0; i < cluster.numServers; i++) {
stats[i] = 0;
for (int regionIdx : cluster.regionsPerServer[i]) {
if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
stats[i]++;
}
}
}
return costFromArray(stats);
return cost.cost();
}
}

View File

@ -34,7 +34,7 @@ class RegionCountSkewCostFunction extends CostFunction {
"hbase.master.balancer.stochastic.regionCountCost";
static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
private double[] stats;
private final DoubleArrayCost cost = new DoubleArrayCost();
RegionCountSkewCostFunction(Configuration conf) {
// Load multiplier should be the greatest as it is the most general way to balance data.
@ -44,9 +44,12 @@ class RegionCountSkewCostFunction extends CostFunction {
@Override
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
cost.prepare(cluster.numServers);
cost.setCosts(costs -> {
for (int i = 0; i < cluster.numServers; i++) {
costs[i] = cluster.regionsPerServer[i].length;
}
});
LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
cluster.numServers, cluster.numRegions);
if (LOG.isTraceEnabled()) {
@ -59,9 +62,14 @@ class RegionCountSkewCostFunction extends CostFunction {
@Override
protected double cost() {
for (int i = 0; i < cluster.numServers; i++) {
stats[i] = cluster.regionsPerServer[i].length;
return cost.cost();
}
return costFromArray(stats);
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
cost.setCosts(costs -> {
costs[oldServer] = cluster.regionsPerServer[oldServer].length;
costs[newServer] = cluster.regionsPerServer[newServer].length;
});
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, SmallTests.class })
public class TestDoubleArrayCost {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestDoubleArrayCost.class);
@Test
public void testComputeCost() {
DoubleArrayCost cost = new DoubleArrayCost();
cost.prepare(100);
cost.setCosts(costs -> {
for (int i = 0; i < 100; i++) {
costs[i] = 10;
}
});
assertEquals(0, cost.cost(), 0.01);
cost.prepare(101);
cost.setCosts(costs -> {
for (int i = 0; i < 100; i++) {
costs[i] = 0;
}
costs[100] = 100;
});
assertEquals(1, cost.cost(), 0.01);
cost.prepare(200);
cost.setCosts(costs -> {
for (int i = 0; i < 100; i++) {
costs[i] = 0;
costs[i + 100] = 100;
}
costs[100] = 100;
});
assertEquals(0.5, cost.cost(), 0.01);
}
}

View File

@ -439,33 +439,6 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
assertEquals(2.5, result, 0.01);
}
@Test
public void testCostFromArray() {
Configuration conf = HBaseConfiguration.create();
CostFromRegionLoadFunction costFunction = new MemStoreSizeCostFunction(conf);
costFunction.prepare(mockCluster(new int[] { 0, 0, 0, 0, 1 }));
double[] statOne = new double[100];
for (int i = 0; i < 100; i++) {
statOne[i] = 10;
}
assertEquals(0, costFunction.costFromArray(statOne), 0.01);
double[] statTwo = new double[101];
for (int i = 0; i < 100; i++) {
statTwo[i] = 0;
}
statTwo[100] = 100;
assertEquals(1, costFunction.costFromArray(statTwo), 0.01);
double[] statThree = new double[200];
for (int i = 0; i < 100; i++) {
statThree[i] = 0;
statThree[i + 100] = 100;
}
assertEquals(0.5, costFunction.costFromArray(statThree), 0.01);
}
@Test
public void testLosingRs() throws Exception {
int numNodes = 3;