HBASE-22978 : Online slow response log (#1228)

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
Viraj Jasani 2020-03-01 00:32:35 +05:30 committed by GitHub
parent 836e1a1caf
commit 6366b73134
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 2677 additions and 21 deletions

View File

@ -3134,4 +3134,29 @@ public interface Admin extends Abortable, Closeable {
*/
boolean isSnapshotCleanupEnabled() throws IOException;
/**
* Retrieves online slow RPC logs from the provided list of
* RegionServers
*
* @param serverNames Server names to get slowlog responses from
* @param slowLogQueryFilter filter to be used if provided
* @return online slowlog response list
* @throws IOException if a remote or network exception occurs
*/
List<SlowLogRecord> getSlowLogResponses(final Set<ServerName> serverNames,
final SlowLogQueryFilter slowLogQueryFilter) throws IOException;
/**
* Clears online slow RPC logs from the provided list of
* RegionServers
*
* @param serverNames Set of Server names to clean slowlog responses from
* @return List of booleans representing if online slowlog response buffer is cleaned
* from each RegionServer
* @throws IOException if a remote or network exception occurs
*/
List<Boolean> clearSlowLogResponses(final Set<ServerName> serverNames)
throws IOException;
}

View File

@ -1484,4 +1484,25 @@ public interface AsyncAdmin {
*/
CompletableFuture<Boolean> isSnapshotCleanupEnabled();
/**
* Retrieves online slow RPC logs from the provided list of
* RegionServers
*
* @param serverNames Server names to get slowlog responses from
* @param slowLogQueryFilter filter to be used if provided
* @return Online slowlog response list. The return value wrapped by a {@link CompletableFuture}
*/
CompletableFuture<List<SlowLogRecord>> getSlowLogResponses(final Set<ServerName> serverNames,
final SlowLogQueryFilter slowLogQueryFilter);
/**
* Clears online slow RPC logs from the provided list of
* RegionServers
*
* @param serverNames Set of Server names to clean slowlog responses from
* @return List of booleans representing if online slowlog response buffer is cleaned
* from each RegionServer. The return value wrapped by a {@link CompletableFuture}
*/
CompletableFuture<List<Boolean>> clearSlowLogResponses(final Set<ServerName> serverNames);
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import com.google.protobuf.RpcChannel;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -836,4 +837,15 @@ class AsyncHBaseAdmin implements AsyncAdmin {
return wrap(rawAdmin.isSnapshotCleanupEnabled());
}
@Override
public CompletableFuture<List<SlowLogRecord>> getSlowLogResponses(
final Set<ServerName> serverNames, final SlowLogQueryFilter slowLogQueryFilter) {
return wrap(rawAdmin.getSlowLogResponses(serverNames, slowLogQueryFilter));
}
@Override
public CompletableFuture<List<Boolean>> clearSlowLogResponses(Set<ServerName> serverNames) {
return wrap(rawAdmin.clearSlowLogResponses(serverNames));
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
@ -111,6 +112,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@ -4358,4 +4360,68 @@ public class HBaseAdmin implements Admin {
}
@Override
public List<SlowLogRecord> getSlowLogResponses(@Nullable final Set<ServerName> serverNames,
final SlowLogQueryFilter slowLogQueryFilter) throws IOException {
if (CollectionUtils.isEmpty(serverNames)) {
return Collections.emptyList();
}
return serverNames.stream().map(serverName -> {
try {
return getSlowLogResponseFromServer(serverName, slowLogQueryFilter);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
).flatMap(List::stream).collect(Collectors.toList());
}
private List<SlowLogRecord> getSlowLogResponseFromServer(final ServerName serverName,
final SlowLogQueryFilter slowLogQueryFilter) throws IOException {
return getSlowLogResponsesFromServer(this.connection.getAdmin(serverName), slowLogQueryFilter);
}
private List<SlowLogRecord> getSlowLogResponsesFromServer(AdminService.BlockingInterface admin,
SlowLogQueryFilter slowLogQueryFilter) throws IOException {
return executeCallable(new RpcRetryingCallable<List<SlowLogRecord>>() {
@Override
protected List<SlowLogRecord> rpcCall(int callTimeout) throws Exception {
HBaseRpcController controller = rpcControllerFactory.newController();
AdminProtos.SlowLogResponses slowLogResponses =
admin.getSlowLogResponses(controller,
RequestConverter.buildSlowLogResponseRequest(slowLogQueryFilter));
return ProtobufUtil.toSlowLogPayloads(slowLogResponses);
}
});
}
@Override
public List<Boolean> clearSlowLogResponses(@Nullable final Set<ServerName> serverNames)
throws IOException {
if (CollectionUtils.isEmpty(serverNames)) {
return Collections.emptyList();
}
return serverNames.stream().map(serverName -> {
try {
return clearSlowLogsResponses(serverName);
} catch (IOException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
}
private Boolean clearSlowLogsResponses(final ServerName serverName) throws IOException {
AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
return executeCallable(new RpcRetryingCallable<Boolean>() {
@Override
protected Boolean rpcCall(int callTimeout) throws Exception {
HBaseRpcController controller = rpcControllerFactory.newController();
AdminProtos.ClearSlowLogResponses clearSlowLogResponses =
admin.clearSlowLogsResponses(controller,
RequestConverter.buildClearSlowLogResponseRequest());
return ProtobufUtil.toClearSlowLogPayload(clearSlowLogResponses);
}
});
}
}

View File

@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -45,6 +46,7 @@ import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
import org.apache.hadoop.hbase.CacheEvictionStats;
@ -102,6 +104,8 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
@ -3884,4 +3888,63 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
.call();
}
@Override
public CompletableFuture<List<SlowLogRecord>> getSlowLogResponses(
@Nullable final Set<ServerName> serverNames,
final SlowLogQueryFilter slowLogQueryFilter) {
if (CollectionUtils.isEmpty(serverNames)) {
return CompletableFuture.completedFuture(Collections.emptyList());
}
return CompletableFuture.supplyAsync(() -> serverNames.stream()
.map((ServerName serverName) ->
getSlowLogResponseFromServer(serverName, slowLogQueryFilter))
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList()));
}
private CompletableFuture<List<SlowLogRecord>> getSlowLogResponseFromServer(
final ServerName serverName, final SlowLogQueryFilter slowLogQueryFilter) {
return this.<List<SlowLogRecord>>newAdminCaller()
.action((controller, stub) -> this
.adminCall(
controller, stub, RequestConverter.buildSlowLogResponseRequest(slowLogQueryFilter),
AdminService.Interface::getSlowLogResponses,
ProtobufUtil::toSlowLogPayloads))
.serverName(serverName).call();
}
@Override
public CompletableFuture<List<Boolean>> clearSlowLogResponses(
@Nullable Set<ServerName> serverNames) {
if (CollectionUtils.isEmpty(serverNames)) {
return CompletableFuture.completedFuture(Collections.emptyList());
}
List<CompletableFuture<Boolean>> clearSlowLogResponseList = serverNames.stream()
.map(this::clearSlowLogsResponses)
.collect(Collectors.toList());
return convertToFutureOfList(clearSlowLogResponseList);
}
private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
return this.<Boolean>newAdminCaller()
.action(((controller, stub) -> this
.adminCall(
controller, stub, RequestConverter.buildClearSlowLogResponseRequest(),
AdminService.Interface::clearSlowLogsResponses,
ProtobufUtil::toClearSlowLogPayload))
).serverName(serverName).call();
}
private static <T> CompletableFuture<List<T>> convertToFutureOfList(
List<CompletableFuture<T>> futures) {
CompletableFuture<Void> allDoneFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return allDoneFuture.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
}

View File

@ -0,0 +1,89 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.yetus.audience.InterfaceAudience;
/**
* SlowLog params object that contains detailed info as params and region name : to be used
* for filter purpose
*/
@InterfaceAudience.Private
public class SlowLogParams {
private final String regionName;
private final String params;
public SlowLogParams(String regionName, String params) {
this.regionName = regionName;
this.params = params;
}
public SlowLogParams(String params) {
this.regionName = StringUtils.EMPTY;
this.params = params;
}
public String getRegionName() {
return regionName;
}
public String getParams() {
return params;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("regionName", regionName)
.append("params", params)
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SlowLogParams that = (SlowLogParams) o;
return new EqualsBuilder()
.append(regionName, that.regionName)
.append(params, that.params)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(regionName)
.append(params)
.toHashCode();
}
}

View File

@ -0,0 +1,122 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.yetus.audience.InterfaceAudience;
/**
* SlowLog Query Filter with all filter and limit parameters
*/
@InterfaceAudience.Private
public class SlowLogQueryFilter {
private String regionName;
private String clientAddress;
private String tableName;
private String userName;
private int limit = 10;
public String getRegionName() {
return regionName;
}
public void setRegionName(String regionName) {
this.regionName = regionName;
}
public String getClientAddress() {
return clientAddress;
}
public void setClientAddress(String clientAddress) {
this.clientAddress = clientAddress;
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public int getLimit() {
return limit;
}
public void setLimit(int limit) {
this.limit = limit;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SlowLogQueryFilter that = (SlowLogQueryFilter) o;
return new EqualsBuilder()
.append(limit, that.limit)
.append(regionName, that.regionName)
.append(clientAddress, that.clientAddress)
.append(tableName, that.tableName)
.append(userName, that.userName)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(regionName)
.append(clientAddress)
.append(tableName)
.append(userName)
.append(limit)
.toHashCode();
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("regionName", regionName)
.append("clientAddress", clientAddress)
.append("tableName", tableName)
.append("userName", userName)
.append("limit", limit)
.toString();
}
}

View File

@ -0,0 +1,319 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.util.GsonUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.gson.Gson;
import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer;
/**
* SlowLog payload for hbase-client, to be used by Admin API get_slow_responses
*/
@InterfaceAudience.Private
final public class SlowLogRecord {
// used to convert object to pretty printed format
// used by toJsonPrettyPrint()
private static final Gson GSON = GsonUtil.createGson()
.setPrettyPrinting()
.registerTypeAdapter(SlowLogRecord.class, (JsonSerializer<SlowLogRecord>)
(slowLogPayload, type, jsonSerializationContext) -> {
Gson gson = new Gson();
JsonObject jsonObj = (JsonObject) gson.toJsonTree(slowLogPayload);
if (slowLogPayload.getMultiGetsCount() == 0) {
jsonObj.remove("multiGetsCount");
}
if (slowLogPayload.getMultiMutationsCount() == 0) {
jsonObj.remove("multiMutationsCount");
}
if (slowLogPayload.getMultiServiceCalls() == 0) {
jsonObj.remove("multiServiceCalls");
}
return jsonObj;
}).create();
private long startTime;
private int processingTime;
private int queueTime;
private long responseSize;
private String clientAddress;
private String serverClass;
private String methodName;
private String callDetails;
private String param;
// we don't want to serialize region name, it is just for the filter purpose
// hence avoiding deserialization
private transient String regionName;
private String userName;
private int multiGetsCount;
private int multiMutationsCount;
private int multiServiceCalls;
public long getStartTime() {
return startTime;
}
public int getProcessingTime() {
return processingTime;
}
public int getQueueTime() {
return queueTime;
}
public long getResponseSize() {
return responseSize;
}
public String getClientAddress() {
return clientAddress;
}
public String getServerClass() {
return serverClass;
}
public String getMethodName() {
return methodName;
}
public String getCallDetails() {
return callDetails;
}
public String getParam() {
return param;
}
public String getRegionName() {
return regionName;
}
public String getUserName() {
return userName;
}
public int getMultiGetsCount() {
return multiGetsCount;
}
public int getMultiMutationsCount() {
return multiMutationsCount;
}
public int getMultiServiceCalls() {
return multiServiceCalls;
}
private SlowLogRecord(final long startTime, final int processingTime, final int queueTime,
final long responseSize, final String clientAddress, final String serverClass,
final String methodName, final String callDetails, final String param,
final String regionName, final String userName, final int multiGetsCount,
final int multiMutationsCount, final int multiServiceCalls) {
this.startTime = startTime;
this.processingTime = processingTime;
this.queueTime = queueTime;
this.responseSize = responseSize;
this.clientAddress = clientAddress;
this.serverClass = serverClass;
this.methodName = methodName;
this.callDetails = callDetails;
this.param = param;
this.regionName = regionName;
this.userName = userName;
this.multiGetsCount = multiGetsCount;
this.multiMutationsCount = multiMutationsCount;
this.multiServiceCalls = multiServiceCalls;
}
public static class SlowLogRecordBuilder {
private long startTime;
private int processingTime;
private int queueTime;
private long responseSize;
private String clientAddress;
private String serverClass;
private String methodName;
private String callDetails;
private String param;
private String regionName;
private String userName;
private int multiGetsCount;
private int multiMutationsCount;
private int multiServiceCalls;
public SlowLogRecordBuilder setStartTime(long startTime) {
this.startTime = startTime;
return this;
}
public SlowLogRecordBuilder setProcessingTime(int processingTime) {
this.processingTime = processingTime;
return this;
}
public SlowLogRecordBuilder setQueueTime(int queueTime) {
this.queueTime = queueTime;
return this;
}
public SlowLogRecordBuilder setResponseSize(long responseSize) {
this.responseSize = responseSize;
return this;
}
public SlowLogRecordBuilder setClientAddress(String clientAddress) {
this.clientAddress = clientAddress;
return this;
}
public SlowLogRecordBuilder setServerClass(String serverClass) {
this.serverClass = serverClass;
return this;
}
public SlowLogRecordBuilder setMethodName(String methodName) {
this.methodName = methodName;
return this;
}
public SlowLogRecordBuilder setCallDetails(String callDetails) {
this.callDetails = callDetails;
return this;
}
public SlowLogRecordBuilder setParam(String param) {
this.param = param;
return this;
}
public SlowLogRecordBuilder setRegionName(String regionName) {
this.regionName = regionName;
return this;
}
public SlowLogRecordBuilder setUserName(String userName) {
this.userName = userName;
return this;
}
public SlowLogRecordBuilder setMultiGetsCount(int multiGetsCount) {
this.multiGetsCount = multiGetsCount;
return this;
}
public SlowLogRecordBuilder setMultiMutationsCount(int multiMutationsCount) {
this.multiMutationsCount = multiMutationsCount;
return this;
}
public SlowLogRecordBuilder setMultiServiceCalls(int multiServiceCalls) {
this.multiServiceCalls = multiServiceCalls;
return this;
}
public SlowLogRecord build() {
return new SlowLogRecord(startTime, processingTime, queueTime, responseSize,
clientAddress, serverClass, methodName, callDetails, param, regionName,
userName, multiGetsCount, multiMutationsCount, multiServiceCalls);
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SlowLogRecord that = (SlowLogRecord) o;
return new EqualsBuilder()
.append(startTime, that.startTime)
.append(processingTime, that.processingTime)
.append(queueTime, that.queueTime)
.append(responseSize, that.responseSize)
.append(multiGetsCount, that.multiGetsCount)
.append(multiMutationsCount, that.multiMutationsCount)
.append(multiServiceCalls, that.multiServiceCalls)
.append(clientAddress, that.clientAddress)
.append(serverClass, that.serverClass)
.append(methodName, that.methodName)
.append(callDetails, that.callDetails)
.append(param, that.param)
.append(regionName, that.regionName)
.append(userName, that.userName)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(startTime)
.append(processingTime)
.append(queueTime)
.append(responseSize)
.append(clientAddress)
.append(serverClass)
.append(methodName)
.append(callDetails)
.append(param)
.append(regionName)
.append(userName)
.append(multiGetsCount)
.append(multiMutationsCount)
.append(multiServiceCalls)
.toHashCode();
}
public String toJsonPrettyPrint() {
return GSON.toJson(this);
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("startTime", startTime)
.append("processingTime", processingTime)
.append("queueTime", queueTime)
.append("responseSize", responseSize)
.append("clientAddress", clientAddress)
.append("serverClass", serverClass)
.append("methodName", methodName)
.append("callDetails", callDetails)
.append("param", param)
.append("regionName", regionName)
.append("userName", userName)
.append("multiGetsCount", multiGetsCount)
.append("multiMutationsCount", multiMutationsCount)
.append("multiServiceCalls", multiServiceCalls)
.toString();
}
}

View File

@ -84,6 +84,8 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.SlowLogParams;
import org.apache.hadoop.hbase.client.SlowLogRecord;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.SnapshotType;
import org.apache.hadoop.hbase.client.TableDescriptor;
@ -128,7 +130,10 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
@ -145,12 +150,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegio
import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Column;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.DeleteType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
@ -182,6 +191,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@ -2221,6 +2231,57 @@ public final class ProtobufUtil {
return Bytes.toStringBinary(bs.toByteArray());
}
/**
* Return SlowLogParams to maintain recent online slowlog responses
*
* @param message Message object {@link Message}
* @return SlowLogParams with regionName(for filter queries) and params
*/
public static SlowLogParams getSlowLogParams(Message message) {
if (message == null) {
return null;
}
if (message instanceof ScanRequest) {
ScanRequest scanRequest = (ScanRequest) message;
String regionName = getStringForByteString(scanRequest.getRegion().getValue());
String params = TextFormat.shortDebugString(message);
return new SlowLogParams(regionName, params);
} else if (message instanceof MutationProto) {
MutationProto mutationProto = (MutationProto) message;
String params = "type= " + mutationProto.getMutateType().toString();
return new SlowLogParams(params);
} else if (message instanceof GetRequest) {
GetRequest getRequest = (GetRequest) message;
String regionName = getStringForByteString(getRequest.getRegion().getValue());
String params = "region= " + regionName + ", row= "
+ getStringForByteString(getRequest.getGet().getRow());
return new SlowLogParams(regionName, params);
} else if (message instanceof MultiRequest) {
MultiRequest multiRequest = (MultiRequest) message;
int actionsCount = multiRequest.getRegionActionList()
.stream()
.mapToInt(ClientProtos.RegionAction::getActionCount)
.sum();
RegionAction actions = multiRequest.getRegionActionList().get(0);
String regionName = getStringForByteString(actions.getRegion().getValue());
String params = "region= " + regionName + ", for " + actionsCount + " action(s)";
return new SlowLogParams(regionName, params);
} else if (message instanceof MutateRequest) {
MutateRequest mutateRequest = (MutateRequest) message;
String regionName = getStringForByteString(mutateRequest.getRegion().getValue());
String params = "region= " + regionName;
return new SlowLogParams(regionName, params);
} else if (message instanceof CoprocessorServiceRequest) {
CoprocessorServiceRequest coprocessorServiceRequest = (CoprocessorServiceRequest) message;
String params = "coprocessorService= "
+ coprocessorServiceRequest.getCall().getServiceName()
+ ":" + coprocessorServiceRequest.getCall().getMethodName();
return new SlowLogParams(params);
}
String params = message.getClass().toString();
return new SlowLogParams(params);
}
/**
* Print out some subset of a MutationProto rather than all of it and its data
* @param proto Protobuf to print out
@ -3414,4 +3475,56 @@ public final class ProtobufUtil {
.build();
}
/**
* Convert Protobuf class
* {@link org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload}
* To client SlowLog Payload class {@link SlowLogRecord}
*
* @param slowLogPayload SlowLog Payload protobuf instance
* @return SlowLog Payload for client usecase
*/
private static SlowLogRecord getSlowLogRecord(
final TooSlowLog.SlowLogPayload slowLogPayload) {
SlowLogRecord clientSlowLogRecord = new SlowLogRecord.SlowLogRecordBuilder()
.setCallDetails(slowLogPayload.getCallDetails())
.setClientAddress(slowLogPayload.getClientAddress())
.setMethodName(slowLogPayload.getMethodName())
.setMultiGetsCount(slowLogPayload.getMultiGets())
.setMultiMutationsCount(slowLogPayload.getMultiMutations())
.setMultiServiceCalls(slowLogPayload.getMultiServiceCalls())
.setParam(slowLogPayload.getParam())
.setProcessingTime(slowLogPayload.getProcessingTime())
.setQueueTime(slowLogPayload.getQueueTime())
.setRegionName(slowLogPayload.getRegionName())
.setResponseSize(slowLogPayload.getResponseSize())
.setServerClass(slowLogPayload.getServerClass())
.setStartTime(slowLogPayload.getStartTime())
.setUserName(slowLogPayload.getUserName())
.build();
return clientSlowLogRecord;
}
/**
* Convert AdminProtos#SlowLogResponses to list of {@link SlowLogRecord}
*
* @param slowLogResponses slowlog response protobuf instance
* @return list of SlowLog payloads for client usecase
*/
public static List<SlowLogRecord> toSlowLogPayloads(
final AdminProtos.SlowLogResponses slowLogResponses) {
List<SlowLogRecord> slowLogRecords = slowLogResponses.getSlowLogPayloadsList()
.stream().map(ProtobufUtil::getSlowLogRecord).collect(Collectors.toList());
return slowLogRecords;
}
/**
* Convert {@link ClearSlowLogResponses} to boolean
*
* @param clearSlowLogResponses Clear slowlog response protobuf instance
* @return boolean representing clear slowlog response
*/
public static boolean toClearSlowLogPayload(final ClearSlowLogResponses clearSlowLogResponses) {
return clearSlowLogResponses.getIsCleaned();
}
}

View File

@ -27,6 +27,7 @@ import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.SlowLogQueryFilter;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
@ -67,6 +69,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@ -76,6 +79,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerIn
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo;
@ -1940,4 +1944,44 @@ public final class RequestConverter {
return IsSnapshotCleanupEnabledRequest.newBuilder().build();
}
/**
* Create a protocol buffer {@link SlowLogResponseRequest}
*
* @param slowLogQueryFilter filter to use if provided
* @return a protocol buffer SlowLogResponseRequest
*/
public static SlowLogResponseRequest buildSlowLogResponseRequest(
final SlowLogQueryFilter slowLogQueryFilter) {
SlowLogResponseRequest.Builder builder = SlowLogResponseRequest.newBuilder();
if (slowLogQueryFilter == null) {
return builder.build();
}
final String clientAddress = slowLogQueryFilter.getClientAddress();
if (StringUtils.isNotEmpty(clientAddress)) {
builder.setClientAddress(clientAddress);
}
final String regionName = slowLogQueryFilter.getRegionName();
if (StringUtils.isNotEmpty(regionName)) {
builder.setRegionName(regionName);
}
final String tableName = slowLogQueryFilter.getTableName();
if (StringUtils.isNotEmpty(tableName)) {
builder.setTableName(tableName);
}
final String userName = slowLogQueryFilter.getUserName();
if (StringUtils.isNotEmpty(userName)) {
builder.setUserName(userName);
}
return builder.setLimit(slowLogQueryFilter.getLimit()).build();
}
/**
* Create a protocol buffer {@link ClearSlowLogResponseRequest}
*
* @return a protocol buffer ClearSlowLogResponseRequest
*/
public static ClearSlowLogResponseRequest buildClearSlowLogResponseRequest() {
return ClearSlowLogResponseRequest.newBuilder().build();
}
}

View File

@ -1566,6 +1566,12 @@ public final class HConstants {
"hbase.master.executor.logreplayops.threads";
public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10;
public static final int DEFAULT_SLOW_LOG_RING_BUFFER_SIZE = 256;
public static final String SLOW_LOG_BUFFER_ENABLED_KEY =
"hbase.regionserver.slowlog.buffer.enabled";
public static final boolean DEFAULT_ONLINE_LOG_PROVIDER_ENABLED = false;
private HConstants() {
// Can't be instantiated with this ctor.
}

View File

@ -1926,4 +1926,27 @@ possible configurations would overwhelm and obscure the important.
enable this feature.
</description>
</property>
<property>
<name>hbase.regionserver.slowlog.ringbuffer.size</name>
<value>256</value>
<description>
Default size of ringbuffer to be maintained by each RegionServer in order
to store online slowlog responses. This is an in-memory ring buffer of
requests that were judged to be too slow in addition to the responseTooSlow
logging. The in-memory representation would be complete.
For more details, please look into Doc Section:
Get Slow Response Log from shell
</description>
</property>
<property>
<name>hbase.regionserver.slowlog.buffer.enabled</name>
<value>false</value>
<description>
Indicates whether RegionServers have ring buffer running for storing
Online Slow logs in FIFO manner with limited entries. The size of
the ring buffer is indicated by config: hbase.regionserver.slowlog.ringbuffer.size
The default value is false, turn this on and get latest slowlog
responses with complete data.
</description>
</property>
</configuration>

View File

@ -29,6 +29,7 @@ import "ClusterStatus.proto";
import "HBase.proto";
import "WAL.proto";
import "Quota.proto";
import "TooSlowLog.proto";
message GetRegionInfoRequest {
required RegionSpecifier region = 1;
@ -280,6 +281,26 @@ message ExecuteProceduresRequest {
message ExecuteProceduresResponse {
}
message SlowLogResponseRequest {
optional string region_name = 1;
optional string table_name = 2;
optional string client_address = 3;
optional string user_name = 4;
optional uint32 limit = 5 [default = 10];
}
message SlowLogResponses {
repeated SlowLogPayload slow_log_payloads = 1;
}
message ClearSlowLogResponseRequest {
}
message ClearSlowLogResponses {
required bool is_cleaned = 1;
}
service AdminService {
rpc GetRegionInfo(GetRegionInfoRequest)
returns(GetRegionInfoResponse);
@ -344,4 +365,10 @@ service AdminService {
rpc ExecuteProcedures(ExecuteProceduresRequest)
returns(ExecuteProceduresResponse);
rpc GetSlowLogResponses(SlowLogResponseRequest)
returns(SlowLogResponses);
rpc ClearSlowLogsResponses(ClearSlowLogResponseRequest)
returns(ClearSlowLogResponses);
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
// This file contains protocol buffers that are used for Online TooSlowLogs
// To be used as Ring Buffer payload
package hbase.pb;
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "TooSlowLog";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message SlowLogPayload {
required int64 start_time = 1;
required int32 processing_time = 2;
required int32 queue_time = 3;
required int64 response_size = 4;
required string client_address = 5;
required string server_class = 6;
required string method_name = 7;
required string call_details = 8;
optional string param = 9;
required string user_name = 10;
optional string region_name = 11;
optional int32 multi_gets = 12 [default = 0];
optional int32 multi_mutations = 13 [default = 0];
optional int32 multi_service_calls = 14 [default = 0];
}

View File

@ -33,6 +33,8 @@ import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScanner;
@ -44,6 +46,8 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.slowlog.RpcLogDetails;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.hbase.security.User;
@ -85,6 +89,10 @@ public abstract class RpcServer implements RpcServerInterface,
protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
= new CallQueueTooBigException();
private static final String MULTI_GETS = "multi.gets";
private static final String MULTI_MUTATIONS = "multi.mutations";
private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
private final boolean authorize;
protected boolean isSecurityEnabled;
@ -215,6 +223,12 @@ public abstract class RpcServer implements RpcServerInterface,
*/
private RSRpcServices rsRpcServices;
/**
* Use to add online slowlog responses
*/
private SlowLogRecorder slowLogRecorder;
@FunctionalInterface
protected interface CallCleanup {
void run();
@ -403,13 +417,21 @@ public abstract class RpcServer implements RpcServerInterface,
boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
if (tooSlow || tooLarge) {
final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY);
// when tagging, we let TooLarge trump TooSmall to keep output simple
// note that large responses will often also be slow.
logResponse(param,
md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
(tooLarge ? "TooLarge" : "TooSlow"),
status.getClient(), startTime, processingTime, qTime,
responseSize);
md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
tooLarge, tooSlow,
status.getClient(), startTime, processingTime, qTime,
responseSize, userName);
if (tooSlow && this.slowLogRecorder != null) {
// send logs to ring buffer owned by slowLogRecorder
final String className = server == null ? StringUtils.EMPTY :
server.getClass().getSimpleName();
this.slowLogRecorder.addSlowLogPayload(
new RpcLogDetails(call, status.getClient(), responseSize, className));
}
}
return new Pair<>(result, controller.cellScanner());
} catch (Throwable e) {
@ -440,17 +462,21 @@ public abstract class RpcServer implements RpcServerInterface,
* @param param The parameters received in the call.
* @param methodName The name of the method invoked
* @param call The string representation of the call
* @param tag The tag that will be used to indicate this event in the log.
* @param clientAddress The address of the client who made this call.
* @param startTime The time that the call was initiated, in ms.
* @param processingTime The duration that the call took to run, in ms.
* @param qTime The duration that the call spent on the queue
* prior to being initiated, in ms.
* @param responseSize The size in bytes of the response buffer.
* @param tooLarge To indicate if the event is tooLarge
* @param tooSlow To indicate if the event is tooSlow
* @param clientAddress The address of the client who made this call.
* @param startTime The time that the call was initiated, in ms.
* @param processingTime The duration that the call took to run, in ms.
* @param qTime The duration that the call spent on the queue
* prior to being initiated, in ms.
* @param responseSize The size in bytes of the response buffer.
* @param userName UserName of the current RPC Call
*/
void logResponse(Message param, String methodName, String call, String tag,
String clientAddress, long startTime, int processingTime, int qTime,
long responseSize) throws IOException {
void logResponse(Message param, String methodName, String call, boolean tooLarge,
boolean tooSlow, String clientAddress, long startTime, int processingTime, int qTime,
long responseSize, String userName) {
final String className = server == null ? StringUtils.EMPTY :
server.getClass().getSimpleName();
// base information that is reported regardless of type of call
Map<String, Object> responseInfo = new HashMap<>();
responseInfo.put("starttimems", startTime);
@ -458,7 +484,7 @@ public abstract class RpcServer implements RpcServerInterface,
responseInfo.put("queuetimems", qTime);
responseInfo.put("responsesize", responseSize);
responseInfo.put("client", clientAddress);
responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
responseInfo.put("class", className);
responseInfo.put("method", methodName);
responseInfo.put("call", call);
// The params could be really big, make sure they don't kill us at WARN
@ -496,13 +522,16 @@ public abstract class RpcServer implements RpcServerInterface,
}
}
}
responseInfo.put("multi.gets", numGets);
responseInfo.put("multi.mutations", numMutations);
responseInfo.put("multi.servicecalls", numServiceCalls);
responseInfo.put(MULTI_GETS, numGets);
responseInfo.put(MULTI_MUTATIONS, numMutations);
responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls);
}
final String tag = (tooLarge && tooSlow) ? "TooLarge & TooSlow"
: (tooSlow ? "TooSlow" : "TooLarge");
LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo));
}
/**
* Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length
* if TRACE is on else to 150 chars Refer to Jira HBASE-20826 and HBASE-20942
@ -758,4 +787,14 @@ public abstract class RpcServer implements RpcServerInterface,
public void setRsRpcServices(RSRpcServices rsRpcServices) {
this.rsRpcServices = rsRpcServices;
}
@Override
public void setSlowLogRecorder(SlowLogRecorder slowLogRecorder) {
this.slowLogRecorder = slowLogRecorder;
}
@Override
public SlowLogRecorder getSlowLogRecorder() {
return slowLogRecorder;
}
}

View File

@ -22,13 +22,14 @@ package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
@ -96,4 +97,16 @@ public interface RpcServerInterface {
ByteBuffAllocator getByteBuffAllocator();
void setRsRpcServices(RSRpcServices rsRpcServices);
/**
* Set Online SlowLog Provider
*
* @param slowLogRecorder instance of {@link SlowLogRecorder}
*/
void setSlowLogRecorder(final SlowLogRecorder slowLogRecorder);
/**
* @return Retrieve instance of {@link SlowLogRecorder} maintained by RpcServer
*/
SlowLogRecorder getSlowLogRecorder();
}

View File

@ -138,6 +138,7 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@ -527,6 +528,11 @@ public class HRegionServer extends HasThread implements
private final NettyEventLoopGroupConfig eventLoopGroupConfig;
/**
* Provide online slow log responses from ringbuffer
*/
private SlowLogRecorder slowLogRecorder;
/**
* True if this RegionServer is coming up in a cluster where there is no Master;
* means it needs to just come up and make do without a Master to talk to: e.g. in test or
@ -586,6 +592,9 @@ public class HRegionServer extends HasThread implements
this.abortRequested = false;
this.stopped = false;
if (!(this instanceof HMaster)) {
this.slowLogRecorder = new SlowLogRecorder(this.conf);
}
rpcServices = createRpcServices();
useThisHostnameInstead = getUseThisHostnameInstead(conf);
String hostName =
@ -1494,6 +1503,15 @@ public class HRegionServer extends HasThread implements
}
/**
* get Online SlowLog Provider to add slow logs to ringbuffer
*
* @return Online SlowLog Provider
*/
public SlowLogRecorder getSlowLogRecorder() {
return this.slowLogRecorder;
}
/*
* Run init. Sets up wal and starts up all server threads.
*
* @param c Extra configuration.

View File

@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
@ -104,6 +105,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
@ -125,6 +127,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
@ -167,6 +170,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompac
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
@ -196,6 +201,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWA
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
@ -243,6 +250,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
@ -1245,6 +1253,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG);
rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
rpcServer.setRsRpcServices(this);
if (!(rs instanceof HMaster)) {
rpcServer.setSlowLogRecorder(rs.getSlowLogRecorder());
}
scannerLeaseTimeoutPeriod = conf.getInt(
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
@ -3730,6 +3741,36 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
@Override
@QosPriority(priority = HConstants.ADMIN_QOS)
public SlowLogResponses getSlowLogResponses(final RpcController controller,
final SlowLogResponseRequest request) {
final SlowLogRecorder slowLogRecorder =
this.regionServer.getSlowLogRecorder();
final List<SlowLogPayload> slowLogPayloads;
slowLogPayloads = slowLogRecorder != null
? slowLogRecorder.getSlowLogPayloads(request)
: Collections.emptyList();
SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
.addAllSlowLogPayloads(slowLogPayloads)
.build();
return slowLogResponses;
}
@Override
@QosPriority(priority = HConstants.ADMIN_QOS)
public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller,
final ClearSlowLogResponseRequest request) {
final SlowLogRecorder slowLogRecorder =
this.regionServer.getSlowLogRecorder();
boolean slowLogsCleaned = Optional.ofNullable(slowLogRecorder)
.map(SlowLogRecorder::clearSlowLogPayloads).orElse(false);
ClearSlowLogResponses clearSlowLogResponses = ClearSlowLogResponses.newBuilder()
.setIsCleaned(slowLogsCleaned)
.build();
return clearSlowLogResponses;
}
@VisibleForTesting
public RpcScheduler getRpcScheduler() {
return rpcServer.getScheduler();

View File

@ -0,0 +1,50 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
import com.lmax.disruptor.ExceptionHandler;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Exception Handler for Online Slow Log Ring Buffer
*/
@InterfaceAudience.Private
class DisruptorExceptionHandler implements ExceptionHandler<RingBufferEnvelope> {
private static final Logger LOG = LoggerFactory.getLogger(DisruptorExceptionHandler.class);
@Override
public void handleEventException(Throwable e, long sequence, RingBufferEnvelope event) {
LOG.error("Sequence={}, event={}", sequence, event, e);
}
@Override
public void handleOnStartException(Throwable e) {
LOG.error("Disruptor onStartException: ", e);
}
@Override
public void handleOnShutdownException(Throwable e) {
LOG.error("Disruptor onShutdownException: ", e);
}
}

View File

@ -0,0 +1,57 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.yetus.audience.InterfaceAudience;
/**
* An envelope to carry payload in the slow log ring buffer that serves as online buffer
* to provide latest TooSlowLog
*/
@InterfaceAudience.Private
final class RingBufferEnvelope {
private RpcLogDetails rpcLogDetails;
/**
* Load the Envelope with {@link RpcCall}
*
* @param rpcLogDetails all details of rpc call that would be useful for ring buffer
* consumers
*/
public void load(RpcLogDetails rpcLogDetails) {
this.rpcLogDetails = rpcLogDetails;
}
/**
* Retrieve current rpcCall details {@link RpcLogDetails} available on Envelope and
* free up the Envelope
*
* @return Retrieve rpc log details
*/
public RpcLogDetails getPayload() {
final RpcLogDetails rpcLogDetails = this.rpcLogDetails;
this.rpcLogDetails = null;
return rpcLogDetails;
}
}

View File

@ -0,0 +1,71 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.yetus.audience.InterfaceAudience;
/**
* RpcCall details that would be passed on to ring buffer of slow log responses
*/
@InterfaceAudience.Private
public class RpcLogDetails {
private RpcCall rpcCall;
private String clientAddress;
private long responseSize;
private String className;
public RpcLogDetails(RpcCall rpcCall, String clientAddress, long responseSize,
String className) {
this.rpcCall = rpcCall;
this.clientAddress = clientAddress;
this.responseSize = responseSize;
this.className = className;
}
public RpcCall getRpcCall() {
return rpcCall;
}
public String getClientAddress() {
return clientAddress;
}
public long getResponseSize() {
return responseSize;
}
public String getClassName() {
return className;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("rpcCall", rpcCall)
.append("clientAddress", clientAddress)
.append("responseSize", responseSize)
.append("className", className)
.toString();
}
}

View File

@ -0,0 +1,208 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.client.SlowLogParams;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
/**
* Event Handler run by disruptor ringbuffer consumer
*/
@InterfaceAudience.Private
class SlowLogEventHandler implements EventHandler<RingBufferEnvelope> {
private static final Logger LOG = LoggerFactory.getLogger(SlowLogEventHandler.class);
private final Queue<SlowLogPayload> queue;
SlowLogEventHandler(int eventCount) {
EvictingQueue<SlowLogPayload> evictingQueue = EvictingQueue.create(eventCount);
queue = Queues.synchronizedQueue(evictingQueue);
}
/**
* Called when a publisher has published an event to the {@link RingBuffer}
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from
* the {@link RingBuffer}
* @throws Exception if the EventHandler would like the exception handled further up the chain
*/
@Override
public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch)
throws Exception {
final RpcLogDetails rpcCallDetails = event.getPayload();
final RpcCall rpcCall = rpcCallDetails.getRpcCall();
final String clientAddress = rpcCallDetails.getClientAddress();
final long responseSize = rpcCallDetails.getResponseSize();
final String className = rpcCallDetails.getClassName();
Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
Message param = rpcCall.getParam();
long receiveTime = rpcCall.getReceiveTime();
long startTime = rpcCall.getStartTime();
long endTime = System.currentTimeMillis();
int processingTime = (int) (endTime - startTime);
int qTime = (int) (startTime - receiveTime);
final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
int numGets = 0;
int numMutations = 0;
int numServiceCalls = 0;
if (param instanceof ClientProtos.MultiRequest) {
ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
for (ClientProtos.Action action : regionAction.getActionList()) {
if (action.hasMutation()) {
numMutations++;
}
if (action.hasGet()) {
numGets++;
}
if (action.hasServiceCall()) {
numServiceCalls++;
}
}
}
}
final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
final String methodDescriptorName =
methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
SlowLogPayload slowLogPayload = SlowLogPayload.newBuilder()
.setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
.setClientAddress(clientAddress)
.setMethodName(methodDescriptorName)
.setMultiGets(numGets)
.setMultiMutations(numMutations)
.setMultiServiceCalls(numServiceCalls)
.setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
.setProcessingTime(processingTime)
.setQueueTime(qTime)
.setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
.setResponseSize(responseSize)
.setServerClass(className)
.setStartTime(startTime)
.setUserName(userName)
.build();
queue.add(slowLogPayload);
}
/**
* Cleans up slow log payloads
*
* @return true if slow log payloads are cleaned up, false otherwise
*/
boolean clearSlowLogs() {
if (LOG.isDebugEnabled()) {
LOG.debug("Received request to clean up online slowlog buffer..");
}
queue.clear();
return true;
}
/**
* Retrieve list of slow log payloads
*
* @param request slow log request parameters
* @return list of slow log payloads
*/
List<SlowLogPayload> getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
List<SlowLogPayload> slowLogPayloadList =
Arrays.stream(queue.toArray(new SlowLogPayload[0])).collect(Collectors.toList());
// latest slow logs first, operator is interested in latest records from in-memory buffer
Collections.reverse(slowLogPayloadList);
if (isFilterProvided(request)) {
slowLogPayloadList = filterSlowLogs(request, slowLogPayloadList);
}
int limit = request.getLimit() >= slowLogPayloadList.size() ? slowLogPayloadList.size()
: request.getLimit();
return slowLogPayloadList.subList(0, limit);
}
private boolean isFilterProvided(AdminProtos.SlowLogResponseRequest request) {
if (StringUtils.isNotEmpty(request.getUserName())) {
return true;
}
if (StringUtils.isNotEmpty(request.getTableName())) {
return true;
}
if (StringUtils.isNotEmpty(request.getClientAddress())) {
return true;
}
return StringUtils.isNotEmpty(request.getRegionName());
}
private List<SlowLogPayload> filterSlowLogs(AdminProtos.SlowLogResponseRequest request,
List<SlowLogPayload> slowLogPayloadList) {
List<SlowLogPayload> filteredSlowLogPayloads = new ArrayList<>();
for (SlowLogPayload slowLogPayload : slowLogPayloadList) {
if (StringUtils.isNotEmpty(request.getRegionName())) {
if (slowLogPayload.getRegionName().equals(request.getRegionName())) {
filteredSlowLogPayloads.add(slowLogPayload);
continue;
}
}
if (StringUtils.isNotEmpty(request.getTableName())) {
if (slowLogPayload.getRegionName().startsWith(request.getTableName())) {
filteredSlowLogPayloads.add(slowLogPayload);
continue;
}
}
if (StringUtils.isNotEmpty(request.getClientAddress())) {
if (slowLogPayload.getClientAddress().equals(request.getClientAddress())) {
filteredSlowLogPayloads.add(slowLogPayload);
continue;
}
}
if (StringUtils.isNotEmpty(request.getUserName())) {
if (slowLogPayload.getUserName().equals(request.getUserName())) {
filteredSlowLogPayloads.add(slowLogPayload);
}
}
}
return filteredSlowLogPayloads;
}
}

