HBASE-24528 : BalancerDecision queue implementation in HMaster with Admin API (#2411)

* Admin API getLogEntries() for ring buffer use-cases: so far, provides balancerDecision and slowLogResponse
* Refactor RPC call for similar use-cases
* Single RPC API getLogEntries() for both Master.proto and Admin.proto

Closes #2261

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Viraj Jasani 2020-09-17 18:51:42 +05:30 committed by GitHub
parent dacedb9d07
commit 4316dc738c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 1390 additions and 257 deletions

View File

@ -23,12 +23,14 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CacheEvictionStats;
@ -3202,9 +3204,30 @@ public interface Admin extends Abortable, Closeable {
* @param logQueryFilter filter to be used if provided (determines slow / large RPC logs)
* @return online slowlog response list
* @throws IOException if a remote or network exception occurs
* @deprecated since 2.4.0 and will be removed in 4.0.0.
* Use {@link #getLogEntries(Set, String, ServerType, int, Map)} instead.
*/
List<OnlineLogRecord> getSlowLogResponses(final Set<ServerName> serverNames,
final LogQueryFilter logQueryFilter) throws IOException;
@Deprecated
default List<OnlineLogRecord> getSlowLogResponses(final Set<ServerName> serverNames,
final LogQueryFilter logQueryFilter) throws IOException {
String logType;
if (LogQueryFilter.Type.LARGE_LOG.equals(logQueryFilter.getType())) {
logType = "LARGE_LOG";
} else {
logType = "SLOW_LOG";
}
Map<String, Object> filterParams = new HashMap<>();
filterParams.put("regionName", logQueryFilter.getRegionName());
filterParams.put("clientAddress", logQueryFilter.getClientAddress());
filterParams.put("tableName", logQueryFilter.getTableName());
filterParams.put("userName", logQueryFilter.getUserName());
filterParams.put("filterByOperator", logQueryFilter.getFilterByOperator().toString());
List<LogEntry> logEntries =
getLogEntries(serverNames, logType, ServerType.REGION_SERVER, logQueryFilter.getLimit(),
filterParams);
return logEntries.stream().map(logEntry -> (OnlineLogRecord) logEntry)
.collect(Collectors.toList());
}
/**
* Clears online slow/large RPC logs from the provided list of
@ -3218,4 +3241,21 @@ public interface Admin extends Abortable, Closeable {
List<Boolean> clearSlowLogResponses(final Set<ServerName> serverNames)
throws IOException;
/**
* Retrieve recent online records from HMaster / RegionServers.
* Examples include slow/large RPC logs, balancer decisions by master.
*
* @param serverNames servers to retrieve records from, useful in case of records maintained
* by RegionServer as we can select specific server. In case of servertype=MASTER, logs will
* only come from the currently active master.
* @param logType string representing type of log records
* @param serverType enum for server type: HMaster or RegionServer
* @param limit put a limit to list of records that server should send in response
* @param filterParams additional filter params
* @return Log entries representing online records from servers
* @throws IOException if a remote or network exception occurs
*/
List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType,
ServerType serverType, int limit, Map<String, Object> filterParams) throws IOException;
}

View File

@ -21,6 +21,7 @@ import com.google.protobuf.RpcChannel;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -28,6 +29,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.CacheEvictionStats;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
@ -1515,11 +1517,33 @@ public interface AsyncAdmin {
* RegionServers
*
* @param serverNames Server names to get slowlog responses from
* @param slowLogQueryFilter filter to be used if provided
* @param logQueryFilter filter to be used if provided
* @return Online slowlog response list. The return value wrapped by a {@link CompletableFuture}
* @deprecated since 2.4.0 and will be removed in 4.0.0.
* Use {@link #getLogEntries(Set, String, ServerType, int, Map)} instead.
*/
CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses(final Set<ServerName> serverNames,
final LogQueryFilter slowLogQueryFilter);
@Deprecated
default CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses(
final Set<ServerName> serverNames, final LogQueryFilter logQueryFilter) {
String logType;
if (LogQueryFilter.Type.LARGE_LOG.equals(logQueryFilter.getType())) {
logType = "LARGE_LOG";
} else {
logType = "SLOW_LOG";
}
Map<String, Object> filterParams = new HashMap<>();
filterParams.put("regionName", logQueryFilter.getRegionName());
filterParams.put("clientAddress", logQueryFilter.getClientAddress());
filterParams.put("tableName", logQueryFilter.getTableName());
filterParams.put("userName", logQueryFilter.getUserName());
filterParams.put("filterByOperator", logQueryFilter.getFilterByOperator().toString());
CompletableFuture<List<LogEntry>> logEntries =
getLogEntries(serverNames, logType, ServerType.REGION_SERVER, logQueryFilter.getLimit(),
filterParams);
return logEntries.thenApply(
logEntryList -> logEntryList.stream().map(logEntry -> (OnlineLogRecord) logEntry)
.collect(Collectors.toList()));
}
/**
* Clears online slow RPC logs from the provided list of
@ -1531,4 +1555,19 @@ public interface AsyncAdmin {
*/
CompletableFuture<List<Boolean>> clearSlowLogResponses(final Set<ServerName> serverNames);
/**
* Retrieve recent online records from HMaster / RegionServers.
* Examples include slow/large RPC logs, balancer decisions by master.
*
* @param serverNames servers to retrieve records from, useful in case of records maintained
* by RegionServer as we can select specific server. In case of servertype=MASTER, logs will
* only come from the currently active master.
* @param logType string representing type of log records
* @param serverType enum for server type: HMaster or RegionServer
* @param limit put a limit to list of records that server should send in response
* @param filterParams additional filter params
* @return Log entries representing online records from servers
*/
CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, String logType,
ServerType serverType, int limit, Map<String, Object> filterParams);
}

View File

@ -847,15 +847,15 @@ class AsyncHBaseAdmin implements AsyncAdmin {
return wrap(rawAdmin.isSnapshotCleanupEnabled());
}
@Override
public CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses(
final Set<ServerName> serverNames, final LogQueryFilter logQueryFilter) {
return wrap(rawAdmin.getSlowLogResponses(serverNames, logQueryFilter));
}
@Override
public CompletableFuture<List<Boolean>> clearSlowLogResponses(Set<ServerName> serverNames) {
return wrap(rawAdmin.clearSlowLogResponses(serverNames));
}
@Override
public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
String logType, ServerType serverType, int limit,
Map<String, Object> filterParams) {
return wrap(rawAdmin.getLogEntries(serverNames, logType, serverType, limit, filterParams));
}
}

View File

