From 6f5e5e4828e965994ab20311979002d708bfc59f Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Wed, 20 May 2020 15:10:29 +0530 Subject: [PATCH] =?UTF-8?q?HBASE-23938=20:=20System=20table=20hbase:slowlo?= =?UTF-8?q?g=20to=20store=20complete=20slow/large=E2=80=A6=20(#1681)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Duo Zhang Signed-off-by: Anoop Sam John Signed-off-by: ramkrish86 --- .../hbase/slowlog/SlowLogTableAccessor.java | 150 +++++++++++++ .../org/apache/hadoop/hbase/HConstants.java | 10 + .../apache/hadoop/hbase/ipc/RpcServer.java | 2 +- .../apache/hadoop/hbase/master/HMaster.java | 3 + .../master/slowlog/SlowLogMasterService.java | 73 +++++++ .../hbase/regionserver/HRegionServer.java | 15 ++ .../regionserver/slowlog/LogEventHandler.java | 80 ++++++- .../regionserver/slowlog/RpcLogDetails.java | 11 +- .../regionserver/slowlog/SlowLogRecorder.java | 14 +- .../slowlog/SlowLogTableOpsChore.java | 63 ++++++ .../slowlog/TestSlowLogAccessor.java | 204 ++++++++++++++++++ .../slowlog/TestSlowLogRecorder.java | 15 +- 12 files changed, 619 insertions(+), 21 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/slowlog/SlowLogMasterService.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java new file mode 100644 index 00000000000..f4f29c6579e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java @@ -0,0 +1,150 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.slowlog; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Slowlog Accessor to record slow/large RPC log identified at each RegionServer RpcServer level. + * This can be done only optionally to record the entire history of slow/large rpc calls + * since RingBuffer can handle only limited latest records. + */ +@InterfaceAudience.Private +public class SlowLogTableAccessor { + + private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableAccessor.class); + + private static final Random RANDOM = new Random(); + + private static Connection connection; + + /** + * hbase:slowlog table name - can be enabled + * with config - hbase.regionserver.slowlog.systable.enabled + */ + public static final TableName SLOW_LOG_TABLE_NAME = + TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "slowlog"); + + private static void doPut(final Connection connection, final List puts) + throws IOException { + try (Table table = connection.getTable(SLOW_LOG_TABLE_NAME)) { + table.put(puts); + } + } + + /** + * Add slow/large log records to hbase:slowlog table + * @param slowLogPayloads List of SlowLogPayload to process + * @param configuration Configuration to use for connection + */ + public static void addSlowLogRecords(final List slowLogPayloads, + final Configuration configuration) { + List puts = new ArrayList<>(slowLogPayloads.size()); + for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) { + final byte[] rowKey = getRowKey(slowLogPayload); + final Put put = new Put(rowKey).setDurability(Durability.SKIP_WAL) + .setPriority(HConstants.NORMAL_QOS) + .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("call_details"), + Bytes.toBytes(slowLogPayload.getCallDetails())) + .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("client_address"), + Bytes.toBytes(slowLogPayload.getClientAddress())) + .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("method_name"), + Bytes.toBytes(slowLogPayload.getMethodName())) + .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("param"), + Bytes.toBytes(slowLogPayload.getParam())) + .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("processing_time"), + Bytes.toBytes(Integer.toString(slowLogPayload.getProcessingTime()))) + .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("queue_time"), + Bytes.toBytes(Integer.toString(slowLogPayload.getQueueTime()))) + .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("region_name"), + Bytes.toBytes(slowLogPayload.getRegionName())) + .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("response_size"), + Bytes.toBytes(Long.toString(slowLogPayload.getResponseSize()))) + .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("server_class"), + Bytes.toBytes(slowLogPayload.getServerClass())) + .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("start_time"), + Bytes.toBytes(Long.toString(slowLogPayload.getStartTime()))) + .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("type"), + Bytes.toBytes(slowLogPayload.getType().name())) + .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("username"), + Bytes.toBytes(slowLogPayload.getUserName())); + puts.add(put); + } + try { + if (connection == null) { + createConnection(configuration); + } + doPut(connection, puts); + } catch (Exception e) { + LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", e); + } + } + + private static synchronized void createConnection(Configuration configuration) + throws IOException { + Configuration conf = new Configuration(configuration); + // rpc timeout: 20s + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 20000); + // retry count: 5 + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); + conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); + connection = ConnectionFactory.createConnection(conf); + } + + /** + * Create rowKey: currentTimeMillis APPEND slowLogPayload.hashcode + * Scan on slowlog table should keep records with sorted order of time, however records + * added at the very same time (currentTimeMillis) could be in random order. + * + * @param slowLogPayload SlowLogPayload to process + * @return rowKey byte[] + */ + private static byte[] getRowKey(final TooSlowLog.SlowLogPayload slowLogPayload) { + String hashcode = String.valueOf(slowLogPayload.hashCode()); + String lastFiveDig = + hashcode.substring((hashcode.length() > 5) ? (hashcode.length() - 5) : 0); + if (lastFiveDig.startsWith("-")) { + lastFiveDig = String.valueOf(RANDOM.nextInt(99999)); + } + final long currentTimeMillis = EnvironmentEdgeManager.currentTime(); + final String timeAndHashcode = currentTimeMillis + lastFiveDig; + final long rowKeyLong = Long.parseLong(timeAndHashcode); + return Bytes.toBytes(rowKeyLong); + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index fdc35325d36..8c3d2951326 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1563,6 +1563,16 @@ public final class HConstants { "hbase.regionserver.slowlog.buffer.enabled"; public static final boolean DEFAULT_ONLINE_LOG_PROVIDER_ENABLED = false; + /** The slowlog info family as a string*/ + private static final String SLOWLOG_INFO_FAMILY_STR = "info"; + + /** The slowlog info family */ + public static final byte [] SLOWLOG_INFO_FAMILY = Bytes.toBytes(SLOWLOG_INFO_FAMILY_STR); + + public static final String SLOW_LOG_SYS_TABLE_ENABLED_KEY = + "hbase.regionserver.slowlog.systable.enabled"; + public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index d49122a4516..3a59a4b840c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -435,7 +435,7 @@ public abstract class RpcServer implements RpcServerInterface, final String className = server == null ? StringUtils.EMPTY : server.getClass().getSimpleName(); this.slowLogRecorder.addSlowLogPayload( - new RpcLogDetails(call, status.getClient(), responseSize, className, tooSlow, + new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow, tooLarge)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 482e9bb5cb4..26f0092a25c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -143,6 +143,7 @@ import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure; import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; +import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer; import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer; @@ -1139,6 +1140,8 @@ public class HMaster extends HRegionServer implements MasterServices { // Start the chore to read snapshots and add their usage to table/NS quotas getChoreService().scheduleChore(snapshotQuotaChore); } + final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this); + slowLogMasterService.init(); // clear the dead servers with same host name and port of online server because we are not // removing dead server with same hostname and port of rs which is trying to check in before diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/slowlog/SlowLogMasterService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/slowlog/SlowLogMasterService.java new file mode 100644 index 00000000000..554ed88adcc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/slowlog/SlowLogMasterService.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.slowlog; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Slowlog Master services - Table creation to be used by HMaster + */ +@InterfaceAudience.Private +public class SlowLogMasterService { + + private static final Logger LOG = LoggerFactory.getLogger(SlowLogMasterService.class); + + private final boolean slowlogTableEnabled; + private final MasterServices masterServices; + + private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER = + TableDescriptorBuilder.newBuilder(SlowLogTableAccessor.SLOW_LOG_TABLE_NAME) + .setRegionReplication(1) + .setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(HConstants.SLOWLOG_INFO_FAMILY) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL) + .setBlockCacheEnabled(false) + .setMaxVersions(1).build()); + + public SlowLogMasterService(final Configuration configuration, + final MasterServices masterServices) { + slowlogTableEnabled = configuration.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, + HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); + this.masterServices = masterServices; + } + + public void init() throws IOException { + if (!slowlogTableEnabled) { + LOG.info("Slow/Large requests logging to system table hbase:slowlog is disabled. Quitting."); + return; + } + if (!MetaTableAccessor.tableExists(masterServices.getConnection(), + SlowLogTableAccessor.SLOW_LOG_TABLE_NAME)) { + LOG.info("slowlog table not found. Creating."); + this.masterServices.createSystemTable(TABLE_DESCRIPTOR_BUILDER.build()); + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index c76055e43dc..cd0cc3adcf7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -138,6 +138,7 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder; +import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogTableOpsChore; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; @@ -427,6 +428,8 @@ public class HRegionServer extends HasThread implements private final RegionServerAccounting regionServerAccounting; + private SlowLogTableOpsChore slowLogTableOpsChore = null; + // Block cache private BlockCache blockCache; // The cache for mob files @@ -2011,6 +2014,9 @@ public class HRegionServer extends HasThread implements if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher); if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner); if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore); + if (this.slowLogTableOpsChore != null) { + choreService.scheduleChore(slowLogTableOpsChore); + } // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. @@ -2056,6 +2062,14 @@ public class HRegionServer extends HasThread implements this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); this.leaseManager = new LeaseManager(this.threadWakeFrequency); + final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, + HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); + if (isSlowLogTableEnabled) { + // default chore duration: 10 min + final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000); + slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.slowLogRecorder); + } + // Create the thread to clean the moved regions list movedRegionsCleaner = MovedRegionsCleaner.create(this); @@ -2559,6 +2573,7 @@ public class HRegionServer extends HasThread implements choreService.cancelChore(storefileRefresher); choreService.cancelChore(movedRegionsCleaner); choreService.cancelChore(fsUtilizationChore); + choreService.cancelChore(slowLogTableOpsChore); // clean up the remaining scheduled chores (in case we missed out any) choreService.shutdown(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java index 508f08697ec..8d500de571c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java @@ -27,11 +27,14 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Queue; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.SlowLogParams; import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,11 +57,32 @@ class LogEventHandler implements EventHandler { private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class); - private final Queue queue; + private static final String SYS_TABLE_QUEUE_SIZE = + "hbase.regionserver.slowlog.systable.queue.size"; + private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000; + private static final int SYSTABLE_PUT_BATCH_SIZE = 100; - LogEventHandler(int eventCount) { + private final Queue queueForRingBuffer; + private final Queue queueForSysTable; + private final boolean isSlowLogTableEnabled; + + private Configuration configuration; + + private static final ReentrantLock LOCK = new ReentrantLock(); + + LogEventHandler(int eventCount, boolean isSlowLogTableEnabled, Configuration conf) { + this.configuration = conf; EvictingQueue evictingQueue = EvictingQueue.create(eventCount); - queue = Queues.synchronizedQueue(evictingQueue); + queueForRingBuffer = Queues.synchronizedQueue(evictingQueue); + this.isSlowLogTableEnabled = isSlowLogTableEnabled; + if (isSlowLogTableEnabled) { + int sysTableQueueSize = conf.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE); + EvictingQueue evictingQueueForTable = + EvictingQueue.create(sysTableQueueSize); + queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable); + } else { + queueForSysTable = null; + } } /** @@ -83,7 +107,7 @@ class LogEventHandler implements EventHandler { return; } Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod(); - Message param = rpcCall.getParam(); + Message param = rpcCallDetails.getParam(); long receiveTime = rpcCall.getReceiveTime(); long startTime = rpcCall.getStartTime(); long endTime = System.currentTimeMillis(); @@ -129,7 +153,12 @@ class LogEventHandler implements EventHandler { .setType(type) .setUserName(userName) .build(); - queue.add(slowLogPayload); + queueForRingBuffer.add(slowLogPayload); + if (isSlowLogTableEnabled) { + if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) { + queueForSysTable.add(slowLogPayload); + } + } } private SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) { @@ -160,7 +189,7 @@ class LogEventHandler implements EventHandler { if (LOG.isDebugEnabled()) { LOG.debug("Received request to clean up online slowlog buffer.."); } - queue.clear(); + queueForRingBuffer.clear(); return true; } @@ -172,7 +201,7 @@ class LogEventHandler implements EventHandler { */ List getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) { List slowLogPayloadList = - Arrays.stream(queue.toArray(new SlowLogPayload[0])) + Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0])) .filter(e -> e.getType() == SlowLogPayload.Type.ALL || e.getType() == SlowLogPayload.Type.SLOW_LOG) .collect(Collectors.toList()); @@ -191,7 +220,7 @@ class LogEventHandler implements EventHandler { */ List getLargeLogPayloads(final AdminProtos.SlowLogResponseRequest request) { List slowLogPayloadList = - Arrays.stream(queue.toArray(new SlowLogPayload[0])) + Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0])) .filter(e -> e.getType() == SlowLogPayload.Type.ALL || e.getType() == SlowLogPayload.Type.LARGE_LOG) .collect(Collectors.toList()); @@ -207,8 +236,7 @@ class LogEventHandler implements EventHandler { if (isFilterProvided(request)) { logPayloadList = filterLogs(request, logPayloadList); } - int limit = request.getLimit() >= logPayloadList.size() ? logPayloadList.size() - : request.getLimit(); + int limit = Math.min(request.getLimit(), logPayloadList.size()); return logPayloadList.subList(0, limit); } @@ -256,4 +284,36 @@ class LogEventHandler implements EventHandler { return filteredSlowLogPayloads; } + /** + * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch + */ + void addAllLogsToSysTable() { + if (queueForSysTable == null) { + // hbase.regionserver.slowlog.systable.enabled is turned off. Exiting. + return; + } + if (LOCK.isLocked()) { + return; + } + LOCK.lock(); + try { + List slowLogPayloads = new ArrayList<>(); + int i = 0; + while (!queueForSysTable.isEmpty()) { + slowLogPayloads.add(queueForSysTable.poll()); + i++; + if (i == SYSTABLE_PUT_BATCH_SIZE) { + SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); + slowLogPayloads.clear(); + i = 0; + } + } + if (slowLogPayloads.size() > 0) { + SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); + } + } finally { + LOCK.unlock(); + } + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java index 7d5558cde73..b469cdb94b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver.slowlog; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.yetus.audience.InterfaceAudience; /** @@ -30,15 +31,17 @@ import org.apache.yetus.audience.InterfaceAudience; public class RpcLogDetails { private final RpcCall rpcCall; + private final Message param; private final String clientAddress; private final long responseSize; private final String className; private final boolean isSlowLog; private final boolean isLargeLog; - public RpcLogDetails(RpcCall rpcCall, String clientAddress, long responseSize, + public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize, String className, boolean isSlowLog, boolean isLargeLog) { this.rpcCall = rpcCall; + this.param = param; this.clientAddress = clientAddress; this.responseSize = responseSize; this.className = className; @@ -70,10 +73,15 @@ public class RpcLogDetails { return isLargeLog; } + public Message getParam() { + return param; + } + @Override public String toString() { return new ToStringBuilder(this) .append("rpcCall", rpcCall) + .append("param", param) .append("clientAddress", clientAddress) .append("responseSize", responseSize) .append("className", className) @@ -81,5 +89,4 @@ public class RpcLogDetails { .append("isLargeLog", isLargeLog) .toString(); } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java index a69b0adb5df..b0fb3e75a56 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java @@ -29,6 +29,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -86,7 +87,9 @@ public class SlowLogRecorder { this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler()); // initialize ringbuffer event handler - this.logEventHandler = new LogEventHandler(this.eventCount); + final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, + HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); + this.logEventHandler = new LogEventHandler(this.eventCount, isSlowLogTableEnabled, conf); this.disruptor.handleEventsWith(new LogEventHandler[]{this.logEventHandler}); this.disruptor.start(); } @@ -161,4 +164,13 @@ public class SlowLogRecorder { } } + /** + * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch + */ + public void addAllLogsToSysTable() { + if (this.logEventHandler != null) { + this.logEventHandler.addAllLogsToSysTable(); + } + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java new file mode 100644 index 00000000000..77749f7dd36 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.slowlog; + +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Chore to insert multiple accumulated slow/large logs to hbase:slowlog system table + */ +@InterfaceAudience.Private +public class SlowLogTableOpsChore extends ScheduledChore { + + private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableOpsChore.class); + + private final SlowLogRecorder slowLogRecorder; + + /** + * Chore Constructor + * + * @param stopper The stopper - When {@link Stoppable#isStopped()} is true, this chore will + * cancel and cleanup + * @param period Period in millis with which this Chore repeats execution when scheduled + * @param slowLogRecorder {@link SlowLogRecorder} instance + */ + public SlowLogTableOpsChore(final Stoppable stopper, final int period, + final SlowLogRecorder slowLogRecorder) { + super("SlowLogTableOpsChore", stopper, period); + this.slowLogRecorder = slowLogRecorder; + } + + @Override + protected void chore() { + if (LOG.isTraceEnabled()) { + LOG.trace("SlowLog Table Ops Chore is starting up."); + } + slowLogRecorder.addAllLogsToSysTable(); + if (LOG.isTraceEnabled()) { + LOG.trace("SlowLog Table Ops Chore is closing."); + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java new file mode 100644 index 00000000000..e08ad290cfc --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java @@ -0,0 +1,204 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.slowlog; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests for SlowLog System Table + */ +@Category({ MasterTests.class, MediumTests.class }) +public class TestSlowLogAccessor { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSlowLogAccessor.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class); + + private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility(); + + private SlowLogRecorder slowLogRecorder; + + @BeforeClass + public static void setup() throws Exception { + try { + HBASE_TESTING_UTILITY.shutdownMiniHBaseCluster(); + } catch (IOException e) { + LOG.debug("No worries."); + } + Configuration conf = HBASE_TESTING_UTILITY.getConfiguration(); + conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true); + conf.setBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, true); + conf.setInt("hbase.slowlog.systable.chore.duration", 900); + conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", 50000); + HBASE_TESTING_UTILITY.startMiniCluster(); + } + + @AfterClass + public static void teardown() throws Exception { + HBASE_TESTING_UTILITY.shutdownMiniHBaseCluster(); + } + + @Before + public void setUp() throws Exception { + HRegionServer hRegionServer = HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0); + Field slowLogRecorder = HRegionServer.class.getDeclaredField("slowLogRecorder"); + slowLogRecorder.setAccessible(true); + this.slowLogRecorder = (SlowLogRecorder) slowLogRecorder.get(hRegionServer); + } + + @Test + public void testSlowLogRecords() throws Exception { + + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build(); + + slowLogRecorder.clearSlowLogPayloads(); + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + + int i = 0; + + Connection connection = waitForSlowLogTableCreation(); + // add 5 records initially + for (; i < 5; i++) { + RpcLogDetails rpcLogDetails = TestSlowLogRecorder + .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); + } + + // add 2 more records + for (; i < 7; i++) { + RpcLogDetails rpcLogDetails = TestSlowLogRecorder + .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); + } + + // add 3 more records + for (; i < 10; i++) { + RpcLogDetails rpcLogDetails = TestSlowLogRecorder + .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); + } + + // add 4 more records + for (; i < 14; i++) { + RpcLogDetails rpcLogDetails = TestSlowLogRecorder + .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); + } + + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY + .waitFor(3000, () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14)); + + Assert.assertNotEquals(-1, + HBASE_TESTING_UTILITY.waitFor(3000, () -> getTableCount(connection) == 14)); + } + + private int getTableCount(Connection connection) { + try (Table table = connection.getTable(SlowLogTableAccessor.SLOW_LOG_TABLE_NAME)) { + ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM)); + int count = 0; + for (Result result : resultScanner) { + ++count; + } + return count; + } catch (Exception e) { + return 0; + } + } + + private Connection waitForSlowLogTableCreation() { + Connection connection = + HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0).getConnection(); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(2000, () -> { + try { + return MetaTableAccessor.tableExists(connection, SlowLogTableAccessor.SLOW_LOG_TABLE_NAME); + } catch (IOException e) { + return false; + } + })); + return connection; + } + + @Test + public void testHigherSlowLogs() throws Exception { + Connection connection = waitForSlowLogTableCreation(); + + slowLogRecorder.clearSlowLogPayloads(); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build(); + Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0); + + for (int j = 0; j < 100; j++) { + CompletableFuture.runAsync(() -> { + for (int i = 0; i < 350; i++) { + if (i == 300) { + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + } + RpcLogDetails rpcLogDetails = TestSlowLogRecorder + .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1)); + slowLogRecorder.addSlowLogPayload(rpcLogDetails); + } + }); + } + + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(7000, () -> { + int count = slowLogRecorder.getSlowLogPayloads(request).size(); + LOG.debug("RingBuffer records count: {}", count); + return count > 2000; + })); + + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(7000, () -> { + int count = getTableCount(connection); + LOG.debug("SlowLog Table records count: {}", count); + return count > 2000; + })); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java index bdd5c89be0b..863e27bd297 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java @@ -486,18 +486,19 @@ public class TestSlowLogRecorder { } - private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, - String className) { - return new RpcLogDetails(getRpcCall(userName), clientAddress, 0, className, true, true); + static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) { + RpcCall rpcCall = getRpcCall(userName); + return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, true, true); } private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className, boolean isSlowLog, boolean isLargeLog) { - return new RpcLogDetails(getRpcCall(userName), clientAddress, 0, className, isSlowLog, + RpcCall rpcCall = getRpcCall(userName); + return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, isSlowLog, isLargeLog); } - private RpcCall getRpcCall(String userName) { + private static RpcCall getRpcCall(String userName) { RpcCall rpcCall = new RpcCall() { @Override public BlockingService getService() { @@ -646,7 +647,7 @@ public class TestSlowLogRecorder { return rpcCall; } - private Message getMessage() { + private static Message getMessage() { i = (i + 1) % 3; @@ -693,7 +694,7 @@ public class TestSlowLogRecorder { } - private Optional getUser(String userName) { + private static Optional getUser(String userName) { return Optional.of(new User() {