HBASE-23938 : System table hbase:slowlog to store complete slow/large… (#1681)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Anoop Sam John <anoopsamjohn@apache.org>
Signed-off-by: ramkrish86 <ramkrishna@apache.org>
This commit is contained in:
Viraj Jasani 2020-05-20 15:10:29 +05:30
parent fbe0da2672
commit 6f5e5e4828
No known key found for this signature in database
GPG Key ID: 3AE697641452FC5D
12 changed files with 619 additions and 21 deletions

View File

@ -0,0 +1,150 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.slowlog;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Slowlog Accessor to record slow/large RPC log identified at each RegionServer RpcServer level.
* This can be done only optionally to record the entire history of slow/large rpc calls
* since RingBuffer can handle only limited latest records.
*/
@InterfaceAudience.Private
public class SlowLogTableAccessor {
private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableAccessor.class);
private static final Random RANDOM = new Random();
private static Connection connection;
/**
* hbase:slowlog table name - can be enabled
* with config - hbase.regionserver.slowlog.systable.enabled
*/
public static final TableName SLOW_LOG_TABLE_NAME =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "slowlog");
private static void doPut(final Connection connection, final List<Put> puts)
throws IOException {
try (Table table = connection.getTable(SLOW_LOG_TABLE_NAME)) {
table.put(puts);
}
}
/**
* Add slow/large log records to hbase:slowlog table
* @param slowLogPayloads List of SlowLogPayload to process
* @param configuration Configuration to use for connection
*/
public static void addSlowLogRecords(final List<TooSlowLog.SlowLogPayload> slowLogPayloads,
final Configuration configuration) {
List<Put> puts = new ArrayList<>(slowLogPayloads.size());
for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) {
final byte[] rowKey = getRowKey(slowLogPayload);
final Put put = new Put(rowKey).setDurability(Durability.SKIP_WAL)
.setPriority(HConstants.NORMAL_QOS)
.addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("call_details"),
Bytes.toBytes(slowLogPayload.getCallDetails()))
.addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("client_address"),
Bytes.toBytes(slowLogPayload.getClientAddress()))
.addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("method_name"),
Bytes.toBytes(slowLogPayload.getMethodName()))
.addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("param"),
Bytes.toBytes(slowLogPayload.getParam()))
.addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("processing_time"),
Bytes.toBytes(Integer.toString(slowLogPayload.getProcessingTime())))
.addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("queue_time"),
Bytes.toBytes(Integer.toString(slowLogPayload.getQueueTime())))
.addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("region_name"),
Bytes.toBytes(slowLogPayload.getRegionName()))
.addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("response_size"),
Bytes.toBytes(Long.toString(slowLogPayload.getResponseSize())))
.addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("server_class"),
Bytes.toBytes(slowLogPayload.getServerClass()))
.addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("start_time"),
Bytes.toBytes(Long.toString(slowLogPayload.getStartTime())))
.addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("type"),
Bytes.toBytes(slowLogPayload.getType().name()))
.addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("username"),
Bytes.toBytes(slowLogPayload.getUserName()));
puts.add(put);
}
try {
if (connection == null) {
createConnection(configuration);
}
doPut(connection, puts);
} catch (Exception e) {
LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", e);
}
}
private static synchronized void createConnection(Configuration configuration)
throws IOException {
Configuration conf = new Configuration(configuration);
// rpc timeout: 20s
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 20000);
// retry count: 5
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
connection = ConnectionFactory.createConnection(conf);
}
/**
* Create rowKey: currentTimeMillis APPEND slowLogPayload.hashcode
* Scan on slowlog table should keep records with sorted order of time, however records
* added at the very same time (currentTimeMillis) could be in random order.
*
* @param slowLogPayload SlowLogPayload to process
* @return rowKey byte[]
*/
private static byte[] getRowKey(final TooSlowLog.SlowLogPayload slowLogPayload) {
String hashcode = String.valueOf(slowLogPayload.hashCode());
String lastFiveDig =
hashcode.substring((hashcode.length() > 5) ? (hashcode.length() - 5) : 0);
if (lastFiveDig.startsWith("-")) {
lastFiveDig = String.valueOf(RANDOM.nextInt(99999));
}
final long currentTimeMillis = EnvironmentEdgeManager.currentTime();
final String timeAndHashcode = currentTimeMillis + lastFiveDig;
final long rowKeyLong = Long.parseLong(timeAndHashcode);
return Bytes.toBytes(rowKeyLong);
}
}

