diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index a8029f8cba9..4576d7d8525 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -3134,4 +3134,29 @@ public interface Admin extends Abortable, Closeable { */ boolean isSnapshotCleanupEnabled() throws IOException; + + /** + * Retrieves online slow RPC logs from the provided list of + * RegionServers + * + * @param serverNames Server names to get slowlog responses from + * @param slowLogQueryFilter filter to be used if provided + * @return online slowlog response list + * @throws IOException if a remote or network exception occurs + */ + List getSlowLogResponses(final Set serverNames, + final SlowLogQueryFilter slowLogQueryFilter) throws IOException; + + /** + * Clears online slow RPC logs from the provided list of + * RegionServers + * + * @param serverNames Set of Server names to clean slowlog responses from + * @return List of booleans representing if online slowlog response buffer is cleaned + * from each RegionServer + * @throws IOException if a remote or network exception occurs + */ + List clearSlowLogResponses(final Set serverNames) + throws IOException; + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 1aa76232293..9e61a38c96b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -1484,4 +1484,25 @@ public interface AsyncAdmin { */ CompletableFuture isSnapshotCleanupEnabled(); + /** + * Retrieves online slow RPC logs from the provided list of + * RegionServers + * + * @param serverNames Server names to get slowlog responses from + * @param slowLogQueryFilter filter to be used if provided + * @return Online slowlog response list. The return value wrapped by a {@link CompletableFuture} + */ + CompletableFuture> getSlowLogResponses(final Set serverNames, + final SlowLogQueryFilter slowLogQueryFilter); + + /** + * Clears online slow RPC logs from the provided list of + * RegionServers + * + * @param serverNames Set of Server names to clean slowlog responses from + * @return List of booleans representing if online slowlog response buffer is cleaned + * from each RegionServer. The return value wrapped by a {@link CompletableFuture} + */ + CompletableFuture> clearSlowLogResponses(final Set serverNames); + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 3b5127958eb..d5c9d09ac35 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import com.google.protobuf.RpcChannel; + import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -836,4 +837,15 @@ class AsyncHBaseAdmin implements AsyncAdmin { return wrap(rawAdmin.isSnapshotCleanupEnabled()); } + @Override + public CompletableFuture> getSlowLogResponses( + final Set serverNames, final SlowLogQueryFilter slowLogQueryFilter) { + return wrap(rawAdmin.getSlowLogResponses(serverNames, slowLogQueryFilter)); + } + + @Override + public CompletableFuture> clearSlowLogResponses(Set serverNames) { + return wrap(rawAdmin.clearSlowLogResponses(serverNames)); + } + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 0b2be195581..7ef5d4d84da 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.google.protobuf.RpcController; +import edu.umd.cs.findbugs.annotations.Nullable; import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; @@ -111,6 +112,7 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; @@ -4358,4 +4360,68 @@ public class HBaseAdmin implements Admin { } + @Override + public List getSlowLogResponses(@Nullable final Set serverNames, + final SlowLogQueryFilter slowLogQueryFilter) throws IOException { + if (CollectionUtils.isEmpty(serverNames)) { + return Collections.emptyList(); + } + return serverNames.stream().map(serverName -> { + try { + return getSlowLogResponseFromServer(serverName, slowLogQueryFilter); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + ).flatMap(List::stream).collect(Collectors.toList()); + } + + private List getSlowLogResponseFromServer(final ServerName serverName, + final SlowLogQueryFilter slowLogQueryFilter) throws IOException { + return getSlowLogResponsesFromServer(this.connection.getAdmin(serverName), slowLogQueryFilter); + } + + private List getSlowLogResponsesFromServer(AdminService.BlockingInterface admin, + SlowLogQueryFilter slowLogQueryFilter) throws IOException { + return executeCallable(new RpcRetryingCallable>() { + @Override + protected List rpcCall(int callTimeout) throws Exception { + HBaseRpcController controller = rpcControllerFactory.newController(); + AdminProtos.SlowLogResponses slowLogResponses = + admin.getSlowLogResponses(controller, + RequestConverter.buildSlowLogResponseRequest(slowLogQueryFilter)); + return ProtobufUtil.toSlowLogPayloads(slowLogResponses); + } + }); + } + + @Override + public List clearSlowLogResponses(@Nullable final Set serverNames) + throws IOException { + if (CollectionUtils.isEmpty(serverNames)) { + return Collections.emptyList(); + } + return serverNames.stream().map(serverName -> { + try { + return clearSlowLogsResponses(serverName); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + } + + private Boolean clearSlowLogsResponses(final ServerName serverName) throws IOException { + AdminService.BlockingInterface admin = this.connection.getAdmin(serverName); + return executeCallable(new RpcRetryingCallable() { + @Override + protected Boolean rpcCall(int callTimeout) throws Exception { + HBaseRpcController controller = rpcControllerFactory.newController(); + AdminProtos.ClearSlowLogResponses clearSlowLogResponses = + admin.clearSlowLogsResponses(controller, + RequestConverter.buildClearSlowLogResponseRequest()); + return ProtobufUtil.toClearSlowLogPayload(clearSlowLogResponses); + } + }); + } + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index ee32e429eea..ddaf786c913 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; import com.google.protobuf.Message; import com.google.protobuf.RpcChannel; +import edu.umd.cs.findbugs.annotations.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -45,6 +46,7 @@ import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AsyncMetaTableAccessor; import org.apache.hadoop.hbase.CacheEvictionStats; @@ -102,6 +104,8 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hbase.thirdparty.io.netty.util.Timeout; import org.apache.hbase.thirdparty.io.netty.util.TimerTask; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; @@ -3884,4 +3888,63 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { .call(); } + @Override + public CompletableFuture> getSlowLogResponses( + @Nullable final Set serverNames, + final SlowLogQueryFilter slowLogQueryFilter) { + if (CollectionUtils.isEmpty(serverNames)) { + return CompletableFuture.completedFuture(Collections.emptyList()); + } + return CompletableFuture.supplyAsync(() -> serverNames.stream() + .map((ServerName serverName) -> + getSlowLogResponseFromServer(serverName, slowLogQueryFilter)) + .map(CompletableFuture::join) + .flatMap(List::stream) + .collect(Collectors.toList())); + } + + private CompletableFuture> getSlowLogResponseFromServer( + final ServerName serverName, final SlowLogQueryFilter slowLogQueryFilter) { + return this.>newAdminCaller() + .action((controller, stub) -> this + .adminCall( + controller, stub, RequestConverter.buildSlowLogResponseRequest(slowLogQueryFilter), + AdminService.Interface::getSlowLogResponses, + ProtobufUtil::toSlowLogPayloads)) + .serverName(serverName).call(); + } + + @Override + public CompletableFuture> clearSlowLogResponses( + @Nullable Set serverNames) { + if (CollectionUtils.isEmpty(serverNames)) { + return CompletableFuture.completedFuture(Collections.emptyList()); + } + List> clearSlowLogResponseList = serverNames.stream() + .map(this::clearSlowLogsResponses) + .collect(Collectors.toList()); + return convertToFutureOfList(clearSlowLogResponseList); + } + + private CompletableFuture clearSlowLogsResponses(final ServerName serverName) { + return this.newAdminCaller() + .action(((controller, stub) -> this + .adminCall( + controller, stub, RequestConverter.buildClearSlowLogResponseRequest(), + AdminService.Interface::clearSlowLogsResponses, + ProtobufUtil::toClearSlowLogPayload)) + ).serverName(serverName).call(); + } + + private static CompletableFuture> convertToFutureOfList( + List> futures) { + CompletableFuture allDoneFuture = + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + return allDoneFuture.thenApply(v -> + futures.stream() + .map(CompletableFuture::join) + .collect(Collectors.toList()) + ); + } + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java new file mode 100644 index 00000000000..86df9fda207 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * SlowLog params object that contains detailed info as params and region name : to be used + * for filter purpose + */ +@InterfaceAudience.Private +public class SlowLogParams { + + private final String regionName; + private final String params; + + public SlowLogParams(String regionName, String params) { + this.regionName = regionName; + this.params = params; + } + + public SlowLogParams(String params) { + this.regionName = StringUtils.EMPTY; + this.params = params; + } + + public String getRegionName() { + return regionName; + } + + public String getParams() { + return params; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("regionName", regionName) + .append("params", params) + .toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + SlowLogParams that = (SlowLogParams) o; + + return new EqualsBuilder() + .append(regionName, that.regionName) + .append(params, that.params) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(regionName) + .append(params) + .toHashCode(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogQueryFilter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogQueryFilter.java new file mode 100644 index 00000000000..aa56a8afab0 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogQueryFilter.java @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * SlowLog Query Filter with all filter and limit parameters + */ +@InterfaceAudience.Private +public class SlowLogQueryFilter { + + private String regionName; + private String clientAddress; + private String tableName; + private String userName; + private int limit = 10; + + public String getRegionName() { + return regionName; + } + + public void setRegionName(String regionName) { + this.regionName = regionName; + } + + public String getClientAddress() { + return clientAddress; + } + + public void setClientAddress(String clientAddress) { + this.clientAddress = clientAddress; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public int getLimit() { + return limit; + } + + public void setLimit(int limit) { + this.limit = limit; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + SlowLogQueryFilter that = (SlowLogQueryFilter) o; + + return new EqualsBuilder() + .append(limit, that.limit) + .append(regionName, that.regionName) + .append(clientAddress, that.clientAddress) + .append(tableName, that.tableName) + .append(userName, that.userName) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(regionName) + .append(clientAddress) + .append(tableName) + .append(userName) + .append(limit) + .toHashCode(); + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("regionName", regionName) + .append("clientAddress", clientAddress) + .append("tableName", tableName) + .append("userName", userName) + .append("limit", limit) + .toString(); + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogRecord.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogRecord.java new file mode 100644 index 00000000000..95936186047 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogRecord.java @@ -0,0 +1,319 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.hadoop.hbase.util.GsonUtil; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.gson.Gson; +import org.apache.hbase.thirdparty.com.google.gson.JsonObject; +import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer; + +/** + * SlowLog payload for hbase-client, to be used by Admin API get_slow_responses + */ +@InterfaceAudience.Private +final public class SlowLogRecord { + + // used to convert object to pretty printed format + // used by toJsonPrettyPrint() + private static final Gson GSON = GsonUtil.createGson() + .setPrettyPrinting() + .registerTypeAdapter(SlowLogRecord.class, (JsonSerializer) + (slowLogPayload, type, jsonSerializationContext) -> { + Gson gson = new Gson(); + JsonObject jsonObj = (JsonObject) gson.toJsonTree(slowLogPayload); + if (slowLogPayload.getMultiGetsCount() == 0) { + jsonObj.remove("multiGetsCount"); + } + if (slowLogPayload.getMultiMutationsCount() == 0) { + jsonObj.remove("multiMutationsCount"); + } + if (slowLogPayload.getMultiServiceCalls() == 0) { + jsonObj.remove("multiServiceCalls"); + } + return jsonObj; + }).create(); + + private long startTime; + private int processingTime; + private int queueTime; + private long responseSize; + private String clientAddress; + private String serverClass; + private String methodName; + private String callDetails; + private String param; + // we don't want to serialize region name, it is just for the filter purpose + // hence avoiding deserialization + private transient String regionName; + private String userName; + private int multiGetsCount; + private int multiMutationsCount; + private int multiServiceCalls; + + public long getStartTime() { + return startTime; + } + + public int getProcessingTime() { + return processingTime; + } + + public int getQueueTime() { + return queueTime; + } + + public long getResponseSize() { + return responseSize; + } + + public String getClientAddress() { + return clientAddress; + } + + public String getServerClass() { + return serverClass; + } + + public String getMethodName() { + return methodName; + } + + public String getCallDetails() { + return callDetails; + } + + public String getParam() { + return param; + } + + public String getRegionName() { + return regionName; + } + + public String getUserName() { + return userName; + } + + public int getMultiGetsCount() { + return multiGetsCount; + } + + public int getMultiMutationsCount() { + return multiMutationsCount; + } + + public int getMultiServiceCalls() { + return multiServiceCalls; + } + + private SlowLogRecord(final long startTime, final int processingTime, final int queueTime, + final long responseSize, final String clientAddress, final String serverClass, + final String methodName, final String callDetails, final String param, + final String regionName, final String userName, final int multiGetsCount, + final int multiMutationsCount, final int multiServiceCalls) { + this.startTime = startTime; + this.processingTime = processingTime; + this.queueTime = queueTime; + this.responseSize = responseSize; + this.clientAddress = clientAddress; + this.serverClass = serverClass; + this.methodName = methodName; + this.callDetails = callDetails; + this.param = param; + this.regionName = regionName; + this.userName = userName; + this.multiGetsCount = multiGetsCount; + this.multiMutationsCount = multiMutationsCount; + this.multiServiceCalls = multiServiceCalls; + } + + public static class SlowLogRecordBuilder { + private long startTime; + private int processingTime; + private int queueTime; + private long responseSize; + private String clientAddress; + private String serverClass; + private String methodName; + private String callDetails; + private String param; + private String regionName; + private String userName; + private int multiGetsCount; + private int multiMutationsCount; + private int multiServiceCalls; + + public SlowLogRecordBuilder setStartTime(long startTime) { + this.startTime = startTime; + return this; + } + + public SlowLogRecordBuilder setProcessingTime(int processingTime) { + this.processingTime = processingTime; + return this; + } + + public SlowLogRecordBuilder setQueueTime(int queueTime) { + this.queueTime = queueTime; + return this; + } + + public SlowLogRecordBuilder setResponseSize(long responseSize) { + this.responseSize = responseSize; + return this; + } + + public SlowLogRecordBuilder setClientAddress(String clientAddress) { + this.clientAddress = clientAddress; + return this; + } + + public SlowLogRecordBuilder setServerClass(String serverClass) { + this.serverClass = serverClass; + return this; + } + + public SlowLogRecordBuilder setMethodName(String methodName) { + this.methodName = methodName; + return this; + } + + public SlowLogRecordBuilder setCallDetails(String callDetails) { + this.callDetails = callDetails; + return this; + } + + public SlowLogRecordBuilder setParam(String param) { + this.param = param; + return this; + } + + public SlowLogRecordBuilder setRegionName(String regionName) { + this.regionName = regionName; + return this; + } + + public SlowLogRecordBuilder setUserName(String userName) { + this.userName = userName; + return this; + } + + public SlowLogRecordBuilder setMultiGetsCount(int multiGetsCount) { + this.multiGetsCount = multiGetsCount; + return this; + } + + public SlowLogRecordBuilder setMultiMutationsCount(int multiMutationsCount) { + this.multiMutationsCount = multiMutationsCount; + return this; + } + + public SlowLogRecordBuilder setMultiServiceCalls(int multiServiceCalls) { + this.multiServiceCalls = multiServiceCalls; + return this; + } + + public SlowLogRecord build() { + return new SlowLogRecord(startTime, processingTime, queueTime, responseSize, + clientAddress, serverClass, methodName, callDetails, param, regionName, + userName, multiGetsCount, multiMutationsCount, multiServiceCalls); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + SlowLogRecord that = (SlowLogRecord) o; + + return new EqualsBuilder() + .append(startTime, that.startTime) + .append(processingTime, that.processingTime) + .append(queueTime, that.queueTime) + .append(responseSize, that.responseSize) + .append(multiGetsCount, that.multiGetsCount) + .append(multiMutationsCount, that.multiMutationsCount) + .append(multiServiceCalls, that.multiServiceCalls) + .append(clientAddress, that.clientAddress) + .append(serverClass, that.serverClass) + .append(methodName, that.methodName) + .append(callDetails, that.callDetails) + .append(param, that.param) + .append(regionName, that.regionName) + .append(userName, that.userName) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(startTime) + .append(processingTime) + .append(queueTime) + .append(responseSize) + .append(clientAddress) + .append(serverClass) + .append(methodName) + .append(callDetails) + .append(param) + .append(regionName) + .append(userName) + .append(multiGetsCount) + .append(multiMutationsCount) + .append(multiServiceCalls) + .toHashCode(); + } + + public String toJsonPrettyPrint() { + return GSON.toJson(this); + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("startTime", startTime) + .append("processingTime", processingTime) + .append("queueTime", queueTime) + .append("responseSize", responseSize) + .append("clientAddress", clientAddress) + .append("serverClass", serverClass) + .append("methodName", methodName) + .append("callDetails", callDetails) + .append("param", param) + .append("regionName", regionName) + .append("userName", userName) + .append("multiGetsCount", multiGetsCount) + .append("multiMutationsCount", multiMutationsCount) + .append("multiServiceCalls", multiServiceCalls) + .toString(); + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 46cef7bfc2f..dc4091b485b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -84,6 +84,8 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RegionStatesCount; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.SlowLogParams; +import org.apache.hadoop.hbase.client.SlowLogRecord; import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.SnapshotType; import org.apache.hadoop.hbase.client.TableDescriptor; @@ -128,7 +130,10 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; @@ -145,12 +150,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegio import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Column; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.DeleteType; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; @@ -182,6 +191,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; @@ -2221,6 +2231,57 @@ public final class ProtobufUtil { return Bytes.toStringBinary(bs.toByteArray()); } + /** + * Return SlowLogParams to maintain recent online slowlog responses + * + * @param message Message object {@link Message} + * @return SlowLogParams with regionName(for filter queries) and params + */ + public static SlowLogParams getSlowLogParams(Message message) { + if (message == null) { + return null; + } + if (message instanceof ScanRequest) { + ScanRequest scanRequest = (ScanRequest) message; + String regionName = getStringForByteString(scanRequest.getRegion().getValue()); + String params = TextFormat.shortDebugString(message); + return new SlowLogParams(regionName, params); + } else if (message instanceof MutationProto) { + MutationProto mutationProto = (MutationProto) message; + String params = "type= " + mutationProto.getMutateType().toString(); + return new SlowLogParams(params); + } else if (message instanceof GetRequest) { + GetRequest getRequest = (GetRequest) message; + String regionName = getStringForByteString(getRequest.getRegion().getValue()); + String params = "region= " + regionName + ", row= " + + getStringForByteString(getRequest.getGet().getRow()); + return new SlowLogParams(regionName, params); + } else if (message instanceof MultiRequest) { + MultiRequest multiRequest = (MultiRequest) message; + int actionsCount = multiRequest.getRegionActionList() + .stream() + .mapToInt(ClientProtos.RegionAction::getActionCount) + .sum(); + RegionAction actions = multiRequest.getRegionActionList().get(0); + String regionName = getStringForByteString(actions.getRegion().getValue()); + String params = "region= " + regionName + ", for " + actionsCount + " action(s)"; + return new SlowLogParams(regionName, params); + } else if (message instanceof MutateRequest) { + MutateRequest mutateRequest = (MutateRequest) message; + String regionName = getStringForByteString(mutateRequest.getRegion().getValue()); + String params = "region= " + regionName; + return new SlowLogParams(regionName, params); + } else if (message instanceof CoprocessorServiceRequest) { + CoprocessorServiceRequest coprocessorServiceRequest = (CoprocessorServiceRequest) message; + String params = "coprocessorService= " + + coprocessorServiceRequest.getCall().getServiceName() + + ":" + coprocessorServiceRequest.getCall().getMethodName(); + return new SlowLogParams(params); + } + String params = message.getClass().toString(); + return new SlowLogParams(params); + } + /** * Print out some subset of a MutationProto rather than all of it and its data * @param proto Protobuf to print out @@ -3414,4 +3475,56 @@ public final class ProtobufUtil { .build(); } + /** + * Convert Protobuf class + * {@link org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload} + * To client SlowLog Payload class {@link SlowLogRecord} + * + * @param slowLogPayload SlowLog Payload protobuf instance + * @return SlowLog Payload for client usecase + */ + private static SlowLogRecord getSlowLogRecord( + final TooSlowLog.SlowLogPayload slowLogPayload) { + SlowLogRecord clientSlowLogRecord = new SlowLogRecord.SlowLogRecordBuilder() + .setCallDetails(slowLogPayload.getCallDetails()) + .setClientAddress(slowLogPayload.getClientAddress()) + .setMethodName(slowLogPayload.getMethodName()) + .setMultiGetsCount(slowLogPayload.getMultiGets()) + .setMultiMutationsCount(slowLogPayload.getMultiMutations()) + .setMultiServiceCalls(slowLogPayload.getMultiServiceCalls()) + .setParam(slowLogPayload.getParam()) + .setProcessingTime(slowLogPayload.getProcessingTime()) + .setQueueTime(slowLogPayload.getQueueTime()) + .setRegionName(slowLogPayload.getRegionName()) + .setResponseSize(slowLogPayload.getResponseSize()) + .setServerClass(slowLogPayload.getServerClass()) + .setStartTime(slowLogPayload.getStartTime()) + .setUserName(slowLogPayload.getUserName()) + .build(); + return clientSlowLogRecord; + } + + /** + * Convert AdminProtos#SlowLogResponses to list of {@link SlowLogRecord} + * + * @param slowLogResponses slowlog response protobuf instance + * @return list of SlowLog payloads for client usecase + */ + public static List toSlowLogPayloads( + final AdminProtos.SlowLogResponses slowLogResponses) { + List slowLogRecords = slowLogResponses.getSlowLogPayloadsList() + .stream().map(ProtobufUtil::getSlowLogRecord).collect(Collectors.toList()); + return slowLogRecords; + } + + /** + * Convert {@link ClearSlowLogResponses} to boolean + * + * @param clearSlowLogResponses Clear slowlog response protobuf instance + * @return boolean representing clear slowlog response + */ + public static boolean toClearSlowLogPayload(final ClearSlowLogResponses clearSlowLogResponses) { + return clearSlowLogResponses.getIsCleaned(); + } + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 24f8009caf9..afdd653a016 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.ClusterMetrics.Option; import org.apache.hadoop.hbase.ClusterMetricsBuilder; @@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.SlowLogQueryFilter; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; @@ -67,6 +69,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; @@ -76,6 +79,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerIn import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo; @@ -1940,4 +1944,44 @@ public final class RequestConverter { return IsSnapshotCleanupEnabledRequest.newBuilder().build(); } + /** + * Create a protocol buffer {@link SlowLogResponseRequest} + * + * @param slowLogQueryFilter filter to use if provided + * @return a protocol buffer SlowLogResponseRequest + */ + public static SlowLogResponseRequest buildSlowLogResponseRequest( + final SlowLogQueryFilter slowLogQueryFilter) { + SlowLogResponseRequest.Builder builder = SlowLogResponseRequest.newBuilder(); + if (slowLogQueryFilter == null) { + return builder.build(); + } + final String clientAddress = slowLogQueryFilter.getClientAddress(); + if (StringUtils.isNotEmpty(clientAddress)) { + builder.setClientAddress(clientAddress); + } + final String regionName = slowLogQueryFilter.getRegionName(); + if (StringUtils.isNotEmpty(regionName)) { + builder.setRegionName(regionName); + } + final String tableName = slowLogQueryFilter.getTableName(); + if (StringUtils.isNotEmpty(tableName)) { + builder.setTableName(tableName); + } + final String userName = slowLogQueryFilter.getUserName(); + if (StringUtils.isNotEmpty(userName)) { + builder.setUserName(userName); + } + return builder.setLimit(slowLogQueryFilter.getLimit()).build(); + } + + /** + * Create a protocol buffer {@link ClearSlowLogResponseRequest} + * + * @return a protocol buffer ClearSlowLogResponseRequest + */ + public static ClearSlowLogResponseRequest buildClearSlowLogResponseRequest() { + return ClearSlowLogResponseRequest.newBuilder().build(); + } + } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index d6b96670fdf..6f1bb6ad68e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1566,6 +1566,12 @@ public final class HConstants { "hbase.master.executor.logreplayops.threads"; public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10; + public static final int DEFAULT_SLOW_LOG_RING_BUFFER_SIZE = 256; + + public static final String SLOW_LOG_BUFFER_ENABLED_KEY = + "hbase.regionserver.slowlog.buffer.enabled"; + public static final boolean DEFAULT_ONLINE_LOG_PROVIDER_ENABLED = false; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 8fda74d047e..a7d6898b211 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -1926,4 +1926,27 @@ possible configurations would overwhelm and obscure the important. enable this feature. + + hbase.regionserver.slowlog.ringbuffer.size + 256 + + Default size of ringbuffer to be maintained by each RegionServer in order + to store online slowlog responses. This is an in-memory ring buffer of + requests that were judged to be too slow in addition to the responseTooSlow + logging. The in-memory representation would be complete. + For more details, please look into Doc Section: + Get Slow Response Log from shell + + + + hbase.regionserver.slowlog.buffer.enabled + false + + Indicates whether RegionServers have ring buffer running for storing + Online Slow logs in FIFO manner with limited entries. The size of + the ring buffer is indicated by config: hbase.regionserver.slowlog.ringbuffer.size + The default value is false, turn this on and get latest slowlog + responses with complete data. + + diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto index 85b9113a27b..34c9806c554 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto @@ -29,6 +29,7 @@ import "ClusterStatus.proto"; import "HBase.proto"; import "WAL.proto"; import "Quota.proto"; +import "TooSlowLog.proto"; message GetRegionInfoRequest { required RegionSpecifier region = 1; @@ -280,6 +281,26 @@ message ExecuteProceduresRequest { message ExecuteProceduresResponse { } +message SlowLogResponseRequest { + optional string region_name = 1; + optional string table_name = 2; + optional string client_address = 3; + optional string user_name = 4; + optional uint32 limit = 5 [default = 10]; +} + +message SlowLogResponses { + repeated SlowLogPayload slow_log_payloads = 1; +} + +message ClearSlowLogResponseRequest { + +} + +message ClearSlowLogResponses { + required bool is_cleaned = 1; +} + service AdminService { rpc GetRegionInfo(GetRegionInfoRequest) returns(GetRegionInfoResponse); @@ -344,4 +365,10 @@ service AdminService { rpc ExecuteProcedures(ExecuteProceduresRequest) returns(ExecuteProceduresResponse); + + rpc GetSlowLogResponses(SlowLogResponseRequest) + returns(SlowLogResponses); + + rpc ClearSlowLogsResponses(ClearSlowLogResponseRequest) + returns(ClearSlowLogResponses); } diff --git a/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto b/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto new file mode 100644 index 00000000000..26dabde901c --- /dev/null +++ b/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto2"; + +// This file contains protocol buffers that are used for Online TooSlowLogs +// To be used as Ring Buffer payload +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated"; +option java_outer_classname = "TooSlowLog"; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +message SlowLogPayload { + required int64 start_time = 1; + required int32 processing_time = 2; + required int32 queue_time = 3; + required int64 response_size = 4; + required string client_address = 5; + required string server_class = 6; + required string method_name = 7; + required string call_details = 8; + optional string param = 9; + required string user_name = 10; + optional string region_name = 11; + optional int32 multi_gets = 12 [default = 0]; + optional int32 multi_mutations = 13 [default = 0]; + optional int32 multi_service_calls = 14 [default = 0]; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index f0409fd31f6..97b4990fbf0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -33,6 +33,8 @@ import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.LongAdder; + +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CellScanner; @@ -44,6 +46,8 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.slowlog.RpcLogDetails; +import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder; import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; import org.apache.hadoop.hbase.security.User; @@ -85,6 +89,10 @@ public abstract class RpcServer implements RpcServerInterface, protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION = new CallQueueTooBigException(); + private static final String MULTI_GETS = "multi.gets"; + private static final String MULTI_MUTATIONS = "multi.mutations"; + private static final String MULTI_SERVICE_CALLS = "multi.service_calls"; + private final boolean authorize; protected boolean isSecurityEnabled; @@ -215,6 +223,12 @@ public abstract class RpcServer implements RpcServerInterface, */ private RSRpcServices rsRpcServices; + + /** + * Use to add online slowlog responses + */ + private SlowLogRecorder slowLogRecorder; + @FunctionalInterface protected interface CallCleanup { void run(); @@ -403,13 +417,21 @@ public abstract class RpcServer implements RpcServerInterface, boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1); boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1); if (tooSlow || tooLarge) { + final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY); // when tagging, we let TooLarge trump TooSmall to keep output simple // note that large responses will often also be slow. logResponse(param, - md.getName(), md.getName() + "(" + param.getClass().getName() + ")", - (tooLarge ? "TooLarge" : "TooSlow"), - status.getClient(), startTime, processingTime, qTime, - responseSize); + md.getName(), md.getName() + "(" + param.getClass().getName() + ")", + tooLarge, tooSlow, + status.getClient(), startTime, processingTime, qTime, + responseSize, userName); + if (tooSlow && this.slowLogRecorder != null) { + // send logs to ring buffer owned by slowLogRecorder + final String className = server == null ? StringUtils.EMPTY : + server.getClass().getSimpleName(); + this.slowLogRecorder.addSlowLogPayload( + new RpcLogDetails(call, status.getClient(), responseSize, className)); + } } return new Pair<>(result, controller.cellScanner()); } catch (Throwable e) { @@ -440,17 +462,21 @@ public abstract class RpcServer implements RpcServerInterface, * @param param The parameters received in the call. * @param methodName The name of the method invoked * @param call The string representation of the call - * @param tag The tag that will be used to indicate this event in the log. - * @param clientAddress The address of the client who made this call. - * @param startTime The time that the call was initiated, in ms. - * @param processingTime The duration that the call took to run, in ms. - * @param qTime The duration that the call spent on the queue - * prior to being initiated, in ms. - * @param responseSize The size in bytes of the response buffer. + * @param tooLarge To indicate if the event is tooLarge + * @param tooSlow To indicate if the event is tooSlow + * @param clientAddress The address of the client who made this call. + * @param startTime The time that the call was initiated, in ms. + * @param processingTime The duration that the call took to run, in ms. + * @param qTime The duration that the call spent on the queue + * prior to being initiated, in ms. + * @param responseSize The size in bytes of the response buffer. + * @param userName UserName of the current RPC Call */ - void logResponse(Message param, String methodName, String call, String tag, - String clientAddress, long startTime, int processingTime, int qTime, - long responseSize) throws IOException { + void logResponse(Message param, String methodName, String call, boolean tooLarge, + boolean tooSlow, String clientAddress, long startTime, int processingTime, int qTime, + long responseSize, String userName) { + final String className = server == null ? StringUtils.EMPTY : + server.getClass().getSimpleName(); // base information that is reported regardless of type of call Map responseInfo = new HashMap<>(); responseInfo.put("starttimems", startTime); @@ -458,7 +484,7 @@ public abstract class RpcServer implements RpcServerInterface, responseInfo.put("queuetimems", qTime); responseInfo.put("responsesize", responseSize); responseInfo.put("client", clientAddress); - responseInfo.put("class", server == null? "": server.getClass().getSimpleName()); + responseInfo.put("class", className); responseInfo.put("method", methodName); responseInfo.put("call", call); // The params could be really big, make sure they don't kill us at WARN @@ -496,13 +522,16 @@ public abstract class RpcServer implements RpcServerInterface, } } } - responseInfo.put("multi.gets", numGets); - responseInfo.put("multi.mutations", numMutations); - responseInfo.put("multi.servicecalls", numServiceCalls); + responseInfo.put(MULTI_GETS, numGets); + responseInfo.put(MULTI_MUTATIONS, numMutations); + responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls); } + final String tag = (tooLarge && tooSlow) ? "TooLarge & TooSlow" + : (tooSlow ? "TooSlow" : "TooLarge"); LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo)); } + /** * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length * if TRACE is on else to 150 chars Refer to Jira HBASE-20826 and HBASE-20942 @@ -758,4 +787,14 @@ public abstract class RpcServer implements RpcServerInterface, public void setRsRpcServices(RSRpcServices rsRpcServices) { this.rsRpcServices = rsRpcServices; } + + @Override + public void setSlowLogRecorder(SlowLogRecorder slowLogRecorder) { + this.slowLogRecorder = slowLogRecorder; + } + + @Override + public SlowLogRecorder getSlowLogRecorder() { + return slowLogRecorder; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index 0f875d8c2d3..31556796697 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -22,13 +22,14 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; import java.net.InetSocketAddress; -import org.apache.hadoop.hbase.io.ByteBuffAllocator; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; @@ -96,4 +97,16 @@ public interface RpcServerInterface { ByteBuffAllocator getByteBuffAllocator(); void setRsRpcServices(RSRpcServices rsRpcServices); + + /** + * Set Online SlowLog Provider + * + * @param slowLogRecorder instance of {@link SlowLogRecorder} + */ + void setSlowLogRecorder(final SlowLogRecorder slowLogRecorder); + + /** + * @return Retrieve instance of {@link SlowLogRecorder} maintained by RpcServer + */ + SlowLogRecorder getSlowLogRecorder(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index df81413a1a9..75122c45ab5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -138,6 +138,7 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; +import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; @@ -527,6 +528,11 @@ public class HRegionServer extends HasThread implements private final NettyEventLoopGroupConfig eventLoopGroupConfig; + /** + * Provide online slow log responses from ringbuffer + */ + private SlowLogRecorder slowLogRecorder; + /** * True if this RegionServer is coming up in a cluster where there is no Master; * means it needs to just come up and make do without a Master to talk to: e.g. in test or @@ -586,6 +592,9 @@ public class HRegionServer extends HasThread implements this.abortRequested = false; this.stopped = false; + if (!(this instanceof HMaster)) { + this.slowLogRecorder = new SlowLogRecorder(this.conf); + } rpcServices = createRpcServices(); useThisHostnameInstead = getUseThisHostnameInstead(conf); String hostName = @@ -1494,6 +1503,15 @@ public class HRegionServer extends HasThread implements } /** + * get Online SlowLog Provider to add slow logs to ringbuffer + * + * @return Online SlowLog Provider + */ + public SlowLogRecorder getSlowLogRecorder() { + return this.slowLogRecorder; + } + + /* * Run init. Sets up wal and starts up all server threads. * * @param c Extra configuration. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index b90c6fc5ab1..ed4ead0e589 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; +import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -104,6 +105,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.log.HBaseMarkers; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterRpcServices; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; @@ -125,6 +127,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; +import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.AccessChecker; @@ -167,6 +170,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompac import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; @@ -196,6 +201,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWA import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; @@ -243,6 +250,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; @@ -1245,6 +1253,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG); rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name); rpcServer.setRsRpcServices(this); + if (!(rs instanceof HMaster)) { + rpcServer.setSlowLogRecorder(rs.getSlowLogRecorder()); + } scannerLeaseTimeoutPeriod = conf.getInt( HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); @@ -3730,6 +3741,36 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + @Override + @QosPriority(priority = HConstants.ADMIN_QOS) + public SlowLogResponses getSlowLogResponses(final RpcController controller, + final SlowLogResponseRequest request) { + final SlowLogRecorder slowLogRecorder = + this.regionServer.getSlowLogRecorder(); + final List slowLogPayloads; + slowLogPayloads = slowLogRecorder != null + ? slowLogRecorder.getSlowLogPayloads(request) + : Collections.emptyList(); + SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder() + .addAllSlowLogPayloads(slowLogPayloads) + .build(); + return slowLogResponses; + } + + @Override + @QosPriority(priority = HConstants.ADMIN_QOS) + public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller, + final ClearSlowLogResponseRequest request) { + final SlowLogRecorder slowLogRecorder = + this.regionServer.getSlowLogRecorder(); + boolean slowLogsCleaned = Optional.ofNullable(slowLogRecorder) + .map(SlowLogRecorder::clearSlowLogPayloads).orElse(false); + ClearSlowLogResponses clearSlowLogResponses = ClearSlowLogResponses.newBuilder() + .setIsCleaned(slowLogsCleaned) + .build(); + return clearSlowLogResponses; + } + @VisibleForTesting public RpcScheduler getRpcScheduler() { return rpcServer.getScheduler(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java new file mode 100644 index 00000000000..53a2ef131f4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.slowlog; + +import com.lmax.disruptor.ExceptionHandler; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Exception Handler for Online Slow Log Ring Buffer + */ +@InterfaceAudience.Private +class DisruptorExceptionHandler implements ExceptionHandler { + + private static final Logger LOG = LoggerFactory.getLogger(DisruptorExceptionHandler.class); + + @Override + public void handleEventException(Throwable e, long sequence, RingBufferEnvelope event) { + LOG.error("Sequence={}, event={}", sequence, event, e); + } + + @Override + public void handleOnStartException(Throwable e) { + LOG.error("Disruptor onStartException: ", e); + } + + @Override + public void handleOnShutdownException(Throwable e) { + LOG.error("Disruptor onShutdownException: ", e); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java new file mode 100644 index 00000000000..d308670f059 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.slowlog; + +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.yetus.audience.InterfaceAudience; + + +/** + * An envelope to carry payload in the slow log ring buffer that serves as online buffer + * to provide latest TooSlowLog + */ +@InterfaceAudience.Private +final class RingBufferEnvelope { + + private RpcLogDetails rpcLogDetails; + + /** + * Load the Envelope with {@link RpcCall} + * + * @param rpcLogDetails all details of rpc call that would be useful for ring buffer + * consumers + */ + public void load(RpcLogDetails rpcLogDetails) { + this.rpcLogDetails = rpcLogDetails; + } + + /** + * Retrieve current rpcCall details {@link RpcLogDetails} available on Envelope and + * free up the Envelope + * + * @return Retrieve rpc log details + */ + public RpcLogDetails getPayload() { + final RpcLogDetails rpcLogDetails = this.rpcLogDetails; + this.rpcLogDetails = null; + return rpcLogDetails; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java new file mode 100644 index 00000000000..e7ab7d499b3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.slowlog; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * RpcCall details that would be passed on to ring buffer of slow log responses + */ +@InterfaceAudience.Private +public class RpcLogDetails { + + private RpcCall rpcCall; + private String clientAddress; + private long responseSize; + private String className; + + public RpcLogDetails(RpcCall rpcCall, String clientAddress, long responseSize, + String className) { + this.rpcCall = rpcCall; + this.clientAddress = clientAddress; + this.responseSize = responseSize; + this.className = className; + } + + public RpcCall getRpcCall() { + return rpcCall; + } + + public String getClientAddress() { + return clientAddress; + } + + public long getResponseSize() { + return responseSize; + } + + public String getClassName() { + return className; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("rpcCall", rpcCall) + .append("clientAddress", clientAddress) + .append("responseSize", responseSize) + .append("className", className) + .toString(); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogEventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogEventHandler.java new file mode 100644 index 00000000000..24e8460e2b4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogEventHandler.java @@ -0,0 +1,208 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.slowlog; + +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.RingBuffer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hbase.client.SlowLogParams; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; +import org.apache.hbase.thirdparty.com.google.common.collect.Queues; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; + +/** + * Event Handler run by disruptor ringbuffer consumer + */ +@InterfaceAudience.Private +class SlowLogEventHandler implements EventHandler { + + private static final Logger LOG = LoggerFactory.getLogger(SlowLogEventHandler.class); + + private final Queue queue; + + SlowLogEventHandler(int eventCount) { + EvictingQueue evictingQueue = EvictingQueue.create(eventCount); + queue = Queues.synchronizedQueue(evictingQueue); + } + + /** + * Called when a publisher has published an event to the {@link RingBuffer} + * + * @param event published to the {@link RingBuffer} + * @param sequence of the event being processed + * @param endOfBatch flag to indicate if this is the last event in a batch from + * the {@link RingBuffer} + * @throws Exception if the EventHandler would like the exception handled further up the chain + */ + @Override + public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) + throws Exception { + final RpcLogDetails rpcCallDetails = event.getPayload(); + final RpcCall rpcCall = rpcCallDetails.getRpcCall(); + final String clientAddress = rpcCallDetails.getClientAddress(); + final long responseSize = rpcCallDetails.getResponseSize(); + final String className = rpcCallDetails.getClassName(); + Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod(); + Message param = rpcCall.getParam(); + long receiveTime = rpcCall.getReceiveTime(); + long startTime = rpcCall.getStartTime(); + long endTime = System.currentTimeMillis(); + int processingTime = (int) (endTime - startTime); + int qTime = (int) (startTime - receiveTime); + final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param); + int numGets = 0; + int numMutations = 0; + int numServiceCalls = 0; + if (param instanceof ClientProtos.MultiRequest) { + ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param; + for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) { + for (ClientProtos.Action action : regionAction.getActionList()) { + if (action.hasMutation()) { + numMutations++; + } + if (action.hasGet()) { + numGets++; + } + if (action.hasServiceCall()) { + numServiceCalls++; + } + } + } + } + final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY); + final String methodDescriptorName = + methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY; + SlowLogPayload slowLogPayload = SlowLogPayload.newBuilder() + .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")") + .setClientAddress(clientAddress) + .setMethodName(methodDescriptorName) + .setMultiGets(numGets) + .setMultiMutations(numMutations) + .setMultiServiceCalls(numServiceCalls) + .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY) + .setProcessingTime(processingTime) + .setQueueTime(qTime) + .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY) + .setResponseSize(responseSize) + .setServerClass(className) + .setStartTime(startTime) + .setUserName(userName) + .build(); + queue.add(slowLogPayload); + } + + /** + * Cleans up slow log payloads + * + * @return true if slow log payloads are cleaned up, false otherwise + */ + boolean clearSlowLogs() { + if (LOG.isDebugEnabled()) { + LOG.debug("Received request to clean up online slowlog buffer.."); + } + queue.clear(); + return true; + } + + /** + * Retrieve list of slow log payloads + * + * @param request slow log request parameters + * @return list of slow log payloads + */ + List getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) { + List slowLogPayloadList = + Arrays.stream(queue.toArray(new SlowLogPayload[0])).collect(Collectors.toList()); + + // latest slow logs first, operator is interested in latest records from in-memory buffer + Collections.reverse(slowLogPayloadList); + + if (isFilterProvided(request)) { + slowLogPayloadList = filterSlowLogs(request, slowLogPayloadList); + } + int limit = request.getLimit() >= slowLogPayloadList.size() ? slowLogPayloadList.size() + : request.getLimit(); + return slowLogPayloadList.subList(0, limit); + } + + private boolean isFilterProvided(AdminProtos.SlowLogResponseRequest request) { + if (StringUtils.isNotEmpty(request.getUserName())) { + return true; + } + if (StringUtils.isNotEmpty(request.getTableName())) { + return true; + } + if (StringUtils.isNotEmpty(request.getClientAddress())) { + return true; + } + return StringUtils.isNotEmpty(request.getRegionName()); + } + + private List filterSlowLogs(AdminProtos.SlowLogResponseRequest request, + List slowLogPayloadList) { + List filteredSlowLogPayloads = new ArrayList<>(); + for (SlowLogPayload slowLogPayload : slowLogPayloadList) { + if (StringUtils.isNotEmpty(request.getRegionName())) { + if (slowLogPayload.getRegionName().equals(request.getRegionName())) { + filteredSlowLogPayloads.add(slowLogPayload); + continue; + } + } + if (StringUtils.isNotEmpty(request.getTableName())) { + if (slowLogPayload.getRegionName().startsWith(request.getTableName())) { + filteredSlowLogPayloads.add(slowLogPayload); + continue; + } + } + if (StringUtils.isNotEmpty(request.getClientAddress())) { + if (slowLogPayload.getClientAddress().equals(request.getClientAddress())) { + filteredSlowLogPayloads.add(slowLogPayload); + continue; + } + } + if (StringUtils.isNotEmpty(request.getUserName())) { + if (slowLogPayload.getUserName().equals(request.getUserName())) { + filteredSlowLogPayloads.add(slowLogPayload); + } + } + } + return filteredSlowLogPayloads; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java new file mode 100644 index 00000000000..d750642d4a1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java @@ -0,0 +1,153 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.slowlog; + +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; + +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; + +/** + * Online SlowLog Provider Service that keeps slow RPC logs in the ring buffer. + * The service uses LMAX Disruptor to save slow records which are then consumed by + * a queue and based on the ring buffer size, the available records are then fetched + * from the queue in thread-safe manner. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class SlowLogRecorder { + + private final Disruptor disruptor; + private final SlowLogEventHandler slowLogEventHandler; + private final int eventCount; + private final boolean isOnlineSlowLogProviderEnabled; + + private static final String SLOW_LOG_RING_BUFFER_SIZE = + "hbase.regionserver.slowlog.ringbuffer.size"; + + /** + * Initialize disruptor with configurable ringbuffer size + */ + public SlowLogRecorder(Configuration conf) { + isOnlineSlowLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, + HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); + + if (!isOnlineSlowLogProviderEnabled) { + this.disruptor = null; + this.slowLogEventHandler = null; + this.eventCount = 0; + return; + } + + this.eventCount = conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, + HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE); + + // This is the 'writer' -- a single threaded executor. This single thread consumes what is + // put on the ringbuffer. + final String hostingThreadName = Thread.currentThread().getName(); + + // disruptor initialization with BlockingWaitStrategy + this.disruptor = new Disruptor<>(RingBufferEnvelope::new, + getEventCount(), + Threads.newDaemonThreadFactory(hostingThreadName + ".slowlog.append"), + ProducerType.MULTI, + new BlockingWaitStrategy()); + this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler()); + + // initialize ringbuffer event handler + this.slowLogEventHandler = new SlowLogEventHandler(this.eventCount); + this.disruptor.handleEventsWith(new SlowLogEventHandler[]{this.slowLogEventHandler}); + this.disruptor.start(); + } + + // must be power of 2 for disruptor ringbuffer + private int getEventCount() { + Preconditions.checkArgument(eventCount >= 0, + SLOW_LOG_RING_BUFFER_SIZE + " must be > 0"); + int floor = Integer.highestOneBit(eventCount); + if (floor == eventCount) { + return floor; + } + // max capacity is 1 << 30 + if (floor >= 1 << 29) { + return 1 << 30; + } + return floor << 1; + } + + /** + * Retrieve online slow logs from ringbuffer + * + * @param request slow log request parameters + * @return online slow logs from ringbuffer + */ + public List getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) { + return isOnlineSlowLogProviderEnabled ? this.slowLogEventHandler.getSlowLogPayloads(request) + : Collections.emptyList(); + } + + /** + * clears slow log payloads from ringbuffer + * + * @return true if slow log payloads are cleaned up or + * hbase.regionserver.slowlog.buffer.enabled is not set to true, false if failed to + * clean up slow logs + */ + public boolean clearSlowLogPayloads() { + if (!isOnlineSlowLogProviderEnabled) { + return true; + } + return this.slowLogEventHandler.clearSlowLogs(); + } + + /** + * Add slow log rpcCall details to ringbuffer + * + * @param rpcLogDetails all details of rpc call that would be useful for ring buffer + * consumers + */ + public void addSlowLogPayload(RpcLogDetails rpcLogDetails) { + if (!isOnlineSlowLogProviderEnabled) { + return; + } + RingBuffer ringBuffer = this.disruptor.getRingBuffer(); + long seqId = ringBuffer.next(); + try { + ringBuffer.get(seqId).load(rpcLogDetails); + } finally { + ringBuffer.publish(seqId); + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java index 5beef533c7f..89329c5fa3f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java @@ -24,8 +24,10 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; @@ -868,4 +870,29 @@ public class TestAdmin2 extends TestAdminBase { assertEquals(initialState, ADMIN.isSnapshotCleanupEnabled()); } + @Test + public void testSlowLogResponses() throws Exception { + // get all live server names + Collection serverNames = ADMIN.getRegionServers(); + List serverNameList = new ArrayList<>(serverNames); + + // clean up slowlog responses maintained in memory by RegionServers + List areSlowLogsCleared = ADMIN.clearSlowLogResponses(new HashSet<>(serverNameList)); + + int countFailedClearSlowResponse = 0; + for (Boolean isSlowLogCleared : areSlowLogsCleared) { + if (!isSlowLogCleared) { + ++countFailedClearSlowResponse; + } + } + Assert.assertEquals(countFailedClearSlowResponse, 0); + + SlowLogQueryFilter slowLogQueryFilter = new SlowLogQueryFilter(); + List slowLogRecords = ADMIN.getSlowLogResponses(new HashSet<>(serverNames), + slowLogQueryFilter); + + // after cleanup of slowlog responses, total count of slowlog payloads should be 0 + Assert.assertEquals(slowLogRecords.size(), 0); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdminBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdminBase.java index 373710131f2..c379775a602 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdminBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdminBase.java @@ -46,6 +46,7 @@ public class TestAdminBase { TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true); TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 30); TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 30); + TEST_UTIL.getConfiguration().setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); TEST_UTIL.startMiniCluster(3); ADMIN = TEST_UTIL.getAdmin(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 2797df34b81..f555f4ba7a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -34,6 +34,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -51,9 +52,11 @@ import org.junit.Assume; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; @@ -524,4 +527,5 @@ public abstract class AbstractTestIPC { rpcServer.stop(); } } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 604797a91df..dbd1d9d3cfd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -83,6 +83,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompac import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; @@ -109,6 +111,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWA import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; @@ -677,6 +681,18 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface, return null; } + @Override + public SlowLogResponses getSlowLogResponses(RpcController controller, + SlowLogResponseRequest request) throws ServiceException { + return null; + } + + @Override + public ClearSlowLogResponses clearSlowLogsResponses(RpcController controller, + ClearSlowLogResponseRequest request) throws ServiceException { + return null; + } + @Override public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots( RpcController controller, GetSpaceQuotaSnapshotsRequest request) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java new file mode 100644 index 00000000000..240230eedec --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java @@ -0,0 +1,593 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.slowlog; + +import java.io.IOException; +import java.net.InetAddress; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcCallback; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; + +/** + * Tests for Online SlowLog Provider Service + */ +@Category({MasterTests.class, MediumTests.class}) +public class TestSlowLogRecorder { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSlowLogRecorder.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class); + + private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility(); + + private SlowLogRecorder slowLogRecorder; + + private static int i = 0; + + private static Configuration applySlowLogRecorderConf(int eventSize) { + + Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); + conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); + conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize); + return conf; + + } + + /** + * confirm that for a ringbuffer of slow logs, payload on given index of buffer + * has expected elements + * + * @param i index of ringbuffer logs + * @param j data value that was put on index i + * @param slowLogPayloads list of payload retrieved from {@link SlowLogRecorder} + */ + private void confirmPayloadParams(int i, int j, List slowLogPayloads) { + + Assert.assertEquals(slowLogPayloads.get(i).getClientAddress(), "client_" + j); + Assert.assertEquals(slowLogPayloads.get(i).getUserName(), "userName_" + j); + Assert.assertEquals(slowLogPayloads.get(i).getServerClass(), "class_" + j); + } + + @Test + public void testOnlieSlowLogConsumption() throws Exception { + + Configuration conf = applySlowLogRecorderConf(8); + slowLogRecorder = new SlowLogRecorder(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); + + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + + int i = 0; + + // add 5 records initially + for (; i < 5; i++) { + RpcLogDetails rpcLogDetails = + getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); + } + + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, + () -> slowLogRecorder.getSlowLogPayloads(request).size() == 5)); + List slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); + confirmPayloadParams(0, 5, slowLogPayloads); + confirmPayloadParams(1, 4, slowLogPayloads); + confirmPayloadParams(2, 3, slowLogPayloads); + confirmPayloadParams(3, 2, slowLogPayloads); + confirmPayloadParams(4, 1, slowLogPayloads); + + // add 2 more records + for (; i < 7; i++) { + RpcLogDetails rpcLogDetails = + getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); + } + + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, + () -> slowLogRecorder.getSlowLogPayloads(request).size() == 7)); + + slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); + + Assert.assertEquals(slowLogPayloads.size(), 7); + confirmPayloadParams(0, 7, slowLogPayloads); + confirmPayloadParams(5, 2, slowLogPayloads); + confirmPayloadParams(6, 1, slowLogPayloads); + + // add 3 more records + for (; i < 10; i++) { + RpcLogDetails rpcLogDetails = + getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); + } + + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, + () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8)); + + slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); + // confirm ringbuffer is full + Assert.assertEquals(slowLogPayloads.size(), 8); + confirmPayloadParams(7, 3, slowLogPayloads); + confirmPayloadParams(0, 10, slowLogPayloads); + confirmPayloadParams(1, 9, slowLogPayloads); + + // add 4 more records + for (; i < 14; i++) { + RpcLogDetails rpcLogDetails = + getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); + } + + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, + () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8)); + + slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); + // confirm ringbuffer is full + Assert.assertEquals(slowLogPayloads.size(), 8); + confirmPayloadParams(0, 14, slowLogPayloads); + confirmPayloadParams(1, 13, slowLogPayloads); + confirmPayloadParams(2, 12, slowLogPayloads); + confirmPayloadParams(3, 11, slowLogPayloads); + + boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads(); + Assert.assertTrue(isRingBufferCleaned); + + LOG.debug("cleared the ringbuffer of Online Slow Log records"); + + slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); + // confirm ringbuffer is empty + Assert.assertEquals(slowLogPayloads.size(), 0); + + } + + @Test + public void testOnlineSlowLogWithHighRecords() throws Exception { + + Configuration conf = applySlowLogRecorderConf(14); + slowLogRecorder = new SlowLogRecorder(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build(); + + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + + for (int i = 0; i < 14 * 11; i++) { + RpcLogDetails rpcLogDetails = + getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); + } + LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records"); + + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, + () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14)); + + List slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); + Assert.assertEquals(slowLogPayloads.size(), 14); + + // confirm strict order of slow log payloads + confirmPayloadParams(0, 154, slowLogPayloads); + confirmPayloadParams(1, 153, slowLogPayloads); + confirmPayloadParams(2, 152, slowLogPayloads); + confirmPayloadParams(3, 151, slowLogPayloads); + confirmPayloadParams(4, 150, slowLogPayloads); + confirmPayloadParams(5, 149, slowLogPayloads); + confirmPayloadParams(6, 148, slowLogPayloads); + confirmPayloadParams(7, 147, slowLogPayloads); + confirmPayloadParams(8, 146, slowLogPayloads); + confirmPayloadParams(9, 145, slowLogPayloads); + confirmPayloadParams(10, 144, slowLogPayloads); + confirmPayloadParams(11, 143, slowLogPayloads); + confirmPayloadParams(12, 142, slowLogPayloads); + confirmPayloadParams(13, 141, slowLogPayloads); + + boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads(); + Assert.assertTrue(isRingBufferCleaned); + LOG.debug("cleared the ringbuffer of Online Slow Log records"); + slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); + + // confirm ringbuffer is empty + Assert.assertEquals(slowLogPayloads.size(), 0); + + } + + @Test + public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception { + + Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); + conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY); + + slowLogRecorder = new SlowLogRecorder(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().build(); + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + + for (int i = 0; i < 300; i++) { + RpcLogDetails rpcLogDetails = + getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); + } + + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + + List slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); + Assert.assertEquals(slowLogPayloads.size(), 0); + + } + + @Test + public void testOnlineSlowLogWithDisableConfig() throws Exception { + + Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); + conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false); + + slowLogRecorder = new SlowLogRecorder(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().build(); + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + + for (int i = 0; i < 300; i++) { + RpcLogDetails rpcLogDetails = + getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); + } + + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + + List slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request); + Assert.assertEquals(slowLogPayloads.size(), 0); + conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); + + } + + @Test + public void testSlowLogFilters() throws Exception { + + Configuration conf = applySlowLogRecorderConf(30); + slowLogRecorder = new SlowLogRecorder(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder() + .setLimit(15) + .setUserName("userName_87") + .build(); + + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + + for (int i = 0; i < 100; i++) { + RpcLogDetails rpcLogDetails = + getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); + } + LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter"); + + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, + () -> slowLogRecorder.getSlowLogPayloads(request).size() == 1)); + + AdminProtos.SlowLogResponseRequest requestClient = + AdminProtos.SlowLogResponseRequest.newBuilder() + .setLimit(15) + .setClientAddress("client_85") + .build(); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, + () -> slowLogRecorder.getSlowLogPayloads(requestClient).size() == 1)); + + AdminProtos.SlowLogResponseRequest requestSlowLog = + AdminProtos.SlowLogResponseRequest.newBuilder() + .setLimit(15) + .build(); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, + () -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15)); + + } + + @Test + public void testConcurrentSlowLogEvents() throws Exception { + + Configuration conf = applySlowLogRecorderConf(50000); + slowLogRecorder = new SlowLogRecorder(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build(); + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + + for (int j = 0; j < 1000; j++) { + + CompletableFuture.runAsync(() -> { + for (int i = 0; i < 3500; i++) { + RpcLogDetails rpcLogDetails = + getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); + } + }); + + } + + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + + slowLogRecorder.clearSlowLogPayloads(); + + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor( + 4000, () -> slowLogRecorder.getSlowLogPayloads(request).size() > 10000)); + } + + private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) { + return new RpcLogDetails(getRpcCall(userName), clientAddress, 0, className); + } + + private RpcCall getRpcCall(String userName) { + RpcCall rpcCall = new RpcCall() { + @Override + public BlockingService getService() { + return null; + } + + @Override + public Descriptors.MethodDescriptor getMethod() { + return null; + } + + @Override + public Message getParam() { + return getMessage(); + } + + @Override + public CellScanner getCellScanner() { + return null; + } + + @Override + public long getReceiveTime() { + return 0; + } + + @Override + public long getStartTime() { + return 0; + } + + @Override + public void setStartTime(long startTime) { + + } + + @Override + public int getTimeout() { + return 0; + } + + @Override + public int getPriority() { + return 0; + } + + @Override + public long getDeadline() { + return 0; + } + + @Override + public long getSize() { + return 0; + } + + @Override + public RPCProtos.RequestHeader getHeader() { + return null; + } + + @Override + public int getRemotePort() { + return 0; + } + + @Override + public void setResponse(Message param, CellScanner cells, + Throwable errorThrowable, String error) { + } + + @Override + public void sendResponseIfReady() throws IOException { + } + + @Override + public void cleanup() { + } + + @Override + public String toShortString() { + return null; + } + + @Override + public long disconnectSince() { + return 0; + } + + @Override + public boolean isClientCellBlockSupported() { + return false; + } + + @Override + public Optional getRequestUser() { + return getUser(userName); + } + + @Override + public InetAddress getRemoteAddress() { + return null; + } + + @Override + public HBaseProtos.VersionInfo getClientVersionInfo() { + return null; + } + + @Override + public void setCallBack(RpcCallback callback) { + } + + @Override + public boolean isRetryImmediatelySupported() { + return false; + } + + @Override + public long getResponseCellSize() { + return 0; + } + + @Override + public void incrementResponseCellSize(long cellSize) { + } + + @Override + public long getResponseBlockSize() { + return 0; + } + + @Override + public void incrementResponseBlockSize(long blockSize) { + } + + @Override + public long getResponseExceptionSize() { + return 0; + } + + @Override + public void incrementResponseExceptionSize(long exceptionSize) { + } + }; + return rpcCall; + } + + private Message getMessage() { + + i = (i + 1) % 3; + + Message message = null; + + switch (i) { + + case 0: { + message = ClientProtos.ScanRequest.newBuilder() + .setRegion(HBaseProtos.RegionSpecifier.newBuilder() + .setValue(ByteString.copyFromUtf8("region1")) + .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME) + .build()) + .build(); + break; + } + case 1: { + message = ClientProtos.MutateRequest.newBuilder() + .setRegion(HBaseProtos.RegionSpecifier.newBuilder() + .setValue(ByteString.copyFromUtf8("region2")) + .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)) + .setMutation(ClientProtos.MutationProto.newBuilder() + .setRow(ByteString.copyFromUtf8("row123")) + .build()) + .build(); + break; + } + case 2: { + message = ClientProtos.GetRequest.newBuilder() + .setRegion(HBaseProtos.RegionSpecifier.newBuilder() + .setValue(ByteString.copyFromUtf8("region2")) + .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)) + .setGet(ClientProtos.Get.newBuilder() + .setRow(ByteString.copyFromUtf8("row123")) + .build()) + .build(); + break; + } + + } + + return message; + + } + + private Optional getUser(String userName) { + + return Optional.of(new User() { + + + @Override + public String getShortName() { + return userName; + } + + + @Override + public T runAs(PrivilegedAction action) { + return null; + } + + + @Override + public T runAs(PrivilegedExceptionAction action) throws + IOException, InterruptedException { + return null; + } + + }); + + } + +} diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index f60838e3f12..c3b4a8efb80 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -1433,6 +1433,79 @@ module Hbase @admin.listDecommissionedRegionServers end + #---------------------------------------------------------------------------------------------- + # Retrieve SlowLog Responses from RegionServers + def get_slowlog_responses(server_names, args) + unless server_names.is_a?(Array) || server_names.is_a?(String) + raise(ArgumentError, + "#{server_names.class} of #{server_names.inspect} is not of Array/String type") + end + if server_names == '*' + server_names = getServerNames([], true) + else + server_names_list = to_server_names(server_names) + server_names = getServerNames(server_names_list, false) + end + filter_params = get_filter_params(args) + slow_log_responses = @admin.getSlowLogResponses(java.util.HashSet.new(server_names), + filter_params) + slow_log_responses_arr = [] + for slow_log_response in slow_log_responses + slow_log_responses_arr << slow_log_response.toJsonPrettyPrint + end + puts 'Retrieved SlowLog Responses from RegionServers' + puts slow_log_responses_arr + end + + def get_filter_params(args) + filter_params = org.apache.hadoop.hbase.client.SlowLogQueryFilter.new + if args.key? 'REGION_NAME' + region_name = args['REGION_NAME'] + filter_params.setRegionName(region_name) + end + if args.key? 'TABLE_NAME' + table_name = args['TABLE_NAME'] + filter_params.setTableName(table_name) + end + if args.key? 'CLIENT_IP' + client_ip = args['CLIENT_IP'] + filter_params.setClientAddress(client_ip) + end + if args.key? 'USER' + user = args['USER'] + filter_params.setUserName(user) + end + if args.key? 'LIMIT' + limit = args['LIMIT'] + filter_params.setLimit(limit) + end + filter_params + end + + #---------------------------------------------------------------------------------------------- + # Clears SlowLog Responses from RegionServers + def clear_slowlog_responses(server_names) + unless server_names.nil? || server_names.is_a?(Array) || server_names.is_a?(String) + raise(ArgumentError, + "#{server_names.class} of #{server_names.inspect} is not of correct type") + end + if server_names.nil? + server_names = getServerNames([], true) + else + server_names_list = to_server_names(server_names) + server_names = getServerNames(server_names_list, false) + end + clear_log_responses = @admin.clearSlowLogResponses(java.util.HashSet.new(server_names)) + clear_log_success_count = 0 + clear_log_responses.each do |response| + if response + clear_log_success_count += 1 + end + end + puts 'Cleared Slowlog responses from ' \ + "#{clear_log_success_count}/#{clear_log_responses.size} RegionServers" + end + #---------------------------------------------------------------------------------------------- # Decommission a list of region servers, optionally offload corresponding regions def decommission_regionservers(host_or_servers, should_offload) @@ -1507,6 +1580,17 @@ module Hbase def stop_regionserver(hostport) @admin.stopRegionServer(hostport) end + + #---------------------------------------------------------------------------------------------- + # Get list of server names + def to_server_names(server_names) + if server_names.is_a?(Array) + server_names + else + java.util.Arrays.asList(server_names) + end + end + end # rubocop:enable Metrics/ClassLength end diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index f09149fd3ca..6b441d65c10 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -333,10 +333,12 @@ Shell.load_command_group( normalizer_switch normalizer_enabled is_in_maintenance_mode + clear_slowlog_responses close_region compact compaction_switch flush + get_slowlog_responses major_compact move split diff --git a/hbase-shell/src/main/ruby/shell/commands/clear_slowlog_responses.rb b/hbase-shell/src/main/ruby/shell/commands/clear_slowlog_responses.rb new file mode 100644 index 00000000000..ea96de3c26d --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/clear_slowlog_responses.rb @@ -0,0 +1,47 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with this +# work for additional information regarding copyright ownership. The ASF +# licenses this file to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# Clear slowlog responses maintained in memory by RegionServers + +module Shell + module Commands + # Clear slowlog responses + class ClearSlowlogResponses < Command + def help + <<-EOF +Clears SlowLog Responses maintained by each or specific RegionServers. +Specify array of server names for specific RS. A server name is +the host, port plus startcode of a RegionServer. +e.g.: host187.example.com,60020,1289493121758 (find servername in +master ui or when you do detailed status in shell) + +Examples: + + hbase> clear_slowlog_responses => clears slowlog responses from all RS + hbase> clear_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2'] => clears slowlog responses from SERVER_NAME1, + SERVER_NAME2 + + + EOF + end + + def command(server_names = nil) + admin.clear_slowlog_responses(server_names) + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb b/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb new file mode 100644 index 00000000000..55759ca2840 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb @@ -0,0 +1,78 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with this +# work for additional information regarding copyright ownership. The ASF +# licenses this file to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# Retrieve latest slowlog responses maintained in memory by RegionServers + +module Shell + module Commands + # Retrieve latest slowlog responses + class GetSlowlogResponses < Command + def help + <<-EOF +Retrieve latest SlowLog Responses maintained by each or specific RegionServers. +Specify '*' to include all RS otherwise array of server names for specific +RS. A server name is the host, port plus startcode of a RegionServer. +e.g.: host187.example.com,60020,1289493121758 (find servername in +master ui or when you do detailed status in shell) + +Provide optional filter parameters as Hash. +Default Limit of each server for providing no of slow log records is 10. User can specify +more limit by 'LIMIT' param in case more than 10 records should be retrieved. + +Examples: + + hbase> get_slowlog_responses '*' => get slowlog responses from all RS + hbase> get_slowlog_responses '*', {'LIMIT' => 50} => get slowlog responses from all RS + with 50 records limit (default limit: 10) + hbase> get_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2'] => get slowlog responses from SERVER_NAME1, + SERVER_NAME2 + hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1'} + => get slowlog responses only related to meta + region + hbase> get_slowlog_responses '*', {'TABLE_NAME' => 't1'} => get slowlog responses only related to t1 table + hbase> get_slowlog_responses '*', {'CLIENT_IP' => '192.162.1.40:60225', 'LIMIT' => 100} + => get slowlog responses with given client + IP address and get 100 records limit + (default limit: 10) + hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1', 'TABLE_NAME' => 't1'} + => get slowlog responses with given region name + or table name + hbase> get_slowlog_responses '*', {'USER' => 'user_name', 'CLIENT_IP' => '192.162.1.40:60225'} + => get slowlog responses that match either + provided client IP address or user name + +Sometimes output can be long pretty printed json for user to scroll in +a single screen and hence user might prefer +redirecting output of get_slowlog_responses to a file. + +Example: + +echo "get_slowlog_responses '*'" | hbase shell > xyz.out 2>&1 + + EOF + end + + def command(server_names, args = {}) + unless args.is_a? Hash + raise 'Filter parameters are not Hash' + end + + admin.get_slowlog_responses(server_names, args) + end + end + end +end diff --git a/hbase-shell/src/test/ruby/hbase/admin_test.rb b/hbase-shell/src/test/ruby/hbase/admin_test.rb index 7d326453915..3264579bf9c 100644 --- a/hbase-shell/src/test/ruby/hbase/admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/admin_test.rb @@ -201,6 +201,20 @@ module Hbase #------------------------------------------------------------------------------- + define_test 'get slowlog responses should work' do + output = capture_stdout { command(:get_slowlog_responses, '*', {}) } + assert(output.include?('Retrieved SlowLog Responses from RegionServers')) + end + + #------------------------------------------------------------------------------- + + define_test 'clear slowlog responses should work' do + output = capture_stdout { command(:clear_slowlog_responses, nil) } + assert(output.include?('Cleared Slowlog responses from 1/1 RegionServers')) + end + + #------------------------------------------------------------------------------- + define_test "create should fail with non-string/non-hash column args" do assert_raise(ArgumentError) do command(:create, @create_test_name, 123) diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index 600c35d4a49..5b56e628f98 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.client.CompactType; import org.apache.hadoop.hbase.client.CompactionState; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.SlowLogQueryFilter; +import org.apache.hadoop.hbase.client.SlowLogRecord; import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.SnapshotType; import org.apache.hadoop.hbase.client.TableDescriptor; @@ -1392,6 +1394,17 @@ public class ThriftAdmin implements Admin { throw new NotImplementedException("isSnapshotCleanupEnabled not supported in ThriftAdmin"); } + @Override + public List getSlowLogResponses(final Set serverNames, + final SlowLogQueryFilter slowLogQueryFilter) { + throw new NotImplementedException("getSlowLogResponses not supported in ThriftAdmin"); + } + + @Override + public List clearSlowLogResponses(final Set serverNames) { + throw new NotImplementedException("clearSlowLogsResponses not supported in ThriftAdmin"); + } + @Override public Future splitRegionAsync(byte[] regionName) throws IOException { return splitRegionAsync(regionName, null); diff --git a/src/main/asciidoc/_chapters/hbase-default.adoc b/src/main/asciidoc/_chapters/hbase-default.adoc index ebed2f2f5af..b0cd4bff4c2 100644 --- a/src/main/asciidoc/_chapters/hbase-default.adoc +++ b/src/main/asciidoc/_chapters/hbase-default.adoc @@ -2072,6 +2072,44 @@ A comma-separated list of `-1` +[[hbase.regionserver.slowlog.ringbuffer.size]] +*`hbase.regionserver.slowlog.ringbuffer.size`*:: ++ +.Description + + Default size of ringbuffer to be maintained by each RegionServer in order + to store online slowlog responses. This is an in-memory ring buffer of + requests that were judged to be too slow in addition to the responseTooSlow + logging. The in-memory representation would be complete. + For more details, please look into Doc Section: + <> + + ++ +.Default +`256` + + + +[[hbase.regionserver.slowlog.buffer.enabled]] +*`hbase.regionserver.slowlog.buffer.enabled`*:: ++ +.Description + + Indicates whether RegionServers have ring buffer running for storing + Online Slow logs in FIFO manner with limited entries. The size of + the ring buffer is indicated by config: hbase.regionserver.slowlog.ringbuffer.size + The default value is false, turn this on and get latest slowlog + responses with complete data. + For more details, please look into Doc Section: + <> + + ++ +.Default +`false` + + [[hbase.region.replica.replication.enabled]] *`hbase.region.replica.replication.enabled`*:: diff --git a/src/main/asciidoc/_chapters/ops_mgt.adoc b/src/main/asciidoc/_chapters/ops_mgt.adoc index d8afcd549cf..f8f9c2a4614 100644 --- a/src/main/asciidoc/_chapters/ops_mgt.adoc +++ b/src/main/asciidoc/_chapters/ops_mgt.adoc @@ -1813,6 +1813,120 @@ In the case that the call is not a client operation, that detailed fingerprint i This particular example, for example, would indicate that the likely cause of slowness is simply a very large (on the order of 100MB) multiput, as we can tell by the "vlen," or value length, fields of each put in the multiPut. +[[slow_log_responses]] +==== Get Slow Response Log from shell +When an individual RPC exceeds a configurable time bound we log a complaint +by way of the logging subsystem + +e.g. + +---- +2019-10-02 10:10:22,195 WARN [,queue=15,port=60020] ipc.RpcServer - (responseTooSlow): +{"call":"Scan(org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ScanRequest)", +"starttimems":1567203007549, +"responsesize":6819737, +"method":"Scan", +"param":"region { type: REGION_NAME value: \"t1,\\000\\000\\215\\f)o\\\\\\024\\302\\220\\000\\000\\000\\000\\000\\001\\000\\000\\000\\000\\000\\006\\000\\000\\000\\000\\000\\005\\000\\000", +"processingtimems":28646, +"client":"10.253.196.215:41116", +"queuetimems":22453, +"class":"HRegionServer"} +---- + +Unfortunately often the request parameters are truncated as per above Example. +The truncation is unfortunate because it eliminates much of the utility of +the warnings. For example, the region name, the start and end keys, and the +filter hierarchy are all important clues for debugging performance problems +caused by moderate to low selectivity queries or queries made at a high rate. + +HBASE-22978 introduces maintaining an in-memory ring buffer of requests that were judged to +be too slow in addition to the responseTooSlow logging. The in-memory representation can be +complete. There is some chance a high rate of requests will cause information on other +interesting requests to be overwritten before it can be read. This is an acceptable trade off. + +In order to enable the in-memory ring buffer at RegionServers, we need to enable +config: +---- +hbase.regionserver.slowlog.buffer.enabled +---- + +One more config determines the size of the ring buffer: +---- +hbase.regionserver.slowlog.ringbuffer.size +---- + +Check the config section for the detailed description. + +This config would be disabled by default. Turn it on and these shell commands +would provide expected results from the ring-buffers. + + +shell commands to retrieve slowlog responses from RegionServers: + +---- +Retrieve latest SlowLog Responses maintained by each or specific RegionServers. +Specify '*' to include all RS otherwise array of server names for specific +RS. A server name is the host, port plus startcode of a RegionServer. +e.g.: host187.example.com,60020,1289493121758 (find servername in +master ui or when you do detailed status in shell) + +Provide optional filter parameters as Hash. +Default Limit of each server for providing no of slow log records is 10. User can specify +more limit by 'LIMIT' param in case more than 10 records should be retrieved. + +Examples: + + hbase> get_slowlog_responses '*' => get slowlog responses from all RS + hbase> get_slowlog_responses '*', {'LIMIT' => 50} => get slowlog responses from all RS + with 50 records limit (default limit: 10) + hbase> get_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2'] => get slowlog responses from SERVER_NAME1, + SERVER_NAME2 + hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1'} + => get slowlog responses only related to meta + region + hbase> get_slowlog_responses '*', {'TABLE_NAME' => 't1'} => get slowlog responses only related to t1 table + hbase> get_slowlog_responses '*', {'CLIENT_IP' => '192.162.1.40:60225', 'LIMIT' => 100} + => get slowlog responses with given client + IP address and get 100 records limit + (default limit: 10) + hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1', 'TABLE_NAME' => 't1'} + => get slowlog responses with given region name + or table name + hbase> get_slowlog_responses '*', {'USER' => 'user_name', 'CLIENT_IP' => '192.162.1.40:60225'} + => get slowlog responses that match either + provided client IP address or user name + + +---- + +Sometimes output can be long pretty printed json for user to scroll in +a single screen and hence user might prefer +redirecting output of get_slowlog_responses to a file. + +Example: +---- +echo "get_slowlog_responses '*'" | hbase shell > xyz.out 2>&1 +---- + + +shell command to clear slowlog responses from RegionServer: + +---- +Clears SlowLog Responses maintained by each or specific RegionServers. +Specify array of server names for specific RS. A server name is +the host, port plus startcode of a RegionServer. +e.g.: host187.example.com,60020,1289493121758 (find servername in +master ui or when you do detailed status in shell) + +Examples: + + hbase> clear_slowlog_responses => clears slowlog responses from all RS + hbase> clear_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2'] => clears slowlog responses from SERVER_NAME1, + SERVER_NAME2 + + +---- + === Block Cache Monitoring Starting with HBase 0.98, the HBase Web UI includes the ability to monitor and report on the performance of the block cache.