Revert "HBASE-24718 : Generic NamedQueue framework for multiple use-cases (Refactor SlowLog responses)"

Causes TestAdminShell and TestThriftHBaseServiceHandler to fail 100% of the time in branch-2.

This reverts commit 8ae3480e70.
This commit is contained in:
stack 2020-07-20 14:50:03 -07:00
parent 61e3945990
commit 620470607e
23 changed files with 529 additions and 1070 deletions

View File

@ -2055,11 +2055,6 @@ public final class RequestConverter {
} else {
builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.OR);
}
if (LogQueryFilter.Type.SLOW_LOG.equals(logQueryFilter.getType())) {
builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG);
} else {
builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG);
}
return builder.setLimit(logQueryFilter.getLimit()).build();
}

View File

@ -1996,16 +1996,4 @@ possible configurations would overwhelm and obscure the important.
too large batch request.
</description>
</property>
<property>
<name>hbase.namedqueue.provider.classes</name>
<value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService</value>
<description>
Default values for NamedQueueService implementors. This comma separated full class names
represent all implementors of NamedQueueService that we would like to be invoked by
LogEvent handler service. One example of NamedQueue service is SlowLogQueueService which
is used to store slow/large RPC logs in ringbuffer at each RegionServer.
All implementors of NamedQueueService should be found under package:
"org.apache.hadoop.hbase.namequeues.impl"
</description>
</property>
</configuration>

View File

@ -287,18 +287,12 @@ message SlowLogResponseRequest {
OR = 1;
}
enum LogType {
SLOW_LOG = 0;
LARGE_LOG = 1;
}
optional string region_name = 1;
optional string table_name = 2;
optional string client_address = 3;
optional string user_name = 4;
optional uint32 limit = 5 [default = 10];
optional FilterByOperator filter_by_operator = 6 [default = OR];
optional LogType log_type = 7;
}
message SlowLogResponses {

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
@ -47,8 +46,8 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.regionserver.slowlog.RpcLogDetails;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
@ -97,7 +96,6 @@ public abstract class RpcServer implements RpcServerInterface,
private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
private final boolean authorize;
private final boolean isOnlineLogProviderEnabled;
protected boolean isSecurityEnabled;
public static final byte CURRENT_VERSION = 0;
@ -229,7 +227,7 @@ public abstract class RpcServer implements RpcServerInterface,
/**
* Use to add online slowlog responses
*/
private NamedQueueRecorder namedQueueRecorder;
private SlowLogRecorder slowLogRecorder;
@FunctionalInterface
protected interface CallCleanup {
@ -304,8 +302,6 @@ public abstract class RpcServer implements RpcServerInterface,
saslProps = Collections.emptyMap();
}
this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
this.scheduler = scheduler;
}
@ -434,11 +430,11 @@ public abstract class RpcServer implements RpcServerInterface,
tooLarge, tooSlow,
status.getClient(), startTime, processingTime, qTime,
responseSize, userName);
if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
if (this.slowLogRecorder != null) {
// send logs to ring buffer owned by slowLogRecorder
final String className =
server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
this.namedQueueRecorder.addRecord(
final String className = server == null ? StringUtils.EMPTY :
server.getClass().getSimpleName();
this.slowLogRecorder.addSlowLogPayload(
new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow,
tooLarge));
}
@ -821,8 +817,12 @@ public abstract class RpcServer implements RpcServerInterface,
}
@Override
public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
this.namedQueueRecorder = namedQueueRecorder;
public void setSlowLogRecorder(SlowLogRecorder slowLogRecorder) {
this.slowLogRecorder = slowLogRecorder;
}
@Override
public SlowLogRecorder getSlowLogRecorder() {
return slowLogRecorder;
}
}

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.yetus.audience.InterfaceAudience;
@ -102,8 +102,12 @@ public interface RpcServerInterface {
/**
* Set Online SlowLog Provider
*
* @param namedQueueRecorder instance of {@link NamedQueueRecorder}
* @param slowLogRecorder instance of {@link SlowLogRecorder}
*/
void setNamedQueueRecorder(final NamedQueueRecorder namedQueueRecorder);
void setSlowLogRecorder(final SlowLogRecorder slowLogRecorder);
/**
* @return Retrieve instance of {@link SlowLogRecorder} maintained by RpcServer
*/
SlowLogRecorder getSlowLogRecorder();
}

View File

@ -1,130 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Event Handler run by disruptor ringbuffer consumer.
* Although this is generic implementation for namedQueue, it can have individual queue specific
* logic.
*/
@InterfaceAudience.Private
class LogEventHandler implements EventHandler<RingBufferEnvelope> {
private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);
// Map that binds namedQueues to corresponding queue service implementation.
// If NamedQueue of specific type is enabled, corresponding service will be used to
// insert and retrieve records.
// Individual queue sizes should be determined based on their individual configs within
// each service.
private final Map<NamedQueuePayload.NamedQueueEvent, NamedQueueService> namedQueueServices =
new HashMap<>();
private static final String NAMED_QUEUE_PROVIDER_CLASSES = "hbase.namedqueue.provider.classes";
LogEventHandler(final Configuration conf) {
for (String implName : conf.getStringCollection(NAMED_QUEUE_PROVIDER_CLASSES)) {
Class<?> clz;
try {
clz = Class.forName(implName);
} catch (ClassNotFoundException e) {
LOG.warn("Failed to find NamedQueueService implementor class {}", implName, e);
continue;
}
if (!NamedQueueService.class.isAssignableFrom(clz)) {
LOG.warn("Class {} is not implementor of NamedQueueService.", clz);
continue;
}
// add all service mappings here
try {
NamedQueueService namedQueueService =
(NamedQueueService) clz.getConstructor(Configuration.class).newInstance(conf);
namedQueueServices.put(namedQueueService.getEvent(), namedQueueService);
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException
| InvocationTargetException e) {
LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.",
clz);
}
}
}
/**
* Called when a publisher has published an event to the {@link RingBuffer}.
* This is generic consumer of disruptor ringbuffer and for each new namedQueue that we
* add, we should also provide specific consumer logic here.
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from
* the {@link RingBuffer}
*/
@Override
public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) {
final NamedQueuePayload namedQueuePayload = event.getPayload();
// consume ringbuffer payload based on event type
namedQueueServices.get(namedQueuePayload.getNamedQueueEvent())
.consumeEventFromDisruptor(namedQueuePayload);
}
/**
* Cleans up queues maintained by services.
*
* @param namedQueueEvent type of queue to clear
* @return true if queue is cleaned up, false otherwise
*/
boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
return namedQueueServices.get(namedQueueEvent).clearNamedQueue();
}
/**
* Add all in memory queue records to system table. The implementors can use system table
* or direct HDFS file or ZK as persistence system.
*/
void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
namedQueueServices.get(namedQueueEvent).persistAll();
}
/**
* Retrieve in memory queue records from ringbuffer
*
* @param request namedQueue request with event type
* @return queue records from ringbuffer after filter (if applied)
*/
NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
return namedQueueServices.get(request.getNamedQueueEvent()).getNamedQueueRecords(request);
}
}