View File

@ -0,0 +1,153 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
/**
* Online SlowLog Provider Service that keeps slow RPC logs in the ring buffer.
* The service uses LMAX Disruptor to save slow records which are then consumed by
* a queue and based on the ring buffer size, the available records are then fetched
* from the queue in thread-safe manner.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class SlowLogRecorder {
private final Disruptor<RingBufferEnvelope> disruptor;
private final SlowLogEventHandler slowLogEventHandler;
private final int eventCount;
private final boolean isOnlineSlowLogProviderEnabled;
private static final String SLOW_LOG_RING_BUFFER_SIZE =
"hbase.regionserver.slowlog.ringbuffer.size";
/**
* Initialize disruptor with configurable ringbuffer size
*/
public SlowLogRecorder(Configuration conf) {
isOnlineSlowLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
if (!isOnlineSlowLogProviderEnabled) {
this.disruptor = null;
this.slowLogEventHandler = null;
this.eventCount = 0;
return;
}
this.eventCount = conf.getInt(SLOW_LOG_RING_BUFFER_SIZE,
HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
// This is the 'writer' -- a single threaded executor. This single thread consumes what is
// put on the ringbuffer.
final String hostingThreadName = Thread.currentThread().getName();
// disruptor initialization with BlockingWaitStrategy
this.disruptor = new Disruptor<>(RingBufferEnvelope::new,
getEventCount(),
Threads.newDaemonThreadFactory(hostingThreadName + ".slowlog.append"),
ProducerType.MULTI,
new BlockingWaitStrategy());
this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
// initialize ringbuffer event handler
this.slowLogEventHandler = new SlowLogEventHandler(this.eventCount);
this.disruptor.handleEventsWith(new SlowLogEventHandler[]{this.slowLogEventHandler});
this.disruptor.start();
}
// must be power of 2 for disruptor ringbuffer
private int getEventCount() {
Preconditions.checkArgument(eventCount >= 0,
SLOW_LOG_RING_BUFFER_SIZE + " must be > 0");
int floor = Integer.highestOneBit(eventCount);
if (floor == eventCount) {
return floor;
}
// max capacity is 1 << 30
if (floor >= 1 << 29) {
return 1 << 30;
}
return floor << 1;
}
/**
* Retrieve online slow logs from ringbuffer
*
* @param request slow log request parameters
* @return online slow logs from ringbuffer
*/
public List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
return isOnlineSlowLogProviderEnabled ? this.slowLogEventHandler.getSlowLogPayloads(request)
: Collections.emptyList();
}
/**
* clears slow log payloads from ringbuffer
*
* @return true if slow log payloads are cleaned up or
* hbase.regionserver.slowlog.buffer.enabled is not set to true, false if failed to
* clean up slow logs
*/
public boolean clearSlowLogPayloads() {
if (!isOnlineSlowLogProviderEnabled) {
return true;
}
return this.slowLogEventHandler.clearSlowLogs();
}
/**
* Add slow log rpcCall details to ringbuffer
*
* @param rpcLogDetails all details of rpc call that would be useful for ring buffer
* consumers
*/
public void addSlowLogPayload(RpcLogDetails rpcLogDetails) {
if (!isOnlineSlowLogProviderEnabled) {
return;
}
RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
long seqId = ringBuffer.next();
try {
ringBuffer.get(seqId).load(rpcLogDetails);
} finally {
ringBuffer.publish(seqId);
}
}
}

