HBASE-24718 : Generic NamedQueue framework for multiple use-cases (Refactor SlowLog responses)

Closes #2052

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
Viraj Jasani 2020-07-17 19:04:26 +05:30
parent ec3beaf5a2
commit 8ae3480e70
No known key found for this signature in database
GPG Key ID: B3D6C0B41C8ADFD5
23 changed files with 1070 additions and 529 deletions

View File

@ -2055,6 +2055,11 @@ public final class RequestConverter {
} else {
builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.OR);
}
if (LogQueryFilter.Type.SLOW_LOG.equals(logQueryFilter.getType())) {
builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG);
} else {
builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG);
}
return builder.setLimit(logQueryFilter.getLimit()).build();
}

View File

@ -1996,4 +1996,16 @@ possible configurations would overwhelm and obscure the important.
too large batch request.
</description>
</property>
<property>
<name>hbase.namedqueue.provider.classes</name>
<value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService</value>
<description>
Default values for NamedQueueService implementors. This comma separated full class names
represent all implementors of NamedQueueService that we would like to be invoked by
LogEvent handler service. One example of NamedQueue service is SlowLogQueueService which
is used to store slow/large RPC logs in ringbuffer at each RegionServer.
All implementors of NamedQueueService should be found under package:
"org.apache.hadoop.hbase.namequeues.impl"
</description>
</property>
</configuration>

View File

@ -287,12 +287,18 @@ message SlowLogResponseRequest {
OR = 1;
}
enum LogType {
SLOW_LOG = 0;
LARGE_LOG = 1;
}
optional string region_name = 1;
optional string table_name = 2;
optional string client_address = 3;
optional string user_name = 4;
optional uint32 limit = 5 [default = 10];
optional FilterByOperator filter_by_operator = 6 [default = OR];
optional LogType log_type = 7;
}
message SlowLogResponses {

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
@ -46,8 +47,8 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.slowlog.RpcLogDetails;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
@ -96,6 +97,7 @@ public abstract class RpcServer implements RpcServerInterface,
private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
private final boolean authorize;
private final boolean isOnlineLogProviderEnabled;
protected boolean isSecurityEnabled;
public static final byte CURRENT_VERSION = 0;
@ -227,7 +229,7 @@ public abstract class RpcServer implements RpcServerInterface,
/**
* Use to add online slowlog responses
*/
private SlowLogRecorder slowLogRecorder;
private NamedQueueRecorder namedQueueRecorder;
@FunctionalInterface
protected interface CallCleanup {
@ -302,6 +304,8 @@ public abstract class RpcServer implements RpcServerInterface,
saslProps = Collections.emptyMap();
}
this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
this.scheduler = scheduler;
}
@ -430,11 +434,11 @@ public abstract class RpcServer implements RpcServerInterface,
tooLarge, tooSlow,
status.getClient(), startTime, processingTime, qTime,
responseSize, userName);
if (this.slowLogRecorder != null) {
if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
// send logs to ring buffer owned by slowLogRecorder
final String className = server == null ? StringUtils.EMPTY :
server.getClass().getSimpleName();
this.slowLogRecorder.addSlowLogPayload(
final String className =
server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
this.namedQueueRecorder.addRecord(
new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow,
tooLarge));
}
@ -817,12 +821,8 @@ public abstract class RpcServer implements RpcServerInterface,
}
@Override
public void setSlowLogRecorder(SlowLogRecorder slowLogRecorder) {
this.slowLogRecorder = slowLogRecorder;
public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
this.namedQueueRecorder = namedQueueRecorder;
}
@Override
public SlowLogRecorder getSlowLogRecorder() {
return slowLogRecorder;
}
}

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.yetus.audience.InterfaceAudience;
@ -102,12 +102,8 @@ public interface RpcServerInterface {
/**
* Set Online SlowLog Provider
*
* @param slowLogRecorder instance of {@link SlowLogRecorder}
* @param namedQueueRecorder instance of {@link NamedQueueRecorder}
*/
void setSlowLogRecorder(final SlowLogRecorder slowLogRecorder);
void setNamedQueueRecorder(final NamedQueueRecorder namedQueueRecorder);
/**
* @return Retrieve instance of {@link SlowLogRecorder} maintained by RpcServer
*/
SlowLogRecorder getSlowLogRecorder();
}

View File

@ -17,7 +17,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
package org.apache.hadoop.hbase.namequeues;
import com.lmax.disruptor.ExceptionHandler;
import org.apache.yetus.audience.InterfaceAudience;

View File

@ -0,0 +1,130 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Event Handler run by disruptor ringbuffer consumer.
* Although this is generic implementation for namedQueue, it can have individual queue specific
* logic.
*/
@InterfaceAudience.Private
class LogEventHandler implements EventHandler<RingBufferEnvelope> {
private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);
// Map that binds namedQueues to corresponding queue service implementation.
// If NamedQueue of specific type is enabled, corresponding service will be used to
// insert and retrieve records.
// Individual queue sizes should be determined based on their individual configs within
// each service.
private final Map<NamedQueuePayload.NamedQueueEvent, NamedQueueService> namedQueueServices =
new HashMap<>();
private static final String NAMED_QUEUE_PROVIDER_CLASSES = "hbase.namedqueue.provider.classes";
LogEventHandler(final Configuration conf) {
for (String implName : conf.getStringCollection(NAMED_QUEUE_PROVIDER_CLASSES)) {
Class<?> clz;
try {
clz = Class.forName(implName);
} catch (ClassNotFoundException e) {
LOG.warn("Failed to find NamedQueueService implementor class {}", implName, e);
continue;
}
if (!NamedQueueService.class.isAssignableFrom(clz)) {
LOG.warn("Class {} is not implementor of NamedQueueService.", clz);
continue;
}
// add all service mappings here
try {
NamedQueueService namedQueueService =
(NamedQueueService) clz.getConstructor(Configuration.class).newInstance(conf);
namedQueueServices.put(namedQueueService.getEvent(), namedQueueService);
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException
| InvocationTargetException e) {
LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.",
clz);
}
}
}
/**
* Called when a publisher has published an event to the {@link RingBuffer}.
* This is generic consumer of disruptor ringbuffer and for each new namedQueue that we
* add, we should also provide specific consumer logic here.
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from
* the {@link RingBuffer}
*/
@Override
public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) {
final NamedQueuePayload namedQueuePayload = event.getPayload();
// consume ringbuffer payload based on event type
namedQueueServices.get(namedQueuePayload.getNamedQueueEvent())
.consumeEventFromDisruptor(namedQueuePayload);
}
/**
* Cleans up queues maintained by services.
*
* @param namedQueueEvent type of queue to clear
* @return true if queue is cleaned up, false otherwise
*/
boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
return namedQueueServices.get(namedQueueEvent).clearNamedQueue();
}
/**
* Add all in memory queue records to system table. The implementors can use system table
* or direct HDFS file or ZK as persistence system.
*/
void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
namedQueueServices.get(namedQueueEvent).persistAll();
}
/**
* Retrieve in memory queue records from ringbuffer
*
* @param request namedQueue request with event type
* @return queue records from ringbuffer after filter (if applied)
*/
NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
return namedQueueServices.get(request.getNamedQueueEvent()).getNamedQueueRecords(request);
}
}