View File

@ -1563,6 +1563,16 @@ public final class HConstants {
"hbase.regionserver.slowlog.buffer.enabled";
public static final boolean DEFAULT_ONLINE_LOG_PROVIDER_ENABLED = false;
/** The slowlog info family as a string*/
private static final String SLOWLOG_INFO_FAMILY_STR = "info";
/** The slowlog info family */
public static final byte [] SLOWLOG_INFO_FAMILY = Bytes.toBytes(SLOWLOG_INFO_FAMILY_STR);
public static final String SLOW_LOG_SYS_TABLE_ENABLED_KEY =
"hbase.regionserver.slowlog.systable.enabled";
public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;
private HConstants() {
// Can't be instantiated with this ctor.
}

View File

@ -435,7 +435,7 @@ public abstract class RpcServer implements RpcServerInterface,
final String className = server == null ? StringUtils.EMPTY :
server.getClass().getSimpleName();
this.slowLogRecorder.addSlowLogPayload(
new RpcLogDetails(call, status.getClient(), responseSize, className, tooSlow,
new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow,
tooLarge));
}
}

View File

@ -143,6 +143,7 @@ import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure;
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
@ -1139,6 +1140,8 @@ public class HMaster extends HRegionServer implements MasterServices {
// Start the chore to read snapshots and add their usage to table/NS quotas
getChoreService().scheduleChore(snapshotQuotaChore);
}
final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this);
slowLogMasterService.init();
// clear the dead servers with same host name and port of online server because we are not
// removing dead server with same hostname and port of rs which is trying to check in before

View File

@ -0,0 +1,73 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.slowlog;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Slowlog Master services - Table creation to be used by HMaster
*/
@InterfaceAudience.Private
public class SlowLogMasterService {
private static final Logger LOG = LoggerFactory.getLogger(SlowLogMasterService.class);
private final boolean slowlogTableEnabled;
private final MasterServices masterServices;
private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER =
TableDescriptorBuilder.newBuilder(SlowLogTableAccessor.SLOW_LOG_TABLE_NAME)
.setRegionReplication(1)
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(HConstants.SLOWLOG_INFO_FAMILY)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
.setBlockCacheEnabled(false)
.setMaxVersions(1).build());
public SlowLogMasterService(final Configuration configuration,
final MasterServices masterServices) {
slowlogTableEnabled = configuration.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
this.masterServices = masterServices;
}
public void init() throws IOException {
if (!slowlogTableEnabled) {
LOG.info("Slow/Large requests logging to system table hbase:slowlog is disabled. Quitting.");
return;
}
if (!MetaTableAccessor.tableExists(masterServices.getConnection(),
SlowLogTableAccessor.SLOW_LOG_TABLE_NAME)) {
LOG.info("slowlog table not found. Creating.");
this.masterServices.createSystemTable(TABLE_DESCRIPTOR_BUILDER.build());
}
}
}

View File