View File

@ -24,8 +24,10 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
@ -868,4 +870,29 @@ public class TestAdmin2 extends TestAdminBase {
assertEquals(initialState, ADMIN.isSnapshotCleanupEnabled());
}
@Test
public void testSlowLogResponses() throws Exception {
// get all live server names
Collection<ServerName> serverNames = ADMIN.getRegionServers();
List<ServerName> serverNameList = new ArrayList<>(serverNames);
// clean up slowlog responses maintained in memory by RegionServers
List<Boolean> areSlowLogsCleared = ADMIN.clearSlowLogResponses(new HashSet<>(serverNameList));
int countFailedClearSlowResponse = 0;
for (Boolean isSlowLogCleared : areSlowLogsCleared) {
if (!isSlowLogCleared) {
++countFailedClearSlowResponse;
}
}
Assert.assertEquals(countFailedClearSlowResponse, 0);
SlowLogQueryFilter slowLogQueryFilter = new SlowLogQueryFilter();
List<SlowLogRecord> slowLogRecords = ADMIN.getSlowLogResponses(new HashSet<>(serverNames),
slowLogQueryFilter);
// after cleanup of slowlog responses, total count of slowlog payloads should be 0
Assert.assertEquals(slowLogRecords.size(), 0);
}
}

View File

