HBASE-25790 NamedQueue 'BalancerRejection' for recent history of balancer skipping (#3182) (#3245)

Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
GeorryHuang 2021-05-09 02:39:23 +08:00 committed by GitHub
parent 90dc150b1b
commit 63d49cb7ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 744 additions and 19 deletions

View File

@ -0,0 +1,116 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.util.GsonUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.gson.Gson;
import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer;
/**
* History of detail information that balancer movements was rejected
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
final public class BalancerRejection extends LogEntry {
//The reason why balancer was rejected
private final String reason;
private final List<String> costFuncInfoList;
// used to convert object to pretty printed format
// used by toJsonPrettyPrint()
private static final Gson GSON = GsonUtil.createGson()
.setPrettyPrinting()
.disableHtmlEscaping()
.registerTypeAdapter(BalancerRejection.class, (JsonSerializer<BalancerRejection>)
(balancerRejection, type, jsonSerializationContext) -> {
Gson gson = new Gson();
return gson.toJsonTree(balancerRejection);
}).create();
private BalancerRejection(String reason, List<String> costFuncInfoList) {
this.reason = reason;
if(costFuncInfoList == null){
this.costFuncInfoList = Collections.emptyList();
}
else {
this.costFuncInfoList = costFuncInfoList;
}
}
public String getReason() {
return reason;
}
public List<String> getCostFuncInfoList() {
return costFuncInfoList;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("reason", reason)
.append("costFuncInfoList", costFuncInfoList.toString())
.toString();
}
@Override
public String toJsonPrettyPrint() {
return GSON.toJson(this);
}
public static class Builder {
private String reason;
private List<String> costFuncInfoList;
public Builder setReason(String reason) {
this.reason = reason;
return this;
}
public void addCostFuncInfo(String funcName, double cost, float multiplier){
if(costFuncInfoList == null){
costFuncInfoList = new ArrayList<>();
}
costFuncInfoList.add(
new StringBuilder()
.append(funcName)
.append(" cost:").append(cost)
.append(" multiplier:").append(multiplier)
.toString());
}
public Builder setCostFuncInfoList(List<String> costFuncInfoList){
this.costFuncInfoList = costFuncInfoList;
return this;
}
public BalancerRejection build() {
return new BalancerRejection(reason, costFuncInfoList);
}
}
}

View File

@ -4434,6 +4434,12 @@ public class HBaseAdmin implements Admin {
"Balancer Decision logs are not maintained by HRegionServer");
}
return getBalancerDecisions(limit);
} else if (logType.equals("BALANCER_REJECTION")) {
if (ServerType.REGION_SERVER.equals(serverType)) {
throw new IllegalArgumentException(
"Balancer Rejection logs are not maintained by HRegionServer");
}
return getBalancerRejections(limit);
}
return Collections.emptyList();
}
@ -4450,6 +4456,18 @@ public class HBaseAdmin implements Admin {
});
}
private List<LogEntry> getBalancerRejections(final int limit) throws IOException {
return executeCallable(new MasterCallable<List<LogEntry>>(getConnection(),
getRpcControllerFactory()) {
@Override
protected List<LogEntry> rpcCall() throws Exception {
HBaseProtos.LogEntry logEntry =
master.getLogEntries(getRpcController(), ProtobufUtil.toBalancerRejectionRequest(limit));
return ProtobufUtil.toBalancerRejectionResponse(logEntry);
}
});
}
private Boolean clearSlowLogsResponses(final ServerName serverName) throws IOException {
AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
return executeCallable(new RpcRetryingCallable<Boolean>() {

View File

@ -3968,6 +3968,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
.call();
}
private CompletableFuture<List<LogEntry>> getBalancerRejections(final int limit) {
return this.<List<LogEntry>>newMasterCaller()
.action((controller, stub) ->
this.call(controller, stub,
ProtobufUtil.toBalancerRejectionRequest(limit),
MasterService.Interface::getLogEntries, ProtobufUtil::toBalancerRejectionResponse))
.call();
}
@Override
public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
String logType, ServerType serverType, int limit,
@ -3975,18 +3984,27 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
if (logType == null || serverType == null) {
throw new IllegalArgumentException("logType and/or serverType cannot be empty");
}
if (logType.equals("SLOW_LOG") || logType.equals("LARGE_LOG")) {
switch (logType){
case "SLOW_LOG":
case "LARGE_LOG":
if (ServerType.MASTER.equals(serverType)) {
throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
}
return getSlowLogResponses(filterParams, serverNames, limit, logType);
} else if (logType.equals("BALANCER_DECISION")) {
case "BALANCER_DECISION":
if (ServerType.REGION_SERVER.equals(serverType)) {
throw new IllegalArgumentException(
"Balancer Decision logs are not maintained by HRegionServer");
}
return getBalancerDecisions(limit);
case "BALANCER_REJECTION":
if (ServerType.REGION_SERVER.equals(serverType)) {
throw new IllegalArgumentException(
"Balancer Rejection logs are not maintained by HRegionServer");
}
return getBalancerRejections(limit);
default:
return CompletableFuture.completedFuture(Collections.emptyList());
}
}
}

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.BalancerRejection;
import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.ClientUtil;
@ -3686,6 +3687,25 @@ public final class ProtobufUtil {
throw new RuntimeException("Invalid response from server");
}
public static List<LogEntry> toBalancerRejectionResponse(
HBaseProtos.LogEntry logEntry) {
try {
final String logClassName = logEntry.getLogClassName();
Class<?> logClass = Class.forName(logClassName).asSubclass(Message.class);
Method method = logClass.getMethod("parseFrom", ByteString.class);
if (logClassName.contains("BalancerRejectionsResponse")) {
MasterProtos.BalancerRejectionsResponse response =
(MasterProtos.BalancerRejectionsResponse) method
.invoke(null, logEntry.getLogMessage());
return getBalancerRejectionEntries(response);
}
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException e) {
throw new RuntimeException("Error while retrieving response from server");
}
throw new RuntimeException("Invalid response from server");
}
public static List<LogEntry> getBalancerDecisionEntries(
MasterProtos.BalancerDecisionsResponse response) {
List<RecentLogs.BalancerDecision> balancerDecisions = response.getBalancerDecisionList();
@ -3702,6 +3722,19 @@ public final class ProtobufUtil {
.collect(Collectors.toList());
}
public static List<LogEntry> getBalancerRejectionEntries(
MasterProtos.BalancerRejectionsResponse response) {
List<RecentLogs.BalancerRejection> balancerRejections = response.getBalancerRejectionList();
if (CollectionUtils.isEmpty(balancerRejections)) {
return Collections.emptyList();
}
return balancerRejections.stream().map(balancerRejection -> new BalancerRejection.Builder()
.setReason(balancerRejection.getReason())
.setCostFuncInfoList(balancerRejection.getCostFuncInfoList())
.build())
.collect(Collectors.toList());
}
public static HBaseProtos.LogRequest toBalancerDecisionRequest(int limit) {
MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest =
MasterProtos.BalancerDecisionsRequest.newBuilder().setLimit(limit).build();
@ -3711,4 +3744,13 @@ public final class ProtobufUtil {
.build();
}
public static HBaseProtos.LogRequest toBalancerRejectionRequest(int limit) {
MasterProtos.BalancerRejectionsRequest balancerRejectionsRequest =
MasterProtos.BalancerRejectionsRequest.newBuilder().setLimit(limit).build();
return HBaseProtos.LogRequest.newBuilder()
.setLogClassName(balancerRejectionsRequest.getClass().getName())
.setLogMessage(balancerRejectionsRequest.toByteString())
.build();
}
}

View File

@ -2011,7 +2011,7 @@ possible configurations would overwhelm and obscure the important.
</property>
<property>
<name>hbase.namedqueue.provider.classes</name>
<value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService</value>
<value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService</value>
<description>
Default values for NamedQueueService implementors. This comma separated full class names
represent all implementors of NamedQueueService that we would like to be invoked by
@ -2030,4 +2030,13 @@ possible configurations would overwhelm and obscure the important.
the ring buffer is indicated by config: hbase.master.balancer.decision.queue.size
</description>
</property>
<property>
<name>hbase.master.balancer.rejection.buffer.enabled</name>
<value>false</value>
<description>
Indicates whether active HMaster has ring buffer running for storing
balancer rejection in FIFO manner with limited entries. The size of
the ring buffer is indicated by config: hbase.master.balancer.rejection.queue.size
</description>
</property>
</configuration>

View File

@ -705,6 +705,13 @@ message BalancerDecisionsRequest {
optional uint32 limit = 1;
}
/**
* Same as BalancerDecision but used for BalancerRejection
*/
message BalancerRejectionsRequest {
optional uint32 limit = 1;
}
/**
* BalancerDecision (LogEntry) use-case specific RPC response. This response payload will be
* converted in bytes by servers and sent as response to generic RPC API: GetLogEntries
@ -716,6 +723,10 @@ message BalancerDecisionsResponse {
repeated BalancerDecision balancer_decision = 1;
}
message BalancerRejectionsResponse {
repeated BalancerRejection balancer_rejection = 1;
}
service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)

View File

@ -37,3 +37,8 @@ message BalancerDecision {
repeated string region_plans = 6;
}
message BalancerRejection {
required string reason = 1;
repeated string cost_func_info = 2;
}

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedu
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
@ -3048,6 +3049,16 @@ public class MasterRpcServices extends RSRpcServices implements
.setLogClassName(balancerDecisionsResponse.getClass().getName())
.setLogMessage(balancerDecisionsResponse.toByteString())
.build();
}else if (logClassName.contains("BalancerRejectionsRequest")){
MasterProtos.BalancerRejectionsRequest balancerRejectionsRequest =
(MasterProtos.BalancerRejectionsRequest) method
.invoke(null, request.getLogMessage());
MasterProtos.BalancerRejectionsResponse balancerRejectionsResponse =
getBalancerRejections(balancerRejectionsRequest);
return HBaseProtos.LogEntry.newBuilder()
.setLogClassName(balancerRejectionsResponse.getClass().getName())
.setLogMessage(balancerRejectionsResponse.toByteString())
.build();
}
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException e) {
@ -3075,4 +3086,22 @@ public class MasterRpcServices extends RSRpcServices implements
.addAllBalancerDecision(balancerDecisions).build();
}
private MasterProtos.BalancerRejectionsResponse getBalancerRejections(
MasterProtos.BalancerRejectionsRequest request) {
final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder();
if (namedQueueRecorder == null) {
return MasterProtos.BalancerRejectionsResponse.newBuilder()
.addAllBalancerRejection(Collections.emptyList()).build();
}
final NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(BalancerRejectionDetails.BALANCER_REJECTION_EVENT);
namedQueueGetRequest.setBalancerRejectionsRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
List<RecentLogs.BalancerRejection> balancerRejections =
namedQueueGetResponse.getBalancerRejections();
return MasterProtos.BalancerRejectionsResponse.newBuilder()
.addAllBalancerRejection(balancerRejections).build();
}
}

View File

@ -71,6 +71,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
"hbase.master.balancer.decision.buffer.enabled";
public static final boolean DEFAULT_BALANCER_DECISION_BUFFER_ENABLED = false;
public static final String BALANCER_REJECTION_BUFFER_ENABLED =
"hbase.master.balancer.rejection.buffer.enabled";
public static final boolean DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED = false;
protected static final int MIN_SERVER_BALANCE = 2;
private volatile boolean stopped = false;

View File

@ -35,9 +35,11 @@ import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.BalancerRejection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@ -129,6 +131,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private long maxRunningTime = 30 * 1000 * 1; // 30 seconds.
private int numRegionLoadsToRemember = 15;
private float minCostNeedBalance = 0.05f;
private boolean isBalancerDecisionRecording = false;
private boolean isBalancerRejectionRecording = false;
private List<CandidateGenerator> candidateGenerators;
private CostFromRegionLoadFunction[] regionLoadFunctions;
@ -215,11 +219,16 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
curFunctionCosts = new double[costFunctions.size()];
tempFunctionCosts = new double[costFunctions.size()];
boolean isBalancerDecisionRecording = getConf()
isBalancerDecisionRecording = getConf()
.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
if (this.namedQueueRecorder == null && isBalancerDecisionRecording) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(conf);
isBalancerRejectionRecording = getConf()
.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
if (this.namedQueueRecorder == null && (isBalancerDecisionRecording
|| isBalancerRejectionRecording)) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(getConf());
}
LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
@ -325,6 +334,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
LOG.debug("Not running balancer because only " + cs.getNumServers()
+ " active regionserver(s)");
}
if (this.isBalancerRejectionRecording) {
sendRejectionReasonToRingBuffer("The number of RegionServers " +
cs.getNumServers() + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null);
}
return false;
}
if (areSomeRegionReplicasColocated(cluster)) {
@ -353,6 +366,19 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
boolean balanced = total <= 0 || sumMultiplier <= 0 ||
(sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance);
if(balanced && isBalancerRejectionRecording){
String reason = "";
if (total <= 0) {
reason = "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0";
} else if (sumMultiplier <= 0) {
reason = "sumMultiplier = " + sumMultiplier + " <= 0";
} else if ((total / sumMultiplier) < minCostNeedBalance) {
reason =
"[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = " + (total
/ sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")";
}
sendRejectionReasonToRingBuffer(reason, costFunctions);
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} {}; total cost={}, sum multiplier={}; cost/multiplier to need a balance is {}",
balanced ? "Skipping load balancing because balanced" : "We need to load balance",
@ -499,9 +525,27 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return null;
}
private void sendRejectionReasonToRingBuffer(String reason, List<CostFunction> costFunctions){
if (this.isBalancerRejectionRecording){
BalancerRejection.Builder builder =
new BalancerRejection.Builder()
.setReason(reason);
if (costFunctions != null) {
for (CostFunction c : costFunctions) {
float multiplier = c.getMultiplier();
if (multiplier <= 0 || !c.isNeeded()) {
continue;
}
builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier());
}
}
namedQueueRecorder.addRecord(new BalancerRejectionDetails(builder.build()));
}
}
private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
double initCost, String initFunctionTotalCosts, long step) {
if (this.namedQueueRecorder != null) {
if (this.isBalancerDecisionRecording) {
List<String> regionPlans = new ArrayList<>();
for (RegionPlan plan : plans) {
regionPlans.add(

View File

@ -0,0 +1,51 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.client.BalancerRejection;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Balancer rejection details that would be passed on to ring buffer for history
*/
@InterfaceAudience.Private
public class BalancerRejectionDetails extends NamedQueuePayload {
public static final int BALANCER_REJECTION_EVENT = 2;
private final BalancerRejection balancerRejection;
public BalancerRejectionDetails(BalancerRejection balancerRejection) {
super(BALANCER_REJECTION_EVENT);
this.balancerRejection = balancerRejection;
}
public BalancerRejection getBalancerRejection() {
return balancerRejection;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("balancerRejection", balancerRejection)
.toString();
}
}

View File

@ -31,7 +31,8 @@ public class NamedQueuePayload {
public enum NamedQueueEvent {
SLOW_LOG(0),
BALANCE_DECISION(1);
BALANCE_DECISION(1),
BALANCE_REJECTION(2);
private final int value;
@ -47,6 +48,9 @@ public class NamedQueuePayload {
case 1: {
return BALANCE_DECISION;
}
case 2: {
return BALANCE_REJECTION;
}
default: {
throw new IllegalArgumentException(
"NamedQueue event with ordinal " + value + " not defined");

View File

@ -0,0 +1,133 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.BalancerRejection;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.namequeues.NamedQueueService;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;
/**
* In-memory Queue service provider for Balancer Rejection events
*/
@InterfaceAudience.Private
public class BalancerRejectionQueueService implements NamedQueueService {
private static final Logger LOG = LoggerFactory.getLogger(BalancerRejectionQueueService.class);
private final boolean isBalancerRejectionRecording;
private static final String BALANCER_REJECTION_QUEUE_SIZE =
"hbase.master.balancer.rejection.queue.size";
private static final int DEFAULT_BALANCER_REJECTION_QUEUE_SIZE = 250;
private final Queue<RecentLogs.BalancerRejection> balancerRejectionQueue;
public BalancerRejectionQueueService(Configuration conf) {
isBalancerRejectionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
if (!isBalancerRejectionRecording) {
balancerRejectionQueue = null;
return;
}
final int queueSize =
conf.getInt(BALANCER_REJECTION_QUEUE_SIZE, DEFAULT_BALANCER_REJECTION_QUEUE_SIZE);
final EvictingQueue<RecentLogs.BalancerRejection> evictingQueue =
EvictingQueue.create(queueSize);
balancerRejectionQueue = Queues.synchronizedQueue(evictingQueue);
}
@Override
public NamedQueuePayload.NamedQueueEvent getEvent() {
return NamedQueuePayload.NamedQueueEvent.BALANCE_REJECTION;
}
@Override
public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
if (!isBalancerRejectionRecording) {
return;
}
if (!(namedQueuePayload instanceof BalancerRejectionDetails)) {
LOG.warn(
"BalancerRejectionQueueService: NamedQueuePayload is not of type BalancerRejectionDetails.");
return;
}
BalancerRejectionDetails balancerRejectionDetails = (BalancerRejectionDetails) namedQueuePayload;
BalancerRejection balancerRejectionRecord =
balancerRejectionDetails.getBalancerRejection();
RecentLogs.BalancerRejection BalancerRejection = RecentLogs.BalancerRejection.newBuilder()
.setReason(balancerRejectionRecord.getReason())
.addAllCostFuncInfo(balancerRejectionRecord.getCostFuncInfoList())
.build();
balancerRejectionQueue.add(BalancerRejection);
}
@Override
public boolean clearNamedQueue() {
if (!isBalancerRejectionRecording) {
return false;
}
LOG.debug("Received request to clean up balancer rejection queue.");
balancerRejectionQueue.clear();
return true;
}
@Override
public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
if (!isBalancerRejectionRecording) {
return null;
}
List<RecentLogs.BalancerRejection> balancerRejections =
Arrays.stream(balancerRejectionQueue.toArray(new RecentLogs.BalancerRejection[0]))
.collect(Collectors.toList());
// latest records should be displayed first, hence reverse order sorting
Collections.reverse(balancerRejections);
int limit = balancerRejections.size();
if (request.getBalancerRejectionsRequest().hasLimit()) {
limit = Math.min(request.getBalancerRejectionsRequest().getLimit(), balancerRejections.size());
}
// filter limit if provided
balancerRejections = balancerRejections.subList(0, limit);
final NamedQueueGetResponse namedQueueGetResponse = new NamedQueueGetResponse();
namedQueueGetResponse.setNamedQueueEvent(BalancerRejectionDetails.BALANCER_REJECTION_EVENT);
namedQueueGetResponse.setBalancerRejections(balancerRejections);
return namedQueueGetResponse;
}
@Override
public void persistAll() {
// no-op for now
}
}

View File

@ -38,6 +38,7 @@ public class NamedQueueGetRequest {
private AdminProtos.SlowLogResponseRequest slowLogResponseRequest;
private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
private MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest;
private MasterProtos.BalancerRejectionsRequest balancerRejectionsRequest;
public AdminProtos.SlowLogResponseRequest getSlowLogResponseRequest() {
return slowLogResponseRequest;
@ -52,11 +53,20 @@ public class NamedQueueGetRequest {
return balancerDecisionsRequest;
}
public MasterProtos.BalancerRejectionsRequest getBalancerRejectionsRequest() {
return balancerRejectionsRequest;
}
public void setBalancerDecisionsRequest(
MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest) {
this.balancerDecisionsRequest = balancerDecisionsRequest;
}
public void setBalancerRejectionsRequest(
MasterProtos.BalancerRejectionsRequest balancerRejectionsRequest) {
this.balancerRejectionsRequest = balancerRejectionsRequest;
}
public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}
@ -71,6 +81,7 @@ public class NamedQueueGetRequest {
.append("slowLogResponseRequest", slowLogResponseRequest)
.append("namedQueueEvent", namedQueueEvent)
.append("balancerDecisionsRequest", balancerDecisionsRequest)
.append("balancerRejectionsRequest", balancerRejectionsRequest)
.toString();
}
}

View File

@ -34,6 +34,7 @@ public class NamedQueueGetResponse {
private List<TooSlowLog.SlowLogPayload> slowLogPayloads;
private List<RecentLogs.BalancerDecision> balancerDecisions;
private List<RecentLogs.BalancerRejection> balancerRejections;
private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
public List<TooSlowLog.SlowLogPayload> getSlowLogPayloads() {
@ -52,6 +53,14 @@ public class NamedQueueGetResponse {
this.balancerDecisions = balancerDecisions;
}
public List<RecentLogs.BalancerRejection> getBalancerRejections() {
return balancerRejections;
}
public void setBalancerRejections(List<RecentLogs.BalancerRejection> balancerRejections) {
this.balancerRejections = balancerRejections;
}
public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}
@ -65,6 +74,7 @@ public class NamedQueueGetResponse {
return new ToStringBuilder(this)
.append("slowLogPayloads", slowLogPayloads)
.append("balancerDecisions", balancerDecisions)
.append("balancerRejections", balancerRejections)
.append("namedQueueEvent", namedQueueEvent)
.toString();
}

View File

@ -719,7 +719,10 @@ public class HRegionServer extends Thread implements
final boolean isBalancerDecisionRecording = conf
.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
if (isBalancerDecisionRecording) {
final boolean isBalancerRejectionRecording = conf
.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
if (isBalancerDecisionRecording || isBalancerRejectionRecording) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
}
}

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.LogEntry;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
/**
* Test BalancerRejection ring buffer using namedQueue interface
*/
@Category({ MasterTests.class, MediumTests.class })
public class TestBalancerRejection extends BalancerTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBalancerRejection.class);
static class MockCostFunction extends CostFunction{
public static double mockCost;
public MockCostFunction(Configuration c) {
}
@Override
protected double cost() {
return mockCost;
}
@Override
boolean isNeeded() {
return super.isNeeded();
}
@Override
float getMultiplier() {
return 1;
}
}
@Test
public void testBalancerRejections() throws Exception{
try {
//enabled balancer rejection recording
conf.setBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED, true);
conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY, MockCostFunction.class.getName());
loadBalancer.setConf(conf);
//Simulate 2 servers with 5 regions.
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(new int[] { 5, 5 });
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = (Map) mockClusterServersWithTables(servers);
//Reject case 1: Total cost < 0
MockCostFunction.mockCost = -Double.MAX_VALUE;
//Since the Balancer was rejected, there should not be any plans
Assert.assertNull(loadBalancer.balanceCluster(LoadOfAllTable));
//Reject case 2: Cost < minCostNeedBalance
MockCostFunction.mockCost = 1;
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", Float.MAX_VALUE);
loadBalancer.setConf(conf);
Assert.assertNull(loadBalancer.balanceCluster(LoadOfAllTable));
//NamedQueue is an async Producer-consumer Pattern, waiting here until it completed
int maxWaitingCount = 10;
while (maxWaitingCount-- > 0 && getBalancerRejectionLogEntries().size() != 2) {
Thread.sleep(1000);
}
//There are two cases, should be 2 logEntries
List<LogEntry> logEntries = getBalancerRejectionLogEntries();
Assert.assertEquals(2, logEntries.size());
Assert.assertTrue(
logEntries.get(0).toJsonPrettyPrint().contains("minCostNeedBalance"));
Assert.assertTrue(
logEntries.get(1).toJsonPrettyPrint().contains("cost1*multiplier1"));
}finally {
conf.unset(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY);
conf.unset(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED);
loadBalancer.setConf(conf);
}
}
private List<LogEntry> getBalancerRejectionLogEntries(){
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(BalancerRejectionDetails.BALANCER_REJECTION_EVENT);
namedQueueGetRequest.setBalancerRejectionsRequest(MasterProtos.BalancerRejectionsRequest.getDefaultInstance());
NamedQueueGetResponse namedQueueGetResponse =
loadBalancer.namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
List<RecentLogs.BalancerRejection> balancerRejections = namedQueueGetResponse.getBalancerRejections();
MasterProtos.BalancerRejectionsResponse response =
MasterProtos.BalancerRejectionsResponse.newBuilder()
.addAllBalancerRejection(balancerRejections)
.build();
List<LogEntry> balancerRejectionRecords =
ProtobufUtil.getBalancerRejectionEntries(response);
return balancerRejectionRecords;
}
}