View File

@ -1,49 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Base payload to be prepared by client to send various namedQueue events for in-memory
* ring buffer storage in either HMaster or RegionServer.
* e.g slowLog responses
*/
@InterfaceAudience.Private
public class NamedQueuePayload {
public enum NamedQueueEvent {
SLOW_LOG
}
private final NamedQueueEvent namedQueueEvent;
public NamedQueuePayload(NamedQueueEvent namedQueueEvent) {
if (namedQueueEvent == null) {
throw new RuntimeException("NamedQueuePayload with null namedQueueEvent");
}
this.namedQueueEvent = namedQueueEvent;
}
public NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}
}

View File

@ -1,69 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.yetus.audience.InterfaceAudience;
/**
* In-memory Queue service provider for multiple use-cases. Implementers should be
* registered in LogEventHandler
*/
@InterfaceAudience.Private
public interface NamedQueueService {
/**
* Retrieve event type for NamedQueueService implementation.
*
* @return {@link NamedQueuePayload.NamedQueueEvent}
*/
NamedQueuePayload.NamedQueueEvent getEvent();
/**
* This implementation is generic for consuming records from LMAX
* disruptor and inserts records to EvictingQueue which is maintained by each
* ringbuffer provider.
*
* @param namedQueuePayload namedQueue payload from disruptor ring buffer
*/
void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload);
/**
* Cleans up queues maintained by services.
*
* @return true if slow log payloads are cleaned up, false otherwise
*/
boolean clearNamedQueue();
/**
* Retrieve in memory queue records from ringbuffer
*
* @param request namedQueue request with event type
* @return queue records from ringbuffer after filter (if applied)
*/
NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request);
/**
* Add all in memory queue records to system table. The implementors can use system table
* or direct HDFS file or ZK as persistence system.
*/
void persistAll();
}

View File

@ -1,98 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantLock;
/**
* Persistent service provider for Slow/LargeLog events
*/
@InterfaceAudience.Private
public class SlowLogPersistentService {
private static final Logger LOG = LoggerFactory.getLogger(SlowLogPersistentService.class);
private static final ReentrantLock LOCK = new ReentrantLock();
private static final String SYS_TABLE_QUEUE_SIZE =
"hbase.regionserver.slowlog.systable.queue.size";
private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
private final Queue<TooSlowLog.SlowLogPayload> queueForSysTable;
private final Configuration configuration;
public SlowLogPersistentService(final Configuration configuration) {
this.configuration = configuration;
int sysTableQueueSize =
configuration.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE);
EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueueForTable =
EvictingQueue.create(sysTableQueueSize);
queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
}
public void addToQueueForSysTable(TooSlowLog.SlowLogPayload slowLogPayload) {
queueForSysTable.add(slowLogPayload);
}
/**
* Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
*/
public void addAllLogsToSysTable() {
if (queueForSysTable == null) {
LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.");
return;
}
if (LOCK.isLocked()) {
return;
}
LOCK.lock();
try {
List<TooSlowLog.SlowLogPayload> slowLogPayloads = new ArrayList<>();
int i = 0;
while (!queueForSysTable.isEmpty()) {
slowLogPayloads.add(queueForSysTable.poll());
i++;
if (i == SYSTABLE_PUT_BATCH_SIZE) {
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
slowLogPayloads.clear();
i = 0;
}
}
if (slowLogPayloads.size() > 0) {
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
}
} finally {
LOCK.unlock();
}
}
}

View File

