HBASE-25818 Move StochasticLoadBalancer to hbase-balancer module (#3206)

Signed-off-by: Yi Mei <myimeiyi@gmail.com>
This commit is contained in:
Duo Zhang 2021-05-25 23:24:35 +08:00 committed by GitHub
parent 6a77872879
commit 76fbb8b965
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 215 additions and 137 deletions

View File

@ -22,13 +22,17 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.BalancerRejection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -36,7 +40,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* want.
*/
@InterfaceAudience.Private
public interface ClusterInfoProvider {
public interface ClusterInfoProvider extends ConfigurationObserver {
/**
* Get the configuration.
@ -83,4 +87,21 @@ public interface ClusterInfoProvider {
* Get a snapshot of the current assignment status.
*/
Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(Collection<RegionInfo> regions);
/**
* Test whether we are in off peak hour.
* <p/>
* For peak and off peak hours we may have different cost for the same balancing operation.
*/
boolean isOffPeakHour();
/**
* Record the given balancer decision.
*/
void recordBalancerDecision(Supplier<BalancerDecision> decision);
/**
* Record the given balancer rejection.
*/
void recordBalancerRejection(Supplier<BalancerRejection> rejection);
}

View File

@ -16,16 +16,16 @@ package org.apache.hadoop.hbase.master.balancer;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -34,6 +34,9 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.com.google.common.io.CharStreams;
/**
* This is an optional Cost function designed to allow region count skew across RegionServers. A
* rule file is loaded from the local FS or HDFS before balancing. It contains lines of rules. A
@ -169,14 +172,14 @@ public class HeterogeneousRegionCountCostFunction extends CostFunction {
if (line.startsWith("#")) {
continue;
}
final String[] splits = line.split(" ");
if (splits.length != 2) {
final List<String> splits = Splitter.on(' ').splitToList(line);
if (splits.size() != 2) {
throw new IOException(
"line '" + line + "' is malformated, " + "expected [regexp] [limit]. Skipping line");
}
final Pattern pattern = Pattern.compile(splits[0]);
final Integer limit = Integer.parseInt(splits[1]);
final Pattern pattern = Pattern.compile(splits.get(0));
final Integer limit = Integer.parseInt(splits.get(1));
this.limitPerRule.put(pattern, limit);
} catch (final IOException | NumberFormatException | PatternSyntaxException e) {
LOG.error("error on line: " + e);
@ -209,29 +212,17 @@ public class HeterogeneousRegionCountCostFunction extends CostFunction {
private List<String> readFileFromHDFS(final String filename) throws IOException {
final Path path = new Path(filename);
final FileSystem fs = FileSystem.get(this.conf);
final BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
return readLines(reader);
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(fs.open(path), StandardCharsets.UTF_8))) {
return CharStreams.readLines(reader);
}
}
/**
* used to read the rule files from local FS
*/
private List<String> readFileFromLocalFS(final String filename) throws IOException {
BufferedReader reader = new BufferedReader(new FileReader(filename));
return readLines(reader);
}
private List<String> readLines(BufferedReader reader) throws IOException {
final List<String> records = new ArrayList<>();
try {
String line;
while ((line = reader.readLine()) != null) {
records.add(line);
}
} finally {
reader.close();
}
return records;
return Files.readAllLines(Paths.get(filename), StandardCharsets.UTF_8);
}
/**

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.balancer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -38,14 +37,14 @@ class MoveCostFunction extends CostFunction {
private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
private final float maxMovesPercent;
private final OffPeakHours offPeakHours;
private final ClusterInfoProvider provider;
private final float moveCost;
private final float moveCostOffPeak;
MoveCostFunction(Configuration conf) {
MoveCostFunction(Configuration conf, ClusterInfoProvider provider) {
this.provider = provider;
// What percent of the number of regions a single run of the balancer can move.
maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
offPeakHours = OffPeakHours.getInstance(conf);
moveCost = conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST);
moveCostOffPeak = conf.getFloat(MOVE_COST_OFFPEAK_KEY, DEFAULT_MOVE_COST_OFFPEAK);
// Initialize the multiplier so that addCostFunction will add this cost function.
@ -58,7 +57,7 @@ class MoveCostFunction extends CostFunction {
super.prepare(cluster);
// Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
// that large benefits are need to overcome the cost of a move.
if (offPeakHours.isOffPeakHour()) {
if (provider.isOffPeakHour()) {
this.setMultiplier(moveCostOffPeak);
} else {
this.setMultiplier(moveCost);

View File

@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -39,9 +40,6 @@ import org.apache.hadoop.hbase.client.BalancerRejection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
@ -128,8 +126,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
private int numRegionLoadsToRemember = 15;
private float minCostNeedBalance = 0.05f;
private boolean isBalancerDecisionRecording = false;
private boolean isBalancerRejectionRecording = false;
private List<CandidateGenerator> candidateGenerators;
private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
@ -147,11 +143,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
/**
* Use to add balancer decision history to ring-buffer
*/
NamedQueueRecorder namedQueueRecorder;
/**
* The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
* default MetricsBalancer
@ -233,7 +224,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
costFunctions = new ArrayList<>();
addCostFunction(new RegionCountSkewCostFunction(conf));
addCostFunction(new PrimaryRegionCountSkewCostFunction(conf));
addCostFunction(new MoveCostFunction(conf));
addCostFunction(new MoveCostFunction(conf, provider));
addCostFunction(localityCost);
addCostFunction(rackLocalityCost);
addCostFunction(new TableSkewCostFunction(conf));
@ -249,17 +240,6 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
curFunctionCosts = new double[costFunctions.size()];
tempFunctionCosts = new double[costFunctions.size()];
isBalancerDecisionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
isBalancerRejectionRecording =
conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
if (this.namedQueueRecorder == null &&
(isBalancerDecisionRecording || isBalancerRejectionRecording)) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(conf);
}
LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
Arrays.toString(getCostFunctionNames()) + " etc.");
@ -305,6 +285,19 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return false;
}
private String getBalanceReason(double total, double sumMultiplier) {
if (total <= 0) {
return "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0";
} else if (sumMultiplier <= 0) {
return "sumMultiplier = " + sumMultiplier + " <= 0";
} else if ((total / sumMultiplier) < minCostNeedBalance) {
return "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = " +
(total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")";
} else {
return "";
}
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
@ -314,10 +307,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
LOG.debug("Not running balancer because only " + cs.getNumServers()
+ " active regionserver(s)");
}
if (this.isBalancerRejectionRecording) {
sendRejectionReasonToRingBuffer("The number of RegionServers " +
cs.getNumServers() + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null);
}
sendRejectionReasonToRingBuffer(() -> "The number of RegionServers " + cs.getNumServers() +
" < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null);
return false;
}
if (areSomeRegionReplicasColocated(cluster)) {
@ -346,18 +337,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
boolean balanced = total <= 0 || sumMultiplier <= 0 ||
(sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance);
if(balanced && isBalancerRejectionRecording){
String reason = "";
if (total <= 0) {
reason = "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0";
} else if (sumMultiplier <= 0) {
reason = "sumMultiplier = " + sumMultiplier + " <= 0";
} else if ((total / sumMultiplier) < minCostNeedBalance) {
reason =
"[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = " + (total
/ sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")";
}
sendRejectionReasonToRingBuffer(reason, costFunctions);
if (balanced) {
final double calculatedTotal = total;
final double calculatedMultiplier = sumMultiplier;
sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal, calculatedMultiplier),
costFunctions);
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} {}; total cost={}, sum multiplier={}; cost/multiplier to need a balance is {}",
@ -500,11 +484,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return null;
}
private void sendRejectionReasonToRingBuffer(String reason, List<CostFunction> costFunctions){
if (this.isBalancerRejectionRecording){
BalancerRejection.Builder builder =
new BalancerRejection.Builder()
.setReason(reason);
private void sendRejectionReasonToRingBuffer(Supplier<String> reason,
List<CostFunction> costFunctions) {
provider.recordBalancerRejection(() -> {
BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason.get());
if (costFunctions != null) {
for (CostFunction c : costFunctions) {
float multiplier = c.getMultiplier();
@ -514,29 +497,24 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier());
}
}
namedQueueRecorder.addRecord(new BalancerRejectionDetails(builder.build()));
}
return builder.build();
});
}
private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
double initCost, String initFunctionTotalCosts, long step) {
if (this.isBalancerDecisionRecording) {
double initCost, String initFunctionTotalCosts, long step) {
provider.recordBalancerDecision(() -> {
List<String> regionPlans = new ArrayList<>();
for (RegionPlan plan : plans) {
regionPlans.add(
"table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
+ " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
regionPlans
.add("table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName() +
" , source: " + plan.getSource() + " , destination: " + plan.getDestination());
}
BalancerDecision balancerDecision =
new BalancerDecision.Builder()
.setInitTotalCost(initCost)
.setInitialFunctionCosts(initFunctionTotalCosts)
.setComputedTotalCost(currentCost)
.setFinalFunctionCosts(totalCostsPerFunc())
.setComputedSteps(step)
.setRegionPlans(regionPlans).build();
namedQueueRecorder.addRecord(new BalancerDecisionDetails(balancerDecision));
}
return new BalancerDecision.Builder().setInitTotalCost(initCost)
.setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost)
.setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step)
.setRegionPlans(regionPlans).build();
});
}
/**

View File

@ -23,11 +23,14 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.BalancerRejection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
@ -80,4 +83,21 @@ public class DummyClusterInfoProvider implements ClusterInfoProvider {
public Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(Collection<RegionInfo> regions) {
return Collections.emptyMap();
}
@Override
public boolean isOffPeakHour() {
return false;
}
@Override
public void recordBalancerDecision(Supplier<BalancerDecision> decision) {
}
@Override
public void recordBalancerRejection(Supplier<BalancerRejection> rejection) {
}
@Override
public void onConfigurationChange(Configuration conf) {
}
}

View File

@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
@ -49,7 +48,6 @@ import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -122,11 +120,11 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
},
};
private ServerMetrics mockServerMetricsWithCpRequests(ServerName server,
List<RegionInfo> regionsOnServer, long cpRequestCount) {
private ServerMetrics mockServerMetricsWithCpRequests(List<RegionInfo> regionsOnServer,
long cpRequestCount) {
ServerMetrics serverMetrics = mock(ServerMetrics.class);
Map<byte[], RegionMetrics> regionLoadMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for(RegionInfo info : regionsOnServer){
for (RegionInfo info : regionsOnServer) {
RegionMetrics rl = mock(RegionMetrics.class);
when(rl.getReadRequestCount()).thenReturn(0L);
when(rl.getCpRequestCount()).thenReturn(cpRequestCount);
@ -157,9 +155,9 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
clusterState.put(serverC, regionsOnServerC);
// mock ClusterMetrics
Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
serverMetricsMap.put(serverA, mockServerMetricsWithCpRequests(serverA, regionsOnServerA, 0));
serverMetricsMap.put(serverB, mockServerMetricsWithCpRequests(serverB, regionsOnServerB, 0));
serverMetricsMap.put(serverC, mockServerMetricsWithCpRequests(serverC, regionsOnServerC, 0));
serverMetricsMap.put(serverA, mockServerMetricsWithCpRequests(regionsOnServerA, 0));
serverMetricsMap.put(serverB, mockServerMetricsWithCpRequests(regionsOnServerB, 0));
serverMetricsMap.put(serverC, mockServerMetricsWithCpRequests(regionsOnServerC, 0));
ClusterMetrics clusterStatus = mock(ClusterMetrics.class);
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
loadBalancer.updateClusterMetrics(clusterStatus);
@ -171,9 +169,9 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
// serverC : 0,0,0
// so should move two regions from serverA to serverB & serverC
serverMetricsMap = new TreeMap<>();
serverMetricsMap.put(serverA, mockServerMetricsWithCpRequests(serverA, regionsOnServerA, 1000));
serverMetricsMap.put(serverB, mockServerMetricsWithCpRequests(serverB, regionsOnServerB, 0));
serverMetricsMap.put(serverC, mockServerMetricsWithCpRequests(serverC, regionsOnServerC, 0));
serverMetricsMap.put(serverA, mockServerMetricsWithCpRequests(regionsOnServerA, 1000));
serverMetricsMap.put(serverB, mockServerMetricsWithCpRequests(regionsOnServerB, 0));
serverMetricsMap.put(serverC, mockServerMetricsWithCpRequests(regionsOnServerC, 0));
clusterStatus = mock(ClusterMetrics.class);
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
loadBalancer.updateClusterMetrics(clusterStatus);
@ -192,7 +190,6 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
assertEquals(2, regionsMoveFromServerA.size());
assertEquals(2, targetServers.size());
assertTrue(regionsOnServerA.containsAll(regionsMoveFromServerA));
assertNull(loadBalancer.namedQueueRecorder);
// reset config
conf.setFloat("hbase.master.balancer.stochastic.cpRequestCost", 5f);
loadBalancer.onConfigurationChange(conf);
@ -220,7 +217,6 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
serverMetricsMap.put(sn, sl);
when(clusterStatus.getLiveServerMetrics()).thenReturn(serverMetricsMap);
// when(clusterStatus.getLoad(sn)).thenReturn(sl);
loadBalancer.updateClusterMetrics(clusterStatus);
}
@ -228,7 +224,6 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
String regionNameAsString = RegionInfo.getRegionNameAsString(Bytes.toBytes(REGION_KEY));
assertTrue(loadBalancer.loads.get(regionNameAsString) != null);
assertTrue(loadBalancer.loads.get(regionNameAsString).size() == 15);
assertNull(loadBalancer.namedQueueRecorder);
Queue<BalancerRegionLoad> loads = loadBalancer.loads.get(regionNameAsString);
int i = 0;
@ -280,14 +275,15 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
double expected = 1 - expectedLocalities[test];
assertEquals(expected, cost, 0.001);
}
assertNull(loadBalancer.namedQueueRecorder);
}
@Test
public void testMoveCostMultiplier() throws Exception {
Configuration conf = HBaseConfiguration.create();
CostFunction
costFunction = new MoveCostFunction(conf);
ClusterInfoProvider provider = mock(ClusterInfoProvider.class);
CostFunction costFunction =
new MoveCostFunction(conf, provider);
when(provider.isOffPeakHour()).thenReturn(false);
BalancerClusterState cluster = mockCluster(clusterStateMocks[0]);
costFunction.prepare(cluster);
costFunction.cost();
@ -295,25 +291,17 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
costFunction.getMultiplier(), 0.01);
// In offpeak hours, the multiplier of move cost should be lower
conf.setInt("hbase.offpeak.start.hour",0);
conf.setInt("hbase.offpeak.end.hour",23);
// Set a fixed time which hour is 15, so it will always in offpeak
// See HBASE-24898 for more info of the calculation here
long deltaFor15 = TimeZone.getDefault().getRawOffset() - 28800000;
long timeFor15 = 1597907081000L - deltaFor15;
EnvironmentEdgeManager.injectEdge(() -> timeFor15);
costFunction = new MoveCostFunction(conf);
when(provider.isOffPeakHour()).thenReturn(true);
costFunction.prepare(cluster);
costFunction.cost();
assertEquals(MoveCostFunction.DEFAULT_MOVE_COST_OFFPEAK
, costFunction.getMultiplier(), 0.01);
assertEquals(MoveCostFunction.DEFAULT_MOVE_COST_OFFPEAK,
costFunction.getMultiplier(), 0.01);
}
@Test
public void testMoveCost() throws Exception {
Configuration conf = HBaseConfiguration.create();
CostFunction
costFunction = new MoveCostFunction(conf);
CostFunction costFunction = new MoveCostFunction(conf, new DummyClusterInfoProvider(conf));
for (int[] mockCluster : clusterStateMocks) {
BalancerClusterState cluster = mockCluster(mockCluster);
costFunction.prepare(cluster);
@ -413,8 +401,8 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
List<BalancerRegionLoad> regionLoads = new ArrayList<>();
for (int i = 1; i < 5; i++) {
BalancerRegionLoad regionLoad = mock(BalancerRegionLoad.class);
when(regionLoad.getReadRequestsCount()).thenReturn(new Long(i));
when(regionLoad.getCpRequestsCount()).thenReturn(new Long(i));
when(regionLoad.getReadRequestsCount()).thenReturn((long) i);
when(regionLoad.getCpRequestsCount()).thenReturn((long) i);
when(regionLoad.getStorefileSizeMB()).thenReturn(i);
regionLoads.add(regionLoad);
}

View File

@ -17,24 +17,32 @@
*/
package org.apache.hadoop.hbase.master.balancer;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.BalancerRejection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -45,8 +53,34 @@ public class MasterClusterInfoProvider implements ClusterInfoProvider {
private final MasterServices services;
private boolean isBalancerDecisionRecording;
private boolean isBalancerRejectionRecording;
/**
* Use to add balancer decision history to ring-buffer
*/
private NamedQueueRecorder namedQueueRecorder;
private OffPeakHours offPeakHours;
private void loadConf(Configuration conf) {
this.offPeakHours = OffPeakHours.getInstance(conf);
isBalancerDecisionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
isBalancerRejectionRecording =
conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
if (isBalancerDecisionRecording || isBalancerRejectionRecording) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(conf);
} else {
this.namedQueueRecorder = null;
}
}
public MasterClusterInfoProvider(MasterServices services) {
this.services = services;
loadConf(services.getConfiguration());
}
@Override
@ -106,4 +140,34 @@ public class MasterClusterInfoProvider implements ClusterInfoProvider {
return services.getTableDescriptors().getAll().size();
}
@Override
public boolean isOffPeakHour() {
return offPeakHours.isOffPeakHour();
}
@Override
public void onConfigurationChange(Configuration conf) {
loadConf(conf);
}
@Override
public void recordBalancerDecision(Supplier<BalancerDecision> decision) {
if (isBalancerDecisionRecording) {
namedQueueRecorder.addRecord(new BalancerDecisionDetails(decision.get()));
}
}
@Override
public void recordBalancerRejection(Supplier<BalancerRejection> rejection) {
if (isBalancerRejectionRecording) {
namedQueueRecorder.addRecord(new BalancerRejectionDetails(rejection.get()));
}
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
NamedQueueRecorder getNamedQueueRecorder() {
return namedQueueRecorder;
}
}

View File

@ -74,6 +74,7 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
private static final Logger LOG = LoggerFactory.getLogger(RSGroupBasedLoadBalancer.class);
private MasterServices masterServices;
private ClusterInfoProvider provider;
private FavoredNodesManager favoredNodesManager;
private volatile RSGroupInfoManager rsGroupInfoManager;
private volatile LoadBalancer internalBalancer;
@ -345,12 +346,13 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
throw new IOException(e);
}
}
this.provider = new MasterClusterInfoProvider(masterServices);
// avoid infinite nesting
if (getClass().isAssignableFrom(balancerClass)) {
balancerClass = LoadBalancerFactory.getDefaultLoadBalancerClass();
}
internalBalancer = ReflectionUtils.newInstance(balancerClass);
internalBalancer.setClusterInfoProvider(new MasterClusterInfoProvider(masterServices));
internalBalancer.setClusterInfoProvider(provider);
// special handling for favor node balancers
if (internalBalancer instanceof FavoredNodesPromoter) {
favoredNodesManager = new FavoredNodesManager(masterServices);
@ -394,6 +396,7 @@ public class RSGroupBasedLoadBalancer implements LoadBalancer {
fallbackEnabled, newFallbackEnabled);
fallbackEnabled = newFallbackEnabled;
}
provider.onConfigurationChange(conf);
internalBalancer.onConfigurationChange(conf);
}

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.master.balancer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -27,6 +30,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.LogEntry;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
@ -55,6 +59,10 @@ public class TestBalancerDecision extends StochasticBalancerTestBase {
@Test
public void testBalancerDecisions() {
conf.setBoolean("hbase.master.balancer.decision.buffer.enabled", true);
MasterServices services = mock(MasterServices.class);
when(services.getConfiguration()).thenReturn(conf);
MasterClusterInfoProvider provider = new MasterClusterInfoProvider(services);
loadBalancer.setClusterInfoProvider(provider);
loadBalancer.onConfigurationChange(conf);
float minCost = conf.getFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 1.0f);
@ -78,7 +86,7 @@ public class TestBalancerDecision extends StochasticBalancerTestBase {
namedQueueGetRequest
.setBalancerDecisionsRequest(MasterProtos.BalancerDecisionsRequest.getDefaultInstance());
NamedQueueGetResponse namedQueueGetResponse =
loadBalancer.namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
provider.getNamedQueueRecorder().getNamedQueueRecords(namedQueueGetRequest);
List<RecentLogs.BalancerDecision> balancerDecisions =
namedQueueGetResponse.getBalancerDecisions();
MasterProtos.BalancerDecisionsResponse response =

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.master.balancer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@ -26,6 +29,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.LogEntry;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
@ -73,6 +77,10 @@ public class TestBalancerRejection extends StochasticBalancerTestBase {
//enabled balancer rejection recording
conf.setBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED, true);
conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY, MockCostFunction.class.getName());
MasterServices services = mock(MasterServices.class);
when(services.getConfiguration()).thenReturn(conf);
MasterClusterInfoProvider provider = new MasterClusterInfoProvider(services);
loadBalancer.setClusterInfoProvider(provider);
loadBalancer.onConfigurationChange(conf);
//Simulate 2 servers with 5 regions.
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(new int[] { 5, 5 });
@ -91,11 +99,11 @@ public class TestBalancerRejection extends StochasticBalancerTestBase {
//NamedQueue is an async Producer-consumer Pattern, waiting here until it completed
int maxWaitingCount = 10;
while (maxWaitingCount-- > 0 && getBalancerRejectionLogEntries().size() != 2) {
while (maxWaitingCount-- > 0 && getBalancerRejectionLogEntries(provider).size() != 2) {
Thread.sleep(1000);
}
//There are two cases, should be 2 logEntries
List<LogEntry> logEntries = getBalancerRejectionLogEntries();
List<LogEntry> logEntries = getBalancerRejectionLogEntries(provider);
Assert.assertEquals(2, logEntries.size());
Assert.assertTrue(
logEntries.get(0).toJsonPrettyPrint().contains("minCostNeedBalance"));
@ -108,19 +116,18 @@ public class TestBalancerRejection extends StochasticBalancerTestBase {
}
}
private List<LogEntry> getBalancerRejectionLogEntries(){
private List<LogEntry> getBalancerRejectionLogEntries(MasterClusterInfoProvider provider) {
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(BalancerRejectionDetails.BALANCER_REJECTION_EVENT);
namedQueueGetRequest.setBalancerRejectionsRequest(MasterProtos.BalancerRejectionsRequest.getDefaultInstance());
namedQueueGetRequest
.setBalancerRejectionsRequest(MasterProtos.BalancerRejectionsRequest.getDefaultInstance());
NamedQueueGetResponse namedQueueGetResponse =
loadBalancer.namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
List<RecentLogs.BalancerRejection> balancerRejections = namedQueueGetResponse.getBalancerRejections();
MasterProtos.BalancerRejectionsResponse response =
MasterProtos.BalancerRejectionsResponse.newBuilder()
.addAllBalancerRejection(balancerRejections)
.build();
List<LogEntry> balancerRejectionRecords =
ProtobufUtil.getBalancerRejectionEntries(response);
provider.getNamedQueueRecorder().getNamedQueueRecords(namedQueueGetRequest);
List<RecentLogs.BalancerRejection> balancerRejections =
namedQueueGetResponse.getBalancerRejections();
MasterProtos.BalancerRejectionsResponse response = MasterProtos.BalancerRejectionsResponse
.newBuilder().addAllBalancerRejection(balancerRejections).build();
List<LogEntry> balancerRejectionRecords = ProtobufUtil.getBalancerRejectionEntries(response);
return balancerRejectionRecords;
}
}

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BalancerTestBase.MockMapping;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.testclassification.LargeTests;

View File

@ -17,6 +17,7 @@ package org.apache.hadoop.hbase.master.balancer;
import static junit.framework.TestCase.assertNotNull;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
@ -27,7 +28,6 @@ import java.util.Queue;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;