View File

@ -17,7 +17,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
package org.apache.hadoop.hbase.namequeues;
import java.util.ArrayList;
import java.util.List;
@ -30,7 +30,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* Event Handler utility class
*/
@InterfaceAudience.Private
class LogHandlerUtils {
public class LogHandlerUtils {
private static int getTotalFiltersCount(AdminProtos.SlowLogResponseRequest request) {
int totalFilters = 0;
@ -91,7 +91,7 @@ class LogHandlerUtils {
return filteredSlowLogPayloads;
}
static List<TooSlowLog.SlowLogPayload> getFilteredLogs(
public static List<TooSlowLog.SlowLogPayload> getFilteredLogs(
AdminProtos.SlowLogResponseRequest request, List<TooSlowLog.SlowLogPayload> logPayloadList) {
int totalFilters = getTotalFiltersCount(request);
if (totalFilters > 0) {

View File

@ -0,0 +1,49 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Base payload to be prepared by client to send various namedQueue events for in-memory
* ring buffer storage in either HMaster or RegionServer.
* e.g slowLog responses
*/
@InterfaceAudience.Private
public class NamedQueuePayload {
public enum NamedQueueEvent {
SLOW_LOG
}
private final NamedQueueEvent namedQueueEvent;
public NamedQueuePayload(NamedQueueEvent namedQueueEvent) {
if (namedQueueEvent == null) {
throw new RuntimeException("NamedQueuePayload with null namedQueueEvent");
}
this.namedQueueEvent = namedQueueEvent;
}
public NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}
}

View File

@ -17,87 +17,80 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
package org.apache.hadoop.hbase.namequeues;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
/**
* Online Slow/Large Log Provider Service that keeps slow/large RPC logs in the ring buffer.
* The service uses LMAX Disruptor to save slow records which are then consumed by
* NamedQueue recorder that maintains various named queues.
* The service uses LMAX Disruptor to save queue records which are then consumed by
* a queue and based on the ring buffer size, the available records are then fetched
* from the queue in thread-safe manner.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class SlowLogRecorder {
public class NamedQueueRecorder {
private final Disruptor<RingBufferEnvelope> disruptor;
private final LogEventHandler logEventHandler;
private final int eventCount;
private final boolean isOnlineLogProviderEnabled;
private static final String SLOW_LOG_RING_BUFFER_SIZE =
"hbase.regionserver.slowlog.ringbuffer.size";
private static NamedQueueRecorder namedQueueRecorder;
private static boolean isInit = false;
private static final Object LOCK = new Object();
/**
* Initialize disruptor with configurable ringbuffer size
*/
public SlowLogRecorder(Configuration conf) {
isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
if (!isOnlineLogProviderEnabled) {
this.disruptor = null;
this.logEventHandler = null;
this.eventCount = 0;
return;
}
this.eventCount = conf.getInt(SLOW_LOG_RING_BUFFER_SIZE,
HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
private NamedQueueRecorder(Configuration conf) {
// This is the 'writer' -- a single threaded executor. This single thread consumes what is
// put on the ringbuffer.
final String hostingThreadName = Thread.currentThread().getName();
int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024);
// disruptor initialization with BlockingWaitStrategy
this.disruptor = new Disruptor<>(RingBufferEnvelope::new,
getEventCount(),
getEventCount(eventCount),
Threads.newDaemonThreadFactory(hostingThreadName + ".slowlog.append"),
ProducerType.MULTI,
new BlockingWaitStrategy());
this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
// initialize ringbuffer event handler
final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
this.logEventHandler = new LogEventHandler(this.eventCount, isSlowLogTableEnabled, conf);
this.logEventHandler = new LogEventHandler(conf);
this.disruptor.handleEventsWith(new LogEventHandler[]{this.logEventHandler});
this.disruptor.start();
}
public static NamedQueueRecorder getInstance(Configuration conf) {
if (namedQueueRecorder != null) {
return namedQueueRecorder;
}
synchronized (LOCK) {
if (!isInit) {
namedQueueRecorder = new NamedQueueRecorder(conf);
isInit = true;
}
}
return namedQueueRecorder;
}
// must be power of 2 for disruptor ringbuffer
private int getEventCount() {
Preconditions.checkArgument(eventCount >= 0,
SLOW_LOG_RING_BUFFER_SIZE + " must be > 0");
private int getEventCount(int eventCount) {
Preconditions.checkArgument(eventCount >= 0, "hbase.namedqueue.ringbuffer.size must be > 0");
int floor = Integer.highestOneBit(eventCount);
if (floor == eventCount) {
return floor;
@ -110,66 +103,53 @@ public class SlowLogRecorder {
}
/**
* Retrieve online slow logs from ringbuffer
* Retrieve in memory queue records from ringbuffer
*
* @param request slow log request parameters
* @return online slow logs from ringbuffer
* @param request namedQueue request with event type
* @return queue records from ringbuffer after filter (if applied)
*/
public List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
return isOnlineLogProviderEnabled ? this.logEventHandler.getSlowLogPayloads(request)
: Collections.emptyList();
public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
return this.logEventHandler.getNamedQueueRecords(request);
}
/**
* Retrieve online large logs from ringbuffer
*
* @param request large log request parameters
* @return online large logs from ringbuffer
*/
public List<SlowLogPayload> getLargeLogPayloads(AdminProtos.SlowLogResponseRequest request) {
return isOnlineLogProviderEnabled ? this.logEventHandler.getLargeLogPayloads(request)
: Collections.emptyList();
}
/**
* clears slow log payloads from ringbuffer
* clears queue records from ringbuffer
*
* @param namedQueueEvent type of queue to clear
* @return true if slow log payloads are cleaned up or
* hbase.regionserver.slowlog.buffer.enabled is not set to true, false if failed to
* clean up slow logs
*/
public boolean clearSlowLogPayloads() {
if (!isOnlineLogProviderEnabled) {
return true;
}
return this.logEventHandler.clearSlowLogs();
public boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
return this.logEventHandler.clearNamedQueue(namedQueueEvent);
}
/**
* Add slow log rpcCall details to ringbuffer
* Add various NamedQueue records to ringbuffer. Based on the type of the event (e.g slowLog),
* consumer of disruptor ringbuffer will have specific logic.
* This method is producer of disruptor ringbuffer which is initialized in NamedQueueRecorder
* constructor.
*
* @param rpcLogDetails all details of rpc call that would be useful for ring buffer
* consumers
* @param namedQueuePayload namedQueue payload sent by client of ring buffer
* service
*/
public void addSlowLogPayload(RpcLogDetails rpcLogDetails) {
if (!isOnlineLogProviderEnabled) {
return;
}
public void addRecord(NamedQueuePayload namedQueuePayload) {
RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
long seqId = ringBuffer.next();
try {
ringBuffer.get(seqId).load(rpcLogDetails);
ringBuffer.get(seqId).load(namedQueuePayload);
} finally {
ringBuffer.publish(seqId);
}
}
/**
* Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
* Add all in memory queue records to system table. The implementors can use system table
* or direct HDFS file or ZK as persistence system.
*/
public void addAllLogsToSysTable() {
public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
if (this.logEventHandler != null) {
this.logEventHandler.addAllLogsToSysTable();
this.logEventHandler.persistAll(namedQueueEvent);
}
}

View File

@ -0,0 +1,69 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.yetus.audience.InterfaceAudience;
/**
* In-memory Queue service provider for multiple use-cases. Implementers should be
* registered in LogEventHandler
*/
@InterfaceAudience.Private
public interface NamedQueueService {
/**
* Retrieve event type for NamedQueueService implementation.
*
* @return {@link NamedQueuePayload.NamedQueueEvent}
*/
NamedQueuePayload.NamedQueueEvent getEvent();
/**
* This implementation is generic for consuming records from LMAX
* disruptor and inserts records to EvictingQueue which is maintained by each
* ringbuffer provider.
*
* @param namedQueuePayload namedQueue payload from disruptor ring buffer
*/
void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload);
/**
* Cleans up queues maintained by services.
*
* @return true if slow log payloads are cleaned up, false otherwise
*/
boolean clearNamedQueue();
/**
* Retrieve in memory queue records from ringbuffer
*
* @param request namedQueue request with event type
* @return queue records from ringbuffer after filter (if applied)
*/
NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request);
/**
* Add all in memory queue records to system table. The implementors can use system table
* or direct HDFS file or ZK as persistence system.
*/
void persistAll();
}

View File

@ -17,29 +17,29 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
package org.apache.hadoop.hbase.namequeues;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.yetus.audience.InterfaceAudience;
/**
* An envelope to carry payload in the slow log ring buffer that serves as online buffer
* to provide latest TooSlowLog
* An envelope to carry payload in the ring buffer that serves as online buffer
* to provide latest events
*/
@InterfaceAudience.Private
final class RingBufferEnvelope {
private RpcLogDetails rpcLogDetails;
private NamedQueuePayload namedQueuePayload;
/**
* Load the Envelope with {@link RpcCall}
*
* @param rpcLogDetails all details of rpc call that would be useful for ring buffer
* @param namedQueuePayload all details of rpc call that would be useful for ring buffer
* consumers
*/
public void load(RpcLogDetails rpcLogDetails) {
this.rpcLogDetails = rpcLogDetails;
public void load(NamedQueuePayload namedQueuePayload) {
this.namedQueuePayload = namedQueuePayload;
}
/**
@ -48,10 +48,10 @@ final class RingBufferEnvelope {
*
* @return Retrieve rpc log details
*/
public RpcLogDetails getPayload() {
final RpcLogDetails rpcLogDetails = this.rpcLogDetails;
this.rpcLogDetails = null;
return rpcLogDetails;
public NamedQueuePayload getPayload() {
final NamedQueuePayload namedQueuePayload = this.namedQueuePayload;
this.namedQueuePayload = null;
return namedQueuePayload;
}
}

View File

@ -17,7 +17,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
package org.apache.hadoop.hbase.namequeues;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.ipc.RpcCall;
@ -28,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* RpcCall details that would be passed on to ring buffer of slow log responses
*/
@InterfaceAudience.Private
public class RpcLogDetails {
public class RpcLogDetails extends NamedQueuePayload {
private final RpcCall rpcCall;
private final Message param;
@ -40,6 +40,7 @@ public class RpcLogDetails {
public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize,
String className, boolean isSlowLog, boolean isLargeLog) {
super(NamedQueueEvent.SLOW_LOG);
this.rpcCall = rpcCall;
this.param = param;
this.clientAddress = clientAddress;

View File

@ -0,0 +1,98 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantLock;
/**
* Persistent service provider for Slow/LargeLog events
*/
@InterfaceAudience.Private
public class SlowLogPersistentService {
private static final Logger LOG = LoggerFactory.getLogger(SlowLogPersistentService.class);
private static final ReentrantLock LOCK = new ReentrantLock();
private static final String SYS_TABLE_QUEUE_SIZE =
"hbase.regionserver.slowlog.systable.queue.size";
private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
private final Queue<TooSlowLog.SlowLogPayload> queueForSysTable;
private final Configuration configuration;
public SlowLogPersistentService(final Configuration configuration) {
this.configuration = configuration;
int sysTableQueueSize =
configuration.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE);
EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueueForTable =
EvictingQueue.create(sysTableQueueSize);
queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
}
public void addToQueueForSysTable(TooSlowLog.SlowLogPayload slowLogPayload) {
queueForSysTable.add(slowLogPayload);
}
/**
* Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
*/
public void addAllLogsToSysTable() {
if (queueForSysTable == null) {
LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.");
return;
}
if (LOCK.isLocked()) {
return;
}
LOCK.lock();
try {
List<TooSlowLog.SlowLogPayload> slowLogPayloads = new ArrayList<>();
int i = 0;
while (!queueForSysTable.isEmpty()) {
slowLogPayloads.add(queueForSysTable.poll());
i++;
if (i == SYSTABLE_PUT_BATCH_SIZE) {
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
slowLogPayloads.clear();
i = 0;
}
}
if (slowLogPayloads.size() > 0) {
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
}
} finally {
LOCK.unlock();
}
}
}

View File

@ -17,7 +17,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
package org.apache.hadoop.hbase.namequeues;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
@ -33,7 +33,7 @@ public class SlowLogTableOpsChore extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableOpsChore.class);
private final SlowLogRecorder slowLogRecorder;
private final NamedQueueRecorder namedQueueRecorder;
/**
* Chore Constructor
@ -41,12 +41,12 @@ public class SlowLogTableOpsChore extends ScheduledChore {
* @param stopper The stopper - When {@link Stoppable#isStopped()} is true, this chore will
* cancel and cleanup
* @param period Period in millis with which this Chore repeats execution when scheduled
* @param slowLogRecorder {@link SlowLogRecorder} instance
* @param namedQueueRecorder {@link NamedQueueRecorder} instance
*/
public SlowLogTableOpsChore(final Stoppable stopper, final int period,
final SlowLogRecorder slowLogRecorder) {
final NamedQueueRecorder namedQueueRecorder) {
super("SlowLogTableOpsChore", stopper, period);
this.slowLogRecorder = slowLogRecorder;
this.namedQueueRecorder = namedQueueRecorder;
}
@Override
@ -54,7 +54,7 @@ public class SlowLogTableOpsChore extends ScheduledChore {
if (LOG.isTraceEnabled()) {
LOG.trace("SlowLog Table Ops Chore is starting up.");
}
slowLogRecorder.addAllLogsToSysTable();
namedQueueRecorder.persistAll(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
if (LOG.isTraceEnabled()) {
LOG.trace("SlowLog Table Ops Chore is closing.");
}

View File

@ -0,0 +1,264 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.SlowLogParams;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.namequeues.LogHandlerUtils;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.namequeues.NamedQueueService;
import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;
/**
* In-memory Queue service provider for Slow/LargeLog events
*/
@InterfaceAudience.Private
public class SlowLogQueueService implements NamedQueueService {
private static final Logger LOG = LoggerFactory.getLogger(SlowLogQueueService.class);
private static final String SLOW_LOG_RING_BUFFER_SIZE =
"hbase.regionserver.slowlog.ringbuffer.size";
private final boolean isOnlineLogProviderEnabled;
private final boolean isSlowLogTableEnabled;
private final SlowLogPersistentService slowLogPersistentService;
private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue;
public SlowLogQueueService(Configuration conf) {
this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
if (!isOnlineLogProviderEnabled) {
this.isSlowLogTableEnabled = false;
this.slowLogPersistentService = null;
this.slowLogQueue = null;
return;
}
// Initialize SlowLog Queue
int slowLogQueueSize =
conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueue =
EvictingQueue.create(slowLogQueueSize);
slowLogQueue = Queues.synchronizedQueue(evictingQueue);
this.isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
if (isSlowLogTableEnabled) {
slowLogPersistentService = new SlowLogPersistentService(conf);
} else {
slowLogPersistentService = null;
}
}
@Override
public NamedQueuePayload.NamedQueueEvent getEvent() {
return NamedQueuePayload.NamedQueueEvent.SLOW_LOG;
}
/**
* This implementation is specific to slowLog event. This consumes slowLog event from
* disruptor and inserts records to EvictingQueue.
*
* @param namedQueuePayload namedQueue payload from disruptor ring buffer
*/
@Override
public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
if (!isOnlineLogProviderEnabled) {
return;
}
if (!(namedQueuePayload instanceof RpcLogDetails)) {
LOG.warn("SlowLogQueueService: NamedQueuePayload is not of type RpcLogDetails.");
return;
}
final RpcLogDetails rpcLogDetails = (RpcLogDetails) namedQueuePayload;
final RpcCall rpcCall = rpcLogDetails.getRpcCall();
final String clientAddress = rpcLogDetails.getClientAddress();
final long responseSize = rpcLogDetails.getResponseSize();
final String className = rpcLogDetails.getClassName();
final TooSlowLog.SlowLogPayload.Type type = getLogType(rpcLogDetails);
if (type == null) {
return;
}
Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
Message param = rpcLogDetails.getParam();
long receiveTime = rpcCall.getReceiveTime();
long startTime = rpcCall.getStartTime();
long endTime = System.currentTimeMillis();
int processingTime = (int) (endTime - startTime);
int qTime = (int) (startTime - receiveTime);
final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
int numGets = 0;
int numMutations = 0;
int numServiceCalls = 0;
if (param instanceof ClientProtos.MultiRequest) {
ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
for (ClientProtos.Action action : regionAction.getActionList()) {
if (action.hasMutation()) {
numMutations++;
}
if (action.hasGet()) {
numGets++;
}
if (action.hasServiceCall()) {
numServiceCalls++;
}
}
}
}
final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
final String methodDescriptorName =
methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
TooSlowLog.SlowLogPayload slowLogPayload = TooSlowLog.SlowLogPayload.newBuilder()
.setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
.setClientAddress(clientAddress)
.setMethodName(methodDescriptorName)
.setMultiGets(numGets)
.setMultiMutations(numMutations)
.setMultiServiceCalls(numServiceCalls)
.setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
.setProcessingTime(processingTime)
.setQueueTime(qTime)
.setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
.setResponseSize(responseSize)
.setServerClass(className)
.setStartTime(startTime)
.setType(type)
.setUserName(userName)
.build();
slowLogQueue.add(slowLogPayload);
if (isSlowLogTableEnabled) {
if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
slowLogPersistentService.addToQueueForSysTable(slowLogPayload);
}
}
}
@Override
public boolean clearNamedQueue() {
if (!isOnlineLogProviderEnabled) {
return false;
}
LOG.debug("Received request to clean up online slowlog buffer.");
slowLogQueue.clear();
return true;
}
@Override
public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
if (!isOnlineLogProviderEnabled) {
return null;
}
final AdminProtos.SlowLogResponseRequest slowLogResponseRequest =
request.getSlowLogResponseRequest();
final List<TooSlowLog.SlowLogPayload> slowLogPayloads;
if (AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG
.equals(slowLogResponseRequest.getLogType())) {
slowLogPayloads = getLargeLogPayloads(slowLogResponseRequest);
} else {
slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest);
}
NamedQueueGetResponse response = new NamedQueueGetResponse();
response.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
response.setSlowLogPayloads(slowLogPayloads);
return response;
}
private TooSlowLog.SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
final boolean isSlowLog = rpcCallDetails.isSlowLog();
final boolean isLargeLog = rpcCallDetails.isLargeLog();
final TooSlowLog.SlowLogPayload.Type type;
if (!isSlowLog && !isLargeLog) {
LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}",
rpcCallDetails);
return null;
}
if (isSlowLog && isLargeLog) {
type = TooSlowLog.SlowLogPayload.Type.ALL;
} else if (isSlowLog) {
type = TooSlowLog.SlowLogPayload.Type.SLOW_LOG;
} else {
type = TooSlowLog.SlowLogPayload.Type.LARGE_LOG;
}
return type;
}
/**
* Add all slowLog events to system table. This is only for slowLog event's persistence on
* system table.
*/
@Override
public void persistAll() {
if (!isOnlineLogProviderEnabled) {
return;
}
if (slowLogPersistentService != null) {
slowLogPersistentService.addAllLogsToSysTable();
}
}
private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads(
final AdminProtos.SlowLogResponseRequest request) {
List<TooSlowLog.SlowLogPayload> slowLogPayloadList =
Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter(
e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
|| e.getType() == TooSlowLog.SlowLogPayload.Type.SLOW_LOG).collect(Collectors.toList());
// latest slow logs first, operator is interested in latest records from in-memory buffer
Collections.reverse(slowLogPayloadList);
return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
}
private List<TooSlowLog.SlowLogPayload> getLargeLogPayloads(
final AdminProtos.SlowLogResponseRequest request) {
List<TooSlowLog.SlowLogPayload> slowLogPayloadList =
Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter(
e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
|| e.getType() == TooSlowLog.SlowLogPayload.Type.LARGE_LOG).collect(Collectors.toList());
// latest large logs first, operator is interested in latest records from in-memory buffer
Collections.reverse(slowLogPayloadList);
return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
}
}

View File

@ -0,0 +1,65 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues.request;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Request object to be used by ring buffer use-cases. Clients get records by sending
* this request object.
* For each ring buffer use-case, add request payload to this class, client should set
* namedQueueEvent based on use-case.
* Protobuf does not support inheritance, hence we need to work with
*/
@InterfaceAudience.Private
public class NamedQueueGetRequest {
private AdminProtos.SlowLogResponseRequest slowLogResponseRequest;
private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
public AdminProtos.SlowLogResponseRequest getSlowLogResponseRequest() {
return slowLogResponseRequest;
}
public void setSlowLogResponseRequest(
AdminProtos.SlowLogResponseRequest slowLogResponseRequest) {
this.slowLogResponseRequest = slowLogResponseRequest;
}
public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}
public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
this.namedQueueEvent = namedQueueEvent;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("slowLogResponseRequest", slowLogResponseRequest)
.append("namedQueueEvent", namedQueueEvent)
.toString();
}
}

View File

@ -0,0 +1,61 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues.response;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
import org.apache.yetus.audience.InterfaceAudience;
import java.util.List;
/**
* Response object to be sent by namedQueue service back to caller
*/
@InterfaceAudience.Private
public class NamedQueueGetResponse {
private List<TooSlowLog.SlowLogPayload> slowLogPayloads;
private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
public List<TooSlowLog.SlowLogPayload> getSlowLogPayloads() {
return slowLogPayloads;
}
public void setSlowLogPayloads(List<TooSlowLog.SlowLogPayload> slowLogPayloads) {
this.slowLogPayloads = slowLogPayloads;
}
public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}
public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
this.namedQueueEvent = namedQueueEvent;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("slowLogPayloads", slowLogPayloads)
.append("namedQueueEvent", namedQueueEvent)
.toString();
}
}