@ -1,264 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.SlowLogParams;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.namequeues.LogHandlerUtils;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.namequeues.NamedQueueService;
import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;
/**
* In-memory Queue service provider for Slow/LargeLog events
*/
@InterfaceAudience.Private
public class SlowLogQueueService implements NamedQueueService {
private static final Logger LOG = LoggerFactory.getLogger(SlowLogQueueService.class);
private static final String SLOW_LOG_RING_BUFFER_SIZE =
"hbase.regionserver.slowlog.ringbuffer.size";
private final boolean isOnlineLogProviderEnabled;
private final boolean isSlowLogTableEnabled;
private final SlowLogPersistentService slowLogPersistentService;
private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue;
public SlowLogQueueService(Configuration conf) {
this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
if (!isOnlineLogProviderEnabled) {
this.isSlowLogTableEnabled = false;
this.slowLogPersistentService = null;
this.slowLogQueue = null;
return;
}
// Initialize SlowLog Queue
int slowLogQueueSize =
conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueue =
EvictingQueue.create(slowLogQueueSize);
slowLogQueue = Queues.synchronizedQueue(evictingQueue);
this.isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
if (isSlowLogTableEnabled) {
slowLogPersistentService = new SlowLogPersistentService(conf);
} else {
slowLogPersistentService = null;
}
}
@Override
public NamedQueuePayload.NamedQueueEvent getEvent() {
return NamedQueuePayload.NamedQueueEvent.SLOW_LOG;
}
/**
* This implementation is specific to slowLog event. This consumes slowLog event from
* disruptor and inserts records to EvictingQueue.
*
* @param namedQueuePayload namedQueue payload from disruptor ring buffer
*/
@Override
public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
if (!isOnlineLogProviderEnabled) {
return;
}
if (!(namedQueuePayload instanceof RpcLogDetails)) {
LOG.warn("SlowLogQueueService: NamedQueuePayload is not of type RpcLogDetails.");
return;
}
final RpcLogDetails rpcLogDetails = (RpcLogDetails) namedQueuePayload;
final RpcCall rpcCall = rpcLogDetails.getRpcCall();
final String clientAddress = rpcLogDetails.getClientAddress();
final long responseSize = rpcLogDetails.getResponseSize();
final String className = rpcLogDetails.getClassName();
final TooSlowLog.SlowLogPayload.Type type = getLogType(rpcLogDetails);
if (type == null) {
return;
}
Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
Message param = rpcLogDetails.getParam();
long receiveTime = rpcCall.getReceiveTime();
long startTime = rpcCall.getStartTime();
long endTime = System.currentTimeMillis();
int processingTime = (int) (endTime - startTime);
int qTime = (int) (startTime - receiveTime);
final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
int numGets = 0;
int numMutations = 0;
int numServiceCalls = 0;
if (param instanceof ClientProtos.MultiRequest) {
ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
for (ClientProtos.Action action : regionAction.getActionList()) {
if (action.hasMutation()) {
numMutations++;
}
if (action.hasGet()) {
numGets++;
}
if (action.hasServiceCall()) {
numServiceCalls++;
}
}
}
}
final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
final String methodDescriptorName =
methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
TooSlowLog.SlowLogPayload slowLogPayload = TooSlowLog.SlowLogPayload.newBuilder()
.setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
.setClientAddress(clientAddress)
.setMethodName(methodDescriptorName)
.setMultiGets(numGets)
.setMultiMutations(numMutations)
.setMultiServiceCalls(numServiceCalls)
.setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
.setProcessingTime(processingTime)
.setQueueTime(qTime)
.setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
.setResponseSize(responseSize)
.setServerClass(className)
.setStartTime(startTime)
.setType(type)
.setUserName(userName)
.build();
slowLogQueue.add(slowLogPayload);
if (isSlowLogTableEnabled) {
if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
slowLogPersistentService.addToQueueForSysTable(slowLogPayload);
}
}
}
@Override
public boolean clearNamedQueue() {
if (!isOnlineLogProviderEnabled) {
return false;
}
LOG.debug("Received request to clean up online slowlog buffer.");
slowLogQueue.clear();
return true;
}
@Override
public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
if (!isOnlineLogProviderEnabled) {
return null;
}
final AdminProtos.SlowLogResponseRequest slowLogResponseRequest =
request.getSlowLogResponseRequest();
final List<TooSlowLog.SlowLogPayload> slowLogPayloads;
if (AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG
.equals(slowLogResponseRequest.getLogType())) {
slowLogPayloads = getLargeLogPayloads(slowLogResponseRequest);
} else {
slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest);
}
NamedQueueGetResponse response = new NamedQueueGetResponse();
response.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
response.setSlowLogPayloads(slowLogPayloads);
return response;
}
private TooSlowLog.SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
final boolean isSlowLog = rpcCallDetails.isSlowLog();
final boolean isLargeLog = rpcCallDetails.isLargeLog();
final TooSlowLog.SlowLogPayload.Type type;
if (!isSlowLog && !isLargeLog) {
LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}",
rpcCallDetails);
return null;
}
if (isSlowLog && isLargeLog) {
type = TooSlowLog.SlowLogPayload.Type.ALL;
} else if (isSlowLog) {
type = TooSlowLog.SlowLogPayload.Type.SLOW_LOG;
} else {
type = TooSlowLog.SlowLogPayload.Type.LARGE_LOG;
}
return type;
}
/**
* Add all slowLog events to system table. This is only for slowLog event's persistence on
* system table.
*/
@Override
public void persistAll() {
if (!isOnlineLogProviderEnabled) {
return;
}
if (slowLogPersistentService != null) {
slowLogPersistentService.addAllLogsToSysTable();
}
}
private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads(
final AdminProtos.SlowLogResponseRequest request) {
List<TooSlowLog.SlowLogPayload> slowLogPayloadList =
Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter(
e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
|| e.getType() == TooSlowLog.SlowLogPayload.Type.SLOW_LOG).collect(Collectors.toList());
// latest slow logs first, operator is interested in latest records from in-memory buffer
Collections.reverse(slowLogPayloadList);
return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
}
private List<TooSlowLog.SlowLogPayload> getLargeLogPayloads(
final AdminProtos.SlowLogResponseRequest request) {
List<TooSlowLog.SlowLogPayload> slowLogPayloadList =
Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter(
e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
|| e.getType() == TooSlowLog.SlowLogPayload.Type.LARGE_LOG).collect(Collectors.toList());
// latest large logs first, operator is interested in latest records from in-memory buffer
Collections.reverse(slowLogPayloadList);
return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
}
}

View File

@ -1,65 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues.request;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Request object to be used by ring buffer use-cases. Clients get records by sending
* this request object.
* For each ring buffer use-case, add request payload to this class, client should set
* namedQueueEvent based on use-case.
* Protobuf does not support inheritance, hence we need to work with
*/
@InterfaceAudience.Private
public class NamedQueueGetRequest {
private AdminProtos.SlowLogResponseRequest slowLogResponseRequest;
private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
public AdminProtos.SlowLogResponseRequest getSlowLogResponseRequest() {
return slowLogResponseRequest;
}
public void setSlowLogResponseRequest(
AdminProtos.SlowLogResponseRequest slowLogResponseRequest) {
this.slowLogResponseRequest = slowLogResponseRequest;
}
public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}
public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
this.namedQueueEvent = namedQueueEvent;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("slowLogResponseRequest", slowLogResponseRequest)
.append("namedQueueEvent", namedQueueEvent)
.toString();
}
}

View File

@ -1,61 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues.response;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
import org.apache.yetus.audience.InterfaceAudience;
import java.util.List;
/**
* Response object to be sent by namedQueue service back to caller
*/
@InterfaceAudience.Private
public class NamedQueueGetResponse {
private List<TooSlowLog.SlowLogPayload> slowLogPayloads;
private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
public List<TooSlowLog.SlowLogPayload> getSlowLogPayloads() {
return slowLogPayloads;
}
public void setSlowLogPayloads(List<TooSlowLog.SlowLogPayload> slowLogPayloads) {
this.slowLogPayloads = slowLogPayloads;
}
public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
return namedQueueEvent;
}
public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
this.namedQueueEvent = namedQueueEvent;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("slowLogPayloads", slowLogPayloads)
.append("namedQueueEvent", namedQueueEvent)
.toString();
}
}

View File