@ -0,0 +1,152 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.util.List;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.util.GsonUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.gson.Gson;
import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer;
/**
* History of balancer decisions taken for region movements.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
final public class BalancerDecision extends LogEntry {
private final String initialFunctionCosts;
private final String finalFunctionCosts;
private final double initTotalCost;
private final double computedTotalCost;
private final long computedSteps;
private final List<String> regionPlans;
// used to convert object to pretty printed format
// used by toJsonPrettyPrint()
private static final Gson GSON = GsonUtil.createGson()
.setPrettyPrinting()
.registerTypeAdapter(BalancerDecision.class, (JsonSerializer<BalancerDecision>)
(balancerDecision, type, jsonSerializationContext) -> {
Gson gson = new Gson();
return gson.toJsonTree(balancerDecision);
}).create();
private BalancerDecision(String initialFunctionCosts, String finalFunctionCosts,
double initTotalCost, double computedTotalCost, List<String> regionPlans,
long computedSteps) {
this.initialFunctionCosts = initialFunctionCosts;
this.finalFunctionCosts = finalFunctionCosts;
this.initTotalCost = initTotalCost;
this.computedTotalCost = computedTotalCost;
this.regionPlans = regionPlans;
this.computedSteps = computedSteps;
}
public String getInitialFunctionCosts() {
return initialFunctionCosts;
}
public String getFinalFunctionCosts() {
return finalFunctionCosts;
}
public double getInitTotalCost() {
return initTotalCost;
}
public double getComputedTotalCost() {
return computedTotalCost;
}
public List<String> getRegionPlans() {
return regionPlans;
}
public long getComputedSteps() {
return computedSteps;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("initialFunctionCosts", initialFunctionCosts)
.append("finalFunctionCosts", finalFunctionCosts)
.append("initTotalCost", initTotalCost)
.append("computedTotalCost", computedTotalCost)
.append("computedSteps", computedSteps)
.append("regionPlans", regionPlans)
.toString();
}
@Override
public String toJsonPrettyPrint() {
return GSON.toJson(this);
}
public static class Builder {
private String initialFunctionCosts;
private String finalFunctionCosts;
private double initTotalCost;
private double computedTotalCost;
private long computedSteps;
private List<String> regionPlans;
public Builder setInitialFunctionCosts(String initialFunctionCosts) {
this.initialFunctionCosts = initialFunctionCosts;
return this;
}
public Builder setFinalFunctionCosts(String finalFunctionCosts) {
this.finalFunctionCosts = finalFunctionCosts;
return this;
}
public Builder setInitTotalCost(double initTotalCost) {
this.initTotalCost = initTotalCost;
return this;
}
public Builder setComputedTotalCost(double computedTotalCost) {
this.computedTotalCost = computedTotalCost;
return this;
}
public Builder setRegionPlans(List<String> regionPlans) {
this.regionPlans = regionPlans;
return this;
}
public Builder setComputedSteps(long computedSteps) {
this.computedSteps = computedSteps;
return this;
}
public BalancerDecision build() {
return new BalancerDecision(initialFunctionCosts, finalFunctionCosts,
initTotalCost, computedTotalCost, regionPlans, computedSteps);
}
}
}

View File

@ -101,6 +101,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Has
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
@ -1825,6 +1826,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
HasUserPermissionsRequest request) throws ServiceException {
return stub.hasUserPermissions(controller, request);
}
@Override
public HBaseProtos.LogEntry getLogEntries(RpcController controller,
HBaseProtos.LogRequest request) throws ServiceException {
return stub.getLogEntries(controller, request);
}
};
}

View File

@ -4370,15 +4370,15 @@ public class HBaseAdmin implements Admin {
}
@Override
public List<OnlineLogRecord> getSlowLogResponses(@Nullable final Set<ServerName> serverNames,
final LogQueryFilter logQueryFilter) throws IOException {
private List<LogEntry> getSlowLogResponses(
final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
final String logType) {
if (CollectionUtils.isEmpty(serverNames)) {
return Collections.emptyList();
}
return serverNames.stream().map(serverName -> {
try {
return getSlowLogResponseFromServer(serverName, logQueryFilter);
return getSlowLogResponseFromServer(serverName, filterParams, limit, logType);
} catch (IOException e) {
throw new RuntimeException(e);
}
@ -4386,29 +4386,17 @@ public class HBaseAdmin implements Admin {
).flatMap(List::stream).collect(Collectors.toList());
}
private List<OnlineLogRecord> getSlowLogResponseFromServer(final ServerName serverName,
final LogQueryFilter logQueryFilter) throws IOException {
return getSlowLogResponsesFromServer(this.connection.getAdmin(serverName), logQueryFilter);
}
private List<OnlineLogRecord> getSlowLogResponsesFromServer(AdminService.BlockingInterface admin,
LogQueryFilter logQueryFilter) throws IOException {
return executeCallable(new RpcRetryingCallable<List<OnlineLogRecord>>() {
private List<LogEntry> getSlowLogResponseFromServer(ServerName serverName,
Map<String, Object> filterParams, int limit, String logType) throws IOException {
AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
return executeCallable(new RpcRetryingCallable<List<LogEntry>>() {
@Override
protected List<OnlineLogRecord> rpcCall(int callTimeout) throws Exception {
protected List<LogEntry> rpcCall(int callTimeout) throws Exception {
HBaseRpcController controller = rpcControllerFactory.newController();
if (logQueryFilter.getType() == null
|| logQueryFilter.getType() == LogQueryFilter.Type.SLOW_LOG) {
AdminProtos.SlowLogResponses slowLogResponses =
admin.getSlowLogResponses(controller,
RequestConverter.buildSlowLogResponseRequest(logQueryFilter));
return ProtobufUtil.toSlowLogPayloads(slowLogResponses);
} else {
AdminProtos.SlowLogResponses slowLogResponses =
admin.getLargeLogResponses(controller,
RequestConverter.buildSlowLogResponseRequest(logQueryFilter));
return ProtobufUtil.toSlowLogPayloads(slowLogResponses);
}
HBaseProtos.LogRequest logRequest =
RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType);
HBaseProtos.LogEntry logEntry = admin.getLogEntries(controller, logRequest);
return ProtobufUtil.toSlowLogPayloads(logEntry);
}
});
}
@ -4428,6 +4416,39 @@ public class HBaseAdmin implements Admin {
}).collect(Collectors.toList());
}
@Override
public List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType,
ServerType serverType, int limit, Map<String, Object> filterParams) throws IOException {
if (logType == null || serverType == null) {
throw new IllegalArgumentException("logType and/or serverType cannot be empty");
}
if (logType.equals("SLOW_LOG") || logType.equals("LARGE_LOG")) {
if (ServerType.MASTER.equals(serverType)) {
throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
}
return getSlowLogResponses(filterParams, serverNames, limit, logType);
} else if (logType.equals("BALANCER_DECISION")) {
if (ServerType.REGION_SERVER.equals(serverType)) {
throw new IllegalArgumentException(
"Balancer Decision logs are not maintained by HRegionServer");
}
return getBalancerDecisions(limit);
}
return Collections.emptyList();
}
private List<LogEntry> getBalancerDecisions(final int limit) throws IOException {
return executeCallable(new MasterCallable<List<LogEntry>>(getConnection(),
getRpcControllerFactory()) {
@Override
protected List<LogEntry> rpcCall() throws Exception {
HBaseProtos.LogEntry logEntry =
master.getLogEntries(getRpcController(), ProtobufUtil.toBalancerDecisionRequest(limit));
return ProtobufUtil.toBalancerDecisionResponse(logEntry);
}
});
}
private Boolean clearSlowLogsResponses(final ServerName serverName) throws IOException {
AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
return executeCallable(new RpcRetryingCallable<Boolean>() {

View File

@ -0,0 +1,39 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
/**
* Abstract response class representing online logs response from ring-buffer use-cases
* e.g slow/large RPC logs, balancer decision logs
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class LogEntry {
/**
* Based on response sent by server, provide pretty printed Json representation in string
* @return Pretty printed Json representation
*/
public abstract String toJsonPrettyPrint();
}

View File

@ -23,12 +23,16 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
/**
* Slow/Large Log Query Filter with all filter and limit parameters
* Used by Admin API: getSlowLogResponses
* Extends generic LogRequest used by Admin API getLogEntries
* @deprecated as of 2.4.0. Will be removed in 4.0.0.
*/
@InterfaceAudience.Private
@InterfaceAudience.Public
@InterfaceStability.Evolving
@Deprecated
public class LogQueryFilter {
private String regionName;
@ -153,4 +157,5 @@ public class LogQueryFilter {
.append("filterByOperator", filterByOperator)
.toString();
}
}

View File