@ -46,6 +46,7 @@ public class TestAdminBase {
TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 30);
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 30);
TEST_UTIL.getConfiguration().setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
TEST_UTIL.startMiniCluster(3);
ADMIN = TEST_UTIL.getAdmin();
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -34,6 +34,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@ -51,9 +52,11 @@ import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@ -524,4 +527,5 @@ public abstract class AbstractTestIPC {
rpcServer.stop();
}
}
}

View File

@ -83,6 +83,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompac
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
@ -109,6 +111,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWA
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
@ -677,6 +681,18 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
return null;
}
@Override
public SlowLogResponses getSlowLogResponses(RpcController controller,
SlowLogResponseRequest request) throws ServiceException {
return null;
}
@Override
public ClearSlowLogResponses clearSlowLogsResponses(RpcController controller,
ClearSlowLogResponseRequest request) throws ServiceException {
return null;
}
@Override
public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(
RpcController controller, GetSpaceQuotaSnapshotsRequest request)

View File

@ -0,0 +1,593 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
import java.io.IOException;
import java.net.InetAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcCallback;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
/**
* Tests for Online SlowLog Provider Service
*/
@Category({MasterTests.class, MediumTests.class})
public class TestSlowLogRecorder {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSlowLogRecorder.class);
private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class);
private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
private SlowLogRecorder slowLogRecorder;
private static int i = 0;
private static Configuration applySlowLogRecorderConf(int eventSize) {
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize);
return conf;
}
/**
* confirm that for a ringbuffer of slow logs, payload on given index of buffer
* has expected elements
*
* @param i index of ringbuffer logs
* @param j data value that was put on index i
* @param slowLogPayloads list of payload retrieved from {@link SlowLogRecorder}
*/
private void confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) {
Assert.assertEquals(slowLogPayloads.get(i).getClientAddress(), "client_" + j);
Assert.assertEquals(slowLogPayloads.get(i).getUserName(), "userName_" + j);
Assert.assertEquals(slowLogPayloads.get(i).getServerClass(), "class_" + j);
}
@Test
public void testOnlieSlowLogConsumption() throws Exception {
Configuration conf = applySlowLogRecorderConf(8);
slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
int i = 0;
// add 5 records initially
for (; i < 5; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 5));
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
confirmPayloadParams(0, 5, slowLogPayloads);
confirmPayloadParams(1, 4, slowLogPayloads);
confirmPayloadParams(2, 3, slowLogPayloads);
confirmPayloadParams(3, 2, slowLogPayloads);
confirmPayloadParams(4, 1, slowLogPayloads);
// add 2 more records
for (; i < 7; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 7));
slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
Assert.assertEquals(slowLogPayloads.size(), 7);
confirmPayloadParams(0, 7, slowLogPayloads);
confirmPayloadParams(5, 2, slowLogPayloads);
confirmPayloadParams(6, 1, slowLogPayloads);
// add 3 more records
for (; i < 10; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
// confirm ringbuffer is full
Assert.assertEquals(slowLogPayloads.size(), 8);
confirmPayloadParams(7, 3, slowLogPayloads);
confirmPayloadParams(0, 10, slowLogPayloads);
confirmPayloadParams(1, 9, slowLogPayloads);
// add 4 more records
for (; i < 14; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
// confirm ringbuffer is full
Assert.assertEquals(slowLogPayloads.size(), 8);
confirmPayloadParams(0, 14, slowLogPayloads);
confirmPayloadParams(1, 13, slowLogPayloads);
confirmPayloadParams(2, 12, slowLogPayloads);
confirmPayloadParams(3, 11, slowLogPayloads);
boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
Assert.assertTrue(isRingBufferCleaned);
LOG.debug("cleared the ringbuffer of Online Slow Log records");
slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
// confirm ringbuffer is empty
Assert.assertEquals(slowLogPayloads.size(), 0);
}
@Test
public void testOnlineSlowLogWithHighRecords() throws Exception {
Configuration conf = applySlowLogRecorderConf(14);
slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 14 * 11; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
Assert.assertEquals(slowLogPayloads.size(), 14);
// confirm strict order of slow log payloads
confirmPayloadParams(0, 154, slowLogPayloads);
confirmPayloadParams(1, 153, slowLogPayloads);
confirmPayloadParams(2, 152, slowLogPayloads);
confirmPayloadParams(3, 151, slowLogPayloads);
confirmPayloadParams(4, 150, slowLogPayloads);
confirmPayloadParams(5, 149, slowLogPayloads);
confirmPayloadParams(6, 148, slowLogPayloads);
confirmPayloadParams(7, 147, slowLogPayloads);
confirmPayloadParams(8, 146, slowLogPayloads);
confirmPayloadParams(9, 145, slowLogPayloads);
confirmPayloadParams(10, 144, slowLogPayloads);
confirmPayloadParams(11, 143, slowLogPayloads);
confirmPayloadParams(12, 142, slowLogPayloads);
confirmPayloadParams(13, 141, slowLogPayloads);
boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
Assert.assertTrue(isRingBufferCleaned);
LOG.debug("cleared the ringbuffer of Online Slow Log records");
slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
// confirm ringbuffer is empty
Assert.assertEquals(slowLogPayloads.size(), 0);
}
@Test
public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY);
slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().build();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 300; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
Assert.assertEquals(slowLogPayloads.size(), 0);
}
@Test
public void testOnlineSlowLogWithDisableConfig() throws Exception {
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false);
slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().build();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 300; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
Assert.assertEquals(slowLogPayloads.size(), 0);
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
}
@Test
public void testSlowLogFilters() throws Exception {
Configuration conf = applySlowLogRecorderConf(30);
slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setUserName("userName_87")
.build();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 100; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter");
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 1));
AdminProtos.SlowLogResponseRequest requestClient =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setClientAddress("client_85")
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(requestClient).size() == 1));
AdminProtos.SlowLogResponseRequest requestSlowLog =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15));
}
@Test
public void testConcurrentSlowLogEvents() throws Exception {
Configuration conf = applySlowLogRecorderConf(50000);
slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int j = 0; j < 1000; j++) {
CompletableFuture.runAsync(() -> {
for (int i = 0; i < 3500; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
});
}
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
slowLogRecorder.clearSlowLogPayloads();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
4000, () -> slowLogRecorder.getSlowLogPayloads(request).size() > 10000));
}
private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
return new RpcLogDetails(getRpcCall(userName), clientAddress, 0, className);
}
private RpcCall getRpcCall(String userName) {
RpcCall rpcCall = new RpcCall() {
@Override
public BlockingService getService() {
return null;
}
@Override
public Descriptors.MethodDescriptor getMethod() {
return null;
}
@Override
public Message getParam() {
return getMessage();
}
@Override
public CellScanner getCellScanner() {
return null;
}
@Override
public long getReceiveTime() {
return 0;
}
@Override
public long getStartTime() {
return 0;
}
@Override
public void setStartTime(long startTime) {
}
@Override
public int getTimeout() {
return 0;
}
@Override
public int getPriority() {
return 0;
}
@Override
public long getDeadline() {
return 0;
}
@Override
public long getSize() {
return 0;
}
@Override
public RPCProtos.RequestHeader getHeader() {
return null;
}
@Override
public int getRemotePort() {
return 0;
}
@Override
public void setResponse(Message param, CellScanner cells,
Throwable errorThrowable, String error) {
}
@Override
public void sendResponseIfReady() throws IOException {
}
@Override
public void cleanup() {
}
@Override
public String toShortString() {
return null;
}
@Override
public long disconnectSince() {
return 0;
}
@Override
public boolean isClientCellBlockSupported() {
return false;
}
@Override
public Optional<User> getRequestUser() {
return getUser(userName);
}
@Override
public InetAddress getRemoteAddress() {
return null;
}
@Override
public HBaseProtos.VersionInfo getClientVersionInfo() {
return null;
}
@Override
public void setCallBack(RpcCallback callback) {
}
@Override
public boolean isRetryImmediatelySupported() {
return false;
}
@Override
public long getResponseCellSize() {
return 0;
}
@Override
public void incrementResponseCellSize(long cellSize) {
}
@Override
public long getResponseBlockSize() {
return 0;
}
@Override
public void incrementResponseBlockSize(long blockSize) {
}
@Override
public long getResponseExceptionSize() {
return 0;
}
@Override
public void incrementResponseExceptionSize(long exceptionSize) {
}
};
return rpcCall;
}
private Message getMessage() {
i = (i + 1) % 3;
Message message = null;
switch (i) {
case 0: {
message = ClientProtos.ScanRequest.newBuilder()
.setRegion(HBaseProtos.RegionSpecifier.newBuilder()
.setValue(ByteString.copyFromUtf8("region1"))
.setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)
.build())
.build();
break;
}
case 1: {
message = ClientProtos.MutateRequest.newBuilder()
.setRegion(HBaseProtos.RegionSpecifier.newBuilder()
.setValue(ByteString.copyFromUtf8("region2"))
.setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
.setMutation(ClientProtos.MutationProto.newBuilder()
.setRow(ByteString.copyFromUtf8("row123"))
.build())
.build();
break;
}
case 2: {
message = ClientProtos.GetRequest.newBuilder()
.setRegion(HBaseProtos.RegionSpecifier.newBuilder()
.setValue(ByteString.copyFromUtf8("region2"))
.setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
.setGet(ClientProtos.Get.newBuilder()
.setRow(ByteString.copyFromUtf8("row123"))
.build())
.build();
break;
}
}
return message;
}
private Optional<User> getUser(String userName) {
return Optional.of(new User() {
@Override
public String getShortName() {
return userName;
}
@Override
public <T> T runAs(PrivilegedAction<T> action) {
return null;
}
@Override
public <T> T runAs(PrivilegedExceptionAction<T> action) throws
IOException, InterruptedException {
return null;
}
});
}
}