@ -138,8 +138,8 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogTableOpsChore;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@ -535,7 +535,7 @@ public class HRegionServer extends Thread implements
/**
* Provide online slow log responses from ringbuffer
*/
private NamedQueueRecorder namedQueueRecorder = null;
private SlowLogRecorder slowLogRecorder;
/**
* True if this RegionServer is coming up in a cluster where there is no Master;
@ -597,12 +597,7 @@ public class HRegionServer extends Thread implements
this.stopped = false;
if (!(this instanceof HMaster)) {
final boolean isOnlineLogProviderEnabled = conf.getBoolean(
HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
if (isOnlineLogProviderEnabled) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
}
this.slowLogRecorder = new SlowLogRecorder(this.conf);
}
rpcServices = createRpcServices();
useThisHostnameInstead = getUseThisHostnameInstead(conf);
@ -1511,12 +1506,12 @@ public class HRegionServer extends Thread implements
}
/**
* get NamedQueue Provider to add different logs to ringbuffer
* get Online SlowLog Provider to add slow logs to ringbuffer
*
* @return NamedQueueRecorder
* @return Online SlowLog Provider
*/
public NamedQueueRecorder getNamedQueueRecorder() {
return this.namedQueueRecorder;
public SlowLogRecorder getSlowLogRecorder() {
return this.slowLogRecorder;
}
/*
@ -2075,7 +2070,7 @@ public class HRegionServer extends Thread implements
if (isSlowLogTableEnabled) {
// default chore duration: 10 min
final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder);
slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.slowLogRecorder);
}
// Create the thread to clean the moved regions list

View File

@ -108,9 +108,6 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
@ -131,7 +128,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
@ -1304,7 +1301,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
rpcServer.setRsRpcServices(this);
if (!(rs instanceof HMaster)) {
rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
rpcServer.setSlowLogRecorder(rs.getSlowLogRecorder());
}
scannerLeaseTimeoutPeriod = conf.getInt(
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
@ -3986,39 +3983,28 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
@QosPriority(priority = HConstants.ADMIN_QOS)
public SlowLogResponses getSlowLogResponses(final RpcController controller,
final SlowLogResponseRequest request) {
final NamedQueueRecorder namedQueueRecorder =
this.regionServer.getNamedQueueRecorder();
final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder);
final SlowLogRecorder slowLogRecorder =
this.regionServer.getSlowLogRecorder();
final List<SlowLogPayload> slowLogPayloads;
slowLogPayloads = slowLogRecorder != null
? slowLogRecorder.getSlowLogPayloads(request)
: Collections.emptyList();
SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
.addAllSlowLogPayloads(slowLogPayloads)
.build();
return slowLogResponses;
}
private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request,
NamedQueueRecorder namedQueueRecorder) {
if (namedQueueRecorder == null) {
return Collections.emptyList();
}
List<SlowLogPayload> slowLogPayloads;
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
namedQueueGetRequest.setSlowLogResponseRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
slowLogPayloads = namedQueueGetResponse != null ?
namedQueueGetResponse.getSlowLogPayloads() :
Collections.emptyList();
return slowLogPayloads;
}
@Override
@QosPriority(priority = HConstants.ADMIN_QOS)
public SlowLogResponses getLargeLogResponses(final RpcController controller,
final SlowLogResponseRequest request) {
final NamedQueueRecorder namedQueueRecorder =
this.regionServer.getNamedQueueRecorder();
final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder);
final SlowLogRecorder slowLogRecorder =
this.regionServer.getSlowLogRecorder();
final List<SlowLogPayload> slowLogPayloads;
slowLogPayloads = slowLogRecorder != null
? slowLogRecorder.getLargeLogPayloads(request)
: Collections.emptyList();
SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
.addAllSlowLogPayloads(slowLogPayloads)
.build();
@ -4029,12 +4015,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
@QosPriority(priority = HConstants.ADMIN_QOS)
public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller,
final ClearSlowLogResponseRequest request) {
final NamedQueueRecorder namedQueueRecorder =
this.regionServer.getNamedQueueRecorder();
boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder)
.map(queueRecorder ->
queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG))
.orElse(false);
final SlowLogRecorder slowLogRecorder =
this.regionServer.getSlowLogRecorder();
boolean slowLogsCleaned = Optional.ofNullable(slowLogRecorder)
.map(SlowLogRecorder::clearSlowLogPayloads).orElse(false);
ClearSlowLogResponses clearSlowLogResponses = ClearSlowLogResponses.newBuilder()
.setIsCleaned(slowLogsCleaned)
.build();

View File

@ -17,7 +17,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
package org.apache.hadoop.hbase.regionserver.slowlog;
import com.lmax.disruptor.ExceptionHandler;
import org.apache.yetus.audience.InterfaceAudience;

View File

@ -0,0 +1,266 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.slowlog;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.SlowLogParams;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
/**
* Event Handler run by disruptor ringbuffer consumer
*/
@InterfaceAudience.Private
class LogEventHandler implements EventHandler<RingBufferEnvelope> {
private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);
private static final String SYS_TABLE_QUEUE_SIZE =
"hbase.regionserver.slowlog.systable.queue.size";
private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
private final Queue<SlowLogPayload> queueForRingBuffer;
private final Queue<SlowLogPayload> queueForSysTable;
private final boolean isSlowLogTableEnabled;
private Configuration configuration;
private static final ReentrantLock LOCK = new ReentrantLock();
LogEventHandler(int eventCount, boolean isSlowLogTableEnabled, Configuration conf) {
this.configuration = conf;
EvictingQueue<SlowLogPayload> evictingQueue = EvictingQueue.create(eventCount);
queueForRingBuffer = Queues.synchronizedQueue(evictingQueue);
this.isSlowLogTableEnabled = isSlowLogTableEnabled;
if (isSlowLogTableEnabled) {
int sysTableQueueSize = conf.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE);
EvictingQueue<SlowLogPayload> evictingQueueForTable =
EvictingQueue.create(sysTableQueueSize);
queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
} else {
queueForSysTable = null;
}
}
/**
* Called when a publisher has published an event to the {@link RingBuffer}
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from
* the {@link RingBuffer}
* @throws Exception if the EventHandler would like the exception handled further up the chain
*/
@Override
public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch)
throws Exception {
final RpcLogDetails rpcCallDetails = event.getPayload();
final RpcCall rpcCall = rpcCallDetails.getRpcCall();
final String clientAddress = rpcCallDetails.getClientAddress();
final long responseSize = rpcCallDetails.getResponseSize();
final String className = rpcCallDetails.getClassName();
final SlowLogPayload.Type type = getLogType(rpcCallDetails);
if (type == null) {
return;
}
Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
Message param = rpcCallDetails.getParam();
long receiveTime = rpcCall.getReceiveTime();
long startTime = rpcCall.getStartTime();
long endTime = System.currentTimeMillis();
int processingTime = (int) (endTime - startTime);
int qTime = (int) (startTime - receiveTime);
final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
int numGets = 0;
int numMutations = 0;
int numServiceCalls = 0;
if (param instanceof ClientProtos.MultiRequest) {
ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
for (ClientProtos.Action action : regionAction.getActionList()) {
if (action.hasMutation()) {
numMutations++;
}
if (action.hasGet()) {
numGets++;
}
if (action.hasServiceCall()) {
numServiceCalls++;
}
}
}
}
final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
final String methodDescriptorName =
methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
SlowLogPayload slowLogPayload = SlowLogPayload.newBuilder()
.setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
.setClientAddress(clientAddress)
.setMethodName(methodDescriptorName)
.setMultiGets(numGets)
.setMultiMutations(numMutations)
.setMultiServiceCalls(numServiceCalls)
.setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
.setProcessingTime(processingTime)
.setQueueTime(qTime)
.setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
.setResponseSize(responseSize)
.setServerClass(className)
.setStartTime(startTime)
.setType(type)
.setUserName(userName)
.build();
queueForRingBuffer.add(slowLogPayload);
if (isSlowLogTableEnabled) {
if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
queueForSysTable.add(slowLogPayload);
}
}
}
private SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
final boolean isSlowLog = rpcCallDetails.isSlowLog();
final boolean isLargeLog = rpcCallDetails.isLargeLog();
final SlowLogPayload.Type type;
if (!isSlowLog && !isLargeLog) {
LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}",
rpcCallDetails);
return null;
}
if (isSlowLog && isLargeLog) {
type = SlowLogPayload.Type.ALL;
} else if (isSlowLog) {
type = SlowLogPayload.Type.SLOW_LOG;
} else {
type = SlowLogPayload.Type.LARGE_LOG;
}
return type;
}
/**
* Cleans up slow log payloads
*
* @return true if slow log payloads are cleaned up, false otherwise
*/
boolean clearSlowLogs() {
if (LOG.isDebugEnabled()) {
LOG.debug("Received request to clean up online slowlog buffer..");
}
queueForRingBuffer.clear();
return true;
}
/**
* Retrieve list of slow log payloads
*
* @param request slow log request parameters
* @return list of slow log payloads
*/
List<SlowLogPayload> getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
List<SlowLogPayload> slowLogPayloadList =
Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
.filter(e -> e.getType() == SlowLogPayload.Type.ALL
|| e.getType() == SlowLogPayload.Type.SLOW_LOG)
.collect(Collectors.toList());
// latest slow logs first, operator is interested in latest records from in-memory buffer
Collections.reverse(slowLogPayloadList);
return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
}
/**
* Retrieve list of large log payloads
*
* @param request large log request parameters
* @return list of large log payloads
*/
List<SlowLogPayload> getLargeLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
List<SlowLogPayload> slowLogPayloadList =
Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
.filter(e -> e.getType() == SlowLogPayload.Type.ALL
|| e.getType() == SlowLogPayload.Type.LARGE_LOG)
.collect(Collectors.toList());
// latest large logs first, operator is interested in latest records from in-memory buffer
Collections.reverse(slowLogPayloadList);
return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
}
/**
* Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
*/
void addAllLogsToSysTable() {
if (queueForSysTable == null) {
// hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.
return;
}
if (LOCK.isLocked()) {
return;
}
LOCK.lock();
try {
List<SlowLogPayload> slowLogPayloads = new ArrayList<>();
int i = 0;
while (!queueForSysTable.isEmpty()) {
slowLogPayloads.add(queueForSysTable.poll());
i++;
if (i == SYSTABLE_PUT_BATCH_SIZE) {
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
slowLogPayloads.clear();
i = 0;
}
}
if (slowLogPayloads.size() > 0) {
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
}
} finally {
LOCK.unlock();
}
}
}