@ -24,6 +24,7 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.util.GsonUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.gson.Gson;
import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
@ -33,8 +34,9 @@ import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer;
* Slow/Large Log payload for hbase-client, to be used by Admin API get_slow_responses and
* get_large_responses
*/
@InterfaceAudience.Private
final public class OnlineLogRecord {
@InterfaceAudience.Public
@InterfaceStability.Evolving
final public class OnlineLogRecord extends LogEntry {
// used to convert object to pretty printed format
// used by toJsonPrettyPrint()
@ -56,22 +58,22 @@ final public class OnlineLogRecord {
return jsonObj;
}).create();
private long startTime;
private int processingTime;
private int queueTime;
private long responseSize;
private String clientAddress;
private String serverClass;
private String methodName;
private String callDetails;
private String param;
private final long startTime;
private final int processingTime;
private final int queueTime;
private final long responseSize;
private final String clientAddress;
private final String serverClass;
private final String methodName;
private final String callDetails;
private final String param;
// we don't want to serialize region name, it is just for the filter purpose
// hence avoiding deserialization
private transient String regionName;
private String userName;
private int multiGetsCount;
private int multiMutationsCount;
private int multiServiceCalls;
private final transient String regionName;
private final String userName;
private final int multiGetsCount;
private final int multiMutationsCount;
private final int multiServiceCalls;
public long getStartTime() {
return startTime;
@ -293,6 +295,7 @@ final public class OnlineLogRecord {
.toHashCode();
}
@Override
public String toJsonPrettyPrint() {
return GSON.toJson(this);
}

View File

@ -3906,49 +3906,26 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
.call();
}
@Override
public CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses(
@Nullable final Set<ServerName> serverNames, final LogQueryFilter logQueryFilter) {
private CompletableFuture<List<LogEntry>> getSlowLogResponses(
final Map<String, Object> filterParams, final Set<ServerName> serverNames, final int limit,
final String logType) {
if (CollectionUtils.isEmpty(serverNames)) {
return CompletableFuture.completedFuture(Collections.emptyList());
}
if (logQueryFilter.getType() == null
|| logQueryFilter.getType() == LogQueryFilter.Type.SLOW_LOG) {
return CompletableFuture.supplyAsync(() -> serverNames.stream()
.map((ServerName serverName) ->
getSlowLogResponseFromServer(serverName, logQueryFilter))
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList()));
} else {
return CompletableFuture.supplyAsync(() -> serverNames.stream()
.map((ServerName serverName) ->
getLargeLogResponseFromServer(serverName, logQueryFilter))
getSlowLogResponseFromServer(serverName, filterParams, limit, logType))
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList()));
}
}
private CompletableFuture<List<OnlineLogRecord>> getLargeLogResponseFromServer(
final ServerName serverName, final LogQueryFilter logQueryFilter) {
return this.<List<OnlineLogRecord>>newAdminCaller()
.action((controller, stub) -> this
.adminCall(
controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter),
AdminService.Interface::getLargeLogResponses,
ProtobufUtil::toSlowLogPayloads))
.serverName(serverName).call();
}
private CompletableFuture<List<OnlineLogRecord>> getSlowLogResponseFromServer(
final ServerName serverName, final LogQueryFilter logQueryFilter) {
return this.<List<OnlineLogRecord>>newAdminCaller()
.action((controller, stub) -> this
.adminCall(
controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter),
AdminService.Interface::getSlowLogResponses,
ProtobufUtil::toSlowLogPayloads))
private CompletableFuture<List<LogEntry>> getSlowLogResponseFromServer(ServerName serverName,
Map<String, Object> filterParams, int limit, String logType) {
return this.<List<LogEntry>>newAdminCaller().action((controller, stub) -> this
.adminCall(controller, stub,
RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType),
AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads))
.serverName(serverName).call();
}
@ -3985,4 +3962,34 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
);
}
private CompletableFuture<List<LogEntry>> getBalancerDecisions(final int limit) {
return this.<List<LogEntry>>newMasterCaller()
.action((controller, stub) ->
this.call(controller, stub,
ProtobufUtil.toBalancerDecisionRequest(limit),
MasterService.Interface::getLogEntries, ProtobufUtil::toBalancerDecisionResponse))
.call();
}
@Override
public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames,
String logType, ServerType serverType, int limit,
Map<String, Object> filterParams) {
if (logType == null || serverType == null) {
throw new IllegalArgumentException("logType and/or serverType cannot be empty");
}
if (logType.equals("SLOW_LOG") || logType.equals("LARGE_LOG")) {
if (ServerType.MASTER.equals(serverType)) {
throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster");
}
return getSlowLogResponses(filterParams, serverNames, limit, logType);
} else if (logType.equals("BALANCER_DECISION")) {
if (ServerType.REGION_SERVER.equals(serverType)) {
throw new IllegalArgumentException(
"Balancer Decision logs are not maintained by HRegionServer");
}
return getBalancerDecisions(limit);
}
return CompletableFuture.completedFuture(Collections.emptyList());
}
}

View File

@ -0,0 +1,33 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Select server type i.e destination for RPC request associated with ring buffer.
* e.g slow/large log records are maintained by HRegionServer, whereas balancer decisions
* are maintained by HMaster.
*/
@InterfaceAudience.Public
public enum ServerType {
MASTER,
REGION_SERVER
}

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Rev
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
@ -472,6 +472,12 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
return stub.listNamespaces(controller, request);
}
@Override
public HBaseProtos.LogEntry getLogEntries(RpcController controller,
HBaseProtos.LogRequest request) throws ServiceException {
return stub.getLogEntries(controller, request);
}
@Override
public MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
RpcController controller, MajorCompactionTimestampForRegionRequest request)

View File