@ -138,6 +138,7 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogTableOpsChore;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@ -427,6 +428,8 @@ public class HRegionServer extends HasThread implements
private final RegionServerAccounting regionServerAccounting;
private SlowLogTableOpsChore slowLogTableOpsChore = null;
// Block cache
private BlockCache blockCache;
// The cache for mob files
@ -2011,6 +2014,9 @@ public class HRegionServer extends HasThread implements
if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore);
if (this.slowLogTableOpsChore != null) {
choreService.scheduleChore(slowLogTableOpsChore);
}
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit.
@ -2056,6 +2062,14 @@ public class HRegionServer extends HasThread implements
this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
this.leaseManager = new LeaseManager(this.threadWakeFrequency);
final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
if (isSlowLogTableEnabled) {
// default chore duration: 10 min
final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.slowLogRecorder);
}
// Create the thread to clean the moved regions list
movedRegionsCleaner = MovedRegionsCleaner.create(this);
@ -2559,6 +2573,7 @@ public class HRegionServer extends HasThread implements
choreService.cancelChore(storefileRefresher);
choreService.cancelChore(movedRegionsCleaner);
choreService.cancelChore(fsUtilizationChore);
choreService.cancelChore(slowLogTableOpsChore);
// clean up the remaining scheduled chores (in case we missed out any)
choreService.shutdown();
}

View File