View File

@ -1433,6 +1433,79 @@ module Hbase
@admin.listDecommissionedRegionServers
end
#----------------------------------------------------------------------------------------------
# Retrieve SlowLog Responses from RegionServers
def get_slowlog_responses(server_names, args)
unless server_names.is_a?(Array) || server_names.is_a?(String)
raise(ArgumentError,
"#{server_names.class} of #{server_names.inspect} is not of Array/String type")
end
if server_names == '*'
server_names = getServerNames([], true)
else
server_names_list = to_server_names(server_names)
server_names = getServerNames(server_names_list, false)
end
filter_params = get_filter_params(args)
slow_log_responses = @admin.getSlowLogResponses(java.util.HashSet.new(server_names),
filter_params)
slow_log_responses_arr = []
for slow_log_response in slow_log_responses
slow_log_responses_arr << slow_log_response.toJsonPrettyPrint
end
puts 'Retrieved SlowLog Responses from RegionServers'
puts slow_log_responses_arr
end
def get_filter_params(args)
filter_params = org.apache.hadoop.hbase.client.SlowLogQueryFilter.new
if args.key? 'REGION_NAME'
region_name = args['REGION_NAME']
filter_params.setRegionName(region_name)
end
if args.key? 'TABLE_NAME'
table_name = args['TABLE_NAME']
filter_params.setTableName(table_name)
end
if args.key? 'CLIENT_IP'
client_ip = args['CLIENT_IP']
filter_params.setClientAddress(client_ip)
end
if args.key? 'USER'
user = args['USER']
filter_params.setUserName(user)
end
if args.key? 'LIMIT'
limit = args['LIMIT']
filter_params.setLimit(limit)
end
filter_params
end
#----------------------------------------------------------------------------------------------
# Clears SlowLog Responses from RegionServers
def clear_slowlog_responses(server_names)
unless server_names.nil? || server_names.is_a?(Array) || server_names.is_a?(String)
raise(ArgumentError,
"#{server_names.class} of #{server_names.inspect} is not of correct type")
end
if server_names.nil?
server_names = getServerNames([], true)
else
server_names_list = to_server_names(server_names)
server_names = getServerNames(server_names_list, false)
end
clear_log_responses = @admin.clearSlowLogResponses(java.util.HashSet.new(server_names))
clear_log_success_count = 0
clear_log_responses.each do |response|
if response
clear_log_success_count += 1
end
end
puts 'Cleared Slowlog responses from ' \
"#{clear_log_success_count}/#{clear_log_responses.size} RegionServers"
end
#----------------------------------------------------------------------------------------------
# Decommission a list of region servers, optionally offload corresponding regions
def decommission_regionservers(host_or_servers, should_offload)
@ -1507,6 +1580,17 @@ module Hbase
def stop_regionserver(hostport)
@admin.stopRegionServer(hostport)
end
#----------------------------------------------------------------------------------------------
# Get list of server names
def to_server_names(server_names)
if server_names.is_a?(Array)
server_names
else
java.util.Arrays.asList(server_names)
end
end
end
# rubocop:enable Metrics/ClassLength
end