View File

@ -1695,6 +1695,25 @@ module Hbase
balancer_decisions_resp_arr
end
#----------------------------------------------------------------------------------------------
# Retrieve latest balancer rejections made by LoadBalancers
def get_balancer_rejections(args)
if args.key? 'LIMIT'
limit = args['LIMIT']
else
limit = 250
end
log_type = 'BALANCER_REJECTION'
log_dest = org.apache.hadoop.hbase.client.ServerType::MASTER
balancer_rejections_responses = @admin.getLogEntries(nil, log_type, log_dest, limit, nil)
balancer_rejections_resp_arr = []
balancer_rejections_responses.each { |balancer_dec_resp|
balancer_rejections_resp_arr << balancer_dec_resp.toJsonPrettyPrint
}
balancer_rejections_resp_arr
end
#----------------------------------------------------------------------------------------------
# Stop the active Master
def stop_master

View File

@ -455,6 +455,7 @@ Shell.load_command_group(
compaction_switch
flush
get_balancer_decisions
get_balancer_rejections
get_slowlog_responses
get_largelog_responses
major_compact

View File

@ -0,0 +1,49 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with this
# work for additional information regarding copyright ownership. The ASF
# licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Retrieve latest balancer rejections maintained in memory by HMaster
module Shell
module Commands
# Retrieve latest large log responses
class GetBalancerRejections < Command
def help
<<-EOF
Retrieve latest balancer rejections made by LoadBalancers.
Examples:
hbase> get_balancer_rejections => Retrieve recent balancer rejections with
region plans
hbase> get_balancer_rejections LIMIT => 10 => Retrieve 10 most recent balancer rejections
with region plans
EOF
end
def command(args = {})
unless args.is_a? Hash
raise 'Filter parameters are not Hash'
end
balancer_rejections_resp_arr = admin.get_balancer_rejections(args)
puts 'Retrieved BalancerRejection Responses'
puts balancer_rejections_resp_arr
end
end
end
end

View File

@ -2334,3 +2334,17 @@ The percent of region server RPC threads failed to abort RS.
.Default
`false`
[[hbase.master.balancer.rejection.buffer.enabled]]
*`hbase.master.balancer.rejection.buffer.enabled`*::
+
.Description
Indicates whether active HMaster has ring buffer running for storing
balancer rejection in FIFO manner with limited entries. The size of
the ring buffer is indicated by config:
hbase.master.balancer.rejection.queue.size
+
.Default
`false`