@ -27,11 +27,14 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.SlowLogParams;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,11 +57,32 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);
private final Queue<SlowLogPayload> queue;
private static final String SYS_TABLE_QUEUE_SIZE =
"hbase.regionserver.slowlog.systable.queue.size";
private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
LogEventHandler(int eventCount) {
private final Queue<SlowLogPayload> queueForRingBuffer;
private final Queue<SlowLogPayload> queueForSysTable;
private final boolean isSlowLogTableEnabled;
private Configuration configuration;
private static final ReentrantLock LOCK = new ReentrantLock();
LogEventHandler(int eventCount, boolean isSlowLogTableEnabled, Configuration conf) {
this.configuration = conf;
EvictingQueue<SlowLogPayload> evictingQueue = EvictingQueue.create(eventCount);
queue = Queues.synchronizedQueue(evictingQueue);
queueForRingBuffer = Queues.synchronizedQueue(evictingQueue);
this.isSlowLogTableEnabled = isSlowLogTableEnabled;
if (isSlowLogTableEnabled) {
int sysTableQueueSize = conf.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE);
EvictingQueue<SlowLogPayload> evictingQueueForTable =
EvictingQueue.create(sysTableQueueSize);
queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
} else {
queueForSysTable = null;
}
}
/**
@ -83,7 +107,7 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
return;
}
Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
Message param = rpcCall.getParam();
Message param = rpcCallDetails.getParam();
long receiveTime = rpcCall.getReceiveTime();
long startTime = rpcCall.getStartTime();
long endTime = System.currentTimeMillis();
@ -129,7 +153,12 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
.setType(type)
.setUserName(userName)
.build();
queue.add(slowLogPayload);
queueForRingBuffer.add(slowLogPayload);
if (isSlowLogTableEnabled) {
if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
queueForSysTable.add(slowLogPayload);
}
}
}
private SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
@ -160,7 +189,7 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
if (LOG.isDebugEnabled()) {
LOG.debug("Received request to clean up online slowlog buffer..");
}
queue.clear();
queueForRingBuffer.clear();
return true;
}
@ -172,7 +201,7 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
*/
List<SlowLogPayload> getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
List<SlowLogPayload> slowLogPayloadList =
Arrays.stream(queue.toArray(new SlowLogPayload[0]))
Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
.filter(e -> e.getType() == SlowLogPayload.Type.ALL
|| e.getType() == SlowLogPayload.Type.SLOW_LOG)
.collect(Collectors.toList());
@ -191,7 +220,7 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
*/
List<SlowLogPayload> getLargeLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
List<SlowLogPayload> slowLogPayloadList =
Arrays.stream(queue.toArray(new SlowLogPayload[0]))
Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
.filter(e -> e.getType() == SlowLogPayload.Type.ALL
|| e.getType() == SlowLogPayload.Type.LARGE_LOG)
.collect(Collectors.toList());
@ -207,8 +236,7 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
if (isFilterProvided(request)) {
logPayloadList = filterLogs(request, logPayloadList);
}
int limit = request.getLimit() >= logPayloadList.size() ? logPayloadList.size()
: request.getLimit();
int limit = Math.min(request.getLimit(), logPayloadList.size());
return logPayloadList.subList(0, limit);
}
@ -256,4 +284,36 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
return filteredSlowLogPayloads;
}
/**
* Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
*/
void addAllLogsToSysTable() {
if (queueForSysTable == null) {
// hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.
return;
}
if (LOCK.isLocked()) {
return;
}
LOCK.lock();
try {
List<SlowLogPayload> slowLogPayloads = new ArrayList<>();
int i = 0;
while (!queueForSysTable.isEmpty()) {
slowLogPayloads.add(queueForSysTable.poll());
i++;
if (i == SYSTABLE_PUT_BATCH_SIZE) {
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
slowLogPayloads.clear();
i = 0;
}
}
if (slowLogPayloads.size() > 0) {
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
}
} finally {
LOCK.unlock();
}
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver.slowlog;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -30,15 +31,17 @@ import org.apache.yetus.audience.InterfaceAudience;
public class RpcLogDetails {
private final RpcCall rpcCall;
private final Message param;
private final String clientAddress;
private final long responseSize;
private final String className;
private final boolean isSlowLog;
private final boolean isLargeLog;
public RpcLogDetails(RpcCall rpcCall, String clientAddress, long responseSize,
public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize,
String className, boolean isSlowLog, boolean isLargeLog) {
this.rpcCall = rpcCall;
this.param = param;
this.clientAddress = clientAddress;
this.responseSize = responseSize;
this.className = className;
@ -70,10 +73,15 @@ public class RpcLogDetails {
return isLargeLog;
}
public Message getParam() {
return param;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("rpcCall", rpcCall)
.append("param", param)
.append("clientAddress", clientAddress)
.append("responseSize", responseSize)
.append("className", className)
@ -81,5 +89,4 @@ public class RpcLogDetails {
.append("isLargeLog", isLargeLog)
.toString();
}
}

View File

@ -29,6 +29,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@ -86,7 +87,9 @@ public class SlowLogRecorder {
this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
// initialize ringbuffer event handler
this.logEventHandler = new LogEventHandler(this.eventCount);
final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
this.logEventHandler = new LogEventHandler(this.eventCount, isSlowLogTableEnabled, conf);
this.disruptor.handleEventsWith(new LogEventHandler[]{this.logEventHandler});
this.disruptor.start();
}
@ -161,4 +164,13 @@ public class SlowLogRecorder {
}
}
/**
* Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
*/
public void addAllLogsToSysTable() {
if (this.logEventHandler != null) {
this.logEventHandler.addAllLogsToSysTable();
}
}
}

View File

@ -0,0 +1,63 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Chore to insert multiple accumulated slow/large logs to hbase:slowlog system table
*/
@InterfaceAudience.Private
public class SlowLogTableOpsChore extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableOpsChore.class);
private final SlowLogRecorder slowLogRecorder;
/**
* Chore Constructor
*
* @param stopper The stopper - When {@link Stoppable#isStopped()} is true, this chore will
* cancel and cleanup
* @param period Period in millis with which this Chore repeats execution when scheduled
* @param slowLogRecorder {@link SlowLogRecorder} instance
*/
public SlowLogTableOpsChore(final Stoppable stopper, final int period,
final SlowLogRecorder slowLogRecorder) {
super("SlowLogTableOpsChore", stopper, period);
this.slowLogRecorder = slowLogRecorder;
}
@Override
protected void chore() {
if (LOG.isTraceEnabled()) {
LOG.trace("SlowLog Table Ops Chore is starting up.");
}
slowLogRecorder.addAllLogsToSysTable();
if (LOG.isTraceEnabled()) {
LOG.trace("SlowLog Table Ops Chore is closing.");
}
}
}

View File