@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.security.AccessController;
@ -66,6 +67,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.ClientUtil;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@ -77,6 +79,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.LogEntry;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.OnlineLogRecord;
import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
@ -133,6 +136,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
@ -191,6 +195,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableD
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
@ -3435,7 +3440,7 @@ public final class ProtobufUtil {
* @param slowLogPayload SlowLog Payload protobuf instance
* @return SlowLog Payload for client usecase
*/
private static OnlineLogRecord getSlowLogRecord(
private static LogEntry getSlowLogRecord(
final TooSlowLog.SlowLogPayload slowLogPayload) {
OnlineLogRecord onlineLogRecord = new OnlineLogRecord.OnlineLogRecordBuilder()
.setCallDetails(slowLogPayload.getCallDetails())
@ -3459,14 +3464,26 @@ public final class ProtobufUtil {
/**
* Convert AdminProtos#SlowLogResponses to list of {@link OnlineLogRecord}
*
* @param slowLogResponses slowlog response protobuf instance
* @param logEntry slowlog response protobuf instance
* @return list of SlowLog payloads for client usecase
*/
public static List<OnlineLogRecord> toSlowLogPayloads(
final AdminProtos.SlowLogResponses slowLogResponses) {
List<OnlineLogRecord> slowLogRecords = slowLogResponses.getSlowLogPayloadsList()
.stream().map(ProtobufUtil::getSlowLogRecord).collect(Collectors.toList());
return slowLogRecords;
public static List<LogEntry> toSlowLogPayloads(
final HBaseProtos.LogEntry logEntry) {
try {
final String logClassName = logEntry.getLogClassName();
Class<?> logClass = Class.forName(logClassName).asSubclass(Message.class);
Method method = logClass.getMethod("parseFrom", ByteString.class);
if (logClassName.contains("SlowLogResponses")) {
AdminProtos.SlowLogResponses slowLogResponses = (AdminProtos.SlowLogResponses) method
.invoke(null, logEntry.getLogMessage());
return slowLogResponses.getSlowLogPayloadsList().stream()
.map(ProtobufUtil::getSlowLogRecord).collect(Collectors.toList());
}
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException e) {
throw new RuntimeException("Error while retrieving response from server");
}
throw new RuntimeException("Invalid response from server");
}
/**
@ -3551,4 +3568,49 @@ public final class ProtobufUtil {
throw new DoNotRetryIOException(e.getMessage());
}
}
public static List<LogEntry> toBalancerDecisionResponse(
HBaseProtos.LogEntry logEntry) {
try {
final String logClassName = logEntry.getLogClassName();
Class<?> logClass = Class.forName(logClassName).asSubclass(Message.class);
Method method = logClass.getMethod("parseFrom", ByteString.class);
if (logClassName.contains("BalancerDecisionsResponse")) {
MasterProtos.BalancerDecisionsResponse response =
(MasterProtos.BalancerDecisionsResponse) method
.invoke(null, logEntry.getLogMessage());
return getBalancerDecisionEntries(response);
}
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException e) {
throw new RuntimeException("Error while retrieving response from server");
}
throw new RuntimeException("Invalid response from server");
}
public static List<LogEntry> getBalancerDecisionEntries(
MasterProtos.BalancerDecisionsResponse response) {
List<RecentLogs.BalancerDecision> balancerDecisions = response.getBalancerDecisionList();
if (CollectionUtils.isEmpty(balancerDecisions)) {
return Collections.emptyList();
}
return balancerDecisions.stream().map(balancerDecision -> new BalancerDecision.Builder()
.setInitTotalCost(balancerDecision.getInitTotalCost())
.setInitialFunctionCosts(balancerDecision.getInitialFunctionCosts())
.setComputedTotalCost(balancerDecision.getComputedTotalCost())
.setFinalFunctionCosts(balancerDecision.getFinalFunctionCosts())
.setComputedSteps(balancerDecision.getComputedSteps())
.setRegionPlans(balancerDecision.getRegionPlansList()).build())
.collect(Collectors.toList());
}
public static HBaseProtos.LogRequest toBalancerDecisionRequest(int limit) {
MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest =
MasterProtos.BalancerDecisionsRequest.newBuilder().setLimit(limit).build();
return HBaseProtos.LogRequest.newBuilder()
.setLogClassName(balancerDecisionsRequest.getClass().getName())
.setLogMessage(balancerDecisionsRequest.toByteString())
.build();
}
}

View File

@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.LogQueryFilter;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
@ -71,6 +70,8 @@ import org.apache.hadoop.security.token.Token;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
@ -2068,45 +2069,67 @@ public final class RequestConverter {
}
/**
* Create a protocol buffer {@link SlowLogResponseRequest}
* Build RPC request payload for getLogEntries
*
* @param logQueryFilter filter to use if provided
* @return a protocol buffer SlowLogResponseRequest
* @param filterParams map of filter params
* @param limit limit for no of records that server returns
* @param logType type of the log records
* @return request payload {@link HBaseProtos.LogRequest}
*/
public static SlowLogResponseRequest buildSlowLogResponseRequest(
final LogQueryFilter logQueryFilter) {
public static HBaseProtos.LogRequest buildSlowLogResponseRequest(
final Map<String, Object> filterParams, final int limit, final String logType) {
SlowLogResponseRequest.Builder builder = SlowLogResponseRequest.newBuilder();
if (logQueryFilter == null) {
return builder.build();
builder.setLimit(limit);
if (logType.equals("SLOW_LOG")) {
builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG);
} else if (logType.equals("LARGE_LOG")) {
builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG);
}
final String clientAddress = logQueryFilter.getClientAddress();
boolean filterByAnd = false;
if (MapUtils.isNotEmpty(filterParams)) {
if (filterParams.containsKey("clientAddress")) {
final String clientAddress = (String) filterParams.get("clientAddress");
if (StringUtils.isNotEmpty(clientAddress)) {
builder.setClientAddress(clientAddress);
}
final String regionName = logQueryFilter.getRegionName();
}
if (filterParams.containsKey("regionName")) {
final String regionName = (String) filterParams.get("regionName");
if (StringUtils.isNotEmpty(regionName)) {
builder.setRegionName(regionName);
}
final String tableName = logQueryFilter.getTableName();
}
if (filterParams.containsKey("tableName")) {
final String tableName = (String) filterParams.get("tableName");
if (StringUtils.isNotEmpty(tableName)) {
builder.setTableName(tableName);
}
final String userName = logQueryFilter.getUserName();
}
if (filterParams.containsKey("userName")) {
final String userName = (String) filterParams.get("userName");
if (StringUtils.isNotEmpty(userName)) {
builder.setUserName(userName);
}
LogQueryFilter.FilterByOperator filterByOperator = logQueryFilter.getFilterByOperator();
if (LogQueryFilter.FilterByOperator.AND.equals(filterByOperator)) {
}
if (filterParams.containsKey("filterByOperator")) {
final String filterByOperator = (String) filterParams.get("filterByOperator");
if (StringUtils.isNotEmpty(filterByOperator)) {
if (filterByOperator.toUpperCase().equals("AND")) {
filterByAnd = true;
}
}
}
}
if (filterByAnd) {
builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.AND);
} else {
builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.OR);
}
if (LogQueryFilter.Type.SLOW_LOG.equals(logQueryFilter.getType())) {
builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG);
} else {
builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG);
}
return builder.setLimit(logQueryFilter.getLimit()).build();
SlowLogResponseRequest slowLogResponseRequest = builder.build();
return HBaseProtos.LogRequest.newBuilder()
.setLogClassName(slowLogResponseRequest.getClass().getName())
.setLogMessage(slowLogResponseRequest.toByteString())
.build();
}
/**

View File

@ -1998,7 +1998,7 @@ possible configurations would overwhelm and obscure the important.
</property>
<property>
<name>hbase.namedqueue.provider.classes</name>
<value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService</value>
<value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService</value>
<description>
Default values for NamedQueueService implementors. This comma separated full class names
represent all implementors of NamedQueueService that we would like to be invoked by
@ -2008,4 +2008,13 @@ possible configurations would overwhelm and obscure the important.
"org.apache.hadoop.hbase.namequeues.impl"
</description>
</property>
<property>
<name>hbase.master.balancer.decision.buffer.enabled</name>
<value>false</value>
<description>
Indicates whether active HMaster has ring buffer running for storing
balancer decisions in FIFO manner with limited entries. The size of
the ring buffer is indicated by config: hbase.master.balancer.decision.queue.size
</description>
</property>
</configuration>

View File

@ -37,7 +37,7 @@ public class MetricsStochasticBalancerSourceImpl extends MetricsBalancerSourceIm
private int metricsSize = 1000;
private int mruCap = calcMruCap(metricsSize);
private Map<String, Map<String, Double>> stochasticCosts =
private final Map<String, Map<String, Double>> stochasticCosts =
new LinkedHashMap<String, Map<String, Double>>(mruCap, MRU_LOAD_FACTOR, true) {
private static final long serialVersionUID = 8204713453436906599L;
@ -71,7 +71,6 @@ public class MetricsStochasticBalancerSourceImpl extends MetricsBalancerSourceIm
if (tableName == null || costFunctionName == null || cost == null) {
return;
}
if (functionDesc != null) {
costFunctionDescs.put(costFunctionName, functionDesc);
}
@ -81,7 +80,6 @@ public class MetricsStochasticBalancerSourceImpl extends MetricsBalancerSourceIm
if (costs == null) {
costs = new ConcurrentHashMap<>();
}
costs.put(costFunctionName, cost);
stochasticCosts.put(tableName, costs);
}

View File

@ -282,6 +282,13 @@ message ExecuteProceduresRequest {
message ExecuteProceduresResponse {
}
/**
* Slow/Large log (LogRequest) use-case specific RPC request. This request payload will be
* converted in bytes and sent to generic RPC API: GetLogEntries
* LogRequest message has two params:
* 1. log_class_name: SlowLogResponseRequest (for Slow/Large log use-case)
* 2. log_message: SlowLogResponseRequest converted in bytes (for Slow/Large log use-case)
*/
message SlowLogResponseRequest {
enum FilterByOperator {
AND = 0;
@ -302,6 +309,13 @@ message SlowLogResponseRequest {
optional LogType log_type = 7;
}
/**
* Slow/Large log (LogEntry) use-case specific RPC response. This response payload will be
* converted in bytes by servers and sent as response to generic RPC API: GetLogEntries
* LogEntry message has two params:
* 1. log_class_name: SlowLogResponses (for Slow/Large log use-case)
* 2. log_message: SlowLogResponses converted in bytes (for Slow/Large log use-case)
*/
message SlowLogResponses {
repeated SlowLogPayload slow_log_payloads = 1;
}
@ -387,4 +401,8 @@ service AdminService {
rpc ClearSlowLogsResponses(ClearSlowLogResponseRequest)
returns(ClearSlowLogResponses);
rpc GetLogEntries(LogRequest)
returns(LogEntry);
}

View File

@ -259,3 +259,13 @@ message RegionLocation {
optional ServerName server_name = 2;
required int64 seq_num = 3;
}
message LogRequest {
required string log_class_name = 1;
required bytes log_message = 2;
}
message LogEntry {
required string log_class_name = 1;
required bytes log_message = 2;
}

View File

@ -37,6 +37,7 @@ import "Quota.proto";
import "Replication.proto";
import "Snapshot.proto";
import "AccessControl.proto";
import "RecentLogs.proto";
/* Column-level protobufs */
@ -692,6 +693,28 @@ message SwitchExceedThrottleQuotaResponse {
required bool previous_exceed_throttle_quota_enabled = 1;
}
/**
* BalancerDecision (LogRequest) use-case specific RPC request. This request payload will be
* converted in bytes and sent to generic RPC API: GetLogEntries
* LogRequest message has two params:
* 1. log_class_name: BalancerDecisionsRequest (for BalancerDecision use-case)
* 2. log_message: BalancerDecisionsRequest converted in bytes (for BalancerDecision use-case)
*/
message BalancerDecisionsRequest {
optional uint32 limit = 1;
}
/**
* BalancerDecision (LogEntry) use-case specific RPC response. This response payload will be
* converted in bytes by servers and sent as response to generic RPC API: GetLogEntries
* LogEntry message has two params:
* 1. log_class_name: BalancerDecisionsResponse (for BalancerDecision use-case)
* 2. log_message: BalancerDecisionsResponse converted in bytes (for BalancerDecision use-case)
*/
message BalancerDecisionsResponse {
repeated BalancerDecision balancer_decision = 1;
}
service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
@ -1079,6 +1102,9 @@ service MasterService {
/** returns a list of namespace names */
rpc ListNamespaces(ListNamespacesRequest)
returns(ListNamespacesResponse);
rpc GetLogEntries(LogRequest)
returns(LogEntry);
}
// HBCK Service definitions.

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
// This file contains protocol buffers that are used for Online BalancerDecision history
// To be used as Ring Buffer payload
package hbase.pb;
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "RecentLogs";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message BalancerDecision {
required string initial_function_costs = 1;
required string final_function_costs = 2;
required double init_total_cost = 3;
required double computed_total_cost = 4;
required uint64 computed_steps = 5;
repeated string region_plans = 6;
}

View File

@ -21,10 +21,13 @@ import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -81,6 +84,10 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
import org.apache.hadoop.hbase.procedure2.LockType;
import org.apache.hadoop.hbase.procedure2.LockedResource;
@ -123,6 +130,8 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
@ -324,6 +333,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaSta
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
@ -2981,4 +2991,49 @@ public class MasterRpcServices extends RSRpcServices implements
location -> response.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
return response.build();
}
@Override
public HBaseProtos.LogEntry getLogEntries(RpcController controller,
HBaseProtos.LogRequest request) throws ServiceException {
try {
final String logClassName = request.getLogClassName();
Class<?> logClass = Class.forName(logClassName)
.asSubclass(Message.class);
Method method = logClass.getMethod("parseFrom", ByteString.class);
if (logClassName.contains("BalancerDecisionsRequest")) {
MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest =
(MasterProtos.BalancerDecisionsRequest) method
.invoke(null, request.getLogMessage());
MasterProtos.BalancerDecisionsResponse balancerDecisionsResponse =
getBalancerDecisions(balancerDecisionsRequest);
return HBaseProtos.LogEntry.newBuilder()
.setLogClassName(balancerDecisionsResponse.getClass().getName())
.setLogMessage(balancerDecisionsResponse.toByteString())
.build();
}
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException e) {
LOG.error("Error while retrieving log entries.", e);
throw new ServiceException(e);
}
throw new ServiceException("Invalid request params");
}
private MasterProtos.BalancerDecisionsResponse getBalancerDecisions(
MasterProtos.BalancerDecisionsRequest request) {
final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder();
if (namedQueueRecorder == null) {
return MasterProtos.BalancerDecisionsResponse.newBuilder()
.addAllBalancerDecision(Collections.emptyList()).build();
}
final NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(BalancerDecisionDetails.BALANCER_DECISION_EVENT);
namedQueueGetRequest.setBalancerDecisionsRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
List<RecentLogs.BalancerDecision> balancerDecisions =
namedQueueGetResponse.getBalancerDecisions();
return MasterProtos.BalancerDecisionsResponse.newBuilder()
.addAllBalancerDecision(balancerDecisions).build();
}
}

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
@ -73,6 +74,11 @@ import org.slf4j.LoggerFactory;
*/
@InterfaceAudience.Private
public abstract class BaseLoadBalancer implements LoadBalancer {
public static final String BALANCER_DECISION_BUFFER_ENABLED =
"hbase.master.balancer.decision.buffer.enabled";
public static final boolean DEFAULT_BALANCER_DECISION_BUFFER_ENABLED = false;
protected static final int MIN_SERVER_BALANCE = 2;
private volatile boolean stopped = false;
@ -85,6 +91,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
protected boolean useRegionFinder;
protected boolean isByTable = false;
/**
* Use to add balancer decision history to ring-buffer
*/
protected NamedQueueRecorder namedQueueRecorder;
private static class DefaultRackManager extends RackManager {
@Override
public String getRack(ServerName server) {

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
@ -46,6 +47,8 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRe
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@ -221,6 +224,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
curFunctionCosts = new Double[costFunctions.size()];
tempFunctionCosts = new Double[costFunctions.size()];
boolean isBalancerDecisionRecording = getConf()
.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
if (this.namedQueueRecorder == null && isBalancerDecisionRecording) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(getConf());
}
LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
Arrays.toString(getCostFunctionNames()) + " etc.");
@ -233,8 +243,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return;
}
costFunctions.addAll(Arrays.stream(functionsNames)
.map(c -> {
costFunctions.addAll(Arrays.stream(functionsNames).map(c -> {
Class<? extends CostFunction> klass = null;
try {
klass = (Class<? extends CostFunction>) Class.forName(c);
@ -244,15 +253,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
if (null == klass) {
return null;
}
CostFunction reflected = ReflectionUtils.newInstance(klass, conf);
LOG.info("Successfully loaded custom CostFunction '" +
reflected.getClass().getSimpleName() + "'");
LOG.info(
"Successfully loaded custom CostFunction '" + reflected.getClass().getSimpleName() + "'");
return reflected;
})
.filter(Objects::nonNull)
.collect(Collectors.toList()));
}).filter(Objects::nonNull).collect(Collectors.toList()));
}
protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
@ -411,7 +416,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
curOverallCost = currentCost;
System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
double initCost = currentCost;
double newCost = currentCost;
double newCost;
long computedMaxSteps;
if (runMaxSteps) {
@ -432,6 +437,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
+ functionCost() + " computedMaxSteps: " + computedMaxSteps);
final String initFunctionTotalCosts = totalCostsPerFunc();
// Perform a stochastic walk to see if we can get a good fit.
long step;
@ -480,6 +486,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
"{} regions; Going from a computed cost of {}" +
" to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime),
step, plans.size(), initCost, currentCost);
sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
return plans;
}
LOG.info("Could not find a better load balance plan. Tried {} different configurations in " +
@ -488,6 +495,27 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return null;
}
private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
double initCost, String initFunctionTotalCosts, long step) {
if (this.namedQueueRecorder != null) {
List<String> regionPlans = new ArrayList<>();
for (RegionPlan plan : plans) {
regionPlans.add(
"table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
+ " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
}
BalancerDecision balancerDecision =
new BalancerDecision.Builder()
.setInitTotalCost(initCost)
.setInitialFunctionCosts(initFunctionTotalCosts)
.setComputedTotalCost(currentCost)
.setFinalFunctionCosts(totalCostsPerFunc())
.setComputedSteps(step)
.setRegionPlans(regionPlans).build();
namedQueueRecorder.addRecord(new BalancerDecisionDetails(balancerDecision));
}
}
/**
* update costs to JMX
*/
@ -532,6 +560,23 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
return builder.toString();
}
private String totalCostsPerFunc() {
StringBuilder builder = new StringBuilder();
for (CostFunction c : costFunctions) {
if (c.getMultiplier() * c.cost() > 0.0) {
builder.append(" ");
builder.append(c.getClass().getSimpleName());
builder.append(" : ");
builder.append(c.getMultiplier() * c.cost());
builder.append(";");
}
}
if (builder.length() > 0) {
builder.deleteCharAt(builder.length() - 1);
}
return builder.toString();
}
/**
* Create all of the RegionPlan's needed to move from the initial cluster state to the desired
* state.

View File

@ -0,0 +1,51 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Balancer decision details that would be passed on to ring buffer for history
*/
@InterfaceAudience.Private
public class BalancerDecisionDetails extends NamedQueuePayload {
public static final int BALANCER_DECISION_EVENT = 1;
private final BalancerDecision balancerDecision;
public BalancerDecisionDetails(BalancerDecision balancerDecision) {
super(BALANCER_DECISION_EVENT);
this.balancerDecision = balancerDecision;
}
public BalancerDecision getBalancerDecision() {
return balancerDecision;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("balancerDecision", balancerDecision)
.toString();
}
}