View File

@ -138,8 +138,8 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogTableOpsChore;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@ -535,7 +535,7 @@ public class HRegionServer extends Thread implements
/**
* Provide online slow log responses from ringbuffer
*/
private SlowLogRecorder slowLogRecorder;
private NamedQueueRecorder namedQueueRecorder = null;
/**
* True if this RegionServer is coming up in a cluster where there is no Master;
@ -597,7 +597,12 @@ public class HRegionServer extends Thread implements
this.stopped = false;
if (!(this instanceof HMaster)) {
this.slowLogRecorder = new SlowLogRecorder(this.conf);
final boolean isOnlineLogProviderEnabled = conf.getBoolean(
HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
if (isOnlineLogProviderEnabled) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
}
}
rpcServices = createRpcServices();
useThisHostnameInstead = getUseThisHostnameInstead(conf);
@ -1506,12 +1511,12 @@ public class HRegionServer extends Thread implements
}
/**
* get Online SlowLog Provider to add slow logs to ringbuffer
* get NamedQueue Provider to add different logs to ringbuffer
*
* @return Online SlowLog Provider
* @return NamedQueueRecorder
*/
public SlowLogRecorder getSlowLogRecorder() {
return this.slowLogRecorder;
public NamedQueueRecorder getNamedQueueRecorder() {
return this.namedQueueRecorder;
}
/*
@ -2070,7 +2075,7 @@ public class HRegionServer extends Thread implements
if (isSlowLogTableEnabled) {
// default chore duration: 10 min
final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.slowLogRecorder);
slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder);
}
// Create the thread to clean the moved regions list

View File

@ -108,6 +108,9 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
@ -128,7 +131,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
@ -1301,7 +1304,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
rpcServer.setRsRpcServices(this);
if (!(rs instanceof HMaster)) {
rpcServer.setSlowLogRecorder(rs.getSlowLogRecorder());
rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
}
scannerLeaseTimeoutPeriod = conf.getInt(
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
@ -3983,28 +3986,39 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
@QosPriority(priority = HConstants.ADMIN_QOS)
public SlowLogResponses getSlowLogResponses(final RpcController controller,
final SlowLogResponseRequest request) {
final SlowLogRecorder slowLogRecorder =
this.regionServer.getSlowLogRecorder();
final List<SlowLogPayload> slowLogPayloads;
slowLogPayloads = slowLogRecorder != null
? slowLogRecorder.getSlowLogPayloads(request)
: Collections.emptyList();
final NamedQueueRecorder namedQueueRecorder =
this.regionServer.getNamedQueueRecorder();
final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder);
SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
.addAllSlowLogPayloads(slowLogPayloads)
.build();
return slowLogResponses;
}
private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request,
NamedQueueRecorder namedQueueRecorder) {
if (namedQueueRecorder == null) {
return Collections.emptyList();
}
List<SlowLogPayload> slowLogPayloads;
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
namedQueueGetRequest.setSlowLogResponseRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
slowLogPayloads = namedQueueGetResponse != null ?
namedQueueGetResponse.getSlowLogPayloads() :
Collections.emptyList();
return slowLogPayloads;
}
@Override
@QosPriority(priority = HConstants.ADMIN_QOS)
public SlowLogResponses getLargeLogResponses(final RpcController controller,
final SlowLogResponseRequest request) {
final SlowLogRecorder slowLogRecorder =
this.regionServer.getSlowLogRecorder();
final List<SlowLogPayload> slowLogPayloads;
slowLogPayloads = slowLogRecorder != null
? slowLogRecorder.getLargeLogPayloads(request)
: Collections.emptyList();
final NamedQueueRecorder namedQueueRecorder =
this.regionServer.getNamedQueueRecorder();
final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder);
SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
.addAllSlowLogPayloads(slowLogPayloads)
.build();
@ -4015,10 +4029,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
@QosPriority(priority = HConstants.ADMIN_QOS)
public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller,
final ClearSlowLogResponseRequest request) {
final SlowLogRecorder slowLogRecorder =
this.regionServer.getSlowLogRecorder();
boolean slowLogsCleaned = Optional.ofNullable(slowLogRecorder)
.map(SlowLogRecorder::clearSlowLogPayloads).orElse(false);
final NamedQueueRecorder namedQueueRecorder =
this.regionServer.getNamedQueueRecorder();
boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder)
.map(queueRecorder ->
queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG))
.orElse(false);
ClearSlowLogResponses clearSlowLogResponses = ClearSlowLogResponses.newBuilder()
.setIsCleaned(slowLogsCleaned)
.build();

View File

@ -1,266 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.SlowLogParams;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
/**
* Event Handler run by disruptor ringbuffer consumer
*/
@InterfaceAudience.Private
class LogEventHandler implements EventHandler<RingBufferEnvelope> {
private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);
private static final String SYS_TABLE_QUEUE_SIZE =
"hbase.regionserver.slowlog.systable.queue.size";
private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
private final Queue<SlowLogPayload> queueForRingBuffer;
private final Queue<SlowLogPayload> queueForSysTable;
private final boolean isSlowLogTableEnabled;
private Configuration configuration;
private static final ReentrantLock LOCK = new ReentrantLock();
LogEventHandler(int eventCount, boolean isSlowLogTableEnabled, Configuration conf) {
this.configuration = conf;
EvictingQueue<SlowLogPayload> evictingQueue = EvictingQueue.create(eventCount);
queueForRingBuffer = Queues.synchronizedQueue(evictingQueue);
this.isSlowLogTableEnabled = isSlowLogTableEnabled;
if (isSlowLogTableEnabled) {
int sysTableQueueSize = conf.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE);
EvictingQueue<SlowLogPayload> evictingQueueForTable =
EvictingQueue.create(sysTableQueueSize);
queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
} else {
queueForSysTable = null;
}
}
/**
* Called when a publisher has published an event to the {@link RingBuffer}
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from
* the {@link RingBuffer}
* @throws Exception if the EventHandler would like the exception handled further up the chain
*/
@Override
public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch)
throws Exception {
final RpcLogDetails rpcCallDetails = event.getPayload();
final RpcCall rpcCall = rpcCallDetails.getRpcCall();
final String clientAddress = rpcCallDetails.getClientAddress();
final long responseSize = rpcCallDetails.getResponseSize();
final String className = rpcCallDetails.getClassName();
final SlowLogPayload.Type type = getLogType(rpcCallDetails);
if (type == null) {
return;
}
Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
Message param = rpcCallDetails.getParam();
long receiveTime = rpcCall.getReceiveTime();
long startTime = rpcCall.getStartTime();
long endTime = System.currentTimeMillis();
int processingTime = (int) (endTime - startTime);
int qTime = (int) (startTime - receiveTime);
final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
int numGets = 0;
int numMutations = 0;
int numServiceCalls = 0;
if (param instanceof ClientProtos.MultiRequest) {
ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
for (ClientProtos.Action action : regionAction.getActionList()) {
if (action.hasMutation()) {
numMutations++;
}
if (action.hasGet()) {
numGets++;
}
if (action.hasServiceCall()) {
numServiceCalls++;
}
}
}
}
final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
final String methodDescriptorName =
methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
SlowLogPayload slowLogPayload = SlowLogPayload.newBuilder()
.setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
.setClientAddress(clientAddress)
.setMethodName(methodDescriptorName)
.setMultiGets(numGets)
.setMultiMutations(numMutations)
.setMultiServiceCalls(numServiceCalls)
.setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
.setProcessingTime(processingTime)
.setQueueTime(qTime)
.setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
.setResponseSize(responseSize)
.setServerClass(className)
.setStartTime(startTime)
.setType(type)
.setUserName(userName)
.build();
queueForRingBuffer.add(slowLogPayload);
if (isSlowLogTableEnabled) {
if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
queueForSysTable.add(slowLogPayload);
}
}
}
private SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
final boolean isSlowLog = rpcCallDetails.isSlowLog();
final boolean isLargeLog = rpcCallDetails.isLargeLog();
final SlowLogPayload.Type type;
if (!isSlowLog && !isLargeLog) {
LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}",
rpcCallDetails);
return null;
}
if (isSlowLog && isLargeLog) {
type = SlowLogPayload.Type.ALL;
} else if (isSlowLog) {
type = SlowLogPayload.Type.SLOW_LOG;
} else {
type = SlowLogPayload.Type.LARGE_LOG;
}
return type;
}
/**
* Cleans up slow log payloads
*
* @return true if slow log payloads are cleaned up, false otherwise
*/
boolean clearSlowLogs() {
if (LOG.isDebugEnabled()) {
LOG.debug("Received request to clean up online slowlog buffer..");
}
queueForRingBuffer.clear();
return true;
}
/**
* Retrieve list of slow log payloads
*
* @param request slow log request parameters
* @return list of slow log payloads
*/
List<SlowLogPayload> getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
List<SlowLogPayload> slowLogPayloadList =
Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
.filter(e -> e.getType() == SlowLogPayload.Type.ALL
|| e.getType() == SlowLogPayload.Type.SLOW_LOG)
.collect(Collectors.toList());
// latest slow logs first, operator is interested in latest records from in-memory buffer
Collections.reverse(slowLogPayloadList);
return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
}
/**
* Retrieve list of large log payloads
*
* @param request large log request parameters
* @return list of large log payloads
*/
List<SlowLogPayload> getLargeLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
List<SlowLogPayload> slowLogPayloadList =
Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
.filter(e -> e.getType() == SlowLogPayload.Type.ALL
|| e.getType() == SlowLogPayload.Type.LARGE_LOG)
.collect(Collectors.toList());
// latest large logs first, operator is interested in latest records from in-memory buffer
Collections.reverse(slowLogPayloadList);
return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
}
/**
* Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
*/
void addAllLogsToSysTable() {
if (queueForSysTable == null) {
// hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.
return;
}
if (LOCK.isLocked()) {
return;
}
LOCK.lock();
try {
List<SlowLogPayload> slowLogPayloads = new ArrayList<>();
int i = 0;
while (!queueForSysTable.isEmpty()) {
slowLogPayloads.add(queueForSysTable.poll());
i++;
if (i == SYSTABLE_PUT_BATCH_SIZE) {
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
slowLogPayloads.clear();
i = 0;
}
}
if (slowLogPayloads.size() > 0) {
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
}
} finally {
LOCK.unlock();
}
}
}

View File

@ -17,12 +17,14 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
package org.apache.hadoop.hbase.namequeues;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@ -35,6 +37,8 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcCallback;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -61,27 +65,25 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPaylo
* Tests for Online SlowLog Provider Service
*/
@Category({MasterTests.class, MediumTests.class})
public class TestSlowLogRecorder {
public class TestNamedQueueRecorder {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSlowLogRecorder.class);
HBaseClassTestRule.forClass(TestNamedQueueRecorder.class);
private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class);
private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
private SlowLogRecorder slowLogRecorder;
private NamedQueueRecorder namedQueueRecorder;
private static int i = 0;
private static Configuration applySlowLogRecorderConf(int eventSize) {
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize);
return conf;
}
/**
@ -90,11 +92,10 @@ public class TestSlowLogRecorder {
*
* @param i index of ringbuffer logs
* @param j data value that was put on index i
* @param slowLogPayloads list of payload retrieved from {@link SlowLogRecorder}
* @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder}
* @return if actual values are as per expectations
*/
private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) {
boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j);
boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j);
boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j);
@ -105,12 +106,15 @@ public class TestSlowLogRecorder {
public void testOnlieSlowLogConsumption() throws Exception{
Configuration conf = applySlowLogRecorderConf(8);
slowLogRecorder = new SlowLogRecorder(conf);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
slowLogRecorder.clearSlowLogPayloads();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
int i = 0;
@ -119,12 +123,12 @@ public class TestSlowLogRecorder {
for (; i < 5; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 5));
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
Assert.assertNotEquals(-1,
HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5));
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads));
Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads));
Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads));
@ -135,15 +139,15 @@ public class TestSlowLogRecorder {
for (; i < 7; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 7));
() -> getSlowLogPayloads(request).size() == 7));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
return slowLogPayloadsList.size() == 7
&& confirmPayloadParams(0, 7, slowLogPayloadsList)
&& confirmPayloadParams(5, 2, slowLogPayloadsList)
@ -155,15 +159,15 @@ public class TestSlowLogRecorder {
for (; i < 10; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
() -> getSlowLogPayloads(request).size() == 8));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
// confirm ringbuffer is full
return slowLogPayloadsList.size() == 8
&& confirmPayloadParams(7, 3, slowLogPayloadsList)
@ -176,15 +180,33 @@ public class TestSlowLogRecorder {
for (; i < 14; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
() -> getSlowLogPayloads(request).size() == 8));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
// confirm ringbuffer is full
// and ordered events
return slowLogPayloadsList.size() == 8
&& confirmPayloadParams(0, 14, slowLogPayloadsList)
&& confirmPayloadParams(1, 13, slowLogPayloadsList)
&& confirmPayloadParams(2, 12, slowLogPayloadsList)
&& confirmPayloadParams(3, 11, slowLogPayloadsList);
})
);
AdminProtos.SlowLogResponseRequest largeLogRequest =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest);
// confirm ringbuffer is full
// and ordered events
return slowLogPayloadsList.size() == 8
@ -197,24 +219,12 @@ public class TestSlowLogRecorder {
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getLargeLogPayloads(request);
// confirm ringbuffer is full
// and ordered events
return slowLogPayloadsList.size() == 8
&& confirmPayloadParams(0, 14, slowLogPayloadsList)
&& confirmPayloadParams(1, 13, slowLogPayloadsList)
&& confirmPayloadParams(2, 12, slowLogPayloadsList)
&& confirmPayloadParams(3, 11, slowLogPayloadsList);
})
);
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue(
NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
LOG.debug("cleared the ringbuffer of Online Slow Log records");
List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
// confirm ringbuffer is empty
return slowLogPayloadsList.size() == 0 && isRingBufferCleaned;
})
@ -222,30 +232,43 @@ public class TestSlowLogRecorder {
}
private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
namedQueueGetRequest.setSlowLogResponseRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
return namedQueueGetResponse == null ?
Collections.emptyList() : namedQueueGetResponse.getSlowLogPayloads();
}
@Test
public void testOnlineSlowLogWithHighRecords() throws Exception {
Configuration conf = applySlowLogRecorderConf(14);
slowLogRecorder = new SlowLogRecorder(conf);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 14 * 11; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
() -> getSlowLogPayloads(request).size() == 14));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
// confirm strict order of slow log payloads
return slowLogPayloads.size() == 14
@ -266,37 +289,37 @@ public class TestSlowLogRecorder {
})
);
boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue(
NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
Assert.assertTrue(isRingBufferCleaned);
LOG.debug("cleared the ringbuffer of Online Slow Log records");
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
// confirm ringbuffer is empty
Assert.assertEquals(slowLogPayloads.size(), 0);
}
@Test
public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY);
slowLogRecorder = new SlowLogRecorder(conf);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().build();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 300; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
return slowLogPayloads.size() == 0;
})
);
@ -305,56 +328,58 @@ public class TestSlowLogRecorder {
@Test
public void testOnlineSlowLogWithDisableConfig() throws Exception {
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().build();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 300; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
return slowLogPayloads.size() == 0;
})
);
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
}
@Test
public void testSlowLogFilters() throws Exception {
Configuration conf = applySlowLogRecorderConf(30);
slowLogRecorder = new SlowLogRecorder(conf);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setUserName("userName_87")
.build();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 100; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter");
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 1));
() -> getSlowLogPayloads(request).size() == 1));
AdminProtos.SlowLogResponseRequest requestClient =
AdminProtos.SlowLogResponseRequest.newBuilder()
@ -362,25 +387,32 @@ public class TestSlowLogRecorder {
.setClientAddress("client_85")
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(requestClient).size() == 1));
() -> getSlowLogPayloads(requestClient).size() == 1));
AdminProtos.SlowLogResponseRequest requestSlowLog =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15));
() -> getSlowLogPayloads(requestSlowLog).size() == 15));
}
@Test
public void testConcurrentSlowLogEvents() throws Exception {
Configuration conf = applySlowLogRecorderConf(50000);
slowLogRecorder = new SlowLogRecorder(conf);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
AdminProtos.SlowLogResponseRequest largeLogRequest =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(500000)
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
.build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int j = 0; j < 1000; j++) {
@ -389,7 +421,7 @@ public class TestSlowLogRecorder {
for (int i = 0; i < 3500; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
});
@ -397,22 +429,24 @@ public class TestSlowLogRecorder {
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
slowLogRecorder.clearSlowLogPayloads();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
5000, () -> slowLogRecorder.getSlowLogPayloads(request).size() > 10000));
5000, () -> getSlowLogPayloads(request).size() > 10000));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
5000, () -> slowLogRecorder.getLargeLogPayloads(request).size() > 10000));
5000, () -> getSlowLogPayloads(largeLogRequest).size() > 10000));
}
@Test
public void testSlowLargeLogEvents() throws Exception {
Configuration conf = applySlowLogRecorderConf(28);
slowLogRecorder = new SlowLogRecorder(conf);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
boolean isSlowLog;
@ -428,16 +462,16 @@ public class TestSlowLogRecorder {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1),
isSlowLog, isLargeLog);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
() -> getSlowLogPayloads(request).size() == 14));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
// confirm strict order of slow log payloads
return slowLogPayloads.size() == 14
@ -458,12 +492,18 @@ public class TestSlowLogRecorder {
})
);
AdminProtos.SlowLogResponseRequest largeLogRequest =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(14 * 11)
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getLargeLogPayloads(request).size() == 14));
() -> getSlowLogPayloads(largeLogRequest).size() == 14));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> largeLogPayloads = slowLogRecorder.getLargeLogPayloads(request);
List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest);
// confirm strict order of slow log payloads
return largeLogPayloads.size() == 14
@ -483,14 +523,16 @@ public class TestSlowLogRecorder {
&& confirmPayloadParams(13, 128, largeLogPayloads);
})
);
}
@Test
public void testSlowLogMixedFilters() throws Exception {
Configuration conf = applySlowLogRecorderConf(30);
slowLogRecorder = new SlowLogRecorder(conf);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
@ -498,23 +540,23 @@ public class TestSlowLogRecorder {
.setClientAddress("client_88")
.build();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
for (int i = 0; i < 100; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 2));
() -> getSlowLogPayloads(request).size() == 2));
AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setUserName("userName_1")
.setClientAddress("client_2")
.build();
Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request2).size());
Assert.assertEquals(0, getSlowLogPayloads(request2).size());
AdminProtos.SlowLogResponseRequest request3 =
AdminProtos.SlowLogResponseRequest.newBuilder()
@ -523,7 +565,7 @@ public class TestSlowLogRecorder {
.setClientAddress("client_88")
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
.build();
Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request3).size());
Assert.assertEquals(0, getSlowLogPayloads(request3).size());
AdminProtos.SlowLogResponseRequest request4 =
AdminProtos.SlowLogResponseRequest.newBuilder()
@ -532,7 +574,7 @@ public class TestSlowLogRecorder {
.setClientAddress("client_87")
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
.build();
Assert.assertEquals(1, slowLogRecorder.getSlowLogPayloads(request4).size());
Assert.assertEquals(1, getSlowLogPayloads(request4).size());
AdminProtos.SlowLogResponseRequest request5 =
AdminProtos.SlowLogResponseRequest.newBuilder()
@ -541,14 +583,14 @@ public class TestSlowLogRecorder {
.setClientAddress("client_89")
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR)
.build();
Assert.assertEquals(2, slowLogRecorder.getSlowLogPayloads(request5).size());
Assert.assertEquals(2, getSlowLogPayloads(request5).size());
AdminProtos.SlowLogResponseRequest requestSlowLog =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15));
() -> getSlowLogPayloads(requestSlowLog).size() == 15));
}
static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
@ -762,26 +804,20 @@ public class TestSlowLogRecorder {
private static Optional<User> getUser(String userName) {
return Optional.of(new User() {
@Override
public String getShortName() {
return userName;
}
@Override
public <T> T runAs(PrivilegedAction<T> action) {
return null;
}
@Override
public <T> T runAs(PrivilegedExceptionAction<T> action) throws
IOException, InterruptedException {
public <T> T runAs(PrivilegedExceptionAction<T> action) {
return null;
}
});
}

View File

@ -17,10 +17,11 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
package org.apache.hadoop.hbase.namequeues;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
@ -33,8 +34,11 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -59,11 +63,11 @@ public class TestSlowLogAccessor {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSlowLogAccessor.class);
private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class);
private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
private SlowLogRecorder slowLogRecorder;
private NamedQueueRecorder namedQueueRecorder;
@BeforeClass
public static void setup() throws Exception {
@ -88,9 +92,19 @@ public class TestSlowLogAccessor {
@Before
public void setUp() throws Exception {
HRegionServer hRegionServer = HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0);
Field slowLogRecorder = HRegionServer.class.getDeclaredField("slowLogRecorder");
Field slowLogRecorder = HRegionServer.class.getDeclaredField("namedQueueRecorder");
slowLogRecorder.setAccessible(true);
this.slowLogRecorder = (SlowLogRecorder) slowLogRecorder.get(hRegionServer);
this.namedQueueRecorder = (NamedQueueRecorder) slowLogRecorder.get(hRegionServer);
}
private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads(
AdminProtos.SlowLogResponseRequest request) {
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
namedQueueGetRequest.setSlowLogResponseRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
return namedQueueGetResponse.getSlowLogPayloads();
}
@Test
@ -99,42 +113,42 @@ public class TestSlowLogAccessor {
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
slowLogRecorder.clearSlowLogPayloads();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
int i = 0;
Connection connection = waitForSlowLogTableCreation();
// add 5 records initially
for (; i < 5; i++) {
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
// add 2 more records
for (; i < 7; i++) {
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
// add 3 more records
for (; i < 10; i++) {
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
// add 4 more records
for (; i < 14; i++) {
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY
.waitFor(3000, () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14));
Assert.assertNotEquals(-1,
HBASE_TESTING_UTILITY.waitFor(3000, () -> getTableCount(connection) == 14));
@ -170,10 +184,10 @@ public class TestSlowLogAccessor {
public void testHigherSlowLogs() throws Exception {
Connection connection = waitForSlowLogTableCreation();
slowLogRecorder.clearSlowLogPayloads();
namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
for (int j = 0; j < 100; j++) {
CompletableFuture.runAsync(() -> {
@ -181,15 +195,15 @@ public class TestSlowLogAccessor {
if (i == 300) {
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
}
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
namedQueueRecorder.addRecord(rpcLogDetails);
}
});
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(7000, () -> {
int count = slowLogRecorder.getSlowLogPayloads(request).size();
int count = getSlowLogPayloads(request).size();
LOG.debug("RingBuffer records count: {}", count);
return count > 2000;
}));