View File

@ -17,7 +17,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
package org.apache.hadoop.hbase.regionserver.slowlog;
import java.util.ArrayList;
import java.util.List;
@ -30,7 +30,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* Event Handler utility class
*/
@InterfaceAudience.Private
public class LogHandlerUtils {
class LogHandlerUtils {
private static int getTotalFiltersCount(AdminProtos.SlowLogResponseRequest request) {
int totalFilters = 0;
@ -91,7 +91,7 @@ public class LogHandlerUtils {
return filteredSlowLogPayloads;
}
public static List<TooSlowLog.SlowLogPayload> getFilteredLogs(
static List<TooSlowLog.SlowLogPayload> getFilteredLogs(
AdminProtos.SlowLogResponseRequest request, List<TooSlowLog.SlowLogPayload> logPayloadList) {
int totalFilters = getTotalFiltersCount(request);
if (totalFilters > 0) {

View File

@ -17,29 +17,29 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
package org.apache.hadoop.hbase.regionserver.slowlog;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.yetus.audience.InterfaceAudience;
/**
* An envelope to carry payload in the ring buffer that serves as online buffer
* to provide latest events
* An envelope to carry payload in the slow log ring buffer that serves as online buffer
* to provide latest TooSlowLog
*/
@InterfaceAudience.Private
final class RingBufferEnvelope {
private NamedQueuePayload namedQueuePayload;
private RpcLogDetails rpcLogDetails;
/**
* Load the Envelope with {@link RpcCall}
*
* @param namedQueuePayload all details of rpc call that would be useful for ring buffer
* @param rpcLogDetails all details of rpc call that would be useful for ring buffer
* consumers
*/
public void load(NamedQueuePayload namedQueuePayload) {
this.namedQueuePayload = namedQueuePayload;
public void load(RpcLogDetails rpcLogDetails) {
this.rpcLogDetails = rpcLogDetails;
}
/**
@ -48,10 +48,10 @@ final class RingBufferEnvelope {
*
* @return Retrieve rpc log details
*/
public NamedQueuePayload getPayload() {
final NamedQueuePayload namedQueuePayload = this.namedQueuePayload;
this.namedQueuePayload = null;
return namedQueuePayload;
public RpcLogDetails getPayload() {
final RpcLogDetails rpcLogDetails = this.rpcLogDetails;
this.rpcLogDetails = null;
return rpcLogDetails;
}
}

View File

@ -17,7 +17,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
package org.apache.hadoop.hbase.regionserver.slowlog;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.ipc.RpcCall;
@ -28,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* RpcCall details that would be passed on to ring buffer of slow log responses
*/
@InterfaceAudience.Private
public class RpcLogDetails extends NamedQueuePayload {
public class RpcLogDetails {
private final RpcCall rpcCall;
private final Message param;
@ -40,7 +40,6 @@ public class RpcLogDetails extends NamedQueuePayload {
public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize,
String className, boolean isSlowLog, boolean isLargeLog) {
super(NamedQueueEvent.SLOW_LOG);
this.rpcCall = rpcCall;
this.param = param;
this.clientAddress = clientAddress;

View File

@ -17,80 +17,87 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
package org.apache.hadoop.hbase.regionserver.slowlog;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
/**
* NamedQueue recorder that maintains various named queues.
* The service uses LMAX Disruptor to save queue records which are then consumed by
* Online Slow/Large Log Provider Service that keeps slow/large RPC logs in the ring buffer.
* The service uses LMAX Disruptor to save slow records which are then consumed by
* a queue and based on the ring buffer size, the available records are then fetched
* from the queue in thread-safe manner.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class NamedQueueRecorder {
public class SlowLogRecorder {
private final Disruptor<RingBufferEnvelope> disruptor;
private final LogEventHandler logEventHandler;
private final int eventCount;
private final boolean isOnlineLogProviderEnabled;
private static NamedQueueRecorder namedQueueRecorder;
private static boolean isInit = false;
private static final Object LOCK = new Object();
private static final String SLOW_LOG_RING_BUFFER_SIZE =
"hbase.regionserver.slowlog.ringbuffer.size";
/**
* Initialize disruptor with configurable ringbuffer size
*/
private NamedQueueRecorder(Configuration conf) {
public SlowLogRecorder(Configuration conf) {
isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
if (!isOnlineLogProviderEnabled) {
this.disruptor = null;
this.logEventHandler = null;
this.eventCount = 0;
return;
}
this.eventCount = conf.getInt(SLOW_LOG_RING_BUFFER_SIZE,
HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
// This is the 'writer' -- a single threaded executor. This single thread consumes what is
// put on the ringbuffer.
final String hostingThreadName = Thread.currentThread().getName();
int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024);
// disruptor initialization with BlockingWaitStrategy
this.disruptor = new Disruptor<>(RingBufferEnvelope::new,
getEventCount(eventCount),
getEventCount(),
Threads.newDaemonThreadFactory(hostingThreadName + ".slowlog.append"),
ProducerType.MULTI,
new BlockingWaitStrategy());
this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
// initialize ringbuffer event handler
this.logEventHandler = new LogEventHandler(conf);
final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
this.logEventHandler = new LogEventHandler(this.eventCount, isSlowLogTableEnabled, conf);
this.disruptor.handleEventsWith(new LogEventHandler[]{this.logEventHandler});
this.disruptor.start();
}
public static NamedQueueRecorder getInstance(Configuration conf) {
if (namedQueueRecorder != null) {
return namedQueueRecorder;
}
synchronized (LOCK) {
if (!isInit) {
namedQueueRecorder = new NamedQueueRecorder(conf);
isInit = true;
}
}
return namedQueueRecorder;
}
// must be power of 2 for disruptor ringbuffer
private int getEventCount(int eventCount) {
Preconditions.checkArgument(eventCount >= 0, "hbase.namedqueue.ringbuffer.size must be > 0");
private int getEventCount() {
Preconditions.checkArgument(eventCount >= 0,
SLOW_LOG_RING_BUFFER_SIZE + " must be > 0");
int floor = Integer.highestOneBit(eventCount);
if (floor == eventCount) {
return floor;
@ -103,53 +110,66 @@ public class NamedQueueRecorder {
}
/**
* Retrieve in memory queue records from ringbuffer
* Retrieve online slow logs from ringbuffer
*
* @param request namedQueue request with event type
* @return queue records from ringbuffer after filter (if applied)
* @param request slow log request parameters
* @return online slow logs from ringbuffer
*/
public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
return this.logEventHandler.getNamedQueueRecords(request);
public List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
return isOnlineLogProviderEnabled ? this.logEventHandler.getSlowLogPayloads(request)
: Collections.emptyList();
}
/**
* clears queue records from ringbuffer
* Retrieve online large logs from ringbuffer
*
* @param request large log request parameters
* @return online large logs from ringbuffer
*/
public List<SlowLogPayload> getLargeLogPayloads(AdminProtos.SlowLogResponseRequest request) {
return isOnlineLogProviderEnabled ? this.logEventHandler.getLargeLogPayloads(request)
: Collections.emptyList();
}
/**
* clears slow log payloads from ringbuffer
*
* @param namedQueueEvent type of queue to clear
* @return true if slow log payloads are cleaned up or
* hbase.regionserver.slowlog.buffer.enabled is not set to true, false if failed to
* clean up slow logs
*/
public boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
return this.logEventHandler.clearNamedQueue(namedQueueEvent);
public boolean clearSlowLogPayloads() {
if (!isOnlineLogProviderEnabled) {
return true;
}
return this.logEventHandler.clearSlowLogs();
}
/**
* Add various NamedQueue records to ringbuffer. Based on the type of the event (e.g slowLog),
* consumer of disruptor ringbuffer will have specific logic.
* This method is producer of disruptor ringbuffer which is initialized in NamedQueueRecorder
* constructor.
* Add slow log rpcCall details to ringbuffer
*
* @param namedQueuePayload namedQueue payload sent by client of ring buffer
* service
* @param rpcLogDetails all details of rpc call that would be useful for ring buffer
* consumers
*/
public void addRecord(NamedQueuePayload namedQueuePayload) {
public void addSlowLogPayload(RpcLogDetails rpcLogDetails) {
if (!isOnlineLogProviderEnabled) {
return;
}
RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
long seqId = ringBuffer.next();
try {
ringBuffer.get(seqId).load(namedQueuePayload);
ringBuffer.get(seqId).load(rpcLogDetails);
} finally {
ringBuffer.publish(seqId);
}
}
/**
* Add all in memory queue records to system table. The implementors can use system table
* or direct HDFS file or ZK as persistence system.
* Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
*/
public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
public void addAllLogsToSysTable() {
if (this.logEventHandler != null) {
this.logEventHandler.persistAll(namedQueueEvent);
this.logEventHandler.addAllLogsToSysTable();
}
}

View File

@ -17,7 +17,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
package org.apache.hadoop.hbase.regionserver.slowlog;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
@ -33,7 +33,7 @@ public class SlowLogTableOpsChore extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableOpsChore.class);
private final NamedQueueRecorder namedQueueRecorder;
private final SlowLogRecorder slowLogRecorder;
/**
* Chore Constructor
@ -41,12 +41,12 @@ public class SlowLogTableOpsChore extends ScheduledChore {
* @param stopper The stopper - When {@link Stoppable#isStopped()} is true, this chore will
* cancel and cleanup
* @param period Period in millis with which this Chore repeats execution when scheduled
* @param namedQueueRecorder {@link NamedQueueRecorder} instance
* @param slowLogRecorder {@link SlowLogRecorder} instance
*/
public SlowLogTableOpsChore(final Stoppable stopper, final int period,
final NamedQueueRecorder namedQueueRecorder) {
final SlowLogRecorder slowLogRecorder) {
super("SlowLogTableOpsChore", stopper, period);
this.namedQueueRecorder = namedQueueRecorder;
this.slowLogRecorder = slowLogRecorder;
}
@Override
@ -54,7 +54,7 @@ public class SlowLogTableOpsChore extends ScheduledChore {
if (LOG.isTraceEnabled()) {
LOG.trace("SlowLog Table Ops Chore is starting up.");
}
namedQueueRecorder.persistAll(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
slowLogRecorder.addAllLogsToSysTable();
if (LOG.isTraceEnabled()) {
LOG.trace("SlowLog Table Ops Chore is closing.");
}

View File

@ -17,11 +17,10 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
package org.apache.hadoop.hbase.regionserver.slowlog;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
@ -34,11 +33,8 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -63,11 +59,11 @@ public class TestSlowLogAccessor {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSlowLogAccessor.class);
private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class);
private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
private NamedQueueRecorder namedQueueRecorder;
private SlowLogRecorder slowLogRecorder;
@BeforeClass
public static void setup() throws Exception {
@ -92,19 +88,9 @@ public class TestSlowLogAccessor {
@Before
public void setUp() throws Exception {
HRegionServer hRegionServer = HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0);
Field slowLogRecorder = HRegionServer.class.getDeclaredField("namedQueueRecorder");
Field slowLogRecorder = HRegionServer.class.getDeclaredField("slowLogRecorder");
slowLogRecorder.setAccessible(true);
this.namedQueueRecorder = (NamedQueueRecorder) slowLogRecorder.get(hRegionServer);
}
private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads(
AdminProtos.SlowLogResponseRequest request) {
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
namedQueueGetRequest.setSlowLogResponseRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
return namedQueueGetResponse.getSlowLogPayloads();
this.slowLogRecorder = (SlowLogRecorder) slowLogRecorder.get(hRegionServer);
}
@Test
@ -113,42 +99,42 @@ public class TestSlowLogAccessor {
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
slowLogRecorder.clearSlowLogPayloads();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
int i = 0;
Connection connection = waitForSlowLogTableCreation();
// add 5 records initially
for (; i < 5; i++) {
RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
// add 2 more records
for (; i < 7; i++) {
RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
// add 3 more records
for (; i < 10; i++) {
RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
// add 4 more records
for (; i < 14; i++) {
RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY
.waitFor(3000, () -> getSlowLogPayloads(request).size() == 14));
.waitFor(3000, () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
Assert.assertNotEquals(-1,
HBASE_TESTING_UTILITY.waitFor(3000, () -> getTableCount(connection) == 14));
@ -184,10 +170,10 @@ public class TestSlowLogAccessor {
public void testHigherSlowLogs() throws Exception {
Connection connection = waitForSlowLogTableCreation();
namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
slowLogRecorder.clearSlowLogPayloads();
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
for (int j = 0; j < 100; j++) {
CompletableFuture.runAsync(() -> {
@ -195,15 +181,15 @@ public class TestSlowLogAccessor {
if (i == 300) {
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
}
RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
RpcLogDetails rpcLogDetails = TestSlowLogRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
});
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(7000, () -> {
int count = getSlowLogPayloads(request).size();
int count = slowLogRecorder.getSlowLogPayloads(request).size();
LOG.debug("RingBuffer records count: {}", count);
return count > 2000;
}));

View File

@ -17,14 +17,12 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
package org.apache.hadoop.hbase.regionserver.slowlog;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@ -37,8 +35,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcCallback;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -65,25 +61,27 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPaylo
* Tests for Online SlowLog Provider Service
*/
@Category({MasterTests.class, MediumTests.class})
public class TestNamedQueueRecorder {
public class TestSlowLogRecorder {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestNamedQueueRecorder.class);
HBaseClassTestRule.forClass(TestSlowLogRecorder.class);
private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class);
private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
private NamedQueueRecorder namedQueueRecorder;
private SlowLogRecorder slowLogRecorder;
private static int i = 0;
private static Configuration applySlowLogRecorderConf(int eventSize) {
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize);
return conf;
}
/**
@ -92,10 +90,11 @@ public class TestNamedQueueRecorder {
*
* @param i index of ringbuffer logs
* @param j data value that was put on index i
* @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder}
* @param slowLogPayloads list of payload retrieved from {@link SlowLogRecorder}
* @return if actual values are as per expectations
*/
private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) {
boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j);
boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j);
boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j);
@ -103,18 +102,15 @@ public class TestNamedQueueRecorder {
}
@Test
public void testOnlieSlowLogConsumption() throws Exception{
public void testOnlieSlowLogConsumption() throws Exception {
Configuration conf = applySlowLogRecorderConf(8);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
slowLogRecorder.clearSlowLogPayloads();
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
int i = 0;
@ -123,12 +119,12 @@ public class TestNamedQueueRecorder {
for (; i < 5; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
Assert.assertNotEquals(-1,
HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5));
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 5));
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads));
Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads));
Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads));
@ -139,15 +135,15 @@ public class TestNamedQueueRecorder {
for (; i < 7; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(request).size() == 7));
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 7));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
return slowLogPayloadsList.size() == 7
&& confirmPayloadParams(0, 7, slowLogPayloadsList)
&& confirmPayloadParams(5, 2, slowLogPayloadsList)
@ -159,15 +155,15 @@ public class TestNamedQueueRecorder {
for (; i < 10; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(request).size() == 8));
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
// confirm ringbuffer is full
return slowLogPayloadsList.size() == 8
&& confirmPayloadParams(7, 3, slowLogPayloadsList)
@ -180,33 +176,15 @@ public class TestNamedQueueRecorder {
for (; i < 14; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(request).size() == 8));
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
// confirm ringbuffer is full
// and ordered events
return slowLogPayloadsList.size() == 8
&& confirmPayloadParams(0, 14, slowLogPayloadsList)
&& confirmPayloadParams(1, 13, slowLogPayloadsList)
&& confirmPayloadParams(2, 12, slowLogPayloadsList)
&& confirmPayloadParams(3, 11, slowLogPayloadsList);
})
);
AdminProtos.SlowLogResponseRequest largeLogRequest =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest);
List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
// confirm ringbuffer is full
// and ordered events
return slowLogPayloadsList.size() == 8
@ -219,12 +197,24 @@ public class TestNamedQueueRecorder {
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue(
NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getLargeLogPayloads(request);
// confirm ringbuffer is full
// and ordered events
return slowLogPayloadsList.size() == 8
&& confirmPayloadParams(0, 14, slowLogPayloadsList)
&& confirmPayloadParams(1, 13, slowLogPayloadsList)
&& confirmPayloadParams(2, 12, slowLogPayloadsList)
&& confirmPayloadParams(3, 11, slowLogPayloadsList);
})
);
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
LOG.debug("cleared the ringbuffer of Online Slow Log records");
List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
// confirm ringbuffer is empty
return slowLogPayloadsList.size() == 0 && isRingBufferCleaned;
})
@ -232,43 +222,30 @@ public class TestNamedQueueRecorder {
}
private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
namedQueueGetRequest.setSlowLogResponseRequest(request);
NamedQueueGetResponse namedQueueGetResponse =
namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
return namedQueueGetResponse == null ?
Collections.emptyList() : namedQueueGetResponse.getSlowLogPayloads();
}
@Test
public void testOnlineSlowLogWithHighRecords() throws Exception {
Configuration conf = applySlowLogRecorderConf(14);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 14 * 11; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(request).size() == 14));
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
// confirm strict order of slow log payloads
return slowLogPayloads.size() == 14
@ -289,37 +266,37 @@ public class TestNamedQueueRecorder {
})
);
boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue(
NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
Assert.assertTrue(isRingBufferCleaned);
LOG.debug("cleared the ringbuffer of Online Slow Log records");
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
// confirm ringbuffer is empty
Assert.assertEquals(slowLogPayloads.size(), 0);
}
@Test
public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 300; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
return slowLogPayloads.size() == 0;
})
);
@ -328,58 +305,56 @@ public class TestNamedQueueRecorder {
@Test
public void testOnlineSlowLogWithDisableConfig() throws Exception {
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 300; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
return slowLogPayloads.size() == 0;
})
);
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
}
@Test
public void testSlowLogFilters() throws Exception {
Configuration conf = applySlowLogRecorderConf(30);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setUserName("userName_87")
.build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 100; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter");
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(request).size() == 1));
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 1));
AdminProtos.SlowLogResponseRequest requestClient =
AdminProtos.SlowLogResponseRequest.newBuilder()
@ -387,32 +362,25 @@ public class TestNamedQueueRecorder {
.setClientAddress("client_85")
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(requestClient).size() == 1));
() -> slowLogRecorder.getSlowLogPayloads(requestClient).size() == 1));
AdminProtos.SlowLogResponseRequest requestSlowLog =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(requestSlowLog).size() == 15));
() -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15));
}
@Test
public void testConcurrentSlowLogEvents() throws Exception {
Configuration conf = applySlowLogRecorderConf(50000);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
AdminProtos.SlowLogResponseRequest largeLogRequest =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(500000)
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
.build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int j = 0; j < 1000; j++) {
@ -421,7 +389,7 @@ public class TestNamedQueueRecorder {
for (int i = 0; i < 3500; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
});
@ -429,24 +397,22 @@ public class TestNamedQueueRecorder {
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
slowLogRecorder.clearSlowLogPayloads();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
5000, () -> getSlowLogPayloads(request).size() > 10000));
5000, () -> slowLogRecorder.getSlowLogPayloads(request).size() > 10000));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
5000, () -> getSlowLogPayloads(largeLogRequest).size() > 10000));
5000, () -> slowLogRecorder.getLargeLogPayloads(request).size() > 10000));
}
@Test
public void testSlowLargeLogEvents() throws Exception {
Configuration conf = applySlowLogRecorderConf(28);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
boolean isSlowLog;
@ -462,16 +428,16 @@ public class TestNamedQueueRecorder {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1),
isSlowLog, isLargeLog);
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(request).size() == 14));
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
// confirm strict order of slow log payloads
return slowLogPayloads.size() == 14
@ -492,18 +458,12 @@ public class TestNamedQueueRecorder {
})
);
AdminProtos.SlowLogResponseRequest largeLogRequest =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(14 * 11)
.setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(largeLogRequest).size() == 14));
() -> slowLogRecorder.getLargeLogPayloads(request).size() == 14));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest);
List<SlowLogPayload> largeLogPayloads = slowLogRecorder.getLargeLogPayloads(request);
// confirm strict order of slow log payloads
return largeLogPayloads.size() == 14
@ -523,16 +483,14 @@ public class TestNamedQueueRecorder {
&& confirmPayloadParams(13, 128, largeLogPayloads);
})
);
}
@Test
public void testSlowLogMixedFilters() throws Exception {
Configuration conf = applySlowLogRecorderConf(30);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
@ -540,23 +498,23 @@ public class TestNamedQueueRecorder {
.setClientAddress("client_88")
.build();
Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
for (int i = 0; i < 100; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
namedQueueRecorder.addRecord(rpcLogDetails);
slowLogRecorder.addSlowLogPayload(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(request).size() == 2));
() -> slowLogRecorder.getSlowLogPayloads(request).size() == 2));
AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setUserName("userName_1")
.setClientAddress("client_2")
.build();
Assert.assertEquals(0, getSlowLogPayloads(request2).size());
Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request2).size());
AdminProtos.SlowLogResponseRequest request3 =
AdminProtos.SlowLogResponseRequest.newBuilder()
@ -565,7 +523,7 @@ public class TestNamedQueueRecorder {
.setClientAddress("client_88")
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
.build();
Assert.assertEquals(0, getSlowLogPayloads(request3).size());
Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request3).size());
AdminProtos.SlowLogResponseRequest request4 =
AdminProtos.SlowLogResponseRequest.newBuilder()
@ -574,7 +532,7 @@ public class TestNamedQueueRecorder {
.setClientAddress("client_87")
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
.build();
Assert.assertEquals(1, getSlowLogPayloads(request4).size());
Assert.assertEquals(1, slowLogRecorder.getSlowLogPayloads(request4).size());
AdminProtos.SlowLogResponseRequest request5 =
AdminProtos.SlowLogResponseRequest.newBuilder()
@ -583,14 +541,14 @@ public class TestNamedQueueRecorder {
.setClientAddress("client_89")
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR)
.build();
Assert.assertEquals(2, getSlowLogPayloads(request5).size());
Assert.assertEquals(2, slowLogRecorder.getSlowLogPayloads(request5).size());
AdminProtos.SlowLogResponseRequest requestSlowLog =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> getSlowLogPayloads(requestSlowLog).size() == 15));
() -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15));
}
static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
@ -804,20 +762,26 @@ public class TestNamedQueueRecorder {
private static Optional<User> getUser(String userName) {
return Optional.of(new User() {
@Override
public String getShortName() {
return userName;
}
@Override
public <T> T runAs(PrivilegedAction<T> action) {
return null;
}
@Override
public <T> T runAs(PrivilegedExceptionAction<T> action) {
public <T> T runAs(PrivilegedExceptionAction<T> action) throws
IOException, InterruptedException {
return null;
}
});
}