diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index d76ca51d08f..e59c6cb94d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -23,12 +23,14 @@ import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CacheEvictionStats; @@ -3202,9 +3204,30 @@ public interface Admin extends Abortable, Closeable { * @param logQueryFilter filter to be used if provided (determines slow / large RPC logs) * @return online slowlog response list * @throws IOException if a remote or network exception occurs + * @deprecated since 2.4.0 and will be removed in 4.0.0. + * Use {@link #getLogEntries(Set, String, ServerType, int, Map)} instead. */ - List getSlowLogResponses(final Set serverNames, - final LogQueryFilter logQueryFilter) throws IOException; + @Deprecated + default List getSlowLogResponses(final Set serverNames, + final LogQueryFilter logQueryFilter) throws IOException { + String logType; + if (LogQueryFilter.Type.LARGE_LOG.equals(logQueryFilter.getType())) { + logType = "LARGE_LOG"; + } else { + logType = "SLOW_LOG"; + } + Map filterParams = new HashMap<>(); + filterParams.put("regionName", logQueryFilter.getRegionName()); + filterParams.put("clientAddress", logQueryFilter.getClientAddress()); + filterParams.put("tableName", logQueryFilter.getTableName()); + filterParams.put("userName", logQueryFilter.getUserName()); + filterParams.put("filterByOperator", logQueryFilter.getFilterByOperator().toString()); + List logEntries = + getLogEntries(serverNames, logType, ServerType.REGION_SERVER, logQueryFilter.getLimit(), + filterParams); + return logEntries.stream().map(logEntry -> (OnlineLogRecord) logEntry) + .collect(Collectors.toList()); + } /** * Clears online slow/large RPC logs from the provided list of @@ -3218,4 +3241,21 @@ public interface Admin extends Abortable, Closeable { List clearSlowLogResponses(final Set serverNames) throws IOException; + + /** + * Retrieve recent online records from HMaster / RegionServers. + * Examples include slow/large RPC logs, balancer decisions by master. + * + * @param serverNames servers to retrieve records from, useful in case of records maintained + * by RegionServer as we can select specific server. In case of servertype=MASTER, logs will + * only come from the currently active master. + * @param logType string representing type of log records + * @param serverType enum for server type: HMaster or RegionServer + * @param limit put a limit to list of records that server should send in response + * @param filterParams additional filter params + * @return Log entries representing online records from servers + * @throws IOException if a remote or network exception occurs + */ + List getLogEntries(Set serverNames, String logType, + ServerType serverType, int limit, Map filterParams) throws IOException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index b37785d2eb6..27d15533d75 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -21,6 +21,7 @@ import com.google.protobuf.RpcChannel; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -28,6 +29,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.CacheEvictionStats; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics.Option; @@ -1515,11 +1517,33 @@ public interface AsyncAdmin { * RegionServers * * @param serverNames Server names to get slowlog responses from - * @param slowLogQueryFilter filter to be used if provided + * @param logQueryFilter filter to be used if provided * @return Online slowlog response list. The return value wrapped by a {@link CompletableFuture} + * @deprecated since 2.4.0 and will be removed in 4.0.0. + * Use {@link #getLogEntries(Set, String, ServerType, int, Map)} instead. */ - CompletableFuture> getSlowLogResponses(final Set serverNames, - final LogQueryFilter slowLogQueryFilter); + @Deprecated + default CompletableFuture> getSlowLogResponses( + final Set serverNames, final LogQueryFilter logQueryFilter) { + String logType; + if (LogQueryFilter.Type.LARGE_LOG.equals(logQueryFilter.getType())) { + logType = "LARGE_LOG"; + } else { + logType = "SLOW_LOG"; + } + Map filterParams = new HashMap<>(); + filterParams.put("regionName", logQueryFilter.getRegionName()); + filterParams.put("clientAddress", logQueryFilter.getClientAddress()); + filterParams.put("tableName", logQueryFilter.getTableName()); + filterParams.put("userName", logQueryFilter.getUserName()); + filterParams.put("filterByOperator", logQueryFilter.getFilterByOperator().toString()); + CompletableFuture> logEntries = + getLogEntries(serverNames, logType, ServerType.REGION_SERVER, logQueryFilter.getLimit(), + filterParams); + return logEntries.thenApply( + logEntryList -> logEntryList.stream().map(logEntry -> (OnlineLogRecord) logEntry) + .collect(Collectors.toList())); + } /** * Clears online slow RPC logs from the provided list of @@ -1531,4 +1555,19 @@ public interface AsyncAdmin { */ CompletableFuture> clearSlowLogResponses(final Set serverNames); + /** + * Retrieve recent online records from HMaster / RegionServers. + * Examples include slow/large RPC logs, balancer decisions by master. + * + * @param serverNames servers to retrieve records from, useful in case of records maintained + * by RegionServer as we can select specific server. In case of servertype=MASTER, logs will + * only come from the currently active master. + * @param logType string representing type of log records + * @param serverType enum for server type: HMaster or RegionServer + * @param limit put a limit to list of records that server should send in response + * @param filterParams additional filter params + * @return Log entries representing online records from servers + */ + CompletableFuture> getLogEntries(Set serverNames, String logType, + ServerType serverType, int limit, Map filterParams); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 29bf6ea043f..7c51ed0a74b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -847,15 +847,15 @@ class AsyncHBaseAdmin implements AsyncAdmin { return wrap(rawAdmin.isSnapshotCleanupEnabled()); } - @Override - public CompletableFuture> getSlowLogResponses( - final Set serverNames, final LogQueryFilter logQueryFilter) { - return wrap(rawAdmin.getSlowLogResponses(serverNames, logQueryFilter)); - } - @Override public CompletableFuture> clearSlowLogResponses(Set serverNames) { return wrap(rawAdmin.clearSlowLogResponses(serverNames)); } + @Override + public CompletableFuture> getLogEntries(Set serverNames, + String logType, ServerType serverType, int limit, + Map filterParams) { + return wrap(rawAdmin.getLogEntries(serverNames, logType, serverType, limit, filterParams)); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BalancerDecision.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BalancerDecision.java new file mode 100644 index 00000000000..e2bf2e28e0e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BalancerDecision.java @@ -0,0 +1,152 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.util.List; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.hadoop.hbase.util.GsonUtil; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +import org.apache.hbase.thirdparty.com.google.gson.Gson; +import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer; + +/** + * History of balancer decisions taken for region movements. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +final public class BalancerDecision extends LogEntry { + + private final String initialFunctionCosts; + private final String finalFunctionCosts; + private final double initTotalCost; + private final double computedTotalCost; + private final long computedSteps; + private final List regionPlans; + + // used to convert object to pretty printed format + // used by toJsonPrettyPrint() + private static final Gson GSON = GsonUtil.createGson() + .setPrettyPrinting() + .registerTypeAdapter(BalancerDecision.class, (JsonSerializer) + (balancerDecision, type, jsonSerializationContext) -> { + Gson gson = new Gson(); + return gson.toJsonTree(balancerDecision); + }).create(); + + private BalancerDecision(String initialFunctionCosts, String finalFunctionCosts, + double initTotalCost, double computedTotalCost, List regionPlans, + long computedSteps) { + this.initialFunctionCosts = initialFunctionCosts; + this.finalFunctionCosts = finalFunctionCosts; + this.initTotalCost = initTotalCost; + this.computedTotalCost = computedTotalCost; + this.regionPlans = regionPlans; + this.computedSteps = computedSteps; + } + + public String getInitialFunctionCosts() { + return initialFunctionCosts; + } + + public String getFinalFunctionCosts() { + return finalFunctionCosts; + } + + public double getInitTotalCost() { + return initTotalCost; + } + + public double getComputedTotalCost() { + return computedTotalCost; + } + + public List getRegionPlans() { + return regionPlans; + } + + public long getComputedSteps() { + return computedSteps; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("initialFunctionCosts", initialFunctionCosts) + .append("finalFunctionCosts", finalFunctionCosts) + .append("initTotalCost", initTotalCost) + .append("computedTotalCost", computedTotalCost) + .append("computedSteps", computedSteps) + .append("regionPlans", regionPlans) + .toString(); + } + + @Override + public String toJsonPrettyPrint() { + return GSON.toJson(this); + } + + public static class Builder { + private String initialFunctionCosts; + private String finalFunctionCosts; + private double initTotalCost; + private double computedTotalCost; + private long computedSteps; + private List regionPlans; + + public Builder setInitialFunctionCosts(String initialFunctionCosts) { + this.initialFunctionCosts = initialFunctionCosts; + return this; + } + + public Builder setFinalFunctionCosts(String finalFunctionCosts) { + this.finalFunctionCosts = finalFunctionCosts; + return this; + } + + public Builder setInitTotalCost(double initTotalCost) { + this.initTotalCost = initTotalCost; + return this; + } + + public Builder setComputedTotalCost(double computedTotalCost) { + this.computedTotalCost = computedTotalCost; + return this; + } + + public Builder setRegionPlans(List regionPlans) { + this.regionPlans = regionPlans; + return this; + } + + public Builder setComputedSteps(long computedSteps) { + this.computedSteps = computedSteps; + return this; + } + + public BalancerDecision build() { + return new BalancerDecision(initialFunctionCosts, finalFunctionCosts, + initTotalCost, computedTotalCost, regionPlans, computedSteps); + } + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index b3b7b7db486..be85011e665 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -101,6 +101,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Has import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; @@ -1825,6 +1826,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable { HasUserPermissionsRequest request) throws ServiceException { return stub.hasUserPermissions(controller, request); } + + @Override + public HBaseProtos.LogEntry getLogEntries(RpcController controller, + HBaseProtos.LogRequest request) throws ServiceException { + return stub.getLogEntries(controller, request); + } }; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 03ada1713b3..b652143d50f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -4370,15 +4370,15 @@ public class HBaseAdmin implements Admin { } - @Override - public List getSlowLogResponses(@Nullable final Set serverNames, - final LogQueryFilter logQueryFilter) throws IOException { + private List getSlowLogResponses( + final Map filterParams, final Set serverNames, final int limit, + final String logType) { if (CollectionUtils.isEmpty(serverNames)) { return Collections.emptyList(); } return serverNames.stream().map(serverName -> { try { - return getSlowLogResponseFromServer(serverName, logQueryFilter); + return getSlowLogResponseFromServer(serverName, filterParams, limit, logType); } catch (IOException e) { throw new RuntimeException(e); } @@ -4386,29 +4386,17 @@ public class HBaseAdmin implements Admin { ).flatMap(List::stream).collect(Collectors.toList()); } - private List getSlowLogResponseFromServer(final ServerName serverName, - final LogQueryFilter logQueryFilter) throws IOException { - return getSlowLogResponsesFromServer(this.connection.getAdmin(serverName), logQueryFilter); - } - - private List getSlowLogResponsesFromServer(AdminService.BlockingInterface admin, - LogQueryFilter logQueryFilter) throws IOException { - return executeCallable(new RpcRetryingCallable>() { + private List getSlowLogResponseFromServer(ServerName serverName, + Map filterParams, int limit, String logType) throws IOException { + AdminService.BlockingInterface admin = this.connection.getAdmin(serverName); + return executeCallable(new RpcRetryingCallable>() { @Override - protected List rpcCall(int callTimeout) throws Exception { + protected List rpcCall(int callTimeout) throws Exception { HBaseRpcController controller = rpcControllerFactory.newController(); - if (logQueryFilter.getType() == null - || logQueryFilter.getType() == LogQueryFilter.Type.SLOW_LOG) { - AdminProtos.SlowLogResponses slowLogResponses = - admin.getSlowLogResponses(controller, - RequestConverter.buildSlowLogResponseRequest(logQueryFilter)); - return ProtobufUtil.toSlowLogPayloads(slowLogResponses); - } else { - AdminProtos.SlowLogResponses slowLogResponses = - admin.getLargeLogResponses(controller, - RequestConverter.buildSlowLogResponseRequest(logQueryFilter)); - return ProtobufUtil.toSlowLogPayloads(slowLogResponses); - } + HBaseProtos.LogRequest logRequest = + RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType); + HBaseProtos.LogEntry logEntry = admin.getLogEntries(controller, logRequest); + return ProtobufUtil.toSlowLogPayloads(logEntry); } }); } @@ -4428,6 +4416,39 @@ public class HBaseAdmin implements Admin { }).collect(Collectors.toList()); } + @Override + public List getLogEntries(Set serverNames, String logType, + ServerType serverType, int limit, Map filterParams) throws IOException { + if (logType == null || serverType == null) { + throw new IllegalArgumentException("logType and/or serverType cannot be empty"); + } + if (logType.equals("SLOW_LOG") || logType.equals("LARGE_LOG")) { + if (ServerType.MASTER.equals(serverType)) { + throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster"); + } + return getSlowLogResponses(filterParams, serverNames, limit, logType); + } else if (logType.equals("BALANCER_DECISION")) { + if (ServerType.REGION_SERVER.equals(serverType)) { + throw new IllegalArgumentException( + "Balancer Decision logs are not maintained by HRegionServer"); + } + return getBalancerDecisions(limit); + } + return Collections.emptyList(); + } + + private List getBalancerDecisions(final int limit) throws IOException { + return executeCallable(new MasterCallable>(getConnection(), + getRpcControllerFactory()) { + @Override + protected List rpcCall() throws Exception { + HBaseProtos.LogEntry logEntry = + master.getLogEntries(getRpcController(), ProtobufUtil.toBalancerDecisionRequest(limit)); + return ProtobufUtil.toBalancerDecisionResponse(logEntry); + } + }); + } + private Boolean clearSlowLogsResponses(final ServerName serverName) throws IOException { AdminService.BlockingInterface admin = this.connection.getAdmin(serverName); return executeCallable(new RpcRetryingCallable() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LogEntry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LogEntry.java new file mode 100644 index 00000000000..41f79cf8e81 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LogEntry.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * Abstract response class representing online logs response from ring-buffer use-cases + * e.g slow/large RPC logs, balancer decision logs + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public abstract class LogEntry { + + /** + * Based on response sent by server, provide pretty printed Json representation in string + * @return Pretty printed Json representation + */ + public abstract String toJsonPrettyPrint(); + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LogQueryFilter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LogQueryFilter.java index 728aba4a60b..506fc4f7652 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LogQueryFilter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LogQueryFilter.java @@ -23,12 +23,16 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; /** * Slow/Large Log Query Filter with all filter and limit parameters - * Used by Admin API: getSlowLogResponses + * Extends generic LogRequest used by Admin API getLogEntries + * @deprecated as of 2.4.0. Will be removed in 4.0.0. */ -@InterfaceAudience.Private +@InterfaceAudience.Public +@InterfaceStability.Evolving +@Deprecated public class LogQueryFilter { private String regionName; @@ -153,4 +157,5 @@ public class LogQueryFilter { .append("filterByOperator", filterByOperator) .toString(); } + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java index 8af001369d6..115e55f336f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.hadoop.hbase.util.GsonUtil; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; import org.apache.hbase.thirdparty.com.google.gson.Gson; import org.apache.hbase.thirdparty.com.google.gson.JsonObject; @@ -33,8 +34,9 @@ import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer; * Slow/Large Log payload for hbase-client, to be used by Admin API get_slow_responses and * get_large_responses */ -@InterfaceAudience.Private -final public class OnlineLogRecord { +@InterfaceAudience.Public +@InterfaceStability.Evolving +final public class OnlineLogRecord extends LogEntry { // used to convert object to pretty printed format // used by toJsonPrettyPrint() @@ -56,22 +58,22 @@ final public class OnlineLogRecord { return jsonObj; }).create(); - private long startTime; - private int processingTime; - private int queueTime; - private long responseSize; - private String clientAddress; - private String serverClass; - private String methodName; - private String callDetails; - private String param; + private final long startTime; + private final int processingTime; + private final int queueTime; + private final long responseSize; + private final String clientAddress; + private final String serverClass; + private final String methodName; + private final String callDetails; + private final String param; // we don't want to serialize region name, it is just for the filter purpose // hence avoiding deserialization - private transient String regionName; - private String userName; - private int multiGetsCount; - private int multiMutationsCount; - private int multiServiceCalls; + private final transient String regionName; + private final String userName; + private final int multiGetsCount; + private final int multiMutationsCount; + private final int multiServiceCalls; public long getStartTime() { return startTime; @@ -293,6 +295,7 @@ final public class OnlineLogRecord { .toHashCode(); } + @Override public String toJsonPrettyPrint() { return GSON.toJson(this); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 392447a4063..7f939141bec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -3906,49 +3906,26 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { .call(); } - @Override - public CompletableFuture> getSlowLogResponses( - @Nullable final Set serverNames, final LogQueryFilter logQueryFilter) { + private CompletableFuture> getSlowLogResponses( + final Map filterParams, final Set serverNames, final int limit, + final String logType) { if (CollectionUtils.isEmpty(serverNames)) { return CompletableFuture.completedFuture(Collections.emptyList()); } - if (logQueryFilter.getType() == null - || logQueryFilter.getType() == LogQueryFilter.Type.SLOW_LOG) { - return CompletableFuture.supplyAsync(() -> serverNames.stream() - .map((ServerName serverName) -> - getSlowLogResponseFromServer(serverName, logQueryFilter)) - .map(CompletableFuture::join) - .flatMap(List::stream) - .collect(Collectors.toList())); - } else { - return CompletableFuture.supplyAsync(() -> serverNames.stream() - .map((ServerName serverName) -> - getLargeLogResponseFromServer(serverName, logQueryFilter)) - .map(CompletableFuture::join) - .flatMap(List::stream) - .collect(Collectors.toList())); - } + return CompletableFuture.supplyAsync(() -> serverNames.stream() + .map((ServerName serverName) -> + getSlowLogResponseFromServer(serverName, filterParams, limit, logType)) + .map(CompletableFuture::join) + .flatMap(List::stream) + .collect(Collectors.toList())); } - private CompletableFuture> getLargeLogResponseFromServer( - final ServerName serverName, final LogQueryFilter logQueryFilter) { - return this.>newAdminCaller() - .action((controller, stub) -> this - .adminCall( - controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter), - AdminService.Interface::getLargeLogResponses, - ProtobufUtil::toSlowLogPayloads)) - .serverName(serverName).call(); - } - - private CompletableFuture> getSlowLogResponseFromServer( - final ServerName serverName, final LogQueryFilter logQueryFilter) { - return this.>newAdminCaller() - .action((controller, stub) -> this - .adminCall( - controller, stub, RequestConverter.buildSlowLogResponseRequest(logQueryFilter), - AdminService.Interface::getSlowLogResponses, - ProtobufUtil::toSlowLogPayloads)) + private CompletableFuture> getSlowLogResponseFromServer(ServerName serverName, + Map filterParams, int limit, String logType) { + return this.>newAdminCaller().action((controller, stub) -> this + .adminCall(controller, stub, + RequestConverter.buildSlowLogResponseRequest(filterParams, limit, logType), + AdminService.Interface::getLogEntries, ProtobufUtil::toSlowLogPayloads)) .serverName(serverName).call(); } @@ -3985,4 +3962,34 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { ); } + private CompletableFuture> getBalancerDecisions(final int limit) { + return this.>newMasterCaller() + .action((controller, stub) -> + this.call(controller, stub, + ProtobufUtil.toBalancerDecisionRequest(limit), + MasterService.Interface::getLogEntries, ProtobufUtil::toBalancerDecisionResponse)) + .call(); + } + + @Override + public CompletableFuture> getLogEntries(Set serverNames, + String logType, ServerType serverType, int limit, + Map filterParams) { + if (logType == null || serverType == null) { + throw new IllegalArgumentException("logType and/or serverType cannot be empty"); + } + if (logType.equals("SLOW_LOG") || logType.equals("LARGE_LOG")) { + if (ServerType.MASTER.equals(serverType)) { + throw new IllegalArgumentException("Slow/Large logs are not maintained by HMaster"); + } + return getSlowLogResponses(filterParams, serverNames, limit, logType); + } else if (logType.equals("BALANCER_DECISION")) { + if (ServerType.REGION_SERVER.equals(serverType)) { + throw new IllegalArgumentException( + "Balancer Decision logs are not maintained by HRegionServer"); + } + return getBalancerDecisions(limit); + } + return CompletableFuture.completedFuture(Collections.emptyList()); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerType.java new file mode 100644 index 00000000000..1d1bf6e3c6d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerType.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Select server type i.e destination for RPC request associated with ring buffer. + * e.g slow/large log records are maintained by HRegionServer, whereas balancer decisions + * are maintained by HMaster. + */ +@InterfaceAudience.Public +public enum ServerType { + MASTER, + REGION_SERVER +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java index 8d9c9b7ab22..b2055041f7c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Rev import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.RevokeResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; - +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; @@ -472,6 +472,12 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection { return stub.listNamespaces(controller, request); } + @Override + public HBaseProtos.LogEntry getLogEntries(RpcController controller, + HBaseProtos.LogRequest request) throws ServiceException { + return stub.getLogEntries(controller, request); + } + @Override public MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion( RpcController controller, MajorCompactionTimestampForRegionRequest request) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index b9231bc97ad..d9aa0eae945 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.security.AccessController; @@ -66,6 +67,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.BalancerDecision; import org.apache.hadoop.hbase.client.CheckAndMutate; import org.apache.hadoop.hbase.client.ClientUtil; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; @@ -77,6 +79,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.LogEntry; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.OnlineLogRecord; import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor; @@ -133,6 +136,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; @@ -191,6 +195,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableD import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; @@ -3435,7 +3440,7 @@ public final class ProtobufUtil { * @param slowLogPayload SlowLog Payload protobuf instance * @return SlowLog Payload for client usecase */ - private static OnlineLogRecord getSlowLogRecord( + private static LogEntry getSlowLogRecord( final TooSlowLog.SlowLogPayload slowLogPayload) { OnlineLogRecord onlineLogRecord = new OnlineLogRecord.OnlineLogRecordBuilder() .setCallDetails(slowLogPayload.getCallDetails()) @@ -3459,14 +3464,26 @@ public final class ProtobufUtil { /** * Convert AdminProtos#SlowLogResponses to list of {@link OnlineLogRecord} * - * @param slowLogResponses slowlog response protobuf instance + * @param logEntry slowlog response protobuf instance * @return list of SlowLog payloads for client usecase */ - public static List toSlowLogPayloads( - final AdminProtos.SlowLogResponses slowLogResponses) { - List slowLogRecords = slowLogResponses.getSlowLogPayloadsList() - .stream().map(ProtobufUtil::getSlowLogRecord).collect(Collectors.toList()); - return slowLogRecords; + public static List toSlowLogPayloads( + final HBaseProtos.LogEntry logEntry) { + try { + final String logClassName = logEntry.getLogClassName(); + Class logClass = Class.forName(logClassName).asSubclass(Message.class); + Method method = logClass.getMethod("parseFrom", ByteString.class); + if (logClassName.contains("SlowLogResponses")) { + AdminProtos.SlowLogResponses slowLogResponses = (AdminProtos.SlowLogResponses) method + .invoke(null, logEntry.getLogMessage()); + return slowLogResponses.getSlowLogPayloadsList().stream() + .map(ProtobufUtil::getSlowLogRecord).collect(Collectors.toList()); + } + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException + | InvocationTargetException e) { + throw new RuntimeException("Error while retrieving response from server"); + } + throw new RuntimeException("Invalid response from server"); } /** @@ -3551,4 +3568,49 @@ public final class ProtobufUtil { throw new DoNotRetryIOException(e.getMessage()); } } + + public static List toBalancerDecisionResponse( + HBaseProtos.LogEntry logEntry) { + try { + final String logClassName = logEntry.getLogClassName(); + Class logClass = Class.forName(logClassName).asSubclass(Message.class); + Method method = logClass.getMethod("parseFrom", ByteString.class); + if (logClassName.contains("BalancerDecisionsResponse")) { + MasterProtos.BalancerDecisionsResponse response = + (MasterProtos.BalancerDecisionsResponse) method + .invoke(null, logEntry.getLogMessage()); + return getBalancerDecisionEntries(response); + } + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException + | InvocationTargetException e) { + throw new RuntimeException("Error while retrieving response from server"); + } + throw new RuntimeException("Invalid response from server"); + } + + public static List getBalancerDecisionEntries( + MasterProtos.BalancerDecisionsResponse response) { + List balancerDecisions = response.getBalancerDecisionList(); + if (CollectionUtils.isEmpty(balancerDecisions)) { + return Collections.emptyList(); + } + return balancerDecisions.stream().map(balancerDecision -> new BalancerDecision.Builder() + .setInitTotalCost(balancerDecision.getInitTotalCost()) + .setInitialFunctionCosts(balancerDecision.getInitialFunctionCosts()) + .setComputedTotalCost(balancerDecision.getComputedTotalCost()) + .setFinalFunctionCosts(balancerDecision.getFinalFunctionCosts()) + .setComputedSteps(balancerDecision.getComputedSteps()) + .setRegionPlans(balancerDecision.getRegionPlansList()).build()) + .collect(Collectors.toList()); + } + + public static HBaseProtos.LogRequest toBalancerDecisionRequest(int limit) { + MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest = + MasterProtos.BalancerDecisionsRequest.newBuilder().setLimit(limit).build(); + return HBaseProtos.LogRequest.newBuilder() + .setLogClassName(balancerDecisionsRequest.getClass().getName()) + .setLogMessage(balancerDecisionsRequest.toByteString()) + .build(); + } + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index d64968a68fc..825e914ed32 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.LogQueryFilter; import org.apache.hadoop.hbase.client.MasterSwitchType; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.NormalizeTableFilterParams; @@ -71,6 +70,8 @@ import org.apache.hadoop.security.token.Token; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; + import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest; @@ -2068,45 +2069,67 @@ public final class RequestConverter { } /** - * Create a protocol buffer {@link SlowLogResponseRequest} + * Build RPC request payload for getLogEntries * - * @param logQueryFilter filter to use if provided - * @return a protocol buffer SlowLogResponseRequest + * @param filterParams map of filter params + * @param limit limit for no of records that server returns + * @param logType type of the log records + * @return request payload {@link HBaseProtos.LogRequest} */ - public static SlowLogResponseRequest buildSlowLogResponseRequest( - final LogQueryFilter logQueryFilter) { + public static HBaseProtos.LogRequest buildSlowLogResponseRequest( + final Map filterParams, final int limit, final String logType) { SlowLogResponseRequest.Builder builder = SlowLogResponseRequest.newBuilder(); - if (logQueryFilter == null) { - return builder.build(); + builder.setLimit(limit); + if (logType.equals("SLOW_LOG")) { + builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG); + } else if (logType.equals("LARGE_LOG")) { + builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG); } - final String clientAddress = logQueryFilter.getClientAddress(); - if (StringUtils.isNotEmpty(clientAddress)) { - builder.setClientAddress(clientAddress); + boolean filterByAnd = false; + if (MapUtils.isNotEmpty(filterParams)) { + if (filterParams.containsKey("clientAddress")) { + final String clientAddress = (String) filterParams.get("clientAddress"); + if (StringUtils.isNotEmpty(clientAddress)) { + builder.setClientAddress(clientAddress); + } + } + if (filterParams.containsKey("regionName")) { + final String regionName = (String) filterParams.get("regionName"); + if (StringUtils.isNotEmpty(regionName)) { + builder.setRegionName(regionName); + } + } + if (filterParams.containsKey("tableName")) { + final String tableName = (String) filterParams.get("tableName"); + if (StringUtils.isNotEmpty(tableName)) { + builder.setTableName(tableName); + } + } + if (filterParams.containsKey("userName")) { + final String userName = (String) filterParams.get("userName"); + if (StringUtils.isNotEmpty(userName)) { + builder.setUserName(userName); + } + } + if (filterParams.containsKey("filterByOperator")) { + final String filterByOperator = (String) filterParams.get("filterByOperator"); + if (StringUtils.isNotEmpty(filterByOperator)) { + if (filterByOperator.toUpperCase().equals("AND")) { + filterByAnd = true; + } + } + } } - final String regionName = logQueryFilter.getRegionName(); - if (StringUtils.isNotEmpty(regionName)) { - builder.setRegionName(regionName); - } - final String tableName = logQueryFilter.getTableName(); - if (StringUtils.isNotEmpty(tableName)) { - builder.setTableName(tableName); - } - final String userName = logQueryFilter.getUserName(); - if (StringUtils.isNotEmpty(userName)) { - builder.setUserName(userName); - } - LogQueryFilter.FilterByOperator filterByOperator = logQueryFilter.getFilterByOperator(); - if (LogQueryFilter.FilterByOperator.AND.equals(filterByOperator)) { + if (filterByAnd) { builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.AND); } else { builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.OR); } - if (LogQueryFilter.Type.SLOW_LOG.equals(logQueryFilter.getType())) { - builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG); - } else { - builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG); - } - return builder.setLimit(logQueryFilter.getLimit()).build(); + SlowLogResponseRequest slowLogResponseRequest = builder.build(); + return HBaseProtos.LogRequest.newBuilder() + .setLogClassName(slowLogResponseRequest.getClass().getName()) + .setLogMessage(slowLogResponseRequest.toByteString()) + .build(); } /** diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 5b77dfa2977..0f734bcfd74 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1998,7 +1998,7 @@ possible configurations would overwhelm and obscure the important. hbase.namedqueue.provider.classes - org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService + org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService Default values for NamedQueueService implementors. This comma separated full class names represent all implementors of NamedQueueService that we would like to be invoked by @@ -2008,4 +2008,13 @@ possible configurations would overwhelm and obscure the important. "org.apache.hadoop.hbase.namequeues.impl" + + hbase.master.balancer.decision.buffer.enabled + false + + Indicates whether active HMaster has ring buffer running for storing + balancer decisions in FIFO manner with limited entries. The size of + the ring buffer is indicated by config: hbase.master.balancer.decision.queue.size + + diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/balancer/MetricsStochasticBalancerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/balancer/MetricsStochasticBalancerSourceImpl.java index 3b4ba363b1e..de1dd81b17f 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/balancer/MetricsStochasticBalancerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/balancer/MetricsStochasticBalancerSourceImpl.java @@ -37,7 +37,7 @@ public class MetricsStochasticBalancerSourceImpl extends MetricsBalancerSourceIm private int metricsSize = 1000; private int mruCap = calcMruCap(metricsSize); - private Map> stochasticCosts = + private final Map> stochasticCosts = new LinkedHashMap>(mruCap, MRU_LOAD_FACTOR, true) { private static final long serialVersionUID = 8204713453436906599L; @@ -71,7 +71,6 @@ public class MetricsStochasticBalancerSourceImpl extends MetricsBalancerSourceIm if (tableName == null || costFunctionName == null || cost == null) { return; } - if (functionDesc != null) { costFunctionDescs.put(costFunctionName, functionDesc); } @@ -81,7 +80,6 @@ public class MetricsStochasticBalancerSourceImpl extends MetricsBalancerSourceIm if (costs == null) { costs = new ConcurrentHashMap<>(); } - costs.put(costFunctionName, cost); stochasticCosts.put(tableName, costs); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto index b5bf2ea4210..768c5356969 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto @@ -282,6 +282,13 @@ message ExecuteProceduresRequest { message ExecuteProceduresResponse { } +/** + * Slow/Large log (LogRequest) use-case specific RPC request. This request payload will be + * converted in bytes and sent to generic RPC API: GetLogEntries + * LogRequest message has two params: + * 1. log_class_name: SlowLogResponseRequest (for Slow/Large log use-case) + * 2. log_message: SlowLogResponseRequest converted in bytes (for Slow/Large log use-case) + */ message SlowLogResponseRequest { enum FilterByOperator { AND = 0; @@ -302,6 +309,13 @@ message SlowLogResponseRequest { optional LogType log_type = 7; } +/** + * Slow/Large log (LogEntry) use-case specific RPC response. This response payload will be + * converted in bytes by servers and sent as response to generic RPC API: GetLogEntries + * LogEntry message has two params: + * 1. log_class_name: SlowLogResponses (for Slow/Large log use-case) + * 2. log_message: SlowLogResponses converted in bytes (for Slow/Large log use-case) + */ message SlowLogResponses { repeated SlowLogPayload slow_log_payloads = 1; } @@ -387,4 +401,8 @@ service AdminService { rpc ClearSlowLogsResponses(ClearSlowLogResponseRequest) returns(ClearSlowLogResponses); + + rpc GetLogEntries(LogRequest) + returns(LogEntry); + } diff --git a/hbase-protocol-shaded/src/main/protobuf/HBase.proto b/hbase-protocol-shaded/src/main/protobuf/HBase.proto index 133b1c2e604..4ea158761d1 100644 --- a/hbase-protocol-shaded/src/main/protobuf/HBase.proto +++ b/hbase-protocol-shaded/src/main/protobuf/HBase.proto @@ -258,4 +258,14 @@ message RegionLocation { required RegionInfo region_info = 1; optional ServerName server_name = 2; required int64 seq_num = 3; -} \ No newline at end of file +} + +message LogRequest { + required string log_class_name = 1; + required bytes log_message = 2; +} + +message LogEntry { + required string log_class_name = 1; + required bytes log_message = 2; +} diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index 4f9d75ecaac..66b175c3085 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -37,6 +37,7 @@ import "Quota.proto"; import "Replication.proto"; import "Snapshot.proto"; import "AccessControl.proto"; +import "RecentLogs.proto"; /* Column-level protobufs */ @@ -692,6 +693,28 @@ message SwitchExceedThrottleQuotaResponse { required bool previous_exceed_throttle_quota_enabled = 1; } +/** + * BalancerDecision (LogRequest) use-case specific RPC request. This request payload will be + * converted in bytes and sent to generic RPC API: GetLogEntries + * LogRequest message has two params: + * 1. log_class_name: BalancerDecisionsRequest (for BalancerDecision use-case) + * 2. log_message: BalancerDecisionsRequest converted in bytes (for BalancerDecision use-case) + */ +message BalancerDecisionsRequest { + optional uint32 limit = 1; +} + +/** + * BalancerDecision (LogEntry) use-case specific RPC response. This response payload will be + * converted in bytes by servers and sent as response to generic RPC API: GetLogEntries + * LogEntry message has two params: + * 1. log_class_name: BalancerDecisionsResponse (for BalancerDecision use-case) + * 2. log_message: BalancerDecisionsResponse converted in bytes (for BalancerDecision use-case) + */ +message BalancerDecisionsResponse { + repeated BalancerDecision balancer_decision = 1; +} + service MasterService { /** Used by the client to get the number of regions that have received the updated schema */ rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest) @@ -1079,6 +1102,9 @@ service MasterService { /** returns a list of namespace names */ rpc ListNamespaces(ListNamespacesRequest) returns(ListNamespacesResponse); + + rpc GetLogEntries(LogRequest) + returns(LogEntry); } // HBCK Service definitions. diff --git a/hbase-protocol-shaded/src/main/protobuf/RecentLogs.proto b/hbase-protocol-shaded/src/main/protobuf/RecentLogs.proto new file mode 100644 index 00000000000..ea50b818500 --- /dev/null +++ b/hbase-protocol-shaded/src/main/protobuf/RecentLogs.proto @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto2"; + +// This file contains protocol buffers that are used for Online BalancerDecision history +// To be used as Ring Buffer payload +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated"; +option java_outer_classname = "RecentLogs"; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +message BalancerDecision { + + required string initial_function_costs = 1; + required string final_function_costs = 2; + required double init_total_cost = 3; + required double computed_total_cost = 4; + required uint64 computed_steps = 5; + repeated string region_plans = 6; + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 6d134800f01..2e286c4398d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -21,10 +21,13 @@ import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -81,6 +84,10 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable; import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; +import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; +import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; +import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.procedure.MasterProcedureManager; import org.apache.hadoop.hbase.procedure2.LockType; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -123,6 +130,8 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; @@ -324,6 +333,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaSta import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; @@ -2981,4 +2991,49 @@ public class MasterRpcServices extends RSRpcServices implements location -> response.addMetaLocations(ProtobufUtil.toRegionLocation(location)))); return response.build(); } + @Override + public HBaseProtos.LogEntry getLogEntries(RpcController controller, + HBaseProtos.LogRequest request) throws ServiceException { + try { + final String logClassName = request.getLogClassName(); + Class logClass = Class.forName(logClassName) + .asSubclass(Message.class); + Method method = logClass.getMethod("parseFrom", ByteString.class); + if (logClassName.contains("BalancerDecisionsRequest")) { + MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest = + (MasterProtos.BalancerDecisionsRequest) method + .invoke(null, request.getLogMessage()); + MasterProtos.BalancerDecisionsResponse balancerDecisionsResponse = + getBalancerDecisions(balancerDecisionsRequest); + return HBaseProtos.LogEntry.newBuilder() + .setLogClassName(balancerDecisionsResponse.getClass().getName()) + .setLogMessage(balancerDecisionsResponse.toByteString()) + .build(); + } + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException + | InvocationTargetException e) { + LOG.error("Error while retrieving log entries.", e); + throw new ServiceException(e); + } + throw new ServiceException("Invalid request params"); + } + + private MasterProtos.BalancerDecisionsResponse getBalancerDecisions( + MasterProtos.BalancerDecisionsRequest request) { + final NamedQueueRecorder namedQueueRecorder = this.regionServer.getNamedQueueRecorder(); + if (namedQueueRecorder == null) { + return MasterProtos.BalancerDecisionsResponse.newBuilder() + .addAllBalancerDecision(Collections.emptyList()).build(); + } + final NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); + namedQueueGetRequest.setNamedQueueEvent(BalancerDecisionDetails.BALANCER_DECISION_EVENT); + namedQueueGetRequest.setBalancerDecisionsRequest(request); + NamedQueueGetResponse namedQueueGetResponse = + namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); + List balancerDecisions = + namedQueueGetResponse.getBalancerDecisions(); + return MasterProtos.BalancerDecisionsResponse.newBuilder() + .addAllBalancerDecision(balancerDecisions).build(); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 7d05c41e0e0..b6ec9180bbb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; +import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Joiner; import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; @@ -73,6 +74,11 @@ import org.slf4j.LoggerFactory; */ @InterfaceAudience.Private public abstract class BaseLoadBalancer implements LoadBalancer { + + public static final String BALANCER_DECISION_BUFFER_ENABLED = + "hbase.master.balancer.decision.buffer.enabled"; + public static final boolean DEFAULT_BALANCER_DECISION_BUFFER_ENABLED = false; + protected static final int MIN_SERVER_BALANCE = 2; private volatile boolean stopped = false; @@ -85,6 +91,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer { protected boolean useRegionFinder; protected boolean isByTable = false; + /** + * Use to add balancer decision history to ring-buffer + */ + protected NamedQueueRecorder namedQueueRecorder; + private static class DefaultRackManager extends RackManager { @Override public String getRack(ServerName server) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 855087748f8..4f515072bed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.RegionMetrics; import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BalancerDecision; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionPlan; @@ -46,6 +47,8 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRe import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction; +import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; +import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -221,6 +224,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { curFunctionCosts = new Double[costFunctions.size()]; tempFunctionCosts = new Double[costFunctions.size()]; + boolean isBalancerDecisionRecording = getConf() + .getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED, + BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED); + if (this.namedQueueRecorder == null && isBalancerDecisionRecording) { + this.namedQueueRecorder = NamedQueueRecorder.getInstance(getConf()); + } + LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion + ", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" + Arrays.toString(getCostFunctionNames()) + " etc."); @@ -233,26 +243,21 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return; } - costFunctions.addAll(Arrays.stream(functionsNames) - .map(c -> { - Class klass = null; - try { - klass = (Class) Class.forName(c); - } catch (ClassNotFoundException e) { - LOG.warn("Cannot load class " + c + "': " + e.getMessage()); - } - if (null == klass) { - return null; - } - - CostFunction reflected = ReflectionUtils.newInstance(klass, conf); - LOG.info("Successfully loaded custom CostFunction '" + - reflected.getClass().getSimpleName() + "'"); - - return reflected; - }) - .filter(Objects::nonNull) - .collect(Collectors.toList())); + costFunctions.addAll(Arrays.stream(functionsNames).map(c -> { + Class klass = null; + try { + klass = (Class) Class.forName(c); + } catch (ClassNotFoundException e) { + LOG.warn("Cannot load class " + c + "': " + e.getMessage()); + } + if (null == klass) { + return null; + } + CostFunction reflected = ReflectionUtils.newInstance(klass, conf); + LOG.info( + "Successfully loaded custom CostFunction '" + reflected.getClass().getSimpleName() + "'"); + return reflected; + }).filter(Objects::nonNull).collect(Collectors.toList())); } protected void setCandidateGenerators(List customCandidateGenerators) { @@ -411,7 +416,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { curOverallCost = currentCost; System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); double initCost = currentCost; - double newCost = currentCost; + double newCost; long computedMaxSteps; if (runMaxSteps) { @@ -432,6 +437,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost=" + functionCost() + " computedMaxSteps: " + computedMaxSteps); + final String initFunctionTotalCosts = totalCostsPerFunc(); // Perform a stochastic walk to see if we can get a good fit. long step; @@ -480,6 +486,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { "{} regions; Going from a computed cost of {}" + " to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime), step, plans.size(), initCost, currentCost); + sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step); return plans; } LOG.info("Could not find a better load balance plan. Tried {} different configurations in " + @@ -488,6 +495,27 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return null; } + private void sendRegionPlansToRingBuffer(List plans, double currentCost, + double initCost, String initFunctionTotalCosts, long step) { + if (this.namedQueueRecorder != null) { + List regionPlans = new ArrayList<>(); + for (RegionPlan plan : plans) { + regionPlans.add( + "table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName() + + " , source: " + plan.getSource() + " , destination: " + plan.getDestination()); + } + BalancerDecision balancerDecision = + new BalancerDecision.Builder() + .setInitTotalCost(initCost) + .setInitialFunctionCosts(initFunctionTotalCosts) + .setComputedTotalCost(currentCost) + .setFinalFunctionCosts(totalCostsPerFunc()) + .setComputedSteps(step) + .setRegionPlans(regionPlans).build(); + namedQueueRecorder.addRecord(new BalancerDecisionDetails(balancerDecision)); + } + } + /** * update costs to JMX */ @@ -532,6 +560,23 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return builder.toString(); } + private String totalCostsPerFunc() { + StringBuilder builder = new StringBuilder(); + for (CostFunction c : costFunctions) { + if (c.getMultiplier() * c.cost() > 0.0) { + builder.append(" "); + builder.append(c.getClass().getSimpleName()); + builder.append(" : "); + builder.append(c.getMultiplier() * c.cost()); + builder.append(";"); + } + } + if (builder.length() > 0) { + builder.deleteCharAt(builder.length() - 1); + } + return builder.toString(); + } + /** * Create all of the RegionPlan's needed to move from the initial cluster state to the desired * state. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/BalancerDecisionDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/BalancerDecisionDetails.java new file mode 100644 index 00000000000..99a490d57ff --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/BalancerDecisionDetails.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.namequeues; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.hadoop.hbase.client.BalancerDecision; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Balancer decision details that would be passed on to ring buffer for history + */ +@InterfaceAudience.Private +public class BalancerDecisionDetails extends NamedQueuePayload { + + public static final int BALANCER_DECISION_EVENT = 1; + + private final BalancerDecision balancerDecision; + + public BalancerDecisionDetails(BalancerDecision balancerDecision) { + super(BALANCER_DECISION_EVENT); + this.balancerDecision = balancerDecision; + } + + public BalancerDecision getBalancerDecision() { + return balancerDecision; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("balancerDecision", balancerDecision) + .toString(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java index 7aa87fab038..eff2df9a47f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java @@ -30,16 +30,39 @@ import org.apache.yetus.audience.InterfaceAudience; public class NamedQueuePayload { public enum NamedQueueEvent { - SLOW_LOG + SLOW_LOG(0), + BALANCE_DECISION(1); + + private final int value; + + NamedQueueEvent(int i) { + this.value = i; + } + + public static NamedQueueEvent getEventByOrdinal(int value){ + switch (value) { + case 0: { + return SLOW_LOG; + } + case 1: { + return BALANCE_DECISION; + } + default: { + throw new IllegalArgumentException( + "NamedQueue event with ordinal " + value + " not defined"); + } + } + } + + public int getValue() { + return value; + } } private final NamedQueueEvent namedQueueEvent; - public NamedQueuePayload(NamedQueueEvent namedQueueEvent) { - if (namedQueueEvent == null) { - throw new RuntimeException("NamedQueuePayload with null namedQueueEvent"); - } - this.namedQueueEvent = namedQueueEvent; + public NamedQueuePayload(int eventOrdinal) { + this.namedQueueEvent = NamedQueueEvent.getEventByOrdinal(eventOrdinal); } public NamedQueueEvent getNamedQueueEvent() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java index f93baaa1046..9b422ab4889 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java @@ -43,7 +43,7 @@ final class RingBufferEnvelope { } /** - * Retrieve current rpcCall details {@link RpcLogDetails} available on Envelope and + * Retrieve current namedQueue payload {@link NamedQueuePayload} available on Envelope and * free up the Envelope * * @return Retrieve rpc log details diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java index 581d1a316aa..91ac91ea4a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java @@ -30,6 +30,8 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public class RpcLogDetails extends NamedQueuePayload { + public static final int SLOW_LOG_EVENT = 0; + private final RpcCall rpcCall; private final Message param; private final String clientAddress; @@ -40,7 +42,7 @@ public class RpcLogDetails extends NamedQueuePayload { public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize, String className, boolean isSlowLog, boolean isLargeLog) { - super(NamedQueueEvent.SLOW_LOG); + super(SLOW_LOG_EVENT); this.rpcCall = rpcCall; this.param = param; this.clientAddress = clientAddress; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java new file mode 100644 index 00000000000..e6fb982b930 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java @@ -0,0 +1,150 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.namequeues.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.BalancerDecision; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; +import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; +import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; +import org.apache.hadoop.hbase.namequeues.NamedQueueService; +import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; +import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs; +import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.collect.Queues; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.stream.Collectors; + +/** + * In-memory Queue service provider for Balancer Decision events + */ +@InterfaceAudience.Private +public class BalancerDecisionQueueService implements NamedQueueService { + + private static final Logger LOG = LoggerFactory.getLogger(BalancerDecisionQueueService.class); + + private final boolean isBalancerDecisionRecording; + + private static final String BALANCER_DECISION_QUEUE_SIZE = + "hbase.master.balancer.decision.queue.size"; + private static final int DEFAULT_BALANCER_DECISION_QUEUE_SIZE = 250; + + private static final int REGION_PLANS_THRESHOLD_PER_BALANCER = 15; + + private final Queue balancerDecisionQueue; + + public BalancerDecisionQueueService(Configuration conf) { + isBalancerDecisionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED, + BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED); + if (!isBalancerDecisionRecording) { + balancerDecisionQueue = null; + return; + } + final int queueSize = + conf.getInt(BALANCER_DECISION_QUEUE_SIZE, DEFAULT_BALANCER_DECISION_QUEUE_SIZE); + final EvictingQueue evictingQueue = + EvictingQueue.create(queueSize); + balancerDecisionQueue = Queues.synchronizedQueue(evictingQueue); + } + + @Override + public NamedQueuePayload.NamedQueueEvent getEvent() { + return NamedQueuePayload.NamedQueueEvent.BALANCE_DECISION; + } + + @Override + public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { + if (!isBalancerDecisionRecording) { + return; + } + if (!(namedQueuePayload instanceof BalancerDecisionDetails)) { + LOG.warn( + "BalancerDecisionQueueService: NamedQueuePayload is not of type BalancerDecisionDetails."); + return; + } + BalancerDecisionDetails balancerDecisionDetails = (BalancerDecisionDetails) namedQueuePayload; + BalancerDecision balancerDecisionRecords = + balancerDecisionDetails.getBalancerDecision(); + List regionPlans = balancerDecisionRecords.getRegionPlans(); + List> regionPlansList; + if (regionPlans.size() > REGION_PLANS_THRESHOLD_PER_BALANCER) { + regionPlansList = Lists.partition(regionPlans, REGION_PLANS_THRESHOLD_PER_BALANCER); + } else { + regionPlansList = Collections.singletonList(regionPlans); + } + for (List regionPlansPerBalancer : regionPlansList) { + RecentLogs.BalancerDecision balancerDecision = RecentLogs.BalancerDecision.newBuilder() + .setInitTotalCost(balancerDecisionRecords.getInitTotalCost()) + .setInitialFunctionCosts(balancerDecisionRecords.getInitialFunctionCosts()) + .setComputedTotalCost(balancerDecisionRecords.getComputedTotalCost()) + .setFinalFunctionCosts(balancerDecisionRecords.getFinalFunctionCosts()) + .setComputedSteps(balancerDecisionRecords.getComputedSteps()) + .addAllRegionPlans(regionPlansPerBalancer) + .build(); + balancerDecisionQueue.add(balancerDecision); + } + } + + @Override + public boolean clearNamedQueue() { + if (!isBalancerDecisionRecording) { + return false; + } + LOG.debug("Received request to clean up balancer decision queue."); + balancerDecisionQueue.clear(); + return true; + } + + @Override + public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { + if (!isBalancerDecisionRecording) { + return null; + } + List balancerDecisions = + Arrays.stream(balancerDecisionQueue.toArray(new RecentLogs.BalancerDecision[0])) + .collect(Collectors.toList()); + // latest records should be displayed first, hence reverse order sorting + Collections.reverse(balancerDecisions); + int limit = balancerDecisions.size(); + if (request.getBalancerDecisionsRequest().hasLimit()) { + limit = Math.min(request.getBalancerDecisionsRequest().getLimit(), balancerDecisions.size()); + } + // filter limit if provided + balancerDecisions = balancerDecisions.subList(0, limit); + final NamedQueueGetResponse namedQueueGetResponse = new NamedQueueGetResponse(); + namedQueueGetResponse.setNamedQueueEvent(BalancerDecisionDetails.BALANCER_DECISION_EVENT); + namedQueueGetResponse.setBalancerDecisions(balancerDecisions); + return namedQueueGetResponse; + } + + @Override + public void persistAll() { + // no-op for now + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java index f26ff51f9b1..c0a8e1dc4a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java @@ -19,6 +19,12 @@ package org.apache.hadoop.hbase.namequeues.impl; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.stream.Collectors; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -31,22 +37,19 @@ import org.apache.hadoop.hbase.namequeues.RpcLogDetails; import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService; import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; import org.apache.hbase.thirdparty.com.google.common.collect.Queues; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; import org.apache.hbase.thirdparty.com.google.protobuf.Message; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Queue; -import java.util.stream.Collectors; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; /** * In-memory Queue service provider for Slow/LargeLog events @@ -201,7 +204,7 @@ public class SlowLogQueueService implements NamedQueueService { slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest); } NamedQueueGetResponse response = new NamedQueueGetResponse(); - response.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); + response.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); response.setSlowLogPayloads(slowLogPayloads); return response; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java index 6e88bf474d4..182cfd1def0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.namequeues.request; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; import org.apache.yetus.audience.InterfaceAudience; /** @@ -36,6 +37,7 @@ public class NamedQueueGetRequest { private AdminProtos.SlowLogResponseRequest slowLogResponseRequest; private NamedQueuePayload.NamedQueueEvent namedQueueEvent; + private MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest; public AdminProtos.SlowLogResponseRequest getSlowLogResponseRequest() { return slowLogResponseRequest; @@ -46,12 +48,21 @@ public class NamedQueueGetRequest { this.slowLogResponseRequest = slowLogResponseRequest; } + public MasterProtos.BalancerDecisionsRequest getBalancerDecisionsRequest() { + return balancerDecisionsRequest; + } + + public void setBalancerDecisionsRequest( + MasterProtos.BalancerDecisionsRequest balancerDecisionsRequest) { + this.balancerDecisionsRequest = balancerDecisionsRequest; + } + public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() { return namedQueueEvent; } - public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { - this.namedQueueEvent = namedQueueEvent; + public void setNamedQueueEvent(int eventOrdinal) { + this.namedQueueEvent = NamedQueuePayload.NamedQueueEvent.getEventByOrdinal(eventOrdinal); } @Override @@ -59,7 +70,7 @@ public class NamedQueueGetRequest { return new ToStringBuilder(this) .append("slowLogResponseRequest", slowLogResponseRequest) .append("namedQueueEvent", namedQueueEvent) + .append("balancerDecisionsRequest", balancerDecisionsRequest) .toString(); } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java index ee4ed4394e0..224402a0079 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.namequeues.response; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs; import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; import org.apache.yetus.audience.InterfaceAudience; import java.util.List; @@ -32,6 +33,7 @@ import java.util.List; public class NamedQueueGetResponse { private List slowLogPayloads; + private List balancerDecisions; private NamedQueuePayload.NamedQueueEvent namedQueueEvent; public List getSlowLogPayloads() { @@ -42,18 +44,27 @@ public class NamedQueueGetResponse { this.slowLogPayloads = slowLogPayloads; } + public List getBalancerDecisions() { + return balancerDecisions; + } + + public void setBalancerDecisions(List balancerDecisions) { + this.balancerDecisions = balancerDecisions; + } + public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() { return namedQueueEvent; } - public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { - this.namedQueueEvent = namedQueueEvent; + public void setNamedQueueEvent(int eventOrdinal) { + this.namedQueueEvent = NamedQueuePayload.NamedQueueEvent.getEventByOrdinal(eventOrdinal); } @Override public String toString() { return new ToStringBuilder(this) .append("slowLogPayloads", slowLogPayloads) + .append("balancerDecisions", balancerDecisions) .append("namedQueueEvent", namedQueueEvent) .toString(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 41d6b20bc27..ad21f50220c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -121,6 +121,7 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.mob.MobFileCache; import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; @@ -596,14 +597,7 @@ public class HRegionServer extends Thread implements this.abortRequested = new AtomicBoolean(false); this.stopped = false; - if (!(this instanceof HMaster)) { - final boolean isOnlineLogProviderEnabled = conf.getBoolean( - HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, - HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); - if (isOnlineLogProviderEnabled) { - this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf); - } - } + initNamedQueueRecorder(conf); rpcServices = createRpcServices(); useThisHostnameInstead = getUseThisHostnameInstead(conf); String hostName = @@ -679,6 +673,24 @@ public class HRegionServer extends Thread implements } } + private void initNamedQueueRecorder(Configuration conf) { + if (!(this instanceof HMaster)) { + final boolean isOnlineLogProviderEnabled = conf.getBoolean( + HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, + HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); + if (isOnlineLogProviderEnabled) { + this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf); + } + } else { + final boolean isBalancerDecisionRecording = conf + .getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED, + BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED); + if (isBalancerDecisionRecording) { + this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf); + } + } + } + // HMaster should override this method to load the specific config for master protected String getUseThisHostnameInstead(Configuration conf) throws IOException { String hostname = conf.get(RS_HOSTNAME_KEY); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index ba818d2a596..2fc584a0ffc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -21,6 +21,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.BindException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -106,6 +107,7 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterRpcServices; import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; +import org.apache.hadoop.hbase.namequeues.RpcLogDetails; import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.net.Address; @@ -241,6 +243,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanReques import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameBytesPair; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameInt64Pair; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; @@ -3787,7 +3790,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } List slowLogPayloads; NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); - namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); + namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); namedQueueGetRequest.setSlowLogResponseRequest(request); NamedQueueGetResponse namedQueueGetResponse = namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); @@ -3826,6 +3829,36 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return clearSlowLogResponses; } + @Override + public HBaseProtos.LogEntry getLogEntries(RpcController controller, + HBaseProtos.LogRequest request) throws ServiceException { + try { + final String logClassName = request.getLogClassName(); + Class logClass = Class.forName(logClassName) + .asSubclass(Message.class); + Method method = logClass.getMethod("parseFrom", ByteString.class); + if (logClassName.contains("SlowLogResponseRequest")) { + SlowLogResponseRequest slowLogResponseRequest = + (SlowLogResponseRequest) method.invoke(null, request.getLogMessage()); + final NamedQueueRecorder namedQueueRecorder = + this.regionServer.getNamedQueueRecorder(); + final List slowLogPayloads = + getSlowLogPayloads(slowLogResponseRequest, namedQueueRecorder); + SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder() + .addAllSlowLogPayloads(slowLogPayloads) + .build(); + return HBaseProtos.LogEntry.newBuilder() + .setLogClassName(slowLogResponses.getClass().getName()) + .setLogMessage(slowLogResponses.toByteString()).build(); + } + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException + | InvocationTargetException e) { + LOG.error("Error while retrieving log entries.", e); + throw new ServiceException(e); + } + throw new ServiceException("Invalid request params"); + } + @VisibleForTesting public RpcScheduler getRpcScheduler() { return rpcServer.getScheduler(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java index 2e8632fb59e..90eb594efc5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java @@ -888,12 +888,13 @@ public class TestAdmin2 extends TestAdminBase { } Assert.assertEquals(countFailedClearSlowResponse, 0); - LogQueryFilter logQueryFilter = new LogQueryFilter(); - List onlineLogRecords = ADMIN.getSlowLogResponses(new HashSet<>(serverNames), - logQueryFilter); - + List onlineLogRecords = ADMIN.getLogEntries(new HashSet<>(serverNames), + "SLOW_LOG", ServerType.REGION_SERVER, 100, null); // after cleanup of slowlog responses, total count of slowlog payloads should be 0 Assert.assertEquals(onlineLogRecords.size(), 0); + List balancerDecisionRecords = + ADMIN.getLogEntries(null, "BALANCER_DECISION", ServerType.MASTER, 100, null); + Assert.assertEquals(balancerDecisionRecords.size(), 0); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java index eedfcf2359f..23ab43de074 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.Threads; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -74,13 +75,10 @@ public class TestAsyncTableGetMultiThreaded { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static TableName TABLE_NAME = TableName.valueOf("async"); - - private static byte[] FAMILY = Bytes.toBytes("cf"); - - private static byte[] QUALIFIER = Bytes.toBytes("cq"); - - private static int COUNT = 1000; + private static final TableName TABLE_NAME = TableName.valueOf("async"); + private static final byte[] FAMILY = Bytes.toBytes("cf"); + private static final byte[] QUALIFIER = Bytes.toBytes("cq"); + private static final int COUNT = 1000; private static AsyncConnection CONN; @@ -99,6 +97,7 @@ public class TestAsyncTableGetMultiThreaded { TEST_UTIL.getConfiguration().setInt(MAX_BUFFER_COUNT_KEY, 100); TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(memoryCompaction)); + TEST_UTIL.getConfiguration().setBoolean("hbase.master.balancer.decision.buffer.enabled", true); TEST_UTIL.startMiniCluster(3); SPLIT_KEYS = new byte[8][]; @@ -211,6 +210,9 @@ public class TestAsyncTableGetMultiThreaded { LOG.info("====== Move meta done ======"); Thread.sleep(5000); } + List balancerDecisionRecords = + admin.getLogEntries(null, "BALANCER_DECISION", ServerType.MASTER, 2, null); + Assert.assertEquals(balancerDecisionRecords.size(), 2); LOG.info("====== Read test finished, shutdown thread pool ======"); stop.set(true); executor.shutdown(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index aaeae01952f..5c4eea86f3b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -137,6 +137,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; @@ -699,6 +700,12 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface, return null; } + @Override + public HBaseProtos.LogEntry getLogEntries(RpcController controller, + HBaseProtos.LogRequest request) throws ServiceException { + return null; + } + @Override public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots( RpcController controller, GetSpaceQuotaSnapshotsRequest request) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerDecision.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerDecision.java new file mode 100644 index 00000000000..cfeeefefd6e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerDecision.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.balancer; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.LogEntry; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; +import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; +import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs; + +/** + * Test BalancerDecision ring buffer using namedQueue interface + */ +@Category({ MasterTests.class, MediumTests.class }) +public class TestBalancerDecision extends BalancerTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBalancerDecision.class); + + @Test + public void testBalancerDecisions() { + conf.setBoolean("hbase.master.balancer.decision.buffer.enabled", true); + loadBalancer.setConf(conf); + float minCost = conf.getFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f); + conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 1.0f); + try { + // Test with/without per table balancer. + boolean[] perTableBalancerConfigs = {true, false}; + for (boolean isByTable : perTableBalancerConfigs) { + conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable); + loadBalancer.setConf(conf); + for (int[] mockCluster : clusterStateMocks) { + Map> servers = mockClusterServers(mockCluster); + Map>> LoadOfAllTable = + (Map) mockClusterServersWithTables(servers); + List plans = loadBalancer.balanceCluster(LoadOfAllTable); + boolean emptyPlans = plans == null || plans.isEmpty(); + Assert.assertTrue(emptyPlans || needsBalanceIdleRegion(mockCluster)); + } + } + final NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); + namedQueueGetRequest.setNamedQueueEvent(BalancerDecisionDetails.BALANCER_DECISION_EVENT); + namedQueueGetRequest + .setBalancerDecisionsRequest(MasterProtos.BalancerDecisionsRequest.getDefaultInstance()); + NamedQueueGetResponse namedQueueGetResponse = + loadBalancer.namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); + List balancerDecisions = + namedQueueGetResponse.getBalancerDecisions(); + MasterProtos.BalancerDecisionsResponse response = + MasterProtos.BalancerDecisionsResponse.newBuilder() + .addAllBalancerDecision(balancerDecisions) + .build(); + List balancerDecisionRecords = + ProtobufUtil.getBalancerDecisionEntries(response); + Assert.assertTrue(balancerDecisionRecords.size() > 160); + } finally { + // reset config + conf.unset(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE); + conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", minCost); + loadBalancer.setConf(conf); + } + } + + private static boolean needsBalanceIdleRegion(int[] cluster) { + return (Arrays.stream(cluster).anyMatch(x -> x > 1)) && (Arrays.stream(cluster) + .anyMatch(x -> x < 1)); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index f9e6a9ed418..83444eb4769 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -151,6 +151,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { String regionNameAsString = RegionInfo.getRegionNameAsString(Bytes.toBytes(REGION_KEY)); assertTrue(loadBalancer.loads.get(regionNameAsString) != null); assertTrue(loadBalancer.loads.get(regionNameAsString).size() == 15); + assertNull(loadBalancer.namedQueueRecorder); Queue loads = loadBalancer.loads.get(regionNameAsString); int i = 0; @@ -203,6 +204,7 @@ public class TestStochasticLoadBalancer extends BalancerTestBase { double expected = 1 - expectedLocalities[test]; assertEquals(expected, cost, 0.001); } + assertNull(loadBalancer.namedQueueRecorder); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index 542efc3cf6f..161bcc11a20 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -234,7 +234,7 @@ public class TestNamedQueueRecorder { private List getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) { NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); - namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); + namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); namedQueueGetRequest.setSlowLogResponseRequest(request); NamedQueueGetResponse namedQueueGetResponse = namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java index 4ebf2a5aed3..f7bc6fee844 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java @@ -101,7 +101,7 @@ public class TestSlowLogAccessor { private List getSlowLogPayloads( AdminProtos.SlowLogResponseRequest request) { NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); - namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); + namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); namedQueueGetRequest.setSlowLogResponseRequest(request); NamedQueueGetResponse namedQueueGetResponse = namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 6c29e548761..512f96aea66 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -1496,7 +1496,7 @@ module Hbase #---------------------------------------------------------------------------------------------- # Retrieve SlowLog Responses from RegionServers - def get_slowlog_responses(server_names, args) + def get_slowlog_responses(server_names, args, is_large_log = false) unless server_names.is_a?(Array) || server_names.is_a?(String) raise(ArgumentError, "#{server_names.class} of #{server_names.inspect} is not of Array/String type") @@ -1508,38 +1508,44 @@ module Hbase server_names = getServerNames(server_names_list, false) end filter_params = get_filter_params(args) - filter_params.setType(org.apache.hadoop.hbase.client.LogQueryFilter::Type::SLOW_LOG) - slow_log_responses = @admin.getSlowLogResponses(java.util.HashSet.new(server_names), - filter_params) - slow_log_responses_arr = [] - for slow_log_response in slow_log_responses - slow_log_responses_arr << slow_log_response.toJsonPrettyPrint + if args.key? 'LIMIT' + limit = args['LIMIT'] + else + limit = 10 end - puts 'Retrieved SlowLog Responses from RegionServers' - puts slow_log_responses_arr + if is_large_log + log_type = 'LARGE_LOG' + else + log_type = 'SLOW_LOG' + end + log_dest = org.apache.hadoop.hbase.client.ServerType::REGION_SERVER + server_names_set = java.util.HashSet.new(server_names) + slow_log_responses = @admin.getLogEntries(server_names_set, log_type, log_dest, limit, + filter_params) + slow_log_responses_arr = [] + slow_log_responses.each { |slow_log_response| + slow_log_responses_arr << slow_log_response.toJsonPrettyPrint + } + slow_log_responses_arr end def get_filter_params(args) - filter_params = org.apache.hadoop.hbase.client.LogQueryFilter.new + filter_params = java.util.HashMap.new if args.key? 'REGION_NAME' region_name = args['REGION_NAME'] - filter_params.setRegionName(region_name) + filter_params.put('regionName', region_name) end if args.key? 'TABLE_NAME' table_name = args['TABLE_NAME'] - filter_params.setTableName(table_name) + filter_params.put('tableName', table_name) end if args.key? 'CLIENT_IP' - client_ip = args['CLIENT_IP'] - filter_params.setClientAddress(client_ip) + client_address = args['CLIENT_IP'] + filter_params.put('clientAddress', client_address) end if args.key? 'USER' user = args['USER'] - filter_params.setUserName(user) - end - if args.key? 'LIMIT' - limit = args['LIMIT'] - filter_params.setLimit(limit) + filter_params.put('userName', user) end if args.key? 'FILTER_BY_OP' filter_by_op = args['FILTER_BY_OP'] @@ -1547,38 +1553,12 @@ module Hbase raise(ArgumentError, "FILTER_BY_OP should be either OR / AND") end if filter_by_op == 'AND' - filter_params.setFilterByOperator( - org.apache.hadoop.hbase.client.LogQueryFilter::FilterByOperator::AND) + filter_params.put('filterByOperator', 'AND') end end filter_params end - #---------------------------------------------------------------------------------------------- - # Retrieve LargeLog Responses from RegionServers - def get_largelog_responses(server_names, args) - unless server_names.is_a?(Array) || server_names.is_a?(String) - raise(ArgumentError, - "#{server_names.class} of #{server_names.inspect} is not of Array/String type") - end - if server_names == '*' - server_names = getServerNames([], true) - else - server_names_list = to_server_names(server_names) - server_names = getServerNames(server_names_list, false) - end - filter_params = get_filter_params(args) - filter_params.setType(org.apache.hadoop.hbase.client.LogQueryFilter::Type::LARGE_LOG) - large_log_responses = @admin.getSlowLogResponses(java.util.HashSet.new(server_names), - filter_params) - large_log_responses_arr = [] - for large_log_response in large_log_responses - large_log_responses_arr << large_log_response.toJsonPrettyPrint - end - puts 'Retrieved LargeLog Responses from RegionServers' - puts large_log_responses_arr - end - #---------------------------------------------------------------------------------------------- # Clears SlowLog Responses from RegionServers def clear_slowlog_responses(server_names) @@ -1667,6 +1647,24 @@ module Hbase @admin.recommissionRegionServer(server_name, region_names_in_bytes) end + #---------------------------------------------------------------------------------------------- + # Retrieve latest balancer decisions made by LoadBalancers + def get_balancer_decisions(args) + if args.key? 'LIMIT' + limit = args['LIMIT'] + else + limit = 250 + end + log_type = 'BALANCER_DECISION' + log_dest = org.apache.hadoop.hbase.client.ServerType::MASTER + balancer_decisions_responses = @admin.getLogEntries(nil, log_type, log_dest, limit, nil) + balancer_decisions_resp_arr = [] + balancer_decisions_responses.each { |balancer_dec_resp| + balancer_decisions_resp_arr << balancer_dec_resp.toJsonPrettyPrint + } + balancer_decisions_resp_arr + end + #---------------------------------------------------------------------------------------------- # Stop the active Master def stop_master diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index b638bbeebb5..97d2abef3bc 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -338,6 +338,7 @@ Shell.load_command_group( compact compaction_switch flush + get_balancer_decisions get_slowlog_responses get_largelog_responses major_compact diff --git a/hbase-shell/src/main/ruby/shell/commands/get_balancer_decisions.rb b/hbase-shell/src/main/ruby/shell/commands/get_balancer_decisions.rb new file mode 100644 index 00000000000..801166eb204 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/get_balancer_decisions.rb @@ -0,0 +1,49 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with this +# work for additional information regarding copyright ownership. The ASF +# licenses this file to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# Retrieve latest balancer decisions maintained in memory by HMaster + +module Shell + module Commands + # Retrieve latest large log responses + class GetBalancerDecisions < Command + def help + <<-EOF +Retrieve latest balancer decisions made by LoadBalancers. + +Examples: + + hbase> get_balancer_decisions => Retrieve recent balancer decisions with + region plans + hbase> get_balancer_decisions LIMIT => 10 => Retrieve 10 most recent balancer decisions + with region plans + + EOF + end + + def command(args = {}) + unless args.is_a? Hash + raise 'Filter parameters are not Hash' + end + + balancer_decisions_resp_arr = admin.get_balancer_decisions(args) + puts 'Retrieved BalancerDecision Responses' + puts balancer_decisions_resp_arr + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/get_largelog_responses.rb b/hbase-shell/src/main/ruby/shell/commands/get_largelog_responses.rb index f7effe7a6d1..8ed55abfc10 100644 --- a/hbase-shell/src/main/ruby/shell/commands/get_largelog_responses.rb +++ b/hbase-shell/src/main/ruby/shell/commands/get_largelog_responses.rb @@ -90,8 +90,9 @@ echo "get_largelog_responses '*'" | hbase shell > xyz.out 2>&1 unless args.is_a? Hash raise 'Filter parameters are not Hash' end - - admin.get_largelog_responses(server_names, args) + large_log_responses_arr = admin.get_slowlog_responses(server_names, args, true) + puts 'Retrieved LargeLog Responses from RegionServers' + puts large_log_responses_arr end end end diff --git a/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb b/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb index 23a3ec2576d..2dc108b1d68 100644 --- a/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb +++ b/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb @@ -90,8 +90,9 @@ echo "get_slowlog_responses '*'" | hbase shell > xyz.out 2>&1 unless args.is_a? Hash raise 'Filter parameters are not Hash' end - - admin.get_slowlog_responses(server_names, args) + slow_log_responses_arr = admin.get_slowlog_responses(server_names, args) + puts 'Retrieved SlowLog Responses from RegionServers' + puts slow_log_responses_arr end end end diff --git a/hbase-shell/src/test/ruby/hbase/admin_test.rb b/hbase-shell/src/test/ruby/hbase/admin_test.rb index 3f89383efca..d32e32ef09c 100644 --- a/hbase-shell/src/test/ruby/hbase/admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/admin_test.rb @@ -386,8 +386,8 @@ module Hbase #------------------------------------------------------------------------------- define_test 'get slowlog responses should work' do - output = capture_stdout { command(:get_slowlog_responses, '*', {}) } - assert(output.include?('Retrieved SlowLog Responses from RegionServers')) + output = command(:get_slowlog_responses, '*', {}) + assert(output.nil?) end #------------------------------------------------------------------------------- diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index 9359d4360ee..d4bc5698259 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -44,10 +44,12 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.CompactType; import org.apache.hadoop.hbase.client.CompactionState; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.LogEntry; import org.apache.hadoop.hbase.client.LogQueryFilter; import org.apache.hadoop.hbase.client.NormalizeTableFilterParams; import org.apache.hadoop.hbase.client.OnlineLogRecord; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.ServerType; import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.SnapshotType; import org.apache.hadoop.hbase.client.TableDescriptor; @@ -1438,4 +1440,10 @@ public class ThriftAdmin implements Admin { return splitRegionAsync(regionName, null); } + @Override + public List getLogEntries(Set serverNames, String logType, + ServerType serverType, int limit, Map filterParams) + throws IOException { + throw new NotImplementedException("getLogEntries not supported in ThriftAdmin"); + } } diff --git a/src/main/asciidoc/_chapters/hbase-default.adoc b/src/main/asciidoc/_chapters/hbase-default.adoc index 5768add56e0..14723811ee8 100644 --- a/src/main/asciidoc/_chapters/hbase-default.adoc +++ b/src/main/asciidoc/_chapters/hbase-default.adoc @@ -2308,3 +2308,17 @@ The percent of region server RPC threads failed to abort RS. .Default `false` +[[hbase.master.balancer.decision.buffer.enabled]] +*`hbase.master.balancer.decision.buffer.enabled`*:: ++ +.Description + + Indicates whether active HMaster has ring buffer running for storing + balancer decisions in FIFO manner with limited entries. The size of + the ring buffer is indicated by config: + hbase.master.balancer.decision.queue.size + ++ +.Default +`false` +