@ -0,0 +1,204 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests for SlowLog System Table
*/
@Category({ MasterTests.class, MediumTests.class })
public class TestSlowLogAccessor {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSlowLogAccessor.class);
private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class);
private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
private SlowLogRecorder slowLogRecorder;
@BeforeClass
public static void setup() throws Exception {
try {
HBASE_TESTING_UTILITY.shutdownMiniHBaseCluster();
} catch (IOException e) {
LOG.debug("No worries.");
}
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
conf.setBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, true);
conf.setInt("hbase.slowlog.systable.chore.duration", 900);
conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", 50000);
HBASE_TESTING_UTILITY.startMiniCluster();
}
@AfterClass
public static void teardown() throws Exception {
HBASE_TESTING_UTILITY.shutdownMiniHBaseCluster();
}
@Before
public void setUp() throws Exception {
HRegionServer hRegionServer = HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0);
Field slowLogRecorder = HRegionServer.class.getDeclaredField("slowLogRecorder");
slowLogRecorder.setAccessible(true);
this.slowLogRecorder = (SlowLogRecorder) slowLogRecorder.get(hRegionServer);
}
@Test
public void testSlowLogRecords() throws Exception {
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
slowLogRecorder.clearSlowLogPayloads();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
int i = 0;
Connection connection = waitForSlowLogTableCreation();
// add 5 records initially
for (; i < 5; i++) {
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
// add 2 more records
for (; i < 7; i++) {
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
// add 3 more records
for (; i < 10; i++) {
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
// add 4 more records
for (; i < 14; i++) {
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY
.waitFor(3000, () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
Assert.assertNotEquals(-1,
HBASE_TESTING_UTILITY.waitFor(3000, () -> getTableCount(connection) == 14));
}
private int getTableCount(Connection connection) {
try (Table table = connection.getTable(SlowLogTableAccessor.SLOW_LOG_TABLE_NAME)) {
ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM));
int count = 0;
for (Result result : resultScanner) {
++count;
}
return count;
} catch (Exception e) {
return 0;
}
}
private Connection waitForSlowLogTableCreation() {
Connection connection =
HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0).getConnection();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(2000, () -> {
try {
return MetaTableAccessor.tableExists(connection, SlowLogTableAccessor.SLOW_LOG_TABLE_NAME);
} catch (IOException e) {
return false;
}
}));
return connection;
}
@Test
public void testHigherSlowLogs() throws Exception {
Connection connection = waitForSlowLogTableCreation();
slowLogRecorder.clearSlowLogPayloads();
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
for (int j = 0; j < 100; j++) {
CompletableFuture.runAsync(() -> {
for (int i = 0; i < 350; i++) {
if (i == 300) {
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
}
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
});
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(7000, () -> {
int count = slowLogRecorder.getSlowLogPayloads(request).size();
LOG.debug("RingBuffer records count: {}", count);
return count > 2000;
}));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(7000, () -> {
int count = getTableCount(connection);
LOG.debug("SlowLog Table records count: {}", count);
return count > 2000;
}));
}
}

View File

@ -486,18 +486,19 @@ public class TestSlowLogRecorder {
}
private RpcLogDetails getRpcLogDetails(String userName, String clientAddress,
String className) {
return new RpcLogDetails(getRpcCall(userName), clientAddress, 0, className, true, true);
static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
RpcCall rpcCall = getRpcCall(userName);
return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, true, true);
}
private RpcLogDetails getRpcLogDetails(String userName, String clientAddress,
String className, boolean isSlowLog, boolean isLargeLog) {
return new RpcLogDetails(getRpcCall(userName), clientAddress, 0, className, isSlowLog,
RpcCall rpcCall = getRpcCall(userName);
return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, isSlowLog,
isLargeLog);
}
private RpcCall getRpcCall(String userName) {
private static RpcCall getRpcCall(String userName) {
RpcCall rpcCall = new RpcCall() {
@Override
public BlockingService getService() {
@ -646,7 +647,7 @@ public class TestSlowLogRecorder {
return rpcCall;
}
private Message getMessage() {
private static Message getMessage() {
i = (i + 1) % 3;
@ -693,7 +694,7 @@ public class TestSlowLogRecorder {
}
private Optional<User> getUser(String userName) {
private static Optional<User> getUser(String userName) {
return Optional.of(new User() {