View File

@ -30,16 +30,39 @@ import org.apache.yetus.audience.InterfaceAudience;
public class NamedQueuePayload {
public enum NamedQueueEvent {
SLOW_LOG
SLOW_LOG(0),
BALANCE_DECISION(1);
private final int value;
NamedQueueEvent(int i) {
this.value = i;
}
public static NamedQueueEvent getEventByOrdinal(int value){
switch (value) {
case 0: {
return SLOW_LOG;
}
case 1: {
return BALANCE_DECISION;
}
default: {
throw new IllegalArgumentException(
"NamedQueue event with ordinal " + value + " not defined");
}
}
}
public int getValue() {
return value;
}
}
private final NamedQueueEvent namedQueueEvent;
public NamedQueuePayload(NamedQueueEvent namedQueueEvent) {
if (namedQueueEvent == null) {
throw new RuntimeException("NamedQueuePayload with null namedQueueEvent");
}
this.namedQueueEvent = namedQueueEvent;
public NamedQueuePayload(int eventOrdinal) {
this.namedQueueEvent = NamedQueueEvent.getEventByOrdinal(eventOrdinal);
}
public NamedQueueEvent getNamedQueueEvent() {

View File

@ -43,7 +43,7 @@ final class RingBufferEnvelope {
}
/**
* Retrieve current rpcCall details {@link RpcLogDetails} available on Envelope and
* Retrieve current namedQueue payload {@link NamedQueuePayload} available on Envelope and
* free up the Envelope
*
* @return Retrieve rpc log details

View File

@ -30,6 +30,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class RpcLogDetails extends NamedQueuePayload {
public static final int SLOW_LOG_EVENT = 0;
private final RpcCall rpcCall;
private final Message param;
private final String clientAddress;
@ -40,7 +42,7 @@ public class RpcLogDetails extends NamedQueuePayload {
public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize,
String className, boolean isSlowLog, boolean isLargeLog) {
super(NamedQueueEvent.SLOW_LOG);
super(SLOW_LOG_EVENT);
this.rpcCall = rpcCall;
this.param = param;
this.clientAddress = clientAddress;

View File

@ -0,0 +1,150 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.namequeues.NamedQueueService;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;
/**
* In-memory Queue service provider for Balancer Decision events
*/
@InterfaceAudience.Private
public class BalancerDecisionQueueService implements NamedQueueService {
private static final Logger LOG = LoggerFactory.getLogger(BalancerDecisionQueueService.class);
private final boolean isBalancerDecisionRecording;
private static final String BALANCER_DECISION_QUEUE_SIZE =
"hbase.master.balancer.decision.queue.size";
private static final int DEFAULT_BALANCER_DECISION_QUEUE_SIZE = 250;
private static final int REGION_PLANS_THRESHOLD_PER_BALANCER = 15;
private final Queue<RecentLogs.BalancerDecision> balancerDecisionQueue;
public BalancerDecisionQueueService(Configuration conf) {
isBalancerDecisionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
if (!isBalancerDecisionRecording) {
balancerDecisionQueue = null;
return;
}
final int queueSize =
conf.getInt(BALANCER_DECISION_QUEUE_SIZE, DEFAULT_BALANCER_DECISION_QUEUE_SIZE);
final EvictingQueue<RecentLogs.BalancerDecision> evictingQueue =
EvictingQueue.create(queueSize);
balancerDecisionQueue = Queues.synchronizedQueue(evictingQueue);
}
@Override
public NamedQueuePayload.NamedQueueEvent getEvent() {
return NamedQueuePayload.NamedQueueEvent.BALANCE_DECISION;
}
@Override
public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
if (!isBalancerDecisionRecording) {
return;
}
if (!(namedQueuePayload instanceof BalancerDecisionDetails)) {
LOG.warn(
"BalancerDecisionQueueService: NamedQueuePayload is not of type BalancerDecisionDetails.");
return;
}
BalancerDecisionDetails balancerDecisionDetails = (BalancerDecisionDetails) namedQueuePayload;
BalancerDecision balancerDecisionRecords =
balancerDecisionDetails.getBalancerDecision();
List<String> regionPlans = balancerDecisionRecords.getRegionPlans();
List<List<String>> regionPlansList;
if (regionPlans.size() > REGION_PLANS_THRESHOLD_PER_BALANCER) {
regionPlansList = Lists.partition(regionPlans, REGION_PLANS_THRESHOLD_PER_BALANCER);
} else {
regionPlansList = Collections.singletonList(regionPlans);
}
for (List<String> regionPlansPerBalancer : regionPlansList) {
RecentLogs.BalancerDecision balancerDecision = RecentLogs.BalancerDecision.newBuilder()
.setInitTotalCost(balancerDecisionRecords.getInitTotalCost())
.setInitialFunctionCosts(balancerDecisionRecords.getInitialFunctionCosts())
.setComputedTotalCost(balancerDecisionRecords.getComputedTotalCost())
.setFinalFunctionCosts(balancerDecisionRecords.getFinalFunctionCosts())
.setComputedSteps(balancerDecisionRecords.getComputedSteps())
.addAllRegionPlans(regionPlansPerBalancer)
.build();
balancerDecisionQueue.add(balancerDecision);
}
}
@Override
public boolean clearNamedQueue() {
if (!isBalancerDecisionRecording) {
return false;
}
LOG.debug("Received request to clean up balancer decision queue.");
balancerDecisionQueue.clear();
return true;
}
@Override
public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
if (!isBalancerDecisionRecording) {
return null;
}
List<RecentLogs.BalancerDecision> balancerDecisions =
Arrays.stream(balancerDecisionQueue.toArray(new RecentLogs.BalancerDecision[0]))
.collect(Collectors.toList());
// latest records should be displayed first, hence reverse order sorting
Collections.reverse(balancerDecisions);
int limit = balancerDecisions.size();
if (request.getBalancerDecisionsRequest().hasLimit()) {
limit = Math.min(request.getBalancerDecisionsRequest().getLimit(), balancerDecisions.size());
}
// filter limit if provided
balancerDecisions = balancerDecisions.subList(0, limit);
final NamedQueueGetResponse namedQueueGetResponse = new NamedQueueGetResponse();
namedQueueGetResponse.setNamedQueueEvent(BalancerDecisionDetails.BALANCER_DECISION_EVENT);
namedQueueGetResponse.setBalancerDecisions(balancerDecisions);
return namedQueueGetResponse;
}
@Override
public void persistAll() {
// no-op for now
}
}

View File

@ -19,6 +19,12 @@
package org.apache.hadoop.hbase.namequeues.impl;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
@ -31,22 +37,19 @@ import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
/**
* In-memory Queue service provider for Slow/LargeLog events
@ -201,7 +204,7 @@ public class SlowLogQueueService implements NamedQueueService {
slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest);
}
NamedQueueGetResponse response = new NamedQueueGetResponse();
response.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
response.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
response.setSlowLogPayloads(slowLogPayloads);
return response;
}

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.namequeues.request;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -36,6 +37,7 @@ public class NamedQueueGetRequest {
private AdminProtos.SlowLogResponseRequest slowLogResponseRequest;
private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
private MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest;
public AdminProtos.SlowLogResponseRequest getSlowLogResponseRequest() {
return slowLogResponseRequest;
@ -46,12 +48,21 @@ public class NamedQueueGetRequest {
this.slowLogResponseRequest = slowLogResponseRequest;
}
public MasterProtos.BalancerDecisionsRequest getBalancerDecisionsRequest() {
return balancerDecisionsRequest;
}
public void setBalancerDecisionsRequest(
MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest) {
this.balancerDecisionsRequest = balancerDecisionsRequest;
}
public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}
public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
this.namedQueueEvent = namedQueueEvent;
public void setNamedQueueEvent(int eventOrdinal) {
this.namedQueueEvent = NamedQueuePayload.NamedQueueEvent.getEventByOrdinal(eventOrdinal);
}
@Override
@ -59,7 +70,7 @@ public class NamedQueueGetRequest {
return new ToStringBuilder(this)
.append("slowLogResponseRequest", slowLogResponseRequest)
.append("namedQueueEvent", namedQueueEvent)
.append("balancerDecisionsRequest", balancerDecisionsRequest)
.toString();
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.namequeues.response;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
import org.apache.yetus.audience.InterfaceAudience;
import java.util.List;
@ -32,6 +33,7 @@ import java.util.List;
public class NamedQueueGetResponse {
private List<TooSlowLog.SlowLogPayload> slowLogPayloads;
private List<RecentLogs.BalancerDecision> balancerDecisions;
private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
public List<TooSlowLog.SlowLogPayload> getSlowLogPayloads() {
@ -42,18 +44,27 @@ public class NamedQueueGetResponse {
this.slowLogPayloads = slowLogPayloads;
}
public List<RecentLogs.BalancerDecision> getBalancerDecisions() {
return balancerDecisions;
}
public void setBalancerDecisions(List<RecentLogs.BalancerDecision> balancerDecisions) {
this.balancerDecisions = balancerDecisions;
}
public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}
public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
this.namedQueueEvent = namedQueueEvent;
public void setNamedQueueEvent(int eventOrdinal) {
this.namedQueueEvent = NamedQueuePayload.NamedQueueEvent.getEventByOrdinal(eventOrdinal);
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("slowLogPayloads", slowLogPayloads)
.append("balancerDecisions", balancerDecisions)
.append("namedQueueEvent", namedQueueEvent)
.toString();
}

View File

@ -121,6 +121,7 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
@ -596,14 +597,7 @@ public class HRegionServer extends Thread implements
this.abortRequested = new AtomicBoolean(false);
this.stopped = false;
if (!(this instanceof HMaster)) {
final boolean isOnlineLogProviderEnabled = conf.getBoolean(
HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
if (isOnlineLogProviderEnabled) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
}
}
initNamedQueueRecorder(conf);
rpcServices = createRpcServices();
useThisHostnameInstead = getUseThisHostnameInstead(conf);
String hostName =
@ -679,6 +673,24 @@ public class HRegionServer extends Thread implements
}
}
private void initNamedQueueRecorder(Configuration conf) {
if (!(this instanceof HMaster)) {
final boolean isOnlineLogProviderEnabled = conf.getBoolean(
HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
if (isOnlineLogProviderEnabled) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
}
} else {
final boolean isBalancerDecisionRecording = conf
.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
if (isBalancerDecisionRecording) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
}
}
}
// HMaster should override this method to load the specific config for master
protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
String hostname = conf.get(RS_HOSTNAME_KEY);

View File

@ -21,6 +21,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -106,6 +107,7 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.net.Address;
@ -241,6 +243,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanReques
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
@ -3787,7 +3790,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
List<SlowLogPayload> slowLogPayloads;
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
namedQueueGetRequest.setSlowLogResponseRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
@ -3826,6 +3829,36 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return clearSlowLogResponses;
}
@Override
public HBaseProtos.LogEntry getLogEntries(RpcController controller,
HBaseProtos.LogRequest request) throws ServiceException {
try {
final String logClassName = request.getLogClassName();
Class<?> logClass = Class.forName(logClassName)
.asSubclass(Message.class);
Method method = logClass.getMethod("parseFrom", ByteString.class);
if (logClassName.contains("SlowLogResponseRequest")) {
SlowLogResponseRequest slowLogResponseRequest =
(SlowLogResponseRequest) method.invoke(null, request.getLogMessage());
final NamedQueueRecorder namedQueueRecorder =
this.regionServer.getNamedQueueRecorder();
final List<SlowLogPayload> slowLogPayloads =
getSlowLogPayloads(slowLogResponseRequest, namedQueueRecorder);
SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
.addAllSlowLogPayloads(slowLogPayloads)
.build();
return HBaseProtos.LogEntry.newBuilder()
.setLogClassName(slowLogResponses.getClass().getName())
.setLogMessage(slowLogResponses.toByteString()).build();
}
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException e) {
LOG.error("Error while retrieving log entries.", e);
throw new ServiceException(e);
}
throw new ServiceException("Invalid request params");
}
@VisibleForTesting
public RpcScheduler getRpcScheduler() {
return rpcServer.getScheduler();

View File

@ -888,12 +888,13 @@ public class TestAdmin2 extends TestAdminBase {
}
Assert.assertEquals(countFailedClearSlowResponse, 0);
LogQueryFilter logQueryFilter = new LogQueryFilter();
List<OnlineLogRecord> onlineLogRecords = ADMIN.getSlowLogResponses(new HashSet<>(serverNames),
logQueryFilter);
List<LogEntry> onlineLogRecords = ADMIN.getLogEntries(new HashSet<>(serverNames),
"SLOW_LOG", ServerType.REGION_SERVER, 100, null);
// after cleanup of slowlog responses, total count of slowlog payloads should be 0
Assert.assertEquals(onlineLogRecords.size(), 0);
List<LogEntry> balancerDecisionRecords =
ADMIN.getLogEntries(null, "BALANCER_DECISION", ServerType.MASTER, 100, null);
Assert.assertEquals(balancerDecisionRecords.size(), 0);
}
@Test

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@ -74,13 +75,10 @@ public class TestAsyncTableGetMultiThreaded {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] QUALIFIER = Bytes.toBytes("cq");
private static int COUNT = 1000;
private static final TableName TABLE_NAME = TableName.valueOf("async");
private static final byte[] FAMILY = Bytes.toBytes("cf");
private static final byte[] QUALIFIER = Bytes.toBytes("cq");
private static final int COUNT = 1000;
private static AsyncConnection CONN;
@ -99,6 +97,7 @@ public class TestAsyncTableGetMultiThreaded {
TEST_UTIL.getConfiguration().setInt(MAX_BUFFER_COUNT_KEY, 100);
TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
String.valueOf(memoryCompaction));
TEST_UTIL.getConfiguration().setBoolean("hbase.master.balancer.decision.buffer.enabled", true);
TEST_UTIL.startMiniCluster(3);
SPLIT_KEYS = new byte[8][];
@ -211,6 +210,9 @@ public class TestAsyncTableGetMultiThreaded {
LOG.info("====== Move meta done ======");
Thread.sleep(5000);
}
List<LogEntry> balancerDecisionRecords =
admin.getLogEntries(null, "BALANCER_DECISION", ServerType.MASTER, 2, null);
Assert.assertEquals(balancerDecisionRecords.size(), 2);
LOG.info("====== Read test finished, shutdown thread pool ======");
stop.set(true);
executor.shutdown();

View File

@ -137,6 +137,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
@ -699,6 +700,12 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
return null;
}
@Override
public HBaseProtos.LogEntry getLogEntries(RpcController controller,
HBaseProtos.LogRequest request) throws ServiceException {
return null;
}
@Override
public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(
RpcController controller, GetSpaceQuotaSnapshotsRequest request)

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.LogEntry;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
/**
* Test BalancerDecision ring buffer using namedQueue interface
*/
@Category({ MasterTests.class, MediumTests.class })
public class TestBalancerDecision extends BalancerTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBalancerDecision.class);
@Test
public void testBalancerDecisions() {
conf.setBoolean("hbase.master.balancer.decision.buffer.enabled", true);
loadBalancer.setConf(conf);
float minCost = conf.getFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 1.0f);
try {
// Test with/without per table balancer.
boolean[] perTableBalancerConfigs = {true, false};
for (boolean isByTable : perTableBalancerConfigs) {
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
loadBalancer.setConf(conf);
for (int[] mockCluster : clusterStateMocks) {
Map<ServerName, List<RegionInfo>> servers = mockClusterServers(mockCluster);
Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
(Map) mockClusterServersWithTables(servers);
List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
boolean emptyPlans = plans == null || plans.isEmpty();
Assert.assertTrue(emptyPlans || needsBalanceIdleRegion(mockCluster));
}
}
final NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(BalancerDecisionDetails.BALANCER_DECISION_EVENT);
namedQueueGetRequest
.setBalancerDecisionsRequest(MasterProtos.BalancerDecisionsRequest.getDefaultInstance());
NamedQueueGetResponse namedQueueGetResponse =
loadBalancer.namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
List<RecentLogs.BalancerDecision> balancerDecisions =
namedQueueGetResponse.getBalancerDecisions();
MasterProtos.BalancerDecisionsResponse response =
MasterProtos.BalancerDecisionsResponse.newBuilder()
.addAllBalancerDecision(balancerDecisions)
.build();
List<LogEntry> balancerDecisionRecords =
ProtobufUtil.getBalancerDecisionEntries(response);
Assert.assertTrue(balancerDecisionRecords.size() > 160);
} finally {
// reset config
conf.unset(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE);
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", minCost);
loadBalancer.setConf(conf);
}
}
private static boolean needsBalanceIdleRegion(int[] cluster) {
return (Arrays.stream(cluster).anyMatch(x -> x > 1)) && (Arrays.stream(cluster)
.anyMatch(x -> x < 1));
}
}

