HBASE-24718 : Generic NamedQueue framework for multiple use-cases (Refactor SlowLog responses) (#2110)
Closes #2052 Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
33102a1265
commit
ce4e692699
|
@ -2055,6 +2055,11 @@ public final class RequestConverter {
|
|||
} else {
|
||||
builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.OR);
|
||||
}
|
||||
if (LogQueryFilter.Type.SLOW_LOG.equals(logQueryFilter.getType())) {
|
||||
builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG);
|
||||
} else {
|
||||
builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG);
|
||||
}
|
||||
return builder.setLimit(logQueryFilter.getLimit()).build();
|
||||
}
|
||||
|
||||
|
|
|
@ -1996,4 +1996,16 @@ possible configurations would overwhelm and obscure the important.
|
|||
too large batch request.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.namedqueue.provider.classes</name>
|
||||
<value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService</value>
|
||||
<description>
|
||||
Default values for NamedQueueService implementors. This comma separated full class names
|
||||
represent all implementors of NamedQueueService that we would like to be invoked by
|
||||
LogEvent handler service. One example of NamedQueue service is SlowLogQueueService which
|
||||
is used to store slow/large RPC logs in ringbuffer at each RegionServer.
|
||||
All implementors of NamedQueueService should be found under package:
|
||||
"org.apache.hadoop.hbase.namequeues.impl"
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
||||
|
|
|
@ -287,12 +287,18 @@ message SlowLogResponseRequest {
|
|||
OR = 1;
|
||||
}
|
||||
|
||||
enum LogType {
|
||||
SLOW_LOG = 0;
|
||||
LARGE_LOG = 1;
|
||||
}
|
||||
|
||||
optional string region_name = 1;
|
||||
optional string table_name = 2;
|
||||
optional string client_address = 3;
|
||||
optional string user_name = 4;
|
||||
optional uint32 limit = 5 [default = 10];
|
||||
optional FilterByOperator filter_by_operator = 6 [default = OR];
|
||||
optional LogType log_type = 7;
|
||||
}
|
||||
|
||||
message SlowLogResponses {
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
|
||||
|
@ -46,8 +47,8 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
|||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||
import org.apache.hadoop.hbase.regionserver.slowlog.RpcLogDetails;
|
||||
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
|
||||
import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
|
||||
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
|
||||
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
|
||||
import org.apache.hadoop.hbase.security.SaslUtil;
|
||||
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
|
||||
|
@ -96,6 +97,7 @@ public abstract class RpcServer implements RpcServerInterface,
|
|||
private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
|
||||
|
||||
private final boolean authorize;
|
||||
private final boolean isOnlineLogProviderEnabled;
|
||||
protected boolean isSecurityEnabled;
|
||||
|
||||
public static final byte CURRENT_VERSION = 0;
|
||||
|
@ -227,7 +229,7 @@ public abstract class RpcServer implements RpcServerInterface,
|
|||
/**
|
||||
* Use to add online slowlog responses
|
||||
*/
|
||||
private SlowLogRecorder slowLogRecorder;
|
||||
private NamedQueueRecorder namedQueueRecorder;
|
||||
|
||||
@FunctionalInterface
|
||||
protected interface CallCleanup {
|
||||
|
@ -302,6 +304,8 @@ public abstract class RpcServer implements RpcServerInterface,
|
|||
saslProps = Collections.emptyMap();
|
||||
}
|
||||
|
||||
this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
|
||||
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
|
||||
this.scheduler = scheduler;
|
||||
}
|
||||
|
||||
|
@ -430,11 +434,11 @@ public abstract class RpcServer implements RpcServerInterface,
|
|||
tooLarge, tooSlow,
|
||||
status.getClient(), startTime, processingTime, qTime,
|
||||
responseSize, userName);
|
||||
if (this.slowLogRecorder != null) {
|
||||
if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
|
||||
// send logs to ring buffer owned by slowLogRecorder
|
||||
final String className = server == null ? StringUtils.EMPTY :
|
||||
server.getClass().getSimpleName();
|
||||
this.slowLogRecorder.addSlowLogPayload(
|
||||
final String className =
|
||||
server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
|
||||
this.namedQueueRecorder.addRecord(
|
||||
new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow,
|
||||
tooLarge));
|
||||
}
|
||||
|
@ -817,12 +821,8 @@ public abstract class RpcServer implements RpcServerInterface,
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setSlowLogRecorder(SlowLogRecorder slowLogRecorder) {
|
||||
this.slowLogRecorder = slowLogRecorder;
|
||||
public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
|
||||
this.namedQueueRecorder = namedQueueRecorder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SlowLogRecorder getSlowLogRecorder() {
|
||||
return slowLogRecorder;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.CellScanner;
|
|||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
|
||||
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -102,12 +102,8 @@ public interface RpcServerInterface {
|
|||
/**
|
||||
* Set Online SlowLog Provider
|
||||
*
|
||||
* @param slowLogRecorder instance of {@link SlowLogRecorder}
|
||||
* @param namedQueueRecorder instance of {@link NamedQueueRecorder}
|
||||
*/
|
||||
void setSlowLogRecorder(final SlowLogRecorder slowLogRecorder);
|
||||
void setNamedQueueRecorder(final NamedQueueRecorder namedQueueRecorder);
|
||||
|
||||
/**
|
||||
* @return Retrieve instance of {@link SlowLogRecorder} maintained by RpcServer
|
||||
*/
|
||||
SlowLogRecorder getSlowLogRecorder();
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.slowlog;
|
||||
package org.apache.hadoop.hbase.namequeues;
|
||||
|
||||
import com.lmax.disruptor.ExceptionHandler;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
@ -0,0 +1,130 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.namequeues;
|
||||
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
|
||||
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Event Handler run by disruptor ringbuffer consumer.
|
||||
* Although this is generic implementation for namedQueue, it can have individual queue specific
|
||||
* logic.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class LogEventHandler implements EventHandler<RingBufferEnvelope> {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);
|
||||
|
||||
// Map that binds namedQueues to corresponding queue service implementation.
|
||||
// If NamedQueue of specific type is enabled, corresponding service will be used to
|
||||
// insert and retrieve records.
|
||||
// Individual queue sizes should be determined based on their individual configs within
|
||||
// each service.
|
||||
private final Map<NamedQueuePayload.NamedQueueEvent, NamedQueueService> namedQueueServices =
|
||||
new HashMap<>();
|
||||
|
||||
private static final String NAMED_QUEUE_PROVIDER_CLASSES = "hbase.namedqueue.provider.classes";
|
||||
|
||||
LogEventHandler(final Configuration conf) {
|
||||
for (String implName : conf.getStringCollection(NAMED_QUEUE_PROVIDER_CLASSES)) {
|
||||
Class<?> clz;
|
||||
try {
|
||||
clz = Class.forName(implName);
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.warn("Failed to find NamedQueueService implementor class {}", implName, e);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!NamedQueueService.class.isAssignableFrom(clz)) {
|
||||
LOG.warn("Class {} is not implementor of NamedQueueService.", clz);
|
||||
continue;
|
||||
}
|
||||
|
||||
// add all service mappings here
|
||||
try {
|
||||
NamedQueueService namedQueueService =
|
||||
(NamedQueueService) clz.getConstructor(Configuration.class).newInstance(conf);
|
||||
namedQueueServices.put(namedQueueService.getEvent(), namedQueueService);
|
||||
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException
|
||||
| InvocationTargetException e) {
|
||||
LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.",
|
||||
clz);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when a publisher has published an event to the {@link RingBuffer}.
|
||||
* This is generic consumer of disruptor ringbuffer and for each new namedQueue that we
|
||||
* add, we should also provide specific consumer logic here.
|
||||
*
|
||||
* @param event published to the {@link RingBuffer}
|
||||
* @param sequence of the event being processed
|
||||
* @param endOfBatch flag to indicate if this is the last event in a batch from
|
||||
* the {@link RingBuffer}
|
||||
*/
|
||||
@Override
|
||||
public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) {
|
||||
final NamedQueuePayload namedQueuePayload = event.getPayload();
|
||||
// consume ringbuffer payload based on event type
|
||||
namedQueueServices.get(namedQueuePayload.getNamedQueueEvent())
|
||||
.consumeEventFromDisruptor(namedQueuePayload);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleans up queues maintained by services.
|
||||
*
|
||||
* @param namedQueueEvent type of queue to clear
|
||||
* @return true if queue is cleaned up, false otherwise
|
||||
*/
|
||||
boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
|
||||
return namedQueueServices.get(namedQueueEvent).clearNamedQueue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add all in memory queue records to system table. The implementors can use system table
|
||||
* or direct HDFS file or ZK as persistence system.
|
||||
*/
|
||||
void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
|
||||
namedQueueServices.get(namedQueueEvent).persistAll();
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve in memory queue records from ringbuffer
|
||||
*
|
||||
* @param request namedQueue request with event type
|
||||
* @return queue records from ringbuffer after filter (if applied)
|
||||
*/
|
||||
NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
|
||||
return namedQueueServices.get(request.getNamedQueueEvent()).getNamedQueueRecords(request);
|
||||
}
|
||||
|
||||
}
|
|
@ -17,7 +17,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.slowlog;
|
||||
package org.apache.hadoop.hbase.namequeues;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -30,7 +30,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
* Event Handler utility class
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class LogHandlerUtils {
|
||||
public class LogHandlerUtils {
|
||||
|
||||
private static int getTotalFiltersCount(AdminProtos.SlowLogResponseRequest request) {
|
||||
int totalFilters = 0;
|
||||
|
@ -91,7 +91,7 @@ class LogHandlerUtils {
|
|||
return filteredSlowLogPayloads;
|
||||
}
|
||||
|
||||
static List<TooSlowLog.SlowLogPayload> getFilteredLogs(
|
||||
public static List<TooSlowLog.SlowLogPayload> getFilteredLogs(
|
||||
AdminProtos.SlowLogResponseRequest request, List<TooSlowLog.SlowLogPayload> logPayloadList) {
|
||||
int totalFilters = getTotalFiltersCount(request);
|
||||
if (totalFilters > 0) {
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.namequeues;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Base payload to be prepared by client to send various namedQueue events for in-memory
|
||||
* ring buffer storage in either HMaster or RegionServer.
|
||||
* e.g slowLog responses
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class NamedQueuePayload {
|
||||
|
||||
public enum NamedQueueEvent {
|
||||
SLOW_LOG
|
||||
}
|
||||
|
||||
private final NamedQueueEvent namedQueueEvent;
|
||||
|
||||
public NamedQueuePayload(NamedQueueEvent namedQueueEvent) {
|
||||
if (namedQueueEvent == null) {
|
||||
throw new RuntimeException("NamedQueuePayload with null namedQueueEvent");
|
||||
}
|
||||
this.namedQueueEvent = namedQueueEvent;
|
||||
}
|
||||
|
||||
public NamedQueueEvent getNamedQueueEvent() {
|
||||
return namedQueueEvent;
|
||||
}
|
||||
|
||||
}
|
|
@ -17,87 +17,80 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.slowlog;
|
||||
package org.apache.hadoop.hbase.namequeues;
|
||||
|
||||
import com.lmax.disruptor.BlockingWaitStrategy;
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
|
||||
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
|
||||
|
||||
/**
|
||||
* Online Slow/Large Log Provider Service that keeps slow/large RPC logs in the ring buffer.
|
||||
* The service uses LMAX Disruptor to save slow records which are then consumed by
|
||||
* NamedQueue recorder that maintains various named queues.
|
||||
* The service uses LMAX Disruptor to save queue records which are then consumed by
|
||||
* a queue and based on the ring buffer size, the available records are then fetched
|
||||
* from the queue in thread-safe manner.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class SlowLogRecorder {
|
||||
public class NamedQueueRecorder {
|
||||
|
||||
private final Disruptor<RingBufferEnvelope> disruptor;
|
||||
private final LogEventHandler logEventHandler;
|
||||
private final int eventCount;
|
||||
private final boolean isOnlineLogProviderEnabled;
|
||||
|
||||
private static final String SLOW_LOG_RING_BUFFER_SIZE =
|
||||
"hbase.regionserver.slowlog.ringbuffer.size";
|
||||
private static NamedQueueRecorder namedQueueRecorder;
|
||||
private static boolean isInit = false;
|
||||
private static final Object LOCK = new Object();
|
||||
|
||||
/**
|
||||
* Initialize disruptor with configurable ringbuffer size
|
||||
*/
|
||||
public SlowLogRecorder(Configuration conf) {
|
||||
isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
|
||||
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
|
||||
|
||||
if (!isOnlineLogProviderEnabled) {
|
||||
this.disruptor = null;
|
||||
this.logEventHandler = null;
|
||||
this.eventCount = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
this.eventCount = conf.getInt(SLOW_LOG_RING_BUFFER_SIZE,
|
||||
HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
|
||||
private NamedQueueRecorder(Configuration conf) {
|
||||
|
||||
// This is the 'writer' -- a single threaded executor. This single thread consumes what is
|
||||
// put on the ringbuffer.
|
||||
final String hostingThreadName = Thread.currentThread().getName();
|
||||
|
||||
int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024);
|
||||
|
||||
// disruptor initialization with BlockingWaitStrategy
|
||||
this.disruptor = new Disruptor<>(RingBufferEnvelope::new,
|
||||
getEventCount(),
|
||||
getEventCount(eventCount),
|
||||
Threads.newDaemonThreadFactory(hostingThreadName + ".slowlog.append"),
|
||||
ProducerType.MULTI,
|
||||
new BlockingWaitStrategy());
|
||||
this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
|
||||
|
||||
// initialize ringbuffer event handler
|
||||
final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
|
||||
HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
|
||||
this.logEventHandler = new LogEventHandler(this.eventCount, isSlowLogTableEnabled, conf);
|
||||
this.logEventHandler = new LogEventHandler(conf);
|
||||
this.disruptor.handleEventsWith(new LogEventHandler[]{this.logEventHandler});
|
||||
this.disruptor.start();
|
||||
}
|
||||
|
||||
public static NamedQueueRecorder getInstance(Configuration conf) {
|
||||
if (namedQueueRecorder != null) {
|
||||
return namedQueueRecorder;
|
||||
}
|
||||
synchronized (LOCK) {
|
||||
if (!isInit) {
|
||||
namedQueueRecorder = new NamedQueueRecorder(conf);
|
||||
isInit = true;
|
||||
}
|
||||
}
|
||||
return namedQueueRecorder;
|
||||
}
|
||||
|
||||
// must be power of 2 for disruptor ringbuffer
|
||||
private int getEventCount() {
|
||||
Preconditions.checkArgument(eventCount >= 0,
|
||||
SLOW_LOG_RING_BUFFER_SIZE + " must be > 0");
|
||||
private int getEventCount(int eventCount) {
|
||||
Preconditions.checkArgument(eventCount >= 0, "hbase.namedqueue.ringbuffer.size must be > 0");
|
||||
int floor = Integer.highestOneBit(eventCount);
|
||||
if (floor == eventCount) {
|
||||
return floor;
|
||||
|
@ -110,66 +103,53 @@ public class SlowLogRecorder {
|
|||
}
|
||||
|
||||
/**
|
||||
* Retrieve online slow logs from ringbuffer
|
||||
* Retrieve in memory queue records from ringbuffer
|
||||
*
|
||||
* @param request slow log request parameters
|
||||
* @return online slow logs from ringbuffer
|
||||
* @param request namedQueue request with event type
|
||||
* @return queue records from ringbuffer after filter (if applied)
|
||||
*/
|
||||
public List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
|
||||
return isOnlineLogProviderEnabled ? this.logEventHandler.getSlowLogPayloads(request)
|
||||
: Collections.emptyList();
|
||||
public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
|
||||
return this.logEventHandler.getNamedQueueRecords(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve online large logs from ringbuffer
|
||||
*
|
||||
* @param request large log request parameters
|
||||
* @return online large logs from ringbuffer
|
||||
*/
|
||||
public List<SlowLogPayload> getLargeLogPayloads(AdminProtos.SlowLogResponseRequest request) {
|
||||
return isOnlineLogProviderEnabled ? this.logEventHandler.getLargeLogPayloads(request)
|
||||
: Collections.emptyList();
|
||||
}
|
||||
|
||||
/**
|
||||
* clears slow log payloads from ringbuffer
|
||||
* clears queue records from ringbuffer
|
||||
*
|
||||
* @param namedQueueEvent type of queue to clear
|
||||
* @return true if slow log payloads are cleaned up or
|
||||
* hbase.regionserver.slowlog.buffer.enabled is not set to true, false if failed to
|
||||
* clean up slow logs
|
||||
*/
|
||||
public boolean clearSlowLogPayloads() {
|
||||
if (!isOnlineLogProviderEnabled) {
|
||||
return true;
|
||||
}
|
||||
return this.logEventHandler.clearSlowLogs();
|
||||
public boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
|
||||
return this.logEventHandler.clearNamedQueue(namedQueueEvent);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add slow log rpcCall details to ringbuffer
|
||||
* Add various NamedQueue records to ringbuffer. Based on the type of the event (e.g slowLog),
|
||||
* consumer of disruptor ringbuffer will have specific logic.
|
||||
* This method is producer of disruptor ringbuffer which is initialized in NamedQueueRecorder
|
||||
* constructor.
|
||||
*
|
||||
* @param rpcLogDetails all details of rpc call that would be useful for ring buffer
|
||||
* consumers
|
||||
* @param namedQueuePayload namedQueue payload sent by client of ring buffer
|
||||
* service
|
||||
*/
|
||||
public void addSlowLogPayload(RpcLogDetails rpcLogDetails) {
|
||||
if (!isOnlineLogProviderEnabled) {
|
||||
return;
|
||||
}
|
||||
public void addRecord(NamedQueuePayload namedQueuePayload) {
|
||||
RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
|
||||
long seqId = ringBuffer.next();
|
||||
try {
|
||||
ringBuffer.get(seqId).load(rpcLogDetails);
|
||||
ringBuffer.get(seqId).load(namedQueuePayload);
|
||||
} finally {
|
||||
ringBuffer.publish(seqId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
|
||||
* Add all in memory queue records to system table. The implementors can use system table
|
||||
* or direct HDFS file or ZK as persistence system.
|
||||
*/
|
||||
public void addAllLogsToSysTable() {
|
||||
public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
|
||||
if (this.logEventHandler != null) {
|
||||
this.logEventHandler.addAllLogsToSysTable();
|
||||
this.logEventHandler.persistAll(namedQueueEvent);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.namequeues;
|
||||
|
||||
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
|
||||
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* In-memory Queue service provider for multiple use-cases. Implementers should be
|
||||
* registered in LogEventHandler
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface NamedQueueService {
|
||||
|
||||
/**
|
||||
* Retrieve event type for NamedQueueService implementation.
|
||||
*
|
||||
* @return {@link NamedQueuePayload.NamedQueueEvent}
|
||||
*/
|
||||
NamedQueuePayload.NamedQueueEvent getEvent();
|
||||
|
||||
/**
|
||||
* This implementation is generic for consuming records from LMAX
|
||||
* disruptor and inserts records to EvictingQueue which is maintained by each
|
||||
* ringbuffer provider.
|
||||
*
|
||||
* @param namedQueuePayload namedQueue payload from disruptor ring buffer
|
||||
*/
|
||||
void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload);
|
||||
|
||||
/**
|
||||
* Cleans up queues maintained by services.
|
||||
*
|
||||
* @return true if slow log payloads are cleaned up, false otherwise
|
||||
*/
|
||||
boolean clearNamedQueue();
|
||||
|
||||
/**
|
||||
* Retrieve in memory queue records from ringbuffer
|
||||
*
|
||||
* @param request namedQueue request with event type
|
||||
* @return queue records from ringbuffer after filter (if applied)
|
||||
*/
|
||||
NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request);
|
||||
|
||||
/**
|
||||
* Add all in memory queue records to system table. The implementors can use system table
|
||||
* or direct HDFS file or ZK as persistence system.
|
||||
*/
|
||||
void persistAll();
|
||||
}
|
|
@ -17,29 +17,29 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.slowlog;
|
||||
package org.apache.hadoop.hbase.namequeues;
|
||||
|
||||
import org.apache.hadoop.hbase.ipc.RpcCall;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
|
||||
/**
|
||||
* An envelope to carry payload in the slow log ring buffer that serves as online buffer
|
||||
* to provide latest TooSlowLog
|
||||
* An envelope to carry payload in the ring buffer that serves as online buffer
|
||||
* to provide latest events
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
final class RingBufferEnvelope {
|
||||
|
||||
private RpcLogDetails rpcLogDetails;
|
||||
private NamedQueuePayload namedQueuePayload;
|
||||
|
||||
/**
|
||||
* Load the Envelope with {@link RpcCall}
|
||||
*
|
||||
* @param rpcLogDetails all details of rpc call that would be useful for ring buffer
|
||||
* @param namedQueuePayload all details of rpc call that would be useful for ring buffer
|
||||
* consumers
|
||||
*/
|
||||
public void load(RpcLogDetails rpcLogDetails) {
|
||||
this.rpcLogDetails = rpcLogDetails;
|
||||
public void load(NamedQueuePayload namedQueuePayload) {
|
||||
this.namedQueuePayload = namedQueuePayload;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -48,10 +48,10 @@ final class RingBufferEnvelope {
|
|||
*
|
||||
* @return Retrieve rpc log details
|
||||
*/
|
||||
public RpcLogDetails getPayload() {
|
||||
final RpcLogDetails rpcLogDetails = this.rpcLogDetails;
|
||||
this.rpcLogDetails = null;
|
||||
return rpcLogDetails;
|
||||
public NamedQueuePayload getPayload() {
|
||||
final NamedQueuePayload namedQueuePayload = this.namedQueuePayload;
|
||||
this.namedQueuePayload = null;
|
||||
return namedQueuePayload;
|
||||
}
|
||||
|
||||
}
|
|
@ -17,7 +17,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.slowlog;
|
||||
package org.apache.hadoop.hbase.namequeues;
|
||||
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCall;
|
||||
|
@ -28,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
* RpcCall details that would be passed on to ring buffer of slow log responses
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RpcLogDetails {
|
||||
public class RpcLogDetails extends NamedQueuePayload {
|
||||
|
||||
private final RpcCall rpcCall;
|
||||
private final Message param;
|
||||
|
@ -40,6 +40,7 @@ public class RpcLogDetails {
|
|||
|
||||
public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize,
|
||||
String className, boolean isSlowLog, boolean isLargeLog) {
|
||||
super(NamedQueueEvent.SLOW_LOG);
|
||||
this.rpcCall = rpcCall;
|
||||
this.param = param;
|
||||
this.clientAddress = clientAddress;
|
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.namequeues;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
|
||||
import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* Persistent service provider for Slow/LargeLog events
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SlowLogPersistentService {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SlowLogPersistentService.class);
|
||||
|
||||
private static final ReentrantLock LOCK = new ReentrantLock();
|
||||
private static final String SYS_TABLE_QUEUE_SIZE =
|
||||
"hbase.regionserver.slowlog.systable.queue.size";
|
||||
private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
|
||||
private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
|
||||
|
||||
private final Queue<TooSlowLog.SlowLogPayload> queueForSysTable;
|
||||
|
||||
private final Configuration configuration;
|
||||
|
||||
public SlowLogPersistentService(final Configuration configuration) {
|
||||
this.configuration = configuration;
|
||||
int sysTableQueueSize =
|
||||
configuration.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE);
|
||||
EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueueForTable =
|
||||
EvictingQueue.create(sysTableQueueSize);
|
||||
queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
|
||||
}
|
||||
|
||||
public void addToQueueForSysTable(TooSlowLog.SlowLogPayload slowLogPayload) {
|
||||
queueForSysTable.add(slowLogPayload);
|
||||
}
|
||||
|
||||
/**
|
||||
* Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
|
||||
*/
|
||||
public void addAllLogsToSysTable() {
|
||||
if (queueForSysTable == null) {
|
||||
LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.");
|
||||
return;
|
||||
}
|
||||
if (LOCK.isLocked()) {
|
||||
return;
|
||||
}
|
||||
LOCK.lock();
|
||||
try {
|
||||
List<TooSlowLog.SlowLogPayload> slowLogPayloads = new ArrayList<>();
|
||||
int i = 0;
|
||||
while (!queueForSysTable.isEmpty()) {
|
||||
slowLogPayloads.add(queueForSysTable.poll());
|
||||
i++;
|
||||
if (i == SYSTABLE_PUT_BATCH_SIZE) {
|
||||
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
|
||||
slowLogPayloads.clear();
|
||||
i = 0;
|
||||
}
|
||||
}
|
||||
if (slowLogPayloads.size() > 0) {
|
||||
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
|
||||
}
|
||||
} finally {
|
||||
LOCK.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -17,7 +17,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.slowlog;
|
||||
package org.apache.hadoop.hbase.namequeues;
|
||||
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
|
@ -33,7 +33,7 @@ public class SlowLogTableOpsChore extends ScheduledChore {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableOpsChore.class);
|
||||
|
||||
private final SlowLogRecorder slowLogRecorder;
|
||||
private final NamedQueueRecorder namedQueueRecorder;
|
||||
|
||||
/**
|
||||
* Chore Constructor
|
||||
|
@ -41,12 +41,12 @@ public class SlowLogTableOpsChore extends ScheduledChore {
|
|||
* @param stopper The stopper - When {@link Stoppable#isStopped()} is true, this chore will
|
||||
* cancel and cleanup
|
||||
* @param period Period in millis with which this Chore repeats execution when scheduled
|
||||
* @param slowLogRecorder {@link SlowLogRecorder} instance
|
||||
* @param namedQueueRecorder {@link NamedQueueRecorder} instance
|
||||
*/
|
||||
public SlowLogTableOpsChore(final Stoppable stopper, final int period,
|
||||
final SlowLogRecorder slowLogRecorder) {
|
||||
final NamedQueueRecorder namedQueueRecorder) {
|
||||
super("SlowLogTableOpsChore", stopper, period);
|
||||
this.slowLogRecorder = slowLogRecorder;
|
||||
this.namedQueueRecorder = namedQueueRecorder;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -54,7 +54,7 @@ public class SlowLogTableOpsChore extends ScheduledChore {
|
|||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("SlowLog Table Ops Chore is starting up.");
|
||||
}
|
||||
slowLogRecorder.addAllLogsToSysTable();
|
||||
namedQueueRecorder.persistAll(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("SlowLog Table Ops Chore is closing.");
|
||||
}
|
|
@ -0,0 +1,264 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.namequeues.impl;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.SlowLogParams;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCall;
|
||||
import org.apache.hadoop.hbase.namequeues.LogHandlerUtils;
|
||||
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
|
||||
import org.apache.hadoop.hbase.namequeues.NamedQueueService;
|
||||
import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
|
||||
import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService;
|
||||
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
|
||||
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* In-memory Queue service provider for Slow/LargeLog events
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SlowLogQueueService implements NamedQueueService {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SlowLogQueueService.class);
|
||||
|
||||
private static final String SLOW_LOG_RING_BUFFER_SIZE =
|
||||
"hbase.regionserver.slowlog.ringbuffer.size";
|
||||
|
||||
private final boolean isOnlineLogProviderEnabled;
|
||||
private final boolean isSlowLogTableEnabled;
|
||||
private final SlowLogPersistentService slowLogPersistentService;
|
||||
private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue;
|
||||
|
||||
public SlowLogQueueService(Configuration conf) {
|
||||
this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
|
||||
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
|
||||
|
||||
if (!isOnlineLogProviderEnabled) {
|
||||
this.isSlowLogTableEnabled = false;
|
||||
this.slowLogPersistentService = null;
|
||||
this.slowLogQueue = null;
|
||||
return;
|
||||
}
|
||||
|
||||
// Initialize SlowLog Queue
|
||||
int slowLogQueueSize =
|
||||
conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
|
||||
|
||||
EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueue =
|
||||
EvictingQueue.create(slowLogQueueSize);
|
||||
slowLogQueue = Queues.synchronizedQueue(evictingQueue);
|
||||
|
||||
this.isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
|
||||
HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
|
||||
if (isSlowLogTableEnabled) {
|
||||
slowLogPersistentService = new SlowLogPersistentService(conf);
|
||||
} else {
|
||||
slowLogPersistentService = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public NamedQueuePayload.NamedQueueEvent getEvent() {
|
||||
return NamedQueuePayload.NamedQueueEvent.SLOW_LOG;
|
||||
}
|
||||
|
||||
/**
|
||||
* This implementation is specific to slowLog event. This consumes slowLog event from
|
||||
* disruptor and inserts records to EvictingQueue.
|
||||
*
|
||||
* @param namedQueuePayload namedQueue payload from disruptor ring buffer
|
||||
*/
|
||||
@Override
|
||||
public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
|
||||
if (!isOnlineLogProviderEnabled) {
|
||||
return;
|
||||
}
|
||||
if (!(namedQueuePayload instanceof RpcLogDetails)) {
|
||||
LOG.warn("SlowLogQueueService: NamedQueuePayload is not of type RpcLogDetails.");
|
||||
return;
|
||||
}
|
||||
final RpcLogDetails rpcLogDetails = (RpcLogDetails) namedQueuePayload;
|
||||
final RpcCall rpcCall = rpcLogDetails.getRpcCall();
|
||||
final String clientAddress = rpcLogDetails.getClientAddress();
|
||||
final long responseSize = rpcLogDetails.getResponseSize();
|
||||
final String className = rpcLogDetails.getClassName();
|
||||
final TooSlowLog.SlowLogPayload.Type type = getLogType(rpcLogDetails);
|
||||
if (type == null) {
|
||||
return;
|
||||
}
|
||||
Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
|
||||
Message param = rpcLogDetails.getParam();
|
||||
long receiveTime = rpcCall.getReceiveTime();
|
||||
long startTime = rpcCall.getStartTime();
|
||||
long endTime = System.currentTimeMillis();
|
||||
int processingTime = (int) (endTime - startTime);
|
||||
int qTime = (int) (startTime - receiveTime);
|
||||
final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
|
||||
int numGets = 0;
|
||||
int numMutations = 0;
|
||||
int numServiceCalls = 0;
|
||||
if (param instanceof ClientProtos.MultiRequest) {
|
||||
ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
|
||||
for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
|
||||
for (ClientProtos.Action action : regionAction.getActionList()) {
|
||||
if (action.hasMutation()) {
|
||||
numMutations++;
|
||||
}
|
||||
if (action.hasGet()) {
|
||||
numGets++;
|
||||
}
|
||||
if (action.hasServiceCall()) {
|
||||
numServiceCalls++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
|
||||
final String methodDescriptorName =
|
||||
methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
|
||||
TooSlowLog.SlowLogPayload slowLogPayload = TooSlowLog.SlowLogPayload.newBuilder()
|
||||
.setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
|
||||
.setClientAddress(clientAddress)
|
||||
.setMethodName(methodDescriptorName)
|
||||
.setMultiGets(numGets)
|
||||
.setMultiMutations(numMutations)
|
||||
.setMultiServiceCalls(numServiceCalls)
|
||||
.setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
|
||||
.setProcessingTime(processingTime)
|
||||
.setQueueTime(qTime)
|
||||
.setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
|
||||
.setResponseSize(responseSize)
|
||||
.setServerClass(className)
|
||||
.setStartTime(startTime)
|
||||
.setType(type)
|
||||
.setUserName(userName)
|
||||
.build();
|
||||
slowLogQueue.add(slowLogPayload);
|
||||
if (isSlowLogTableEnabled) {
|
||||
if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
|
||||
slowLogPersistentService.addToQueueForSysTable(slowLogPayload);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean clearNamedQueue() {
|
||||
if (!isOnlineLogProviderEnabled) {
|
||||
return false;
|
||||
}
|
||||
LOG.debug("Received request to clean up online slowlog buffer.");
|
||||
slowLogQueue.clear();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
|
||||
if (!isOnlineLogProviderEnabled) {
|
||||
return null;
|
||||
}
|
||||
final AdminProtos.SlowLogResponseRequest slowLogResponseRequest =
|
||||
request.getSlowLogResponseRequest();
|
||||
final List<TooSlowLog.SlowLogPayload> slowLogPayloads;
|
||||
if (AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG
|
||||
.equals(slowLogResponseRequest.getLogType())) {
|
||||
slowLogPayloads = getLargeLogPayloads(slowLogResponseRequest);
|
||||
} else {
|
||||
slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest);
|
||||
}
|
||||
NamedQueueGetResponse response = new NamedQueueGetResponse();
|
||||
response.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
|
||||
response.setSlowLogPayloads(slowLogPayloads);
|
||||
return response;
|
||||
}
|
||||
|
||||
private TooSlowLog.SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
|
||||
final boolean isSlowLog = rpcCallDetails.isSlowLog();
|
||||
final boolean isLargeLog = rpcCallDetails.isLargeLog();
|
||||
final TooSlowLog.SlowLogPayload.Type type;
|
||||
if (!isSlowLog && !isLargeLog) {
|
||||
LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}",
|
||||
rpcCallDetails);
|
||||
return null;
|
||||
}
|
||||
if (isSlowLog && isLargeLog) {
|
||||
type = TooSlowLog.SlowLogPayload.Type.ALL;
|
||||
} else if (isSlowLog) {
|
||||
type = TooSlowLog.SlowLogPayload.Type.SLOW_LOG;
|
||||
} else {
|
||||
type = TooSlowLog.SlowLogPayload.Type.LARGE_LOG;
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add all slowLog events to system table. This is only for slowLog event's persistence on
|
||||
* system table.
|
||||
*/
|
||||
@Override
|
||||
public void persistAll() {
|
||||
if (!isOnlineLogProviderEnabled) {
|
||||
return;
|
||||
}
|
||||
if (slowLogPersistentService != null) {
|
||||
slowLogPersistentService.addAllLogsToSysTable();
|
||||
}
|
||||
}
|
||||
|
||||
private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads(
|
||||
final AdminProtos.SlowLogResponseRequest request) {
|
||||
List<TooSlowLog.SlowLogPayload> slowLogPayloadList =
|
||||
Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter(
|
||||
e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
|
||||
|| e.getType() == TooSlowLog.SlowLogPayload.Type.SLOW_LOG).collect(Collectors.toList());
|
||||
// latest slow logs first, operator is interested in latest records from in-memory buffer
|
||||
Collections.reverse(slowLogPayloadList);
|
||||
return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
|
||||
}
|
||||
|
||||
private List<TooSlowLog.SlowLogPayload> getLargeLogPayloads(
|
||||
final AdminProtos.SlowLogResponseRequest request) {
|
||||
List<TooSlowLog.SlowLogPayload> slowLogPayloadList =
|
||||
Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter(
|
||||
e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
|
||||
|| e.getType() == TooSlowLog.SlowLogPayload.Type.LARGE_LOG).collect(Collectors.toList());
|
||||
// latest large logs first, operator is interested in latest records from in-memory buffer
|
||||
Collections.reverse(slowLogPayloadList);
|
||||
return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.namequeues.request;
|
||||
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Request object to be used by ring buffer use-cases. Clients get records by sending
|
||||
* this request object.
|
||||
* For each ring buffer use-case, add request payload to this class, client should set
|
||||
* namedQueueEvent based on use-case.
|
||||
* Protobuf does not support inheritance, hence we need to work with
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class NamedQueueGetRequest {
|
||||
|
||||
private AdminProtos.SlowLogResponseRequest slowLogResponseRequest;
|
||||
private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
|
||||
|
||||
public AdminProtos.SlowLogResponseRequest getSlowLogResponseRequest() {
|
||||
return slowLogResponseRequest;
|
||||
}
|
||||
|
||||
public void setSlowLogResponseRequest(
|
||||
AdminProtos.SlowLogResponseRequest slowLogResponseRequest) {
|
||||
this.slowLogResponseRequest = slowLogResponseRequest;
|
||||
}
|
||||
|
||||
public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
|
||||
return namedQueueEvent;
|
||||
}
|
||||
|
||||
public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
|
||||
this.namedQueueEvent = namedQueueEvent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new ToStringBuilder(this)
|
||||
.append("slowLogResponseRequest", slowLogResponseRequest)
|
||||
.append("namedQueueEvent", namedQueueEvent)
|
||||
.toString();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.namequeues.response;
|
||||
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Response object to be sent by namedQueue service back to caller
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class NamedQueueGetResponse {
|
||||
|
||||
private List<TooSlowLog.SlowLogPayload> slowLogPayloads;
|
||||
private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
|
||||
|
||||
public List<TooSlowLog.SlowLogPayload> getSlowLogPayloads() {
|
||||
return slowLogPayloads;
|
||||
}
|
||||
|
||||
public void setSlowLogPayloads(List<TooSlowLog.SlowLogPayload> slowLogPayloads) {
|
||||
this.slowLogPayloads = slowLogPayloads;
|
||||
}
|
||||
|
||||
public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
|
||||
return namedQueueEvent;
|
||||
}
|
||||
|
||||
public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
|
||||
this.namedQueueEvent = namedQueueEvent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return new ToStringBuilder(this)
|
||||
.append("slowLogPayloads", slowLogPayloads)
|
||||
.append("namedQueueEvent", namedQueueEvent)
|
||||
.toString();
|
||||
}
|
||||
|
||||
}
|
|
@ -138,8 +138,8 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
|
|||
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
|
||||
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogTableOpsChore;
|
||||
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
|
||||
import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
|
||||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
|
||||
|
@ -535,7 +535,7 @@ public class HRegionServer extends Thread implements
|
|||
/**
|
||||
* Provide online slow log responses from ringbuffer
|
||||
*/
|
||||
private SlowLogRecorder slowLogRecorder;
|
||||
private NamedQueueRecorder namedQueueRecorder = null;
|
||||
|
||||
/**
|
||||
* True if this RegionServer is coming up in a cluster where there is no Master;
|
||||
|
@ -597,7 +597,12 @@ public class HRegionServer extends Thread implements
|
|||
this.stopped = false;
|
||||
|
||||
if (!(this instanceof HMaster)) {
|
||||
this.slowLogRecorder = new SlowLogRecorder(this.conf);
|
||||
final boolean isOnlineLogProviderEnabled = conf.getBoolean(
|
||||
HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
|
||||
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
|
||||
if (isOnlineLogProviderEnabled) {
|
||||
this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
|
||||
}
|
||||
}
|
||||
rpcServices = createRpcServices();
|
||||
useThisHostnameInstead = getUseThisHostnameInstead(conf);
|
||||
|
@ -1506,12 +1511,12 @@ public class HRegionServer extends Thread implements
|
|||
}
|
||||
|
||||
/**
|
||||
* get Online SlowLog Provider to add slow logs to ringbuffer
|
||||
* get NamedQueue Provider to add different logs to ringbuffer
|
||||
*
|
||||
* @return Online SlowLog Provider
|
||||
* @return NamedQueueRecorder
|
||||
*/
|
||||
public SlowLogRecorder getSlowLogRecorder() {
|
||||
return this.slowLogRecorder;
|
||||
public NamedQueueRecorder getNamedQueueRecorder() {
|
||||
return this.namedQueueRecorder;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -2070,7 +2075,7 @@ public class HRegionServer extends Thread implements
|
|||
if (isSlowLogTableEnabled) {
|
||||
// default chore duration: 10 min
|
||||
final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
|
||||
slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.slowLogRecorder);
|
||||
slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder);
|
||||
}
|
||||
|
||||
// Create the thread to clean the moved regions list
|
||||
|
|
|
@ -108,6 +108,9 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
|||
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.MasterRpcServices;
|
||||
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
|
||||
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
|
||||
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
|
||||
import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
|
||||
|
@ -128,7 +131,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
|
|||
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
|
||||
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
|
||||
import org.apache.hadoop.hbase.security.Superusers;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.access.AccessChecker;
|
||||
|
@ -1301,7 +1304,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
|
||||
rpcServer.setRsRpcServices(this);
|
||||
if (!(rs instanceof HMaster)) {
|
||||
rpcServer.setSlowLogRecorder(rs.getSlowLogRecorder());
|
||||
rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
|
||||
}
|
||||
scannerLeaseTimeoutPeriod = conf.getInt(
|
||||
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
|
||||
|
@ -3983,28 +3986,39 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
@QosPriority(priority = HConstants.ADMIN_QOS)
|
||||
public SlowLogResponses getSlowLogResponses(final RpcController controller,
|
||||
final SlowLogResponseRequest request) {
|
||||
final SlowLogRecorder slowLogRecorder =
|
||||
this.regionServer.getSlowLogRecorder();
|
||||
final List<SlowLogPayload> slowLogPayloads;
|
||||
slowLogPayloads = slowLogRecorder != null
|
||||
? slowLogRecorder.getSlowLogPayloads(request)
|
||||
: Collections.emptyList();
|
||||
final NamedQueueRecorder namedQueueRecorder =
|
||||
this.regionServer.getNamedQueueRecorder();
|
||||
final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder);
|
||||
SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
|
||||
.addAllSlowLogPayloads(slowLogPayloads)
|
||||
.build();
|
||||
return slowLogResponses;
|
||||
}
|
||||
|
||||
private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request,
|
||||
NamedQueueRecorder namedQueueRecorder) {
|
||||
if (namedQueueRecorder == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
List<SlowLogPayload> slowLogPayloads;
|
||||
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
|
||||
namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
|
||||
namedQueueGetRequest.setSlowLogResponseRequest(request);
|
||||
NamedQueueGetResponse namedQueueGetResponse =
|
||||
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
|
||||
slowLogPayloads = namedQueueGetResponse != null ?
|
||||
namedQueueGetResponse.getSlowLogPayloads() :
|
||||
Collections.emptyList();
|
||||
return slowLogPayloads;
|
||||
}
|
||||
|
||||
@Override
|
||||
@QosPriority(priority = HConstants.ADMIN_QOS)
|
||||
public SlowLogResponses getLargeLogResponses(final RpcController controller,
|
||||
final SlowLogResponseRequest request) {
|
||||
final SlowLogRecorder slowLogRecorder =
|
||||
this.regionServer.getSlowLogRecorder();
|
||||
final List<SlowLogPayload> slowLogPayloads;
|
||||
slowLogPayloads = slowLogRecorder != null
|
||||
? slowLogRecorder.getLargeLogPayloads(request)
|
||||
: Collections.emptyList();
|
||||
final NamedQueueRecorder namedQueueRecorder =
|
||||
this.regionServer.getNamedQueueRecorder();
|
||||
final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder);
|
||||
SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
|
||||
.addAllSlowLogPayloads(slowLogPayloads)
|
||||
.build();
|
||||
|
@ -4015,10 +4029,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
@QosPriority(priority = HConstants.ADMIN_QOS)
|
||||
public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller,
|
||||
final ClearSlowLogResponseRequest request) {
|
||||
final SlowLogRecorder slowLogRecorder =
|
||||
this.regionServer.getSlowLogRecorder();
|
||||
boolean slowLogsCleaned = Optional.ofNullable(slowLogRecorder)
|
||||
.map(SlowLogRecorder::clearSlowLogPayloads).orElse(false);
|
||||
final NamedQueueRecorder namedQueueRecorder =
|
||||
this.regionServer.getNamedQueueRecorder();
|
||||
boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder)
|
||||
.map(queueRecorder ->
|
||||
queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG))
|
||||
.orElse(false);
|
||||
ClearSlowLogResponses clearSlowLogResponses = ClearSlowLogResponses.newBuilder()
|
||||
.setIsCleaned(slowLogsCleaned)
|
||||
.build();
|
||||
|
|
|
@ -1,266 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.slowlog;
|
||||
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.SlowLogParams;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCall;
|
||||
import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
|
||||
|
||||
/**
|
||||
* Event Handler run by disruptor ringbuffer consumer
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class LogEventHandler implements EventHandler<RingBufferEnvelope> {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);
|
||||
|
||||
private static final String SYS_TABLE_QUEUE_SIZE =
|
||||
"hbase.regionserver.slowlog.systable.queue.size";
|
||||
private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
|
||||
private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
|
||||
|
||||
private final Queue<SlowLogPayload> queueForRingBuffer;
|
||||
private final Queue<SlowLogPayload> queueForSysTable;
|
||||
private final boolean isSlowLogTableEnabled;
|
||||
|
||||
private Configuration configuration;
|
||||
|
||||
private static final ReentrantLock LOCK = new ReentrantLock();
|
||||
|
||||
LogEventHandler(int eventCount, boolean isSlowLogTableEnabled, Configuration conf) {
|
||||
this.configuration = conf;
|
||||
EvictingQueue<SlowLogPayload> evictingQueue = EvictingQueue.create(eventCount);
|
||||
queueForRingBuffer = Queues.synchronizedQueue(evictingQueue);
|
||||
this.isSlowLogTableEnabled = isSlowLogTableEnabled;
|
||||
if (isSlowLogTableEnabled) {
|
||||
int sysTableQueueSize = conf.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE);
|
||||
EvictingQueue<SlowLogPayload> evictingQueueForTable =
|
||||
EvictingQueue.create(sysTableQueueSize);
|
||||
queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
|
||||
} else {
|
||||
queueForSysTable = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when a publisher has published an event to the {@link RingBuffer}
|
||||
*
|
||||
* @param event published to the {@link RingBuffer}
|
||||
* @param sequence of the event being processed
|
||||
* @param endOfBatch flag to indicate if this is the last event in a batch from
|
||||
* the {@link RingBuffer}
|
||||
* @throws Exception if the EventHandler would like the exception handled further up the chain
|
||||
*/
|
||||
@Override
|
||||
public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch)
|
||||
throws Exception {
|
||||
final RpcLogDetails rpcCallDetails = event.getPayload();
|
||||
final RpcCall rpcCall = rpcCallDetails.getRpcCall();
|
||||
final String clientAddress = rpcCallDetails.getClientAddress();
|
||||
final long responseSize = rpcCallDetails.getResponseSize();
|
||||
final String className = rpcCallDetails.getClassName();
|
||||
final SlowLogPayload.Type type = getLogType(rpcCallDetails);
|
||||
if (type == null) {
|
||||
return;
|
||||
}
|
||||
Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
|
||||
Message param = rpcCallDetails.getParam();
|
||||
long receiveTime = rpcCall.getReceiveTime();
|
||||
long startTime = rpcCall.getStartTime();
|
||||
long endTime = System.currentTimeMillis();
|
||||
int processingTime = (int) (endTime - startTime);
|
||||
int qTime = (int) (startTime - receiveTime);
|
||||
final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
|
||||
int numGets = 0;
|
||||
int numMutations = 0;
|
||||
int numServiceCalls = 0;
|
||||
if (param instanceof ClientProtos.MultiRequest) {
|
||||
ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
|
||||
for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
|
||||
for (ClientProtos.Action action : regionAction.getActionList()) {
|
||||
if (action.hasMutation()) {
|
||||
numMutations++;
|
||||
}
|
||||
if (action.hasGet()) {
|
||||
numGets++;
|
||||
}
|
||||
if (action.hasServiceCall()) {
|
||||
numServiceCalls++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
|
||||
final String methodDescriptorName =
|
||||
methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
|
||||
SlowLogPayload slowLogPayload = SlowLogPayload.newBuilder()
|
||||
.setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
|
||||
.setClientAddress(clientAddress)
|
||||
.setMethodName(methodDescriptorName)
|
||||
.setMultiGets(numGets)
|
||||
.setMultiMutations(numMutations)
|
||||
.setMultiServiceCalls(numServiceCalls)
|
||||
.setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
|
||||
.setProcessingTime(processingTime)
|
||||
.setQueueTime(qTime)
|
||||
.setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
|
||||
.setResponseSize(responseSize)
|
||||
.setServerClass(className)
|
||||
.setStartTime(startTime)
|
||||
.setType(type)
|
||||
.setUserName(userName)
|
||||
.build();
|
||||
queueForRingBuffer.add(slowLogPayload);
|
||||
if (isSlowLogTableEnabled) {
|
||||
if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
|
||||
queueForSysTable.add(slowLogPayload);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
|
||||
final boolean isSlowLog = rpcCallDetails.isSlowLog();
|
||||
final boolean isLargeLog = rpcCallDetails.isLargeLog();
|
||||
final SlowLogPayload.Type type;
|
||||
if (!isSlowLog && !isLargeLog) {
|
||||
LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}",
|
||||
rpcCallDetails);
|
||||
return null;
|
||||
}
|
||||
if (isSlowLog && isLargeLog) {
|
||||
type = SlowLogPayload.Type.ALL;
|
||||
} else if (isSlowLog) {
|
||||
type = SlowLogPayload.Type.SLOW_LOG;
|
||||
} else {
|
||||
type = SlowLogPayload.Type.LARGE_LOG;
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleans up slow log payloads
|
||||
*
|
||||
* @return true if slow log payloads are cleaned up, false otherwise
|
||||
*/
|
||||
boolean clearSlowLogs() {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Received request to clean up online slowlog buffer..");
|
||||
}
|
||||
queueForRingBuffer.clear();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve list of slow log payloads
|
||||
*
|
||||
* @param request slow log request parameters
|
||||
* @return list of slow log payloads
|
||||
*/
|
||||
List<SlowLogPayload> getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
|
||||
List<SlowLogPayload> slowLogPayloadList =
|
||||
Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
|
||||
.filter(e -> e.getType() == SlowLogPayload.Type.ALL
|
||||
|| e.getType() == SlowLogPayload.Type.SLOW_LOG)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// latest slow logs first, operator is interested in latest records from in-memory buffer
|
||||
Collections.reverse(slowLogPayloadList);
|
||||
|
||||
return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve list of large log payloads
|
||||
*
|
||||
* @param request large log request parameters
|
||||
* @return list of large log payloads
|
||||
*/
|
||||
List<SlowLogPayload> getLargeLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
|
||||
List<SlowLogPayload> slowLogPayloadList =
|
||||
Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
|
||||
.filter(e -> e.getType() == SlowLogPayload.Type.ALL
|
||||
|| e.getType() == SlowLogPayload.Type.LARGE_LOG)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// latest large logs first, operator is interested in latest records from in-memory buffer
|
||||
Collections.reverse(slowLogPayloadList);
|
||||
|
||||
return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
|
||||
}
|
||||
|
||||
/**
|
||||
* Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
|
||||
*/
|
||||
void addAllLogsToSysTable() {
|
||||
if (queueForSysTable == null) {
|
||||
// hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.
|
||||
return;
|
||||
}
|
||||
if (LOCK.isLocked()) {
|
||||
return;
|
||||
}
|
||||
LOCK.lock();
|
||||
try {
|
||||
List<SlowLogPayload> slowLogPayloads = new ArrayList<>();
|
||||
int i = 0;
|
||||
while (!queueForSysTable.isEmpty()) {
|
||||
slowLogPayloads.add(queueForSysTable.poll());
|
||||
i++;
|
||||
if (i == SYSTABLE_PUT_BATCH_SIZE) {
|
||||
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
|
||||
slowLogPayloads.clear();
|
||||
i = 0;
|
||||
}
|
||||
}
|
||||
if (slowLogPayloads.size() > 0) {
|
||||
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
|
||||
}
|
||||
} finally {
|
||||
LOCK.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -17,12 +17,14 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.slowlog;
|
||||
package org.apache.hadoop.hbase.namequeues;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.net.InetAddress;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
@ -35,6 +37,8 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCall;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCallback;
|
||||
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
|
||||
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -61,27 +65,25 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPaylo
|
|||
* Tests for Online SlowLog Provider Service
|
||||
*/
|
||||
@Category({MasterTests.class, MediumTests.class})
|
||||
public class TestSlowLogRecorder {
|
||||
public class TestNamedQueueRecorder {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSlowLogRecorder.class);
|
||||
HBaseClassTestRule.forClass(TestNamedQueueRecorder.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
|
||||
|
||||
private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
|
||||
|
||||
private SlowLogRecorder slowLogRecorder;
|
||||
private NamedQueueRecorder namedQueueRecorder;
|
||||
|
||||
private static int i = 0;
|
||||
|
||||
private static Configuration applySlowLogRecorderConf(int eventSize) {
|
||||
|
||||
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
|
||||
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
|
||||
conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize);
|
||||
return conf;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -90,11 +92,10 @@ public class TestSlowLogRecorder {
|
|||
*
|
||||
* @param i index of ringbuffer logs
|
||||
* @param j data value that was put on index i
|
||||
* @param slowLogPayloads list of payload retrieved from {@link SlowLogRecorder}
|
||||
* @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder}
|
||||
* @return if actual values are as per expectations
|
||||
*/
|
||||
private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) {
|
||||
|
||||
boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j);
|
||||
boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j);
|
||||
boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j);
|
||||
|
@ -102,15 +103,18 @@ public class TestSlowLogRecorder {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testOnlieSlowLogConsumption() throws Exception {
|
||||
public void testOnlieSlowLogConsumption() throws Exception{
|
||||
|
||||
Configuration conf = applySlowLogRecorderConf(8);
|
||||
slowLogRecorder = new SlowLogRecorder(conf);
|
||||
Constructor<NamedQueueRecorder> constructor =
|
||||
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
|
||||
constructor.setAccessible(true);
|
||||
namedQueueRecorder = constructor.newInstance(conf);
|
||||
AdminProtos.SlowLogResponseRequest request =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
|
||||
|
||||
slowLogRecorder.clearSlowLogPayloads();
|
||||
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
|
||||
namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
|
||||
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
|
||||
LOG.debug("Initially ringbuffer of Slow Log records is empty");
|
||||
|
||||
int i = 0;
|
||||
|
@ -119,12 +123,12 @@ public class TestSlowLogRecorder {
|
|||
for (; i < 5; i++) {
|
||||
RpcLogDetails rpcLogDetails =
|
||||
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 5));
|
||||
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
|
||||
Assert.assertNotEquals(-1,
|
||||
HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5));
|
||||
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
|
||||
Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads));
|
||||
Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads));
|
||||
Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads));
|
||||
|
@ -135,15 +139,15 @@ public class TestSlowLogRecorder {
|
|||
for (; i < 7; i++) {
|
||||
RpcLogDetails rpcLogDetails =
|
||||
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 7));
|
||||
() -> getSlowLogPayloads(request).size() == 7));
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> {
|
||||
List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
|
||||
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
|
||||
return slowLogPayloadsList.size() == 7
|
||||
&& confirmPayloadParams(0, 7, slowLogPayloadsList)
|
||||
&& confirmPayloadParams(5, 2, slowLogPayloadsList)
|
||||
|
@ -155,15 +159,15 @@ public class TestSlowLogRecorder {
|
|||
for (; i < 10; i++) {
|
||||
RpcLogDetails rpcLogDetails =
|
||||
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
|
||||
() -> getSlowLogPayloads(request).size() == 8));
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> {
|
||||
List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
|
||||
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
|
||||
// confirm ringbuffer is full
|
||||
return slowLogPayloadsList.size() == 8
|
||||
&& confirmPayloadParams(7, 3, slowLogPayloadsList)
|
||||
|
@ -176,15 +180,33 @@ public class TestSlowLogRecorder {
|
|||
for (; i < 14; i++) {
|
||||
RpcLogDetails rpcLogDetails =
|
||||
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
|
||||
() -> getSlowLogPayloads(request).size() == 8));
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> {
|
||||
List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
|
||||
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
|
||||
// confirm ringbuffer is full
|
||||
// and ordered events
|
||||
return slowLogPayloadsList.size() == 8
|
||||
&& confirmPayloadParams(0, 14, slowLogPayloadsList)
|
||||
&& confirmPayloadParams(1, 13, slowLogPayloadsList)
|
||||
&& confirmPayloadParams(2, 12, slowLogPayloadsList)
|
||||
&& confirmPayloadParams(3, 11, slowLogPayloadsList);
|
||||
})
|
||||
);
|
||||
|
||||
AdminProtos.SlowLogResponseRequest largeLogRequest =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder()
|
||||
.setLimit(15)
|
||||
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
|
||||
.build();
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> {
|
||||
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest);
|
||||
// confirm ringbuffer is full
|
||||
// and ordered events
|
||||
return slowLogPayloadsList.size() == 8
|
||||
|
@ -197,24 +219,12 @@ public class TestSlowLogRecorder {
|
|||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> {
|
||||
List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getLargeLogPayloads(request);
|
||||
// confirm ringbuffer is full
|
||||
// and ordered events
|
||||
return slowLogPayloadsList.size() == 8
|
||||
&& confirmPayloadParams(0, 14, slowLogPayloadsList)
|
||||
&& confirmPayloadParams(1, 13, slowLogPayloadsList)
|
||||
&& confirmPayloadParams(2, 12, slowLogPayloadsList)
|
||||
&& confirmPayloadParams(3, 11, slowLogPayloadsList);
|
||||
})
|
||||
);
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> {
|
||||
boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
|
||||
boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue(
|
||||
NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
|
||||
|
||||
LOG.debug("cleared the ringbuffer of Online Slow Log records");
|
||||
|
||||
List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
|
||||
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
|
||||
// confirm ringbuffer is empty
|
||||
return slowLogPayloadsList.size() == 0 && isRingBufferCleaned;
|
||||
})
|
||||
|
@ -222,30 +232,43 @@ public class TestSlowLogRecorder {
|
|||
|
||||
}
|
||||
|
||||
private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
|
||||
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
|
||||
namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
|
||||
namedQueueGetRequest.setSlowLogResponseRequest(request);
|
||||
NamedQueueGetResponse namedQueueGetResponse =
|
||||
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
|
||||
return namedQueueGetResponse == null ?
|
||||
Collections.emptyList() : namedQueueGetResponse.getSlowLogPayloads();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnlineSlowLogWithHighRecords() throws Exception {
|
||||
|
||||
Configuration conf = applySlowLogRecorderConf(14);
|
||||
slowLogRecorder = new SlowLogRecorder(conf);
|
||||
Constructor<NamedQueueRecorder> constructor =
|
||||
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
|
||||
constructor.setAccessible(true);
|
||||
namedQueueRecorder = constructor.newInstance(conf);
|
||||
AdminProtos.SlowLogResponseRequest request =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
|
||||
|
||||
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
|
||||
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
|
||||
LOG.debug("Initially ringbuffer of Slow Log records is empty");
|
||||
|
||||
for (int i = 0; i < 14 * 11; i++) {
|
||||
RpcLogDetails rpcLogDetails =
|
||||
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
|
||||
() -> getSlowLogPayloads(request).size() == 14));
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> {
|
||||
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
|
||||
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
|
||||
|
||||
// confirm strict order of slow log payloads
|
||||
return slowLogPayloads.size() == 14
|
||||
|
@ -266,37 +289,37 @@ public class TestSlowLogRecorder {
|
|||
})
|
||||
);
|
||||
|
||||
boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
|
||||
boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue(
|
||||
NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
|
||||
Assert.assertTrue(isRingBufferCleaned);
|
||||
LOG.debug("cleared the ringbuffer of Online Slow Log records");
|
||||
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
|
||||
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
|
||||
|
||||
// confirm ringbuffer is empty
|
||||
Assert.assertEquals(slowLogPayloads.size(), 0);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
|
||||
|
||||
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
|
||||
conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY);
|
||||
|
||||
slowLogRecorder = new SlowLogRecorder(conf);
|
||||
Constructor<NamedQueueRecorder> constructor =
|
||||
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
|
||||
constructor.setAccessible(true);
|
||||
namedQueueRecorder = constructor.newInstance(conf);
|
||||
AdminProtos.SlowLogResponseRequest request =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder().build();
|
||||
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
|
||||
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
|
||||
LOG.debug("Initially ringbuffer of Slow Log records is empty");
|
||||
|
||||
for (int i = 0; i < 300; i++) {
|
||||
RpcLogDetails rpcLogDetails =
|
||||
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> {
|
||||
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
|
||||
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
|
||||
return slowLogPayloads.size() == 0;
|
||||
})
|
||||
);
|
||||
|
@ -305,56 +328,58 @@ public class TestSlowLogRecorder {
|
|||
|
||||
@Test
|
||||
public void testOnlineSlowLogWithDisableConfig() throws Exception {
|
||||
|
||||
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
|
||||
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false);
|
||||
Constructor<NamedQueueRecorder> constructor =
|
||||
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
|
||||
constructor.setAccessible(true);
|
||||
namedQueueRecorder = constructor.newInstance(conf);
|
||||
|
||||
slowLogRecorder = new SlowLogRecorder(conf);
|
||||
AdminProtos.SlowLogResponseRequest request =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder().build();
|
||||
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
|
||||
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
|
||||
LOG.debug("Initially ringbuffer of Slow Log records is empty");
|
||||
|
||||
for (int i = 0; i < 300; i++) {
|
||||
RpcLogDetails rpcLogDetails =
|
||||
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> {
|
||||
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
|
||||
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
|
||||
return slowLogPayloads.size() == 0;
|
||||
})
|
||||
);
|
||||
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSlowLogFilters() throws Exception {
|
||||
|
||||
Configuration conf = applySlowLogRecorderConf(30);
|
||||
slowLogRecorder = new SlowLogRecorder(conf);
|
||||
Constructor<NamedQueueRecorder> constructor =
|
||||
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
|
||||
constructor.setAccessible(true);
|
||||
namedQueueRecorder = constructor.newInstance(conf);
|
||||
AdminProtos.SlowLogResponseRequest request =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder()
|
||||
.setLimit(15)
|
||||
.setUserName("userName_87")
|
||||
.build();
|
||||
|
||||
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
|
||||
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
|
||||
|
||||
LOG.debug("Initially ringbuffer of Slow Log records is empty");
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
RpcLogDetails rpcLogDetails =
|
||||
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter");
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 1));
|
||||
() -> getSlowLogPayloads(request).size() == 1));
|
||||
|
||||
AdminProtos.SlowLogResponseRequest requestClient =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder()
|
||||
|
@ -362,25 +387,32 @@ public class TestSlowLogRecorder {
|
|||
.setClientAddress("client_85")
|
||||
.build();
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> slowLogRecorder.getSlowLogPayloads(requestClient).size() == 1));
|
||||
() -> getSlowLogPayloads(requestClient).size() == 1));
|
||||
|
||||
AdminProtos.SlowLogResponseRequest requestSlowLog =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder()
|
||||
.setLimit(15)
|
||||
.build();
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15));
|
||||
|
||||
() -> getSlowLogPayloads(requestSlowLog).size() == 15));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConcurrentSlowLogEvents() throws Exception {
|
||||
|
||||
Configuration conf = applySlowLogRecorderConf(50000);
|
||||
slowLogRecorder = new SlowLogRecorder(conf);
|
||||
Constructor<NamedQueueRecorder> constructor =
|
||||
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
|
||||
constructor.setAccessible(true);
|
||||
namedQueueRecorder = constructor.newInstance(conf);
|
||||
AdminProtos.SlowLogResponseRequest request =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
|
||||
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
|
||||
AdminProtos.SlowLogResponseRequest largeLogRequest =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder()
|
||||
.setLimit(500000)
|
||||
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
|
||||
.build();
|
||||
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
|
||||
LOG.debug("Initially ringbuffer of Slow Log records is empty");
|
||||
|
||||
for (int j = 0; j < 1000; j++) {
|
||||
|
@ -389,7 +421,7 @@ public class TestSlowLogRecorder {
|
|||
for (int i = 0; i < 3500; i++) {
|
||||
RpcLogDetails rpcLogDetails =
|
||||
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -397,22 +429,24 @@ public class TestSlowLogRecorder {
|
|||
|
||||
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
|
||||
|
||||
slowLogRecorder.clearSlowLogPayloads();
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
|
||||
5000, () -> slowLogRecorder.getSlowLogPayloads(request).size() > 10000));
|
||||
5000, () -> getSlowLogPayloads(request).size() > 10000));
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
|
||||
5000, () -> slowLogRecorder.getLargeLogPayloads(request).size() > 10000));
|
||||
5000, () -> getSlowLogPayloads(largeLogRequest).size() > 10000));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSlowLargeLogEvents() throws Exception {
|
||||
Configuration conf = applySlowLogRecorderConf(28);
|
||||
slowLogRecorder = new SlowLogRecorder(conf);
|
||||
Constructor<NamedQueueRecorder> constructor =
|
||||
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
|
||||
constructor.setAccessible(true);
|
||||
namedQueueRecorder = constructor.newInstance(conf);
|
||||
|
||||
AdminProtos.SlowLogResponseRequest request =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
|
||||
|
||||
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
|
||||
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
|
||||
LOG.debug("Initially ringbuffer of Slow Log records is empty");
|
||||
|
||||
boolean isSlowLog;
|
||||
|
@ -428,16 +462,16 @@ public class TestSlowLogRecorder {
|
|||
RpcLogDetails rpcLogDetails =
|
||||
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1),
|
||||
isSlowLog, isLargeLog);
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
|
||||
() -> getSlowLogPayloads(request).size() == 14));
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> {
|
||||
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
|
||||
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
|
||||
|
||||
// confirm strict order of slow log payloads
|
||||
return slowLogPayloads.size() == 14
|
||||
|
@ -458,12 +492,18 @@ public class TestSlowLogRecorder {
|
|||
})
|
||||
);
|
||||
|
||||
AdminProtos.SlowLogResponseRequest largeLogRequest =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder()
|
||||
.setLimit(14 * 11)
|
||||
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
|
||||
.build();
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> slowLogRecorder.getLargeLogPayloads(request).size() == 14));
|
||||
() -> getSlowLogPayloads(largeLogRequest).size() == 14));
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> {
|
||||
List<SlowLogPayload> largeLogPayloads = slowLogRecorder.getLargeLogPayloads(request);
|
||||
List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest);
|
||||
|
||||
// confirm strict order of slow log payloads
|
||||
return largeLogPayloads.size() == 14
|
||||
|
@ -483,14 +523,16 @@ public class TestSlowLogRecorder {
|
|||
&& confirmPayloadParams(13, 128, largeLogPayloads);
|
||||
})
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSlowLogMixedFilters() throws Exception {
|
||||
|
||||
Configuration conf = applySlowLogRecorderConf(30);
|
||||
slowLogRecorder = new SlowLogRecorder(conf);
|
||||
Constructor<NamedQueueRecorder> constructor =
|
||||
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
|
||||
constructor.setAccessible(true);
|
||||
namedQueueRecorder = constructor.newInstance(conf);
|
||||
AdminProtos.SlowLogResponseRequest request =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder()
|
||||
.setLimit(15)
|
||||
|
@ -498,23 +540,23 @@ public class TestSlowLogRecorder {
|
|||
.setClientAddress("client_88")
|
||||
.build();
|
||||
|
||||
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
|
||||
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
RpcLogDetails rpcLogDetails =
|
||||
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 2));
|
||||
() -> getSlowLogPayloads(request).size() == 2));
|
||||
|
||||
AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder()
|
||||
.setLimit(15)
|
||||
.setUserName("userName_1")
|
||||
.setClientAddress("client_2")
|
||||
.build();
|
||||
Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request2).size());
|
||||
Assert.assertEquals(0, getSlowLogPayloads(request2).size());
|
||||
|
||||
AdminProtos.SlowLogResponseRequest request3 =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder()
|
||||
|
@ -523,7 +565,7 @@ public class TestSlowLogRecorder {
|
|||
.setClientAddress("client_88")
|
||||
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
|
||||
.build();
|
||||
Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request3).size());
|
||||
Assert.assertEquals(0, getSlowLogPayloads(request3).size());
|
||||
|
||||
AdminProtos.SlowLogResponseRequest request4 =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder()
|
||||
|
@ -532,7 +574,7 @@ public class TestSlowLogRecorder {
|
|||
.setClientAddress("client_87")
|
||||
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
|
||||
.build();
|
||||
Assert.assertEquals(1, slowLogRecorder.getSlowLogPayloads(request4).size());
|
||||
Assert.assertEquals(1, getSlowLogPayloads(request4).size());
|
||||
|
||||
AdminProtos.SlowLogResponseRequest request5 =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder()
|
||||
|
@ -541,14 +583,14 @@ public class TestSlowLogRecorder {
|
|||
.setClientAddress("client_89")
|
||||
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR)
|
||||
.build();
|
||||
Assert.assertEquals(2, slowLogRecorder.getSlowLogPayloads(request5).size());
|
||||
Assert.assertEquals(2, getSlowLogPayloads(request5).size());
|
||||
|
||||
AdminProtos.SlowLogResponseRequest requestSlowLog =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder()
|
||||
.setLimit(15)
|
||||
.build();
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
|
||||
() -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15));
|
||||
() -> getSlowLogPayloads(requestSlowLog).size() == 15));
|
||||
}
|
||||
|
||||
static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
|
||||
|
@ -762,26 +804,20 @@ public class TestSlowLogRecorder {
|
|||
private static Optional<User> getUser(String userName) {
|
||||
|
||||
return Optional.of(new User() {
|
||||
|
||||
|
||||
@Override
|
||||
public String getShortName() {
|
||||
return userName;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public <T> T runAs(PrivilegedAction<T> action) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public <T> T runAs(PrivilegedExceptionAction<T> action) throws
|
||||
IOException, InterruptedException {
|
||||
public <T> T runAs(PrivilegedExceptionAction<T> action) {
|
||||
return null;
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
}
|
|
@ -17,10 +17,11 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.slowlog;
|
||||
package org.apache.hadoop.hbase.namequeues;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -33,8 +34,11 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
|
||||
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
|
||||
import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -59,11 +63,11 @@ public class TestSlowLogAccessor {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSlowLogAccessor.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
|
||||
|
||||
private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
|
||||
|
||||
private SlowLogRecorder slowLogRecorder;
|
||||
private NamedQueueRecorder namedQueueRecorder;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
|
@ -88,9 +92,19 @@ public class TestSlowLogAccessor {
|
|||
@Before
|
||||
public void setUp() throws Exception {
|
||||
HRegionServer hRegionServer = HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0);
|
||||
Field slowLogRecorder = HRegionServer.class.getDeclaredField("slowLogRecorder");
|
||||
Field slowLogRecorder = HRegionServer.class.getDeclaredField("namedQueueRecorder");
|
||||
slowLogRecorder.setAccessible(true);
|
||||
this.slowLogRecorder = (SlowLogRecorder) slowLogRecorder.get(hRegionServer);
|
||||
this.namedQueueRecorder = (NamedQueueRecorder) slowLogRecorder.get(hRegionServer);
|
||||
}
|
||||
|
||||
private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads(
|
||||
AdminProtos.SlowLogResponseRequest request) {
|
||||
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
|
||||
namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
|
||||
namedQueueGetRequest.setSlowLogResponseRequest(request);
|
||||
NamedQueueGetResponse namedQueueGetResponse =
|
||||
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
|
||||
return namedQueueGetResponse.getSlowLogPayloads();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -99,42 +113,42 @@ public class TestSlowLogAccessor {
|
|||
AdminProtos.SlowLogResponseRequest request =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
|
||||
|
||||
slowLogRecorder.clearSlowLogPayloads();
|
||||
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
|
||||
namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
|
||||
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
|
||||
|
||||
int i = 0;
|
||||
|
||||
Connection connection = waitForSlowLogTableCreation();
|
||||
// add 5 records initially
|
||||
for (; i < 5; i++) {
|
||||
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
|
||||
RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
|
||||
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
|
||||
// add 2 more records
|
||||
for (; i < 7; i++) {
|
||||
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
|
||||
RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
|
||||
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
|
||||
// add 3 more records
|
||||
for (; i < 10; i++) {
|
||||
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
|
||||
RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
|
||||
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
|
||||
// add 4 more records
|
||||
for (; i < 14; i++) {
|
||||
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
|
||||
RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
|
||||
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY
|
||||
.waitFor(3000, () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
|
||||
.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14));
|
||||
|
||||
Assert.assertNotEquals(-1,
|
||||
HBASE_TESTING_UTILITY.waitFor(3000, () -> getTableCount(connection) == 14));
|
||||
|
@ -170,10 +184,10 @@ public class TestSlowLogAccessor {
|
|||
public void testHigherSlowLogs() throws Exception {
|
||||
Connection connection = waitForSlowLogTableCreation();
|
||||
|
||||
slowLogRecorder.clearSlowLogPayloads();
|
||||
namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
|
||||
AdminProtos.SlowLogResponseRequest request =
|
||||
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
|
||||
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
|
||||
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
|
||||
|
||||
for (int j = 0; j < 100; j++) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
|
@ -181,15 +195,15 @@ public class TestSlowLogAccessor {
|
|||
if (i == 300) {
|
||||
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
|
||||
RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
|
||||
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
|
||||
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
|
||||
namedQueueRecorder.addRecord(rpcLogDetails);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(7000, () -> {
|
||||
int count = slowLogRecorder.getSlowLogPayloads(request).size();
|
||||
int count = getSlowLogPayloads(request).size();
|
||||
LOG.debug("RingBuffer records count: {}", count);
|
||||
return count > 2000;
|
||||
}));
|
|
@ -394,7 +394,7 @@ module Hbase
|
|||
|
||||
define_test 'clear slowlog responses should work' do
|
||||
output = capture_stdout { command(:clear_slowlog_responses, nil) }
|
||||
assert(output.include?('Cleared Slowlog responses from 1/1 RegionServers'))
|
||||
assert(output.include?('Cleared Slowlog responses from 0/1 RegionServers'))
|
||||
end
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
|
|
|
@ -197,6 +197,7 @@ public class TestThriftHBaseServiceHandler {
|
|||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
UTIL.getConfiguration().set("hbase.client.retries.number", "3");
|
||||
UTIL.getConfiguration().setBoolean("hbase.regionserver.slowlog.buffer.enabled", true);
|
||||
UTIL.startMiniCluster();
|
||||
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableAname));
|
||||
for (HColumnDescriptor family : families) {
|
||||
|
|
Loading…
Reference in New Issue