View File

@ -333,10 +333,12 @@ Shell.load_command_group(
normalizer_switch
normalizer_enabled
is_in_maintenance_mode
clear_slowlog_responses
close_region
compact
compaction_switch
flush
get_slowlog_responses
major_compact
move
split

View File

@ -0,0 +1,47 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with this
# work for additional information regarding copyright ownership. The ASF
# licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Clear slowlog responses maintained in memory by RegionServers
module Shell
module Commands
# Clear slowlog responses
class ClearSlowlogResponses < Command
def help
<<-EOF
Clears SlowLog Responses maintained by each or specific RegionServers.
Specify array of server names for specific RS. A server name is
the host, port plus startcode of a RegionServer.
e.g.: host187.example.com,60020,1289493121758 (find servername in
master ui or when you do detailed status in shell)
Examples:
hbase> clear_slowlog_responses => clears slowlog responses from all RS
hbase> clear_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2'] => clears slowlog responses from SERVER_NAME1,
SERVER_NAME2
EOF
end
def command(server_names = nil)
admin.clear_slowlog_responses(server_names)
end
end
end
end

View File

@ -0,0 +1,78 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with this
# work for additional information regarding copyright ownership. The ASF
# licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Retrieve latest slowlog responses maintained in memory by RegionServers
module Shell
module Commands
# Retrieve latest slowlog responses
class GetSlowlogResponses < Command
def help
<<-EOF
Retrieve latest SlowLog Responses maintained by each or specific RegionServers.
Specify '*' to include all RS otherwise array of server names for specific
RS. A server name is the host, port plus startcode of a RegionServer.
e.g.: host187.example.com,60020,1289493121758 (find servername in
master ui or when you do detailed status in shell)
Provide optional filter parameters as Hash.
Default Limit of each server for providing no of slow log records is 10. User can specify
more limit by 'LIMIT' param in case more than 10 records should be retrieved.
Examples:
hbase> get_slowlog_responses '*' => get slowlog responses from all RS
hbase> get_slowlog_responses '*', {'LIMIT' => 50} => get slowlog responses from all RS
with 50 records limit (default limit: 10)
hbase> get_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2'] => get slowlog responses from SERVER_NAME1,
SERVER_NAME2
hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1'}
=> get slowlog responses only related to meta
region
hbase> get_slowlog_responses '*', {'TABLE_NAME' => 't1'} => get slowlog responses only related to t1 table
hbase> get_slowlog_responses '*', {'CLIENT_IP' => '192.162.1.40:60225', 'LIMIT' => 100}
=> get slowlog responses with given client
IP address and get 100 records limit
(default limit: 10)
hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1', 'TABLE_NAME' => 't1'}
=> get slowlog responses with given region name
or table name
hbase> get_slowlog_responses '*', {'USER' => 'user_name', 'CLIENT_IP' => '192.162.1.40:60225'}
=> get slowlog responses that match either
provided client IP address or user name
Sometimes output can be long pretty printed json for user to scroll in
a single screen and hence user might prefer
redirecting output of get_slowlog_responses to a file.
Example:
echo "get_slowlog_responses '*'" | hbase shell > xyz.out 2>&1
EOF
end
def command(server_names, args = {})
unless args.is_a? Hash
raise 'Filter parameters are not Hash'
end
admin.get_slowlog_responses(server_names, args)
end
end
end
end

View File

@ -201,6 +201,20 @@ module Hbase
#-------------------------------------------------------------------------------
define_test 'get slowlog responses should work' do
output = capture_stdout { command(:get_slowlog_responses, '*', {}) }
assert(output.include?('Retrieved SlowLog Responses from RegionServers'))
end
#-------------------------------------------------------------------------------
define_test 'clear slowlog responses should work' do
output = capture_stdout { command(:clear_slowlog_responses, nil) }
assert(output.include?('Cleared Slowlog responses from 1/1 RegionServers'))
end
#-------------------------------------------------------------------------------
define_test "create should fail with non-string/non-hash column args" do
assert_raise(ArgumentError) do
command(:create, @create_test_name, 123)

View File

@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.client.CompactType;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.SlowLogQueryFilter;
import org.apache.hadoop.hbase.client.SlowLogRecord;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.SnapshotType;
import org.apache.hadoop.hbase.client.TableDescriptor;
@ -1392,6 +1394,17 @@ public class ThriftAdmin implements Admin {
throw new NotImplementedException("isSnapshotCleanupEnabled not supported in ThriftAdmin");
}
@Override
public List<SlowLogRecord> getSlowLogResponses(final Set<ServerName> serverNames,
final SlowLogQueryFilter slowLogQueryFilter) {
throw new NotImplementedException("getSlowLogResponses not supported in ThriftAdmin");
}
@Override
public List<Boolean> clearSlowLogResponses(final Set<ServerName> serverNames) {
throw new NotImplementedException("clearSlowLogsResponses not supported in ThriftAdmin");
}
@Override
public Future<Void> splitRegionAsync(byte[] regionName) throws IOException {
return splitRegionAsync(regionName, null);

View File

@ -2072,6 +2072,44 @@ A comma-separated list of
`-1`
[[hbase.regionserver.slowlog.ringbuffer.size]]
*`hbase.regionserver.slowlog.ringbuffer.size`*::
+
.Description
Default size of ringbuffer to be maintained by each RegionServer in order
to store online slowlog responses. This is an in-memory ring buffer of
requests that were judged to be too slow in addition to the responseTooSlow
logging. The in-memory representation would be complete.
For more details, please look into Doc Section:
<<slow_log_responses, slow_log_responses>>
+
.Default
`256`
[[hbase.regionserver.slowlog.buffer.enabled]]
*`hbase.regionserver.slowlog.buffer.enabled`*::
+
.Description
Indicates whether RegionServers have ring buffer running for storing
Online Slow logs in FIFO manner with limited entries. The size of
the ring buffer is indicated by config: hbase.regionserver.slowlog.ringbuffer.size
The default value is false, turn this on and get latest slowlog
responses with complete data.
For more details, please look into Doc Section:
<<slow_log_responses, slow_log_responses>>
+
.Default
`false`
[[hbase.region.replica.replication.enabled]]
*`hbase.region.replica.replication.enabled`*::

View File

@ -1813,6 +1813,120 @@ In the case that the call is not a client operation, that detailed fingerprint i
This particular example, for example, would indicate that the likely cause of slowness is simply a very large (on the order of 100MB) multiput, as we can tell by the "vlen," or value length, fields of each put in the multiPut.
[[slow_log_responses]]
==== Get Slow Response Log from shell
When an individual RPC exceeds a configurable time bound we log a complaint
by way of the logging subsystem
e.g.
----
2019-10-02 10:10:22,195 WARN [,queue=15,port=60020] ipc.RpcServer - (responseTooSlow):
{"call":"Scan(org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ScanRequest)",
"starttimems":1567203007549,
"responsesize":6819737,
"method":"Scan",
"param":"region { type: REGION_NAME value: \"t1,\\000\\000\\215\\f)o\\\\\\024\\302\\220\\000\\000\\000\\000\\000\\001\\000\\000\\000\\000\\000\\006\\000\\000\\000\\000\\000\\005\\000\\000<TRUNCATED>",
"processingtimems":28646,
"client":"10.253.196.215:41116",
"queuetimems":22453,
"class":"HRegionServer"}
----
Unfortunately often the request parameters are truncated as per above Example.
The truncation is unfortunate because it eliminates much of the utility of
the warnings. For example, the region name, the start and end keys, and the
filter hierarchy are all important clues for debugging performance problems
caused by moderate to low selectivity queries or queries made at a high rate.
HBASE-22978 introduces maintaining an in-memory ring buffer of requests that were judged to
be too slow in addition to the responseTooSlow logging. The in-memory representation can be
complete. There is some chance a high rate of requests will cause information on other
interesting requests to be overwritten before it can be read. This is an acceptable trade off.
In order to enable the in-memory ring buffer at RegionServers, we need to enable
config:
----
hbase.regionserver.slowlog.buffer.enabled
----
One more config determines the size of the ring buffer:
----
hbase.regionserver.slowlog.ringbuffer.size
----
Check the config section for the detailed description.
This config would be disabled by default. Turn it on and these shell commands
would provide expected results from the ring-buffers.
shell commands to retrieve slowlog responses from RegionServers:
----
Retrieve latest SlowLog Responses maintained by each or specific RegionServers.
Specify '*' to include all RS otherwise array of server names for specific
RS. A server name is the host, port plus startcode of a RegionServer.
e.g.: host187.example.com,60020,1289493121758 (find servername in
master ui or when you do detailed status in shell)
Provide optional filter parameters as Hash.
Default Limit of each server for providing no of slow log records is 10. User can specify
more limit by 'LIMIT' param in case more than 10 records should be retrieved.
Examples:
hbase> get_slowlog_responses '*' => get slowlog responses from all RS
hbase> get_slowlog_responses '*', {'LIMIT' => 50} => get slowlog responses from all RS
with 50 records limit (default limit: 10)
hbase> get_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2'] => get slowlog responses from SERVER_NAME1,
SERVER_NAME2
hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1'}
=> get slowlog responses only related to meta
region
hbase> get_slowlog_responses '*', {'TABLE_NAME' => 't1'} => get slowlog responses only related to t1 table
hbase> get_slowlog_responses '*', {'CLIENT_IP' => '192.162.1.40:60225', 'LIMIT' => 100}
=> get slowlog responses with given client
IP address and get 100 records limit
(default limit: 10)
hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1', 'TABLE_NAME' => 't1'}
=> get slowlog responses with given region name
or table name
hbase> get_slowlog_responses '*', {'USER' => 'user_name', 'CLIENT_IP' => '192.162.1.40:60225'}
=> get slowlog responses that match either
provided client IP address or user name
----
Sometimes output can be long pretty printed json for user to scroll in
a single screen and hence user might prefer
redirecting output of get_slowlog_responses to a file.
Example:
----
echo "get_slowlog_responses '*'" | hbase shell > xyz.out 2>&1
----
shell command to clear slowlog responses from RegionServer:
----
Clears SlowLog Responses maintained by each or specific RegionServers.
Specify array of server names for specific RS. A server name is
the host, port plus startcode of a RegionServer.
e.g.: host187.example.com,60020,1289493121758 (find servername in
master ui or when you do detailed status in shell)
Examples:
hbase> clear_slowlog_responses => clears slowlog responses from all RS
hbase> clear_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2'] => clears slowlog responses from SERVER_NAME1,
SERVER_NAME2
----
=== Block Cache Monitoring
Starting with HBase 0.98, the HBase Web UI includes the ability to monitor and report on the performance of the block cache.