View File

@ -151,6 +151,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
String regionNameAsString = RegionInfo.getRegionNameAsString(Bytes.toBytes(REGION_KEY));
assertTrue(loadBalancer.loads.get(regionNameAsString) != null);
assertTrue(loadBalancer.loads.get(regionNameAsString).size() == 15);
assertNull(loadBalancer.namedQueueRecorder);
Queue<BalancerRegionLoad> loads = loadBalancer.loads.get(regionNameAsString);
int i = 0;
@ -203,6 +204,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
double expected = 1 - expectedLocalities[test];
assertEquals(expected, cost, 0.001);
}
assertNull(loadBalancer.namedQueueRecorder);
}
@Test

View File

@ -234,7 +234,7 @@ public class TestNamedQueueRecorder {
private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
namedQueueGetRequest.setSlowLogResponseRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);

View File

@ -101,7 +101,7 @@ public class TestSlowLogAccessor {
private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads(
AdminProtos.SlowLogResponseRequest request) {
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
namedQueueGetRequest.setSlowLogResponseRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);

View File

@ -1496,7 +1496,7 @@ module Hbase
#----------------------------------------------------------------------------------------------
# Retrieve SlowLog Responses from RegionServers
def get_slowlog_responses(server_names, args)
def get_slowlog_responses(server_names, args, is_large_log = false)
unless server_names.is_a?(Array) || server_names.is_a?(String)
raise(ArgumentError,
"#{server_names.class} of #{server_names.inspect} is not of Array/String type")
@ -1508,38 +1508,44 @@ module Hbase
server_names = getServerNames(server_names_list, false)
end
filter_params = get_filter_params(args)
filter_params.setType(org.apache.hadoop.hbase.client.LogQueryFilter::Type::SLOW_LOG)
slow_log_responses = @admin.getSlowLogResponses(java.util.HashSet.new(server_names),
if args.key? 'LIMIT'
limit = args['LIMIT']
else
limit = 10
end
if is_large_log
log_type = 'LARGE_LOG'
else
log_type = 'SLOW_LOG'
end
log_dest = org.apache.hadoop.hbase.client.ServerType::REGION_SERVER
server_names_set = java.util.HashSet.new(server_names)
slow_log_responses = @admin.getLogEntries(server_names_set, log_type, log_dest, limit,
filter_params)
slow_log_responses_arr = []
for slow_log_response in slow_log_responses
slow_log_responses.each { |slow_log_response|
slow_log_responses_arr << slow_log_response.toJsonPrettyPrint
end
puts 'Retrieved SlowLog Responses from RegionServers'
puts slow_log_responses_arr
}
slow_log_responses_arr
end
def get_filter_params(args)
filter_params = org.apache.hadoop.hbase.client.LogQueryFilter.new
filter_params = java.util.HashMap.new
if args.key? 'REGION_NAME'
region_name = args['REGION_NAME']
filter_params.setRegionName(region_name)
filter_params.put('regionName', region_name)
end
if args.key? 'TABLE_NAME'
table_name = args['TABLE_NAME']
filter_params.setTableName(table_name)
filter_params.put('tableName', table_name)
end
if args.key? 'CLIENT_IP'
client_ip = args['CLIENT_IP']
filter_params.setClientAddress(client_ip)
client_address = args['CLIENT_IP']
filter_params.put('clientAddress', client_address)
end
if args.key? 'USER'
user = args['USER']
filter_params.setUserName(user)
end
if args.key? 'LIMIT'
limit = args['LIMIT']
filter_params.setLimit(limit)
filter_params.put('userName', user)
end
if args.key? 'FILTER_BY_OP'
filter_by_op = args['FILTER_BY_OP']
@ -1547,38 +1553,12 @@ module Hbase
raise(ArgumentError, "FILTER_BY_OP should be either OR / AND")
end
if filter_by_op == 'AND'
filter_params.setFilterByOperator(
org.apache.hadoop.hbase.client.LogQueryFilter::FilterByOperator::AND)
filter_params.put('filterByOperator', 'AND')
end
end
filter_params
end
#----------------------------------------------------------------------------------------------
# Retrieve LargeLog Responses from RegionServers
def get_largelog_responses(server_names, args)
unless server_names.is_a?(Array) || server_names.is_a?(String)
raise(ArgumentError,
"#{server_names.class} of #{server_names.inspect} is not of Array/String type")
end
if server_names == '*'
server_names = getServerNames([], true)
else
server_names_list = to_server_names(server_names)
server_names = getServerNames(server_names_list, false)
end
filter_params = get_filter_params(args)
filter_params.setType(org.apache.hadoop.hbase.client.LogQueryFilter::Type::LARGE_LOG)
large_log_responses = @admin.getSlowLogResponses(java.util.HashSet.new(server_names),
filter_params)
large_log_responses_arr = []
for large_log_response in large_log_responses
large_log_responses_arr << large_log_response.toJsonPrettyPrint
end
puts 'Retrieved LargeLog Responses from RegionServers'
puts large_log_responses_arr
end
#----------------------------------------------------------------------------------------------
# Clears SlowLog Responses from RegionServers
def clear_slowlog_responses(server_names)
@ -1667,6 +1647,24 @@ module Hbase
@admin.recommissionRegionServer(server_name, region_names_in_bytes)
end
#----------------------------------------------------------------------------------------------
# Retrieve latest balancer decisions made by LoadBalancers
def get_balancer_decisions(args)
if args.key? 'LIMIT'
limit = args['LIMIT']
else
limit = 250
end
log_type = 'BALANCER_DECISION'
log_dest = org.apache.hadoop.hbase.client.ServerType::MASTER
balancer_decisions_responses = @admin.getLogEntries(nil, log_type, log_dest, limit, nil)
balancer_decisions_resp_arr = []
balancer_decisions_responses.each { |balancer_dec_resp|
balancer_decisions_resp_arr << balancer_dec_resp.toJsonPrettyPrint
}
balancer_decisions_resp_arr
end
#----------------------------------------------------------------------------------------------
# Stop the active Master
def stop_master

View File

@ -338,6 +338,7 @@ Shell.load_command_group(
compact
compaction_switch
flush
get_balancer_decisions
get_slowlog_responses
get_largelog_responses
major_compact

View File

@ -0,0 +1,49 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with this
# work for additional information regarding copyright ownership. The ASF
# licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Retrieve latest balancer decisions maintained in memory by HMaster
module Shell
module Commands
# Retrieve latest large log responses
class GetBalancerDecisions < Command
def help
<<-EOF
Retrieve latest balancer decisions made by LoadBalancers.
Examples:
hbase> get_balancer_decisions => Retrieve recent balancer decisions with
region plans
hbase> get_balancer_decisions LIMIT => 10 => Retrieve 10 most recent balancer decisions
with region plans
EOF
end
def command(args = {})
unless args.is_a? Hash
raise 'Filter parameters are not Hash'
end
balancer_decisions_resp_arr = admin.get_balancer_decisions(args)
puts 'Retrieved BalancerDecision Responses'
puts balancer_decisions_resp_arr
end
end
end
end

View File

@ -90,8 +90,9 @@ echo "get_largelog_responses '*'" | hbase shell > xyz.out 2>&1
unless args.is_a? Hash
raise 'Filter parameters are not Hash'
end
admin.get_largelog_responses(server_names, args)
large_log_responses_arr = admin.get_slowlog_responses(server_names, args, true)
puts 'Retrieved LargeLog Responses from RegionServers'
puts large_log_responses_arr
end
end
end

View File

@ -90,8 +90,9 @@ echo "get_slowlog_responses '*'" | hbase shell > xyz.out 2>&1
unless args.is_a? Hash
raise 'Filter parameters are not Hash'
end
admin.get_slowlog_responses(server_names, args)
slow_log_responses_arr = admin.get_slowlog_responses(server_names, args)
puts 'Retrieved SlowLog Responses from RegionServers'
puts slow_log_responses_arr
end
end
end

View File

@ -386,8 +386,8 @@ module Hbase
#-------------------------------------------------------------------------------
define_test 'get slowlog responses should work' do
output = capture_stdout { command(:get_slowlog_responses, '*', {}) }
assert(output.include?('Retrieved SlowLog Responses from RegionServers'))
output = command(:get_slowlog_responses, '*', {})
assert(output.nil?)
end
#-------------------------------------------------------------------------------

View File

@ -44,10 +44,12 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.CompactType;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.LogEntry;
import org.apache.hadoop.hbase.client.LogQueryFilter;
import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
import org.apache.hadoop.hbase.client.OnlineLogRecord;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.ServerType;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.SnapshotType;
import org.apache.hadoop.hbase.client.TableDescriptor;
@ -1438,4 +1440,10 @@ public class ThriftAdmin implements Admin {
return splitRegionAsync(regionName, null);
}
@Override
public List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType,
ServerType serverType, int limit, Map<String, Object> filterParams)
throws IOException {
throw new NotImplementedException("getLogEntries not supported in ThriftAdmin");
}
}

View File

@ -2308,3 +2308,17 @@ The percent of region server RPC threads failed to abort RS.
.Default
`false`
[[hbase.master.balancer.decision.buffer.enabled]]
*`hbase.master.balancer.decision.buffer.enabled`*::
+
.Description
Indicates whether active HMaster has ring buffer running for storing
balancer decisions in FIFO manner with limited entries. The size of
the ring buffer is indicated by config:
hbase.master.balancer.decision.queue.size
+
.Default
`false`