HBASE-27536: Include more request information in slowlog for Scans (#5155)

Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
This commit is contained in:
Ray Mattingly 2023-04-16 10:03:53 -04:00 committed by GitHub
parent 1e75a2a9a2
commit 2b098b0819
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 237 additions and 21 deletions

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
import java.util.Optional;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
@ -52,6 +53,11 @@ final public class OnlineLogRecord extends LogEntry {
if (slowLogPayload.getMultiServiceCalls() == 0) {
jsonObj.remove("multiServiceCalls");
}
if (slowLogPayload.getScan().isPresent()) {
jsonObj.add("scan", gson.toJsonTree(slowLogPayload.getScan().get().toMap()));
} else {
jsonObj.remove("scan");
}
return jsonObj;
}).create();
@ -72,6 +78,7 @@ final public class OnlineLogRecord extends LogEntry {
private final int multiGetsCount;
private final int multiMutationsCount;
private final int multiServiceCalls;
private final Optional<Scan> scan;
public long getStartTime() {
return startTime;
@ -136,11 +143,20 @@ final public class OnlineLogRecord extends LogEntry {
return multiServiceCalls;
}
private OnlineLogRecord(final long startTime, final int processingTime, final int queueTime,
/**
* If {@value org.apache.hadoop.hbase.HConstants#SLOW_LOG_SCAN_PAYLOAD_ENABLED} is enabled then
* this value may be present and should represent the Scan that produced the given
* {@link OnlineLogRecord}
*/
public Optional<Scan> getScan() {
return scan;
}
OnlineLogRecord(final long startTime, final int processingTime, final int queueTime,
final long responseSize, final long blockBytesScanned, final String clientAddress,
final String serverClass, final String methodName, final String callDetails, final String param,
final String regionName, final String userName, final int multiGetsCount,
final int multiMutationsCount, final int multiServiceCalls) {
final int multiMutationsCount, final int multiServiceCalls, final Scan scan) {
this.startTime = startTime;
this.processingTime = processingTime;
this.queueTime = queueTime;
@ -156,6 +172,7 @@ final public class OnlineLogRecord extends LogEntry {
this.multiGetsCount = multiGetsCount;
this.multiMutationsCount = multiMutationsCount;
this.multiServiceCalls = multiServiceCalls;
this.scan = Optional.ofNullable(scan);
}
public static class OnlineLogRecordBuilder {
@ -174,6 +191,7 @@ final public class OnlineLogRecord extends LogEntry {
private int multiGetsCount;
private int multiMutationsCount;
private int multiServiceCalls;
private Scan scan = null;
public OnlineLogRecordBuilder setStartTime(long startTime) {
this.startTime = startTime;
@ -253,10 +271,15 @@ final public class OnlineLogRecord extends LogEntry {
return this;
}
public OnlineLogRecordBuilder setScan(Scan scan) {
this.scan = scan;
return this;
}
public OnlineLogRecord build() {
return new OnlineLogRecord(startTime, processingTime, queueTime, responseSize,
blockBytesScanned, clientAddress, serverClass, methodName, callDetails, param, regionName,
userName, multiGetsCount, multiMutationsCount, multiServiceCalls);
userName, multiGetsCount, multiMutationsCount, multiServiceCalls, scan);
}
}
@ -280,7 +303,8 @@ final public class OnlineLogRecord extends LogEntry {
.append(multiServiceCalls, that.multiServiceCalls).append(clientAddress, that.clientAddress)
.append(serverClass, that.serverClass).append(methodName, that.methodName)
.append(callDetails, that.callDetails).append(param, that.param)
.append(regionName, that.regionName).append(userName, that.userName).isEquals();
.append(regionName, that.regionName).append(userName, that.userName).append(scan, that.scan)
.isEquals();
}
@Override
@ -288,7 +312,8 @@ final public class OnlineLogRecord extends LogEntry {
return new HashCodeBuilder(17, 37).append(startTime).append(processingTime).append(queueTime)
.append(responseSize).append(blockBytesScanned).append(clientAddress).append(serverClass)
.append(methodName).append(callDetails).append(param).append(regionName).append(userName)
.append(multiGetsCount).append(multiMutationsCount).append(multiServiceCalls).toHashCode();
.append(multiGetsCount).append(multiMutationsCount).append(multiServiceCalls).append(scan)
.toHashCode();
}
@Override
@ -305,7 +330,7 @@ final public class OnlineLogRecord extends LogEntry {
.append("methodName", methodName).append("callDetails", callDetails).append("param", param)
.append("regionName", regionName).append("userName", userName)
.append("multiGetsCount", multiGetsCount).append("multiMutationsCount", multiMutationsCount)
.append("multiServiceCalls", multiServiceCalls).toString();
.append("multiServiceCalls", multiServiceCalls).append("scan", scan).toString();
}
}

View File

@ -23,6 +23,8 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
/**
* SlowLog params object that contains detailed info as params and region name : to be used for
* filter purpose
@ -32,15 +34,24 @@ public class SlowLogParams {
private final String regionName;
private final String params;
private final ClientProtos.Scan scan;
public SlowLogParams(String regionName, String params, ClientProtos.Scan scan) {
this.regionName = regionName;
this.params = params;
this.scan = scan;
}
public SlowLogParams(String regionName, String params) {
this.regionName = regionName;
this.params = params;
this.scan = null;
}
public SlowLogParams(String params) {
this.regionName = StringUtils.EMPTY;
this.params = params;
this.scan = null;
}
public String getRegionName() {
@ -51,10 +62,14 @@ public class SlowLogParams {
return params;
}
public ClientProtos.Scan getScan() {
return scan;
}
@Override
public String toString() {
return new ToStringBuilder(this).append("regionName", regionName).append("params", params)
.toString();
.append("scan", scan).toString();
}
@Override
@ -67,11 +82,11 @@ public class SlowLogParams {
}
SlowLogParams that = (SlowLogParams) o;
return new EqualsBuilder().append(regionName, that.regionName).append(params, that.params)
.isEquals();
.append("scan", scan).isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37).append(regionName).append(params).toHashCode();
return new HashCodeBuilder(17, 37).append(regionName).append(params).append(scan).toHashCode();
}
}

View File

@ -130,6 +130,8 @@ import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.apache.hbase.thirdparty.com.google.gson.JsonArray;
@ -231,6 +233,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
@InterfaceAudience.Private // TODO: some clients (Hive, etc) use this class
public final class ProtobufUtil {
private static final Logger LOG = LoggerFactory.getLogger(ProtobufUtil.class.getName());
private ProtobufUtil() {
}
@ -2144,7 +2148,7 @@ public final class ProtobufUtil {
* @param message Message object {@link Message}
* @return SlowLogParams with regionName(for filter queries) and params
*/
public static SlowLogParams getSlowLogParams(Message message) {
public static SlowLogParams getSlowLogParams(Message message, boolean slowLogScanPayloadEnabled) {
if (message == null) {
return null;
}
@ -2152,7 +2156,11 @@ public final class ProtobufUtil {
ScanRequest scanRequest = (ScanRequest) message;
String regionName = getStringForByteString(scanRequest.getRegion().getValue());
String params = TextFormat.shortDebugString(message);
return new SlowLogParams(regionName, params);
if (slowLogScanPayloadEnabled) {
return new SlowLogParams(regionName, params, scanRequest.getScan());
} else {
return new SlowLogParams(regionName, params);
}
} else if (message instanceof MutationProto) {
MutationProto mutationProto = (MutationProto) message;
String params = "type= " + mutationProto.getMutateType().toString();
@ -3367,7 +3375,7 @@ public final class ProtobufUtil {
* @return SlowLog Payload for client usecase
*/
private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLogPayload) {
OnlineLogRecord onlineLogRecord =
OnlineLogRecord.OnlineLogRecordBuilder onlineLogRecord =
new OnlineLogRecord.OnlineLogRecordBuilder().setCallDetails(slowLogPayload.getCallDetails())
.setClientAddress(slowLogPayload.getClientAddress())
.setMethodName(slowLogPayload.getMethodName())
@ -3379,8 +3387,15 @@ public final class ProtobufUtil {
.setResponseSize(slowLogPayload.getResponseSize())
.setBlockBytesScanned(slowLogPayload.getBlockBytesScanned())
.setServerClass(slowLogPayload.getServerClass()).setStartTime(slowLogPayload.getStartTime())
.setUserName(slowLogPayload.getUserName()).build();
return onlineLogRecord;
.setUserName(slowLogPayload.getUserName());
if (slowLogPayload.hasScan()) {
try {
onlineLogRecord.setScan(ProtobufUtil.toScan(slowLogPayload.getScan()));
} catch (Exception e) {
LOG.warn("Failed to convert Scan proto {}", slowLogPayload.getScan(), e);
}
}
return onlineLogRecord.build();
}
/**

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ClientTests.class, SmallTests.class })
public class TestOnlineLogRecord {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestOnlineLogRecord.class);
@Test
public void itSerializesScan() {
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(123));
scan.withStopRow(Bytes.toBytes(456));
String expectedOutput = "{\n" + " \"startTime\": 1,\n" + " \"processingTime\": 2,\n"
+ " \"queueTime\": 3,\n" + " \"responseSize\": 4,\n" + " \"blockBytesScanned\": 5,\n"
+ " \"multiGetsCount\": 6,\n" + " \"multiMutationsCount\": 7,\n" + " \"scan\": {\n"
+ " \"startRow\": \"\\\\x00\\\\x00\\\\x00{\",\n"
+ " \"stopRow\": \"\\\\x00\\\\x00\\\\x01\\\\xC8\",\n" + " \"batch\": -1,\n"
+ " \"cacheBlocks\": true,\n" + " \"totalColumns\": 0,\n"
+ " \"maxResultSize\": -1,\n" + " \"families\": {},\n" + " \"caching\": -1,\n"
+ " \"maxVersions\": 1,\n" + " \"timeRange\": [\n" + " 0,\n"
+ " 9223372036854775807\n" + " ]\n" + " }\n" + "}";
OnlineLogRecord o =
new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, 6, 7, 0, scan);
String actualOutput = o.toJsonPrettyPrint();
System.out.println(actualOutput);
Assert.assertEquals(actualOutput, expectedOutput);
}
}

View File

@ -1569,6 +1569,9 @@ public final class HConstants {
// Default 10 mins.
public static final int DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION = 10 * 60 * 1000;
public static final String SLOW_LOG_SCAN_PAYLOAD_ENABLED = "hbase.slowlog.scan.payload.enabled";
public static final boolean SLOW_LOG_SCAN_PAYLOAD_ENABLED_DEFAULT = false;
public static final String SHELL_TIMESTAMP_FORMAT_EPOCH_KEY =
"hbase.shell.timestamp.format.epoch";

View File

@ -27,6 +27,8 @@ option java_outer_classname = "TooSlowLog";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
import "client/Client.proto";
message SlowLogPayload {
required int64 start_time = 1;
required int32 processing_time = 2;
@ -45,6 +47,7 @@ message SlowLogPayload {
required Type type = 15;
optional int64 block_bytes_scanned = 16;
optional Scan scan = 17;
// SLOW_LOG is RPC call slow in nature whereas LARGE_LOG is RPC call quite large.
// Majority of times, slow logs are also large logs and hence, ALL is combination of

View File

@ -65,10 +65,13 @@ public class SlowLogQueueService implements NamedQueueService {
private final boolean isSlowLogTableEnabled;
private final SlowLogPersistentService slowLogPersistentService;
private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue;
private final boolean slowLogScanPayloadEnabled;
public SlowLogQueueService(Configuration conf) {
this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
this.slowLogScanPayloadEnabled = conf.getBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED,
HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED_DEFAULT);
if (!isOnlineLogProviderEnabled) {
this.isSlowLogTableEnabled = false;
@ -129,7 +132,8 @@ public class SlowLogQueueService implements NamedQueueService {
long endTime = EnvironmentEdgeManager.currentTime();
int processingTime = (int) (endTime - startTime);
int qTime = (int) (startTime - receiveTime);
final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
final SlowLogParams slowLogParams =
ProtobufUtil.getSlowLogParams(param, slowLogScanPayloadEnabled);
int numGets = 0;
int numMutations = 0;
int numServiceCalls = 0;
@ -152,7 +156,7 @@ public class SlowLogQueueService implements NamedQueueService {
final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
final String methodDescriptorName =
methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
TooSlowLog.SlowLogPayload slowLogPayload = TooSlowLog.SlowLogPayload.newBuilder()
TooSlowLog.SlowLogPayload.Builder slowLogPayloadBuilder = TooSlowLog.SlowLogPayload.newBuilder()
.setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
.setClientAddress(clientAddress).setMethodName(methodDescriptorName).setMultiGets(numGets)
.setMultiMutations(numMutations).setMultiServiceCalls(numServiceCalls)
@ -160,8 +164,11 @@ public class SlowLogQueueService implements NamedQueueService {
.setProcessingTime(processingTime).setQueueTime(qTime)
.setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
.setResponseSize(responseSize).setBlockBytesScanned(blockBytesScanned)
.setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName)
.build();
.setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName);
if (slowLogParams != null && slowLogParams.getScan() != null) {
slowLogPayloadBuilder.setScan(slowLogParams.getScan());
}
TooSlowLog.SlowLogPayload slowLogPayload = slowLogPayloadBuilder.build();
slowLogQueue.add(slowLogPayload);
if (isSlowLogTableEnabled) {
if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {

View File

@ -527,12 +527,96 @@ public class TestNamedQueueRecorder {
HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(requestSlowLog).size() == 15));
}
@Test
public void testOnlineSlowLogScanPayloadDefaultDisabled() throws Exception {
Configuration conf = applySlowLogRecorderConf(1);
conf.unset(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
namedQueueRecorder.addRecord(rpcLogDetails);
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
if (slowLogPayload.isPresent()) {
return !slowLogPayload.get().hasScan();
}
return false;
}));
}
@Test
public void testOnlineSlowLogScanPayloadExplicitlyDisabled() throws Exception {
Configuration conf = applySlowLogRecorderConf(1);
conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, false);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
namedQueueRecorder.addRecord(rpcLogDetails);
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
if (slowLogPayload.isPresent()) {
return !slowLogPayload.get().hasScan();
}
return false;
}));
}
@Test
public void testOnlineSlowLogScanPayloadExplicitlyEnabled() throws Exception {
Configuration conf = applySlowLogRecorderConf(1);
conf.setBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, true);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
namedQueueRecorder.addRecord(rpcLogDetails);
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
if (slowLogPayload.isPresent()) {
return slowLogPayload.get().hasScan();
}
return false;
}));
}
static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className,
int forcedParamIndex) {
RpcCall rpcCall = getRpcCall(userName, forcedParamIndex);
return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, className, true,
true);
}
static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
RpcCall rpcCall = getRpcCall(userName);
return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, 0, className, true,
true);
}
private static RpcLogDetails getRpcLogDetailsOfScan() {
// forcedParamIndex of 0 results in a ScanRequest
return getRpcLogDetails("userName_1", "client_1", "class_1", 0);
}
private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className,
boolean isSlowLog, boolean isLargeLog) {
RpcCall rpcCall = getRpcCall(userName);
@ -541,6 +625,14 @@ public class TestNamedQueueRecorder {
}
private static RpcCall getRpcCall(String userName) {
return getRpcCall(userName, Optional.empty());
}
private static RpcCall getRpcCall(String userName, int forcedParamIndex) {
return getRpcCall(userName, Optional.of(forcedParamIndex));
}
private static RpcCall getRpcCall(String userName, Optional<Integer> forcedParamIndex) {
RpcCall rpcCall = new RpcCall() {
@Override
public BlockingService getService() {
@ -554,7 +646,7 @@ public class TestNamedQueueRecorder {
@Override
public Message getParam() {
return getMessage();
return getMessage(forcedParamIndex);
}
@Override
@ -689,13 +781,13 @@ public class TestNamedQueueRecorder {
return rpcCall;
}
private static Message getMessage() {
private static Message getMessage(Optional<Integer> forcedParamIndex) {
i = (i + 1) % 3;
Message message = null;
switch (i) {
switch (forcedParamIndex.orElse(i)) {
case 0: {
message = ClientProtos.ScanRequest